Line data Source code
1 : /*
2 : Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 :
4 : This file is part of libzmq, the ZeroMQ core engine in C++.
5 :
6 : libzmq is free software; you can redistribute it and/or modify it under
7 : the terms of the GNU Lesser General Public License (LGPL) as published
8 : by the Free Software Foundation; either version 3 of the License, or
9 : (at your option) any later version.
10 :
11 : As a special exception, the Contributors give you permission to link
12 : this library with independent modules to produce an executable,
13 : regardless of the license terms of these independent modules, and to
14 : copy and distribute the resulting executable under terms of your choice,
15 : provided that you also meet, for each linked independent module, the
16 : terms and conditions of the license of that module. An independent
17 : module is a module which is not derived from or based on this library.
18 : If you modify this library, you must extend this exception to your
19 : version of the library.
20 :
21 : libzmq is distributed in the hope that it will be useful, but WITHOUT
22 : ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 : FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 : License for more details.
25 :
26 : You should have received a copy of the GNU Lesser General Public License
27 : along with this program. If not, see <http://www.gnu.org/licenses/>.
28 : */
29 :
30 : #include "precompiled.hpp"
31 : #include <new>
32 : #include <string>
33 :
34 : #include "macros.hpp"
35 : #include "socks_connecter.hpp"
36 : #include "stream_engine.hpp"
37 : #include "platform.hpp"
38 : #include "random.hpp"
39 : #include "err.hpp"
40 : #include "ip.hpp"
41 : #include "tcp.hpp"
42 : #include "address.hpp"
43 : #include "tcp_address.hpp"
44 : #include "session_base.hpp"
45 : #include "socks.hpp"
46 :
47 : #ifdef ZMQ_HAVE_WINDOWS
48 : #include "windows.hpp"
49 : #else
50 : #include <unistd.h>
51 : #include <sys/types.h>
52 : #include <sys/socket.h>
53 : #endif
54 :
55 0 : zmq::socks_connecter_t::socks_connecter_t (class io_thread_t *io_thread_,
56 : class session_base_t *session_, const options_t &options_,
57 : address_t *addr_, address_t *proxy_addr_, bool delayed_start_) :
58 : own_t (io_thread_, options_),
59 : io_object_t (io_thread_),
60 : addr (addr_),
61 : proxy_addr (proxy_addr_),
62 : status (unplugged),
63 : s (retired_fd),
64 : handle(NULL),
65 : handle_valid(false),
66 : delayed_start (delayed_start_),
67 : timer_started(false),
68 : session (session_),
69 0 : current_reconnect_ivl (options.reconnect_ivl)
70 : {
71 0 : zmq_assert (addr);
72 0 : zmq_assert (addr->protocol == "tcp");
73 0 : proxy_addr->to_string (endpoint);
74 0 : socket = session->get_socket ();
75 0 : }
76 :
77 0 : zmq::socks_connecter_t::~socks_connecter_t ()
78 : {
79 0 : zmq_assert (s == retired_fd);
80 0 : LIBZMQ_DELETE(proxy_addr);
81 0 : }
82 :
83 0 : void zmq::socks_connecter_t::process_plug ()
84 : {
85 0 : if (delayed_start)
86 0 : start_timer ();
87 : else
88 0 : initiate_connect ();
89 0 : }
90 :
91 0 : void zmq::socks_connecter_t::process_term (int linger_)
92 : {
93 0 : switch (status) {
94 : case unplugged:
95 : break;
96 : case waiting_for_reconnect_time:
97 0 : cancel_timer (reconnect_timer_id);
98 0 : break;
99 : case waiting_for_proxy_connection:
100 : case sending_greeting:
101 : case waiting_for_choice:
102 : case sending_request:
103 : case waiting_for_response:
104 0 : rm_fd (handle);
105 0 : if (s != retired_fd)
106 0 : close ();
107 : break;
108 : }
109 :
110 0 : own_t::process_term (linger_);
111 0 : }
112 :
113 0 : void zmq::socks_connecter_t::in_event ()
114 : {
115 0 : zmq_assert (status != unplugged
116 : && status != waiting_for_reconnect_time);
117 :
118 0 : if (status == waiting_for_choice) {
119 0 : int rc = choice_decoder.input (s);
120 0 : if (rc == 0 || rc == -1)
121 0 : error ();
122 : else
123 0 : if (choice_decoder.message_ready ()) {
124 0 : const socks_choice_t choice = choice_decoder.decode ();
125 0 : rc = process_server_response (choice);
126 0 : if (rc == -1)
127 0 : error ();
128 : else {
129 0 : std::string hostname = "";
130 0 : uint16_t port = 0;
131 0 : if (parse_address (addr->address, hostname, port) == -1)
132 0 : error ();
133 : else {
134 : request_encoder.encode (
135 0 : socks_request_t (1, hostname, port));
136 0 : reset_pollin (handle);
137 0 : set_pollout (handle);
138 0 : status = sending_request;
139 : }
140 : }
141 : }
142 : }
143 : else
144 0 : if (status == waiting_for_response) {
145 0 : int rc = response_decoder.input (s);
146 0 : if (rc == 0 || rc == -1)
147 0 : error ();
148 : else
149 0 : if (response_decoder.message_ready ()) {
150 0 : const socks_response_t response = response_decoder.decode ();
151 0 : rc = process_server_response (response);
152 0 : if (rc == -1)
153 0 : error ();
154 : else {
155 : // Create the engine object for this connection.
156 : stream_engine_t *engine = new (std::nothrow)
157 0 : stream_engine_t (s, options, endpoint);
158 0 : alloc_assert (engine);
159 :
160 : // Attach the engine to the corresponding session object.
161 0 : send_attach (session, engine);
162 :
163 0 : socket->event_connected (endpoint, (int) s);
164 :
165 0 : rm_fd (handle);
166 0 : s = -1;
167 0 : status = unplugged;
168 :
169 : // Shut the connecter down.
170 0 : terminate ();
171 : }
172 : }
173 : }
174 : else
175 0 : error ();
176 0 : }
177 :
178 0 : void zmq::socks_connecter_t::out_event ()
179 : {
180 0 : zmq_assert (status == waiting_for_proxy_connection
181 : || status == sending_greeting
182 : || status == sending_request);
183 :
184 0 : if (status == waiting_for_proxy_connection) {
185 0 : const int rc = (int) check_proxy_connection ();
186 0 : if (rc == -1)
187 0 : error ();
188 : else {
189 : greeting_encoder.encode (
190 0 : socks_greeting_t (socks_no_auth_required));
191 0 : status = sending_greeting;
192 : }
193 : }
194 : else
195 0 : if (status == sending_greeting) {
196 0 : zmq_assert (greeting_encoder.has_pending_data ());
197 0 : const int rc = greeting_encoder.output (s);
198 0 : if (rc == -1 || rc == 0)
199 0 : error ();
200 : else
201 0 : if (!greeting_encoder.has_pending_data ()) {
202 0 : reset_pollout (handle);
203 0 : set_pollin (handle);
204 0 : status = waiting_for_choice;
205 : }
206 : }
207 : else {
208 0 : zmq_assert (request_encoder.has_pending_data ());
209 0 : const int rc = request_encoder.output (s);
210 0 : if (rc == -1 || rc == 0)
211 0 : error ();
212 : else
213 0 : if (!request_encoder.has_pending_data ()) {
214 0 : reset_pollout (handle);
215 0 : set_pollin (handle);
216 0 : status = waiting_for_response;
217 : }
218 : }
219 0 : }
220 :
221 0 : void zmq::socks_connecter_t::initiate_connect ()
222 : {
223 : // Open the connecting socket.
224 0 : const int rc = connect_to_proxy ();
225 :
226 : // Connect may succeed in synchronous manner.
227 0 : if (rc == 0) {
228 0 : handle = add_fd (s);
229 0 : set_pollout (handle);
230 0 : status = sending_greeting;
231 : }
232 : // Connection establishment may be delayed. Poll for its completion.
233 : else
234 0 : if (errno == EINPROGRESS) {
235 0 : handle = add_fd (s);
236 0 : set_pollout (handle);
237 0 : status = waiting_for_proxy_connection;
238 0 : socket->event_connect_delayed (endpoint, zmq_errno ());
239 : }
240 : // Handle any other error condition by eventual reconnect.
241 : else {
242 0 : if (s != retired_fd)
243 0 : close ();
244 0 : start_timer ();
245 : }
246 0 : }
247 :
248 0 : int zmq::socks_connecter_t::process_server_response (
249 : const socks_choice_t &response)
250 : {
251 : // We do not support any authentication method for now.
252 0 : return response.method == 0? 0: -1;
253 : }
254 :
255 0 : int zmq::socks_connecter_t::process_server_response (
256 : const socks_response_t &response)
257 : {
258 0 : return response.response_code == 0? 0: -1;
259 : }
260 :
261 0 : void zmq::socks_connecter_t::timer_event (int id_)
262 : {
263 0 : zmq_assert (status == waiting_for_reconnect_time);
264 0 : zmq_assert (id_ == reconnect_timer_id);
265 0 : initiate_connect ();
266 0 : }
267 :
268 0 : void zmq::socks_connecter_t::error ()
269 : {
270 0 : rm_fd (handle);
271 0 : close ();
272 0 : greeting_encoder.reset ();
273 0 : choice_decoder.reset ();
274 0 : request_encoder.reset ();
275 0 : response_decoder.reset ();
276 0 : start_timer ();
277 0 : }
278 :
279 0 : void zmq::socks_connecter_t::start_timer ()
280 : {
281 0 : const int interval = get_new_reconnect_ivl ();
282 0 : add_timer (interval, reconnect_timer_id);
283 0 : status = waiting_for_reconnect_time;
284 0 : socket->event_connect_retried (endpoint, interval);
285 0 : }
286 :
287 0 : int zmq::socks_connecter_t::get_new_reconnect_ivl ()
288 : {
289 : // The new interval is the current interval + random value.
290 0 : const int interval = current_reconnect_ivl +
291 0 : generate_random () % options.reconnect_ivl;
292 :
293 : // Only change the current reconnect interval if the maximum reconnect
294 : // interval was set and if it's larger than the reconnect interval.
295 0 : if (options.reconnect_ivl_max > 0 &&
296 : options.reconnect_ivl_max > options.reconnect_ivl)
297 : // Calculate the next interval
298 : current_reconnect_ivl =
299 0 : std::min (current_reconnect_ivl * 2, options.reconnect_ivl_max);
300 0 : return interval;
301 : }
302 :
303 0 : int zmq::socks_connecter_t::connect_to_proxy ()
304 : {
305 0 : zmq_assert (s == retired_fd);
306 :
307 : // Resolve the address
308 0 : LIBZMQ_DELETE(proxy_addr->resolved.tcp_addr);
309 0 : proxy_addr->resolved.tcp_addr = new (std::nothrow) tcp_address_t ();
310 0 : alloc_assert (proxy_addr->resolved.tcp_addr);
311 :
312 : int rc = proxy_addr->resolved.tcp_addr->resolve (
313 0 : proxy_addr->address.c_str (), false, options.ipv6);
314 0 : if (rc != 0) {
315 0 : LIBZMQ_DELETE(proxy_addr->resolved.tcp_addr);
316 0 : return -1;
317 : }
318 0 : zmq_assert (proxy_addr->resolved.tcp_addr != NULL);
319 0 : const tcp_address_t *tcp_addr = proxy_addr->resolved.tcp_addr;
320 :
321 : // Create the socket.
322 0 : s = open_socket (tcp_addr->family (), SOCK_STREAM, IPPROTO_TCP);
323 : #ifdef ZMQ_HAVE_WINDOWS
324 : if (s == INVALID_SOCKET)
325 : return -1;
326 : #else
327 0 : if (s == -1)
328 : return -1;
329 : #endif
330 :
331 : // On some systems, IPv4 mapping in IPv6 sockets is disabled by default.
332 : // Switch it on in such cases.
333 0 : if (tcp_addr->family () == AF_INET6)
334 0 : enable_ipv4_mapping (s);
335 :
336 : // Set the IP Type-Of-Service priority for this socket
337 0 : if (options.tos != 0)
338 0 : set_ip_type_of_service (s, options.tos);
339 :
340 : // Set the socket to non-blocking mode so that we get async connect().
341 0 : unblock_socket (s);
342 :
343 : // Set the socket buffer limits for the underlying socket.
344 0 : if (options.sndbuf >= 0)
345 0 : set_tcp_send_buffer (s, options.sndbuf);
346 0 : if (options.rcvbuf >= 0)
347 0 : set_tcp_receive_buffer (s, options.rcvbuf);
348 :
349 : // Set the IP Type-Of-Service for the underlying socket
350 0 : if (options.tos != 0)
351 0 : set_ip_type_of_service (s, options.tos);
352 :
353 : // Set a source address for conversations
354 0 : if (tcp_addr->has_src_addr ()) {
355 0 : rc = ::bind (s, tcp_addr->src_addr (), tcp_addr->src_addrlen ());
356 0 : if (rc == -1) {
357 0 : close ();
358 0 : return -1;
359 : }
360 : }
361 :
362 : // Connect to the remote peer.
363 0 : rc = ::connect (s, tcp_addr->addr (), tcp_addr->addrlen ());
364 :
365 : // Connect was successful immediately.
366 0 : if (rc == 0)
367 : return 0;
368 :
369 : // Translate error codes indicating asynchronous connect has been
370 : // launched to a uniform EINPROGRESS.
371 : #ifdef ZMQ_HAVE_WINDOWS
372 : const int last_error = WSAGetLastError();
373 : if (last_error == WSAEINPROGRESS || last_error == WSAEWOULDBLOCK)
374 : errno = EINPROGRESS;
375 : else {
376 : errno = wsa_error_to_errno (last_error);
377 : close ();
378 : }
379 : #else
380 0 : if (errno == EINTR)
381 0 : errno = EINPROGRESS;
382 : #endif
383 : return -1;
384 : }
385 :
386 0 : zmq::fd_t zmq::socks_connecter_t::check_proxy_connection ()
387 : {
388 : // Async connect has finished. Check whether an error occurred
389 0 : int err = 0;
390 : #ifdef ZMQ_HAVE_HPUX
391 : int len = sizeof err;
392 : #else
393 0 : socklen_t len = sizeof err;
394 : #endif
395 :
396 0 : const int rc = getsockopt (s, SOL_SOCKET, SO_ERROR, (char*) &err, &len);
397 :
398 : // Assert if the error was caused by 0MQ bug.
399 : // Networking problems are OK. No need to assert.
400 : #ifdef ZMQ_HAVE_WINDOWS
401 : zmq_assert (rc == 0);
402 : if (err != 0) {
403 : wsa_assert (err == WSAECONNREFUSED
404 : || err == WSAETIMEDOUT
405 : || err == WSAECONNABORTED
406 : || err == WSAEHOSTUNREACH
407 : || err == WSAENETUNREACH
408 : || err == WSAENETDOWN
409 : || err == WSAEACCES
410 : || err == WSAEINVAL
411 : || err == WSAEADDRINUSE);
412 : return -1;
413 : }
414 : #else
415 : // Following code should handle both Berkeley-derived socket
416 : // implementations and Solaris.
417 0 : if (rc == -1)
418 0 : err = errno;
419 0 : if (err != 0) {
420 0 : errno = err;
421 0 : errno_assert (
422 : errno == ECONNREFUSED ||
423 : errno == ECONNRESET ||
424 : errno == ETIMEDOUT ||
425 : errno == EHOSTUNREACH ||
426 : errno == ENETUNREACH ||
427 : errno == ENETDOWN ||
428 : errno == EINVAL);
429 : return -1;
430 : }
431 : #endif
432 :
433 0 : tune_tcp_socket (s);
434 : tune_tcp_keepalives (s, options.tcp_keepalive, options.tcp_keepalive_cnt,
435 0 : options.tcp_keepalive_idle, options.tcp_keepalive_intvl);
436 :
437 : return 0;
438 : }
439 :
440 0 : void zmq::socks_connecter_t::close ()
441 : {
442 0 : zmq_assert (s != retired_fd);
443 : #ifdef ZMQ_HAVE_WINDOWS
444 : const int rc = closesocket (s);
445 : wsa_assert (rc != SOCKET_ERROR);
446 : #else
447 0 : const int rc = ::close (s);
448 0 : errno_assert (rc == 0);
449 : #endif
450 0 : socket->event_closed (endpoint, (int) s);
451 0 : s = retired_fd;
452 0 : }
453 :
454 0 : int zmq::socks_connecter_t::parse_address (
455 : const std::string &address_, std::string &hostname_, uint16_t &port_)
456 : {
457 : // Find the ':' at end that separates address from the port number.
458 0 : const size_t idx = address_.rfind (':');
459 0 : if (idx == std::string::npos) {
460 0 : errno = EINVAL;
461 0 : return -1;
462 : }
463 :
464 : // Extract hostname
465 0 : if (idx < 2 || address_ [0] != '[' || address_ [idx - 1] != ']')
466 0 : hostname_ = address_.substr (0, idx);
467 : else
468 0 : hostname_ = address_.substr (1, idx - 2);
469 :
470 : // Separate the hostname/port.
471 0 : const std::string port_str = address_.substr (idx + 1);
472 : // Parse the port number (0 is not a valid port).
473 0 : port_ = (uint16_t) atoi (port_str.c_str ());
474 0 : if (port_ == 0) {
475 0 : errno = EINVAL;
476 0 : return -1;
477 : }
478 : return 0;
479 : }
|