libzmq  master
ZeroMQ C++ Core Engine (LIBZMQ)
tcp_connecter.cpp
Go to the documentation of this file.
1 /*
2  Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 
4  This file is part of libzmq, the ZeroMQ core engine in C++.
5 
6  libzmq is free software; you can redistribute it and/or modify it under
7  the terms of the GNU Lesser General Public License (LGPL) as published
8  by the Free Software Foundation; either version 3 of the License, or
9  (at your option) any later version.
10 
11  As a special exception, the Contributors give you permission to link
12  this library with independent modules to produce an executable,
13  regardless of the license terms of these independent modules, and to
14  copy and distribute the resulting executable under terms of your choice,
15  provided that you also meet, for each linked independent module, the
16  terms and conditions of the license of that module. An independent
17  module is a module which is not derived from or based on this library.
18  If you modify this library, you must extend this exception to your
19  version of the library.
20 
21  libzmq is distributed in the hope that it will be useful, but WITHOUT
22  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23  FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24  License for more details.
25 
26  You should have received a copy of the GNU Lesser General Public License
27  along with this program. If not, see <http://www.gnu.org/licenses/>.
28 */
29 
30 #include "precompiled.hpp"
31 #include <new>
32 #include <string>
33 
34 #include "macros.hpp"
35 #include "tcp_connecter.hpp"
36 #include "stream_engine.hpp"
37 #include "io_thread.hpp"
38 #include "platform.hpp"
39 #include "random.hpp"
40 #include "err.hpp"
41 #include "ip.hpp"
42 #include "tcp.hpp"
43 #include "address.hpp"
44 #include "tcp_address.hpp"
45 #include "session_base.hpp"
46 
47 #if defined ZMQ_HAVE_WINDOWS
48 #include "windows.hpp"
49 #else
50 #include <unistd.h>
51 #include <sys/types.h>
52 #include <sys/socket.h>
53 #include <arpa/inet.h>
54 #include <netinet/tcp.h>
55 #include <netinet/in.h>
56 #include <netdb.h>
57 #include <fcntl.h>
58 #ifdef ZMQ_HAVE_OPENVMS
59 #include <ioctl.h>
60 #endif
61 #endif
62 
64  class session_base_t *session_, const options_t &options_,
65  address_t *addr_, bool delayed_start_) :
66  own_t (io_thread_, options_),
67  io_object_t (io_thread_),
68  addr (addr_),
69  s (retired_fd),
70  handle(NULL),
71  handle_valid (false),
72  delayed_start (delayed_start_),
73  connect_timer_started (false),
74  reconnect_timer_started (false),
75  session (session_),
76  current_reconnect_ivl (options.reconnect_ivl)
77 {
78  zmq_assert (addr);
79  zmq_assert (addr->protocol == "tcp");
82 }
83 
85 {
89  zmq_assert (s == retired_fd);
90 }
91 
93 {
94  if (delayed_start)
96  else
98 }
99 
101 {
102  if (connect_timer_started) {
104  connect_timer_started = false;
105  }
106 
109  reconnect_timer_started = false;
110  }
111 
112  if (handle_valid) {
113  rm_fd (handle);
114  handle_valid = false;
115  }
116 
117  if (s != retired_fd)
118  close ();
119 
120  own_t::process_term (linger_);
121 }
122 
124 {
125  // We are not polling for incoming data, so we are actually called
126  // because of error here. However, we can get error on out event as well
127  // on some platforms, so we'll simply handle both events in the same way.
128  out_event ();
129 }
130 
132 {
133  if (connect_timer_started) {
135  connect_timer_started = false;
136  }
137 
138  rm_fd (handle);
139  handle_valid = false;
140 
141  const fd_t fd = connect ();
142  // Handle the error condition by attempt to reconnect.
143  if (fd == retired_fd) {
144  close ();
146  return;
147  }
148 
149  tune_tcp_socket (fd);
153 
154  // Create the engine object for this connection.
155  stream_engine_t *engine = new (std::nothrow)
157  alloc_assert (engine);
158 
159  // Attach the engine to the corresponding session object.
160  send_attach (session, engine);
161 
162  // Shut the connecter down.
163  terminate ();
164 
165  socket->event_connected (endpoint, (int) fd);
166 }
167 
169 {
171  if (id_ == connect_timer_id) {
172  connect_timer_started = false;
173 
174  rm_fd (handle);
175  handle_valid = false;
176 
177  close ();
179  }
180  else if (id_ == reconnect_timer_id) {
181  reconnect_timer_started = false;
182  start_connecting ();
183  }
184 }
185 
187 {
188  // Open the connecting socket.
189  const int rc = open ();
190 
191  // Connect may succeed in synchronous manner.
192  if (rc == 0) {
193  handle = add_fd (s);
194  handle_valid = true;
195  out_event ();
196  }
197 
198  // Connection establishment may be delayed. Poll for its completion.
199  else
200  if (rc == -1 && errno == EINPROGRESS) {
201  handle = add_fd (s);
202  handle_valid = true;
205 
206  // add userspace connect timeout
208  }
209 
210  // Handle any other error condition by eventual reconnect.
211  else {
212  if (s != retired_fd)
213  close ();
215  }
216 }
217 
219 {
220  if (options.connect_timeout > 0) {
222  connect_timer_started = true;
223  }
224 }
225 
227 {
228  const int interval = get_new_reconnect_ivl ();
229  add_timer (interval, reconnect_timer_id);
232 }
233 
235 {
236  // The new interval is the current interval + random value.
237  const int interval = current_reconnect_ivl +
239 
240  // Only change the current reconnect interval if the maximum reconnect
241  // interval was set and if it's larger than the reconnect interval.
242  if (options.reconnect_ivl_max > 0 &&
244  // Calculate the next interval
247  return interval;
248 }
249 
251 {
252  zmq_assert (s == retired_fd);
253 
254  // Resolve the address
255  if (addr->resolved.tcp_addr != NULL) {
257  }
258 
259  addr->resolved.tcp_addr = new (std::nothrow) tcp_address_t ();
261  int rc = addr->resolved.tcp_addr->resolve (
262  addr->address.c_str (), false, options.ipv6);
263  if (rc != 0) {
265  return -1;
266  }
267  zmq_assert (addr->resolved.tcp_addr != NULL);
268  tcp_address_t * const tcp_addr = addr->resolved.tcp_addr;
269 
270  // Create the socket.
271  s = open_socket (tcp_addr->family (), SOCK_STREAM, IPPROTO_TCP);
272 
273  // IPv6 address family not supported, try automatic downgrade to IPv4.
274  if (s == zmq::retired_fd && tcp_addr->family () == AF_INET6
275  && errno == EAFNOSUPPORT
276  && options.ipv6) {
277  rc = addr->resolved.tcp_addr->resolve (
278  addr->address.c_str (), false, false);
279  if (rc != 0) {
281  return -1;
282  }
283  s = open_socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
284  }
285 
286 #ifdef ZMQ_HAVE_WINDOWS
287  if (s == INVALID_SOCKET) {
288  errno = wsa_error_to_errno (WSAGetLastError ());
289  return -1;
290  }
291 #else
292  if (s == -1)
293  return -1;
294 #endif
295 
296  // On some systems, IPv4 mapping in IPv6 sockets is disabled by default.
297  // Switch it on in such cases.
298  if (tcp_addr->family () == AF_INET6)
300 
301  // Set the IP Type-Of-Service priority for this socket
302  if (options.tos != 0)
304 
305  // Set the socket to non-blocking mode so that we get async connect().
306  unblock_socket (s);
307 
308  // Set the socket buffer limits for the underlying socket.
309  if (options.sndbuf >= 0)
311  if (options.rcvbuf >= 0)
313 
314  // Set the IP Type-Of-Service for the underlying socket
315  if (options.tos != 0)
317 
318  // Set a source address for conversations
319  if (tcp_addr->has_src_addr ()) {
320  rc = ::bind (s, tcp_addr->src_addr (), tcp_addr->src_addrlen ());
321  if (rc == -1)
322  return -1;
323  }
324 
325  // Connect to the remote peer.
326  rc = ::connect (s, tcp_addr->addr (), tcp_addr->addrlen ());
327 
328  // Connect was successful immediately.
329  if (rc == 0)
330  return 0;
331 
332  // Translate error codes indicating asynchronous connect has been
333  // launched to a uniform EINPROGRESS.
334 #ifdef ZMQ_HAVE_WINDOWS
335  const int last_error = WSAGetLastError();
336  if (last_error == WSAEINPROGRESS || last_error == WSAEWOULDBLOCK)
337  errno = EINPROGRESS;
338  else
339  errno = wsa_error_to_errno (last_error);
340 #else
341  if (errno == EINTR)
342  errno = EINPROGRESS;
343 #endif
344  return -1;
345 }
346 
348 {
349  // Async connect has finished. Check whether an error occurred
350  int err = 0;
351 #ifdef ZMQ_HAVE_HPUX
352  int len = sizeof err;
353 #else
354  socklen_t len = sizeof err;
355 #endif
356 
357  const int rc = getsockopt (s, SOL_SOCKET, SO_ERROR, (char*) &err, &len);
358 
359  // Assert if the error was caused by 0MQ bug.
360  // Networking problems are OK. No need to assert.
361 #ifdef ZMQ_HAVE_WINDOWS
362  zmq_assert (rc == 0);
363  if (err != 0) {
364  if (err == WSAEBADF ||
365  err == WSAENOPROTOOPT ||
366  err == WSAENOTSOCK ||
367  err == WSAENOBUFS)
368  {
369  wsa_assert_no (err);
370  }
371  return retired_fd;
372  }
373 #else
374  // Following code should handle both Berkeley-derived socket
375  // implementations and Solaris.
376  if (rc == -1)
377  err = errno;
378  if (err != 0) {
379  errno = err;
380  errno_assert (
381  errno != EBADF &&
382  errno != ENOPROTOOPT &&
383  errno != ENOTSOCK &&
384  errno != ENOBUFS);
385  return retired_fd;
386  }
387 #endif
388 
389  // Return the newly connected socket.
390  const fd_t result = s;
391  s = retired_fd;
392  return result;
393 }
394 
396 {
397  zmq_assert (s != retired_fd);
398 #ifdef ZMQ_HAVE_WINDOWS
399  const int rc = closesocket (s);
400  wsa_assert (rc != SOCKET_ERROR);
401 #else
402  const int rc = ::close (s);
403  errno_assert (rc == 0);
404 #endif
405  socket->event_closed (endpoint, (int) s);
406  s = retired_fd;
407 }
#define LIBZMQ_DELETE(p_object)
Definition: macros.hpp:7
void send_attach(zmq::session_base_t *destination_, zmq::i_engine *engine_, bool inc_seqnum_=true)
Definition: object.cpp:225
void tune_tcp_keepalives(fd_t s_, int keepalive_, int keepalive_cnt_, int keepalive_idle_, int keepalive_intvl_)
Definition: tcp.cpp:96
void process_term(int linger_)
Definition: own.cpp:158
void set_pollout(handle_t handle_)
Definition: io_object.cpp:84
zmq::session_base_t * session
fd_t open_socket(int domain_, int type_, int protocol_)
Definition: ip.cpp:50
tcp_address_t * tcp_addr
Definition: address.hpp:60
const std::string address
Definition: address.hpp:55
void unblock_socket(fd_t s_)
Definition: ip.cpp:84
#define zmq_assert(x)
Definition: err.hpp:119
void cancel_timer(int id_)
Definition: io_object.cpp:99
void process_term(int linger_)
bool has_src_addr() const
socklen_t src_addrlen() const
Definition: command.hpp:84
int connect_timeout
Definition: options.hpp:105
#define ENOBUFS
Definition: zmq.h:118
ZMQ_EXPORT int zmq_errno(void)
Definition: zmq.cpp:107
void event_connected(const std::string &addr_, int fd_)
zmq::socket_base_t * socket
void terminate()
Definition: own.cpp:135
int reconnect_ivl_max
Definition: options.hpp:118
int resolve(const char *name_, bool local_, bool ipv6_, bool is_src_=false)
void event_connect_delayed(const std::string &addr_, int err_)
#define ENOTSOCK
Definition: zmq.h:136
int tcp_keepalive_idle
Definition: options.hpp:159
socket_base_t * get_socket()
void set_tcp_receive_buffer(fd_t sockfd_, int bufsize_)
Definition: tcp.cpp:85
void tune_tcp_maxrt(fd_t sockfd_, int timeout_)
Definition: tcp.cpp:166
void tune_tcp_socket(fd_t s_)
Definition: tcp.cpp:51
void enable_ipv4_mapping(fd_t s_)
Definition: ip.cpp:103
uint32_t generate_random()
Definition: random.cpp:54
const std::string protocol
Definition: address.hpp:54
void set_tcp_send_buffer(fd_t sockfd_, int bufsize_)
Definition: tcp.cpp:74
void add_timer(int timout_, int id_)
Definition: io_object.cpp:94
void event_closed(const std::string &addr_, int fd_)
const sockaddr * addr() const
void event_connect_retried(const std::string &addr_, int interval_)
socklen_t addrlen() const
int tcp_keepalive_cnt
Definition: options.hpp:158
#define EINPROGRESS
Definition: zmq.h:133
sa_family_t family() const
#define alloc_assert(x)
Definition: err.hpp:159
#define errno_assert(x)
Definition: err.hpp:129
handle_t add_fd(fd_t fd_)
Definition: io_object.cpp:64
options_t options
Definition: own.hpp:109
union zmq::address_t::@0 resolved
int to_string(std::string &addr_) const
Definition: address.cpp:95
tcp_connecter_t(zmq::io_thread_t *io_thread_, zmq::session_base_t *session_, const options_t &options_, address_t *addr_, bool delayed_start_)
const sockaddr * src_addr() const
void rm_fd(handle_t handle_)
Definition: io_object.cpp:69
int tcp_keepalive_intvl
Definition: options.hpp:160
void timer_event(int id_)
#define EAFNOSUPPORT
Definition: zmq.h:142
void set_ip_type_of_service(fd_t s_, int iptos)
Definition: ip.cpp:169