32 #if defined ZMQ_HAVE_VMCI 38 #include "platform.hpp" 47 zmq::vmci_connecter_t::vmci_connecter_t (
class io_thread_t *io_thread_,
48 class session_base_t *session_,
const options_t &options_,
49 const address_t *addr_,
bool delayed_start_) :
50 own_t (io_thread_, options_),
51 io_object_t (io_thread_),
55 delayed_start (delayed_start_),
56 timer_started (false),
58 current_reconnect_ivl(options.reconnect_ivl)
62 addr->to_string (endpoint);
63 socket = session-> get_socket();
66 zmq::vmci_connecter_t::~vmci_connecter_t ()
73 void zmq::vmci_connecter_t::process_plug ()
76 add_reconnect_timer ();
81 void zmq::vmci_connecter_t::process_term (
int linger_)
84 cancel_timer (reconnect_timer_id);
85 timer_started =
false;
99 void zmq::vmci_connecter_t::in_event ()
107 void zmq::vmci_connecter_t::out_event ()
109 fd_t fd = connect ();
111 handle_valid =
false;
116 add_reconnect_timer();
120 tune_vmci_buffer_size (this->get_ctx (), fd, options.vmci_buffer_size,
121 options.vmci_buffer_min_size, options.vmci_buffer_max_size);
123 if (options.vmci_connect_timeout > 0)
125 #if defined ZMQ_HAVE_WINDOWS 126 tune_vmci_connect_timeout (this->get_ctx (), fd, options.vmci_connect_timeout);
128 struct timeval timeout = {0, options.vmci_connect_timeout * 1000};
129 tune_vmci_connect_timeout (this->get_ctx (), fd, timeout);
134 stream_engine_t *engine =
new (std::nothrow)
135 stream_engine_t (fd, options, endpoint);
139 send_attach (session, engine);
144 socket->event_connected (endpoint, fd);
147 void zmq::vmci_connecter_t::timer_event (
int id_)
150 timer_started =
false;
154 void zmq::vmci_connecter_t::start_connecting ()
171 set_pollout (handle);
172 socket->event_connect_delayed (endpoint,
zmq_errno());
179 add_reconnect_timer ();
183 void zmq::vmci_connecter_t::add_reconnect_timer()
185 int rc_ivl = get_new_reconnect_ivl();
186 add_timer (rc_ivl, reconnect_timer_id);
187 socket->event_connect_retried (endpoint, rc_ivl);
188 timer_started =
true;
191 int zmq::vmci_connecter_t::get_new_reconnect_ivl ()
194 int this_interval = current_reconnect_ivl +
199 if (options.reconnect_ivl_max > 0 &&
200 options.reconnect_ivl_max > options.reconnect_ivl) {
203 current_reconnect_ivl = current_reconnect_ivl * 2;
204 if(current_reconnect_ivl >= options.reconnect_ivl_max) {
205 current_reconnect_ivl = options.reconnect_ivl_max;
208 return this_interval;
211 int zmq::vmci_connecter_t::open ()
215 int family = this->get_ctx ()->get_vmci_socket_family ();
221 #ifdef ZMQ_HAVE_WINDOWS 222 if (s == INVALID_SOCKET) {
223 errno = wsa_error_to_errno(WSAGetLastError());
236 s, addr->resolved.vmci_addr->addr (),
237 addr->resolved.vmci_addr->addrlen ());
245 #ifdef ZMQ_HAVE_WINDOWS 246 const int error_code = WSAGetLastError();
247 if (error_code == WSAEINPROGRESS || error_code == WSAEWOULDBLOCK)
250 errno = wsa_error_to_errno(error_code);
260 void zmq::vmci_connecter_t::close ()
263 #ifdef ZMQ_HAVE_WINDOWS 264 const int rc = closesocket (s);
265 wsa_assert (rc != SOCKET_ERROR);
267 const int rc = ::close (s);
270 socket->event_closed (endpoint, s);
274 zmq::fd_t zmq::vmci_connecter_t::connect ()
279 #if defined ZMQ_HAVE_HPUX 280 int len =
sizeof (err);
282 socklen_t len =
sizeof (err);
284 int rc = getsockopt (s, SOL_SOCKET, SO_ERROR, (
char*) &err, &len);
288 #ifdef ZMQ_HAVE_WINDOWS 291 if (err != WSAECONNREFUSED
292 && err != WSAETIMEDOUT
293 && err != WSAECONNABORTED
294 && err != WSAEHOSTUNREACH
295 && err != WSAENETUNREACH
296 && err != WSAENETDOWN
299 && err != WSAEADDRINUSE
300 && err != WSAECONNRESET)
void process_term(int linger_)
fd_t open_socket(int domain_, int type_, int protocol_)
void unblock_socket(fd_t s_)
ZMQ_EXPORT int zmq_errno(void)
uint32_t generate_random()