32 #if defined ZMQ_HAVE_TIPC 39 #include "platform.hpp" 48 #include <sys/types.h> 49 #include <sys/socket.h> 51 zmq::tipc_connecter_t::tipc_connecter_t (
class io_thread_t *io_thread_,
52 class session_base_t *session_,
const options_t &options_,
53 const address_t *addr_,
bool delayed_start_) :
54 own_t (io_thread_, options_),
55 io_object_t (io_thread_),
59 delayed_start (delayed_start_),
60 timer_started (false),
62 current_reconnect_ivl(options.reconnect_ivl)
66 addr->to_string (endpoint);
67 socket = session-> get_socket();
70 zmq::tipc_connecter_t::~tipc_connecter_t ()
77 void zmq::tipc_connecter_t::process_plug ()
80 add_reconnect_timer ();
85 void zmq::tipc_connecter_t::process_term (
int linger_)
88 cancel_timer (reconnect_timer_id);
89 timer_started =
false;
103 void zmq::tipc_connecter_t::in_event ()
111 void zmq::tipc_connecter_t::out_event ()
113 fd_t fd = connect ();
115 handle_valid =
false;
120 add_reconnect_timer();
124 stream_engine_t *engine =
new (std::nothrow) stream_engine_t (fd, options, endpoint);
128 send_attach (session, engine);
133 socket->event_connected (endpoint, fd);
136 void zmq::tipc_connecter_t::timer_event (
int id_)
139 timer_started =
false;
143 void zmq::tipc_connecter_t::start_connecting ()
160 set_pollout (handle);
161 socket->event_connect_delayed (endpoint,
zmq_errno());
168 add_reconnect_timer ();
172 void zmq::tipc_connecter_t::add_reconnect_timer()
174 int rc_ivl = get_new_reconnect_ivl();
175 add_timer (rc_ivl, reconnect_timer_id);
176 socket->event_connect_retried (endpoint, rc_ivl);
177 timer_started =
true;
180 int zmq::tipc_connecter_t::get_new_reconnect_ivl ()
183 int this_interval = current_reconnect_ivl +
188 if (options.reconnect_ivl_max > 0 &&
189 options.reconnect_ivl_max > options.reconnect_ivl) {
192 current_reconnect_ivl = current_reconnect_ivl * 2;
193 if(current_reconnect_ivl >= options.reconnect_ivl_max) {
194 current_reconnect_ivl = options.reconnect_ivl_max;
197 return this_interval;
200 int zmq::tipc_connecter_t::open ()
213 s, addr->resolved.tipc_addr->addr (),
214 addr->resolved.tipc_addr->addrlen ());
222 if (rc == -1 && errno == EINTR) {
230 void zmq::tipc_connecter_t::close ()
233 int rc = ::close (s);
235 socket->event_closed (endpoint, s);
239 zmq::fd_t zmq::tipc_connecter_t::connect ()
244 socklen_t len =
sizeof (err);
246 int rc = getsockopt (s, SOL_SOCKET, SO_ERROR, (
char*) &err, &len);
void process_term(int linger_)
fd_t open_socket(int domain_, int type_, int protocol_)
void unblock_socket(fd_t s_)
ZMQ_EXPORT int zmq_errno(void)
uint32_t generate_random()