Line data Source code
1 : /*
2 : Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 :
4 : This file is part of libzmq, the ZeroMQ core engine in C++.
5 :
6 : libzmq is free software; you can redistribute it and/or modify it under
7 : the terms of the GNU Lesser General Public License (LGPL) as published
8 : by the Free Software Foundation; either version 3 of the License, or
9 : (at your option) any later version.
10 :
11 : As a special exception, the Contributors give you permission to link
12 : this library with independent modules to produce an executable,
13 : regardless of the license terms of these independent modules, and to
14 : copy and distribute the resulting executable under terms of your choice,
15 : provided that you also meet, for each linked independent module, the
16 : terms and conditions of the license of that module. An independent
17 : module is a module which is not derived from or based on this library.
18 : If you modify this library, you must extend this exception to your
19 : version of the library.
20 :
21 : libzmq is distributed in the hope that it will be useful, but WITHOUT
22 : ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 : FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 : License for more details.
25 :
26 : You should have received a copy of the GNU Lesser General Public License
27 : along with this program. If not, see <http://www.gnu.org/licenses/>.
28 : */
29 :
30 : #include "precompiled.hpp"
31 : #include <new>
32 : #include <string>
33 :
34 : #include "macros.hpp"
35 : #include "tcp_connecter.hpp"
36 : #include "stream_engine.hpp"
37 : #include "io_thread.hpp"
38 : #include "platform.hpp"
39 : #include "random.hpp"
40 : #include "err.hpp"
41 : #include "ip.hpp"
42 : #include "tcp.hpp"
43 : #include "address.hpp"
44 : #include "tcp_address.hpp"
45 : #include "session_base.hpp"
46 :
47 : #if defined ZMQ_HAVE_WINDOWS
48 : #include "windows.hpp"
49 : #else
50 : #include <unistd.h>
51 : #include <sys/types.h>
52 : #include <sys/socket.h>
53 : #include <arpa/inet.h>
54 : #include <netinet/tcp.h>
55 : #include <netinet/in.h>
56 : #include <netdb.h>
57 : #include <fcntl.h>
58 : #ifdef ZMQ_HAVE_OPENVMS
59 : #include <ioctl.h>
60 : #endif
61 : #endif
62 :
63 3877 : zmq::tcp_connecter_t::tcp_connecter_t (class io_thread_t *io_thread_,
64 : class session_base_t *session_, const options_t &options_,
65 : address_t *addr_, bool delayed_start_) :
66 : own_t (io_thread_, options_),
67 : io_object_t (io_thread_),
68 : addr (addr_),
69 : s (retired_fd),
70 : handle(NULL),
71 : handle_valid (false),
72 : delayed_start (delayed_start_),
73 : connect_timer_started (false),
74 : reconnect_timer_started (false),
75 : session (session_),
76 3877 : current_reconnect_ivl (options.reconnect_ivl)
77 : {
78 3876 : zmq_assert (addr);
79 7752 : zmq_assert (addr->protocol == "tcp");
80 3876 : addr->to_string (endpoint);
81 3877 : socket = session->get_socket ();
82 3877 : }
83 :
84 15508 : zmq::tcp_connecter_t::~tcp_connecter_t ()
85 : {
86 3877 : zmq_assert (!connect_timer_started);
87 3877 : zmq_assert (!reconnect_timer_started);
88 3877 : zmq_assert (!handle_valid);
89 3877 : zmq_assert (s == retired_fd);
90 7754 : }
91 :
92 3876 : void zmq::tcp_connecter_t::process_plug ()
93 : {
94 3876 : if (delayed_start)
95 199 : add_reconnect_timer ();
96 : else
97 3677 : start_connecting ();
98 3877 : }
99 :
100 3876 : void zmq::tcp_connecter_t::process_term (int linger_)
101 : {
102 3876 : if (connect_timer_started) {
103 0 : cancel_timer (connect_timer_id);
104 0 : connect_timer_started = false;
105 : }
106 :
107 3876 : if (reconnect_timer_started) {
108 205 : cancel_timer (reconnect_timer_id);
109 205 : reconnect_timer_started = false;
110 : }
111 :
112 3876 : if (handle_valid) {
113 488 : rm_fd (handle);
114 488 : handle_valid = false;
115 : }
116 :
117 3876 : if (s != retired_fd)
118 488 : close ();
119 :
120 3876 : own_t::process_term (linger_);
121 3877 : }
122 :
123 114 : void zmq::tcp_connecter_t::in_event ()
124 : {
125 : // We are not polling for incoming data, so we are actually called
126 : // because of error here. However, we can get error on out event as well
127 : // on some platforms, so we'll simply handle both events in the same way.
128 114 : out_event ();
129 114 : }
130 :
131 3297 : void zmq::tcp_connecter_t::out_event ()
132 : {
133 3297 : if (connect_timer_started) {
134 0 : cancel_timer (connect_timer_id);
135 0 : connect_timer_started = false;
136 : }
137 :
138 3297 : rm_fd (handle);
139 3298 : handle_valid = false;
140 :
141 3298 : const fd_t fd = connect ();
142 : // Handle the error condition by attempt to reconnect.
143 3298 : if (fd == retired_fd) {
144 114 : close ();
145 114 : add_reconnect_timer ();
146 3412 : return;
147 : }
148 :
149 3184 : tune_tcp_socket (fd);
150 : tune_tcp_keepalives (fd, options.tcp_keepalive, options.tcp_keepalive_cnt,
151 3184 : options.tcp_keepalive_idle, options.tcp_keepalive_intvl);
152 3184 : tune_tcp_maxrt (fd, options.tcp_maxrt);
153 :
154 : // Create the engine object for this connection.
155 : stream_engine_t *engine = new (std::nothrow)
156 3184 : stream_engine_t (fd, options, endpoint);
157 3184 : alloc_assert (engine);
158 :
159 : // Attach the engine to the corresponding session object.
160 3184 : send_attach (session, engine);
161 :
162 : // Shut the connecter down.
163 3184 : terminate ();
164 :
165 3184 : socket->event_connected (endpoint, (int) fd);
166 : }
167 :
168 117 : void zmq::tcp_connecter_t::timer_event (int id_)
169 : {
170 117 : zmq_assert (id_ == reconnect_timer_id || id_ == connect_timer_id);
171 117 : if (id_ == connect_timer_id) {
172 0 : connect_timer_started = false;
173 :
174 0 : rm_fd (handle);
175 0 : handle_valid = false;
176 :
177 0 : close ();
178 0 : add_reconnect_timer ();
179 : }
180 117 : else if (id_ == reconnect_timer_id) {
181 117 : reconnect_timer_started = false;
182 117 : start_connecting ();
183 : }
184 117 : }
185 :
186 3794 : void zmq::tcp_connecter_t::start_connecting ()
187 : {
188 : // Open the connecting socket.
189 3794 : const int rc = open ();
190 :
191 : // Connect may succeed in synchronous manner.
192 3795 : if (rc == 0) {
193 0 : handle = add_fd (s);
194 0 : handle_valid = true;
195 0 : out_event ();
196 : }
197 :
198 : // Connection establishment may be delayed. Poll for its completion.
199 : else
200 3795 : if (rc == -1 && errno == EINPROGRESS) {
201 3786 : handle = add_fd (s);
202 3786 : handle_valid = true;
203 3786 : set_pollout (handle);
204 3786 : socket->event_connect_delayed (endpoint, zmq_errno());
205 :
206 : // add userspace connect timeout
207 : add_connect_timer ();
208 : }
209 :
210 : // Handle any other error condition by eventual reconnect.
211 : else {
212 9 : if (s != retired_fd)
213 3 : close ();
214 9 : add_reconnect_timer ();
215 : }
216 3795 : }
217 :
218 0 : void zmq::tcp_connecter_t::add_connect_timer ()
219 : {
220 3786 : if (options.connect_timeout > 0) {
221 0 : add_timer (options.connect_timeout, connect_timer_id);
222 0 : connect_timer_started = true;
223 : }
224 0 : }
225 :
226 322 : void zmq::tcp_connecter_t::add_reconnect_timer ()
227 : {
228 322 : const int interval = get_new_reconnect_ivl ();
229 322 : add_timer (interval, reconnect_timer_id);
230 322 : socket->event_connect_retried (endpoint, interval);
231 322 : reconnect_timer_started = true;
232 322 : }
233 :
234 322 : int zmq::tcp_connecter_t::get_new_reconnect_ivl ()
235 : {
236 : // The new interval is the current interval + random value.
237 644 : const int interval = current_reconnect_ivl +
238 644 : generate_random () % options.reconnect_ivl;
239 :
240 : // Only change the current reconnect interval if the maximum reconnect
241 : // interval was set and if it's larger than the reconnect interval.
242 322 : if (options.reconnect_ivl_max > 0 &&
243 : options.reconnect_ivl_max > options.reconnect_ivl)
244 : // Calculate the next interval
245 : current_reconnect_ivl =
246 0 : std::min (current_reconnect_ivl * 2, options.reconnect_ivl_max);
247 322 : return interval;
248 : }
249 :
250 3794 : int zmq::tcp_connecter_t::open ()
251 : {
252 3794 : zmq_assert (s == retired_fd);
253 :
254 : // Resolve the address
255 3794 : if (addr->resolved.tcp_addr != NULL) {
256 117 : LIBZMQ_DELETE(addr->resolved.tcp_addr);
257 : }
258 :
259 3794 : addr->resolved.tcp_addr = new (std::nothrow) tcp_address_t ();
260 3795 : alloc_assert (addr->resolved.tcp_addr);
261 : int rc = addr->resolved.tcp_addr->resolve (
262 7590 : addr->address.c_str (), false, options.ipv6);
263 3795 : if (rc != 0) {
264 6 : LIBZMQ_DELETE(addr->resolved.tcp_addr);
265 6 : return -1;
266 : }
267 3789 : zmq_assert (addr->resolved.tcp_addr != NULL);
268 3789 : tcp_address_t * const tcp_addr = addr->resolved.tcp_addr;
269 :
270 : // Create the socket.
271 3789 : s = open_socket (tcp_addr->family (), SOCK_STREAM, IPPROTO_TCP);
272 :
273 : // IPv6 address family not supported, try automatic downgrade to IPv4.
274 3789 : if (s == zmq::retired_fd && tcp_addr->family () == AF_INET6
275 0 : && errno == EAFNOSUPPORT
276 3789 : && options.ipv6) {
277 : rc = addr->resolved.tcp_addr->resolve (
278 0 : addr->address.c_str (), false, false);
279 0 : if (rc != 0) {
280 0 : LIBZMQ_DELETE(addr->resolved.tcp_addr);
281 0 : return -1;
282 : }
283 0 : s = open_socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
284 : }
285 :
286 : #ifdef ZMQ_HAVE_WINDOWS
287 : if (s == INVALID_SOCKET) {
288 : errno = wsa_error_to_errno (WSAGetLastError ());
289 : return -1;
290 : }
291 : #else
292 3789 : if (s == -1)
293 : return -1;
294 : #endif
295 :
296 : // On some systems, IPv4 mapping in IPv6 sockets is disabled by default.
297 : // Switch it on in such cases.
298 3789 : if (tcp_addr->family () == AF_INET6)
299 15 : enable_ipv4_mapping (s);
300 :
301 : // Set the IP Type-Of-Service priority for this socket
302 3789 : if (options.tos != 0)
303 3 : set_ip_type_of_service (s, options.tos);
304 :
305 : // Set the socket to non-blocking mode so that we get async connect().
306 3789 : unblock_socket (s);
307 :
308 : // Set the socket buffer limits for the underlying socket.
309 3789 : if (options.sndbuf >= 0)
310 0 : set_tcp_send_buffer (s, options.sndbuf);
311 3789 : if (options.rcvbuf >= 0)
312 0 : set_tcp_receive_buffer (s, options.rcvbuf);
313 :
314 : // Set the IP Type-Of-Service for the underlying socket
315 3789 : if (options.tos != 0)
316 3 : set_ip_type_of_service (s, options.tos);
317 :
318 : // Set a source address for conversations
319 3789 : if (tcp_addr->has_src_addr ()) {
320 6 : rc = ::bind (s, tcp_addr->src_addr (), tcp_addr->src_addrlen ());
321 6 : if (rc == -1)
322 : return -1;
323 : }
324 :
325 : // Connect to the remote peer.
326 3786 : rc = ::connect (s, tcp_addr->addr (), tcp_addr->addrlen ());
327 :
328 : // Connect was successful immediately.
329 3786 : if (rc == 0)
330 : return 0;
331 :
332 : // Translate error codes indicating asynchronous connect has been
333 : // launched to a uniform EINPROGRESS.
334 : #ifdef ZMQ_HAVE_WINDOWS
335 : const int last_error = WSAGetLastError();
336 : if (last_error == WSAEINPROGRESS || last_error == WSAEWOULDBLOCK)
337 : errno = EINPROGRESS;
338 : else
339 : errno = wsa_error_to_errno (last_error);
340 : #else
341 3786 : if (errno == EINTR)
342 0 : errno = EINPROGRESS;
343 : #endif
344 : return -1;
345 : }
346 :
347 3298 : zmq::fd_t zmq::tcp_connecter_t::connect ()
348 : {
349 : // Async connect has finished. Check whether an error occurred
350 3298 : int err = 0;
351 : #ifdef ZMQ_HAVE_HPUX
352 : int len = sizeof err;
353 : #else
354 3298 : socklen_t len = sizeof err;
355 : #endif
356 :
357 3298 : const int rc = getsockopt (s, SOL_SOCKET, SO_ERROR, (char*) &err, &len);
358 :
359 : // Assert if the error was caused by 0MQ bug.
360 : // Networking problems are OK. No need to assert.
361 : #ifdef ZMQ_HAVE_WINDOWS
362 : zmq_assert (rc == 0);
363 : if (err != 0) {
364 : if (err == WSAEBADF ||
365 : err == WSAENOPROTOOPT ||
366 : err == WSAENOTSOCK ||
367 : err == WSAENOBUFS)
368 : {
369 : wsa_assert_no (err);
370 : }
371 : return retired_fd;
372 : }
373 : #else
374 : // Following code should handle both Berkeley-derived socket
375 : // implementations and Solaris.
376 3298 : if (rc == -1)
377 0 : err = errno;
378 3298 : if (err != 0) {
379 114 : errno = err;
380 114 : errno_assert (
381 : errno != EBADF &&
382 : errno != ENOPROTOOPT &&
383 : errno != ENOTSOCK &&
384 : errno != ENOBUFS);
385 : return retired_fd;
386 : }
387 : #endif
388 :
389 : // Return the newly connected socket.
390 3184 : const fd_t result = s;
391 3184 : s = retired_fd;
392 3184 : return result;
393 : }
394 :
395 605 : void zmq::tcp_connecter_t::close ()
396 : {
397 605 : zmq_assert (s != retired_fd);
398 : #ifdef ZMQ_HAVE_WINDOWS
399 : const int rc = closesocket (s);
400 : wsa_assert (rc != SOCKET_ERROR);
401 : #else
402 605 : const int rc = ::close (s);
403 605 : errno_assert (rc == 0);
404 : #endif
405 605 : socket->event_closed (endpoint, (int) s);
406 605 : s = retired_fd;
407 605 : }
|