libzmq  master
ZeroMQ C++ Core Engine (LIBZMQ)
socks_connecter.cpp
Go to the documentation of this file.
1 /*
2  Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 
4  This file is part of libzmq, the ZeroMQ core engine in C++.
5 
6  libzmq is free software; you can redistribute it and/or modify it under
7  the terms of the GNU Lesser General Public License (LGPL) as published
8  by the Free Software Foundation; either version 3 of the License, or
9  (at your option) any later version.
10 
11  As a special exception, the Contributors give you permission to link
12  this library with independent modules to produce an executable,
13  regardless of the license terms of these independent modules, and to
14  copy and distribute the resulting executable under terms of your choice,
15  provided that you also meet, for each linked independent module, the
16  terms and conditions of the license of that module. An independent
17  module is a module which is not derived from or based on this library.
18  If you modify this library, you must extend this exception to your
19  version of the library.
20 
21  libzmq is distributed in the hope that it will be useful, but WITHOUT
22  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23  FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24  License for more details.
25 
26  You should have received a copy of the GNU Lesser General Public License
27  along with this program. If not, see <http://www.gnu.org/licenses/>.
28 */
29 
30 #include "precompiled.hpp"
31 #include <new>
32 #include <string>
33 
34 #include "macros.hpp"
35 #include "socks_connecter.hpp"
36 #include "stream_engine.hpp"
37 #include "platform.hpp"
38 #include "random.hpp"
39 #include "err.hpp"
40 #include "ip.hpp"
41 #include "tcp.hpp"
42 #include "address.hpp"
43 #include "tcp_address.hpp"
44 #include "session_base.hpp"
45 #include "socks.hpp"
46 
47 #ifdef ZMQ_HAVE_WINDOWS
48 #include "windows.hpp"
49 #else
50 #include <unistd.h>
51 #include <sys/types.h>
52 #include <sys/socket.h>
53 #endif
54 
56  class session_base_t *session_, const options_t &options_,
57  address_t *addr_, address_t *proxy_addr_, bool delayed_start_) :
58  own_t (io_thread_, options_),
59  io_object_t (io_thread_),
60  addr (addr_),
61  proxy_addr (proxy_addr_),
62  status (unplugged),
63  s (retired_fd),
64  handle(NULL),
65  handle_valid(false),
66  delayed_start (delayed_start_),
67  timer_started(false),
68  session (session_),
69  current_reconnect_ivl (options.reconnect_ivl)
70 {
71  zmq_assert (addr);
72  zmq_assert (addr->protocol == "tcp");
75 }
76 
78 {
79  zmq_assert (s == retired_fd);
81 }
82 
84 {
85  if (delayed_start)
86  start_timer ();
87  else
89 }
90 
92 {
93  switch (status) {
94  case unplugged:
95  break;
98  break;
100  case sending_greeting:
101  case waiting_for_choice:
102  case sending_request:
104  rm_fd (handle);
105  if (s != retired_fd)
106  close ();
107  break;
108  }
109 
110  own_t::process_term (linger_);
111 }
112 
114 {
117 
118  if (status == waiting_for_choice) {
119  int rc = choice_decoder.input (s);
120  if (rc == 0 || rc == -1)
121  error ();
122  else
123  if (choice_decoder.message_ready ()) {
124  const socks_choice_t choice = choice_decoder.decode ();
125  rc = process_server_response (choice);
126  if (rc == -1)
127  error ();
128  else {
129  std::string hostname = "";
130  uint16_t port = 0;
131  if (parse_address (addr->address, hostname, port) == -1)
132  error ();
133  else {
135  socks_request_t (1, hostname, port));
139  }
140  }
141  }
142  }
143  else
144  if (status == waiting_for_response) {
145  int rc = response_decoder.input (s);
146  if (rc == 0 || rc == -1)
147  error ();
148  else
150  const socks_response_t response = response_decoder.decode ();
151  rc = process_server_response (response);
152  if (rc == -1)
153  error ();
154  else {
155  // Create the engine object for this connection.
156  stream_engine_t *engine = new (std::nothrow)
158  alloc_assert (engine);
159 
160  // Attach the engine to the corresponding session object.
161  send_attach (session, engine);
162 
163  socket->event_connected (endpoint, (int) s);
164 
165  rm_fd (handle);
166  s = -1;
167  status = unplugged;
168 
169  // Shut the connecter down.
170  terminate ();
171  }
172  }
173  }
174  else
175  error ();
176 }
177 
179 {
182  || status == sending_request);
183 
185  const int rc = (int) check_proxy_connection ();
186  if (rc == -1)
187  error ();
188  else {
192  }
193  }
194  else
195  if (status == sending_greeting) {
197  const int rc = greeting_encoder.output (s);
198  if (rc == -1 || rc == 0)
199  error ();
200  else
203  set_pollin (handle);
205  }
206  }
207  else {
209  const int rc = request_encoder.output (s);
210  if (rc == -1 || rc == 0)
211  error ();
212  else
215  set_pollin (handle);
217  }
218  }
219 }
220 
222 {
223  // Open the connecting socket.
224  const int rc = connect_to_proxy ();
225 
226  // Connect may succeed in synchronous manner.
227  if (rc == 0) {
228  handle = add_fd (s);
231  }
232  // Connection establishment may be delayed. Poll for its completion.
233  else
234  if (errno == EINPROGRESS) {
235  handle = add_fd (s);
239  }
240  // Handle any other error condition by eventual reconnect.
241  else {
242  if (s != retired_fd)
243  close ();
244  start_timer ();
245  }
246 }
247 
249  const socks_choice_t &response)
250 {
251  // We do not support any authentication method for now.
252  return response.method == 0? 0: -1;
253 }
254 
256  const socks_response_t &response)
257 {
258  return response.response_code == 0? 0: -1;
259 }
260 
262 {
265  initiate_connect ();
266 }
267 
269 {
270  rm_fd (handle);
271  close ();
276  start_timer ();
277 }
278 
280 {
281  const int interval = get_new_reconnect_ivl ();
282  add_timer (interval, reconnect_timer_id);
285 }
286 
288 {
289  // The new interval is the current interval + random value.
290  const int interval = current_reconnect_ivl +
292 
293  // Only change the current reconnect interval if the maximum reconnect
294  // interval was set and if it's larger than the reconnect interval.
295  if (options.reconnect_ivl_max > 0 &&
297  // Calculate the next interval
300  return interval;
301 }
302 
304 {
305  zmq_assert (s == retired_fd);
306 
307  // Resolve the address
309  proxy_addr->resolved.tcp_addr = new (std::nothrow) tcp_address_t ();
311 
312  int rc = proxy_addr->resolved.tcp_addr->resolve (
313  proxy_addr->address.c_str (), false, options.ipv6);
314  if (rc != 0) {
316  return -1;
317  }
319  const tcp_address_t *tcp_addr = proxy_addr->resolved.tcp_addr;
320 
321  // Create the socket.
322  s = open_socket (tcp_addr->family (), SOCK_STREAM, IPPROTO_TCP);
323 #ifdef ZMQ_HAVE_WINDOWS
324  if (s == INVALID_SOCKET)
325  return -1;
326 #else
327  if (s == -1)
328  return -1;
329 #endif
330 
331  // On some systems, IPv4 mapping in IPv6 sockets is disabled by default.
332  // Switch it on in such cases.
333  if (tcp_addr->family () == AF_INET6)
335 
336  // Set the IP Type-Of-Service priority for this socket
337  if (options.tos != 0)
339 
340  // Set the socket to non-blocking mode so that we get async connect().
341  unblock_socket (s);
342 
343  // Set the socket buffer limits for the underlying socket.
344  if (options.sndbuf >= 0)
346  if (options.rcvbuf >= 0)
348 
349  // Set the IP Type-Of-Service for the underlying socket
350  if (options.tos != 0)
352 
353  // Set a source address for conversations
354  if (tcp_addr->has_src_addr ()) {
355  rc = ::bind (s, tcp_addr->src_addr (), tcp_addr->src_addrlen ());
356  if (rc == -1) {
357  close ();
358  return -1;
359  }
360  }
361 
362  // Connect to the remote peer.
363  rc = ::connect (s, tcp_addr->addr (), tcp_addr->addrlen ());
364 
365  // Connect was successful immediately.
366  if (rc == 0)
367  return 0;
368 
369  // Translate error codes indicating asynchronous connect has been
370  // launched to a uniform EINPROGRESS.
371 #ifdef ZMQ_HAVE_WINDOWS
372  const int last_error = WSAGetLastError();
373  if (last_error == WSAEINPROGRESS || last_error == WSAEWOULDBLOCK)
374  errno = EINPROGRESS;
375  else {
376  errno = wsa_error_to_errno (last_error);
377  close ();
378  }
379 #else
380  if (errno == EINTR)
381  errno = EINPROGRESS;
382 #endif
383  return -1;
384 }
385 
387 {
388  // Async connect has finished. Check whether an error occurred
389  int err = 0;
390 #ifdef ZMQ_HAVE_HPUX
391  int len = sizeof err;
392 #else
393  socklen_t len = sizeof err;
394 #endif
395 
396  const int rc = getsockopt (s, SOL_SOCKET, SO_ERROR, (char*) &err, &len);
397 
398  // Assert if the error was caused by 0MQ bug.
399  // Networking problems are OK. No need to assert.
400 #ifdef ZMQ_HAVE_WINDOWS
401  zmq_assert (rc == 0);
402  if (err != 0) {
403  wsa_assert (err == WSAECONNREFUSED
404  || err == WSAETIMEDOUT
405  || err == WSAECONNABORTED
406  || err == WSAEHOSTUNREACH
407  || err == WSAENETUNREACH
408  || err == WSAENETDOWN
409  || err == WSAEACCES
410  || err == WSAEINVAL
411  || err == WSAEADDRINUSE);
412  return -1;
413  }
414 #else
415  // Following code should handle both Berkeley-derived socket
416  // implementations and Solaris.
417  if (rc == -1)
418  err = errno;
419  if (err != 0) {
420  errno = err;
421  errno_assert (
422  errno == ECONNREFUSED ||
423  errno == ECONNRESET ||
424  errno == ETIMEDOUT ||
425  errno == EHOSTUNREACH ||
426  errno == ENETUNREACH ||
427  errno == ENETDOWN ||
428  errno == EINVAL);
429  return -1;
430  }
431 #endif
432 
433  tune_tcp_socket (s);
436 
437  return 0;
438 }
439 
441 {
442  zmq_assert (s != retired_fd);
443 #ifdef ZMQ_HAVE_WINDOWS
444  const int rc = closesocket (s);
445  wsa_assert (rc != SOCKET_ERROR);
446 #else
447  const int rc = ::close (s);
448  errno_assert (rc == 0);
449 #endif
450  socket->event_closed (endpoint, (int) s);
451  s = retired_fd;
452 }
453 
455  const std::string &address_, std::string &hostname_, uint16_t &port_)
456 {
457  // Find the ':' at end that separates address from the port number.
458  const size_t idx = address_.rfind (':');
459  if (idx == std::string::npos) {
460  errno = EINVAL;
461  return -1;
462  }
463 
464  // Extract hostname
465  if (idx < 2 || address_ [0] != '[' || address_ [idx - 1] != ']')
466  hostname_ = address_.substr (0, idx);
467  else
468  hostname_ = address_.substr (1, idx - 2);
469 
470  // Separate the hostname/port.
471  const std::string port_str = address_.substr (idx + 1);
472  // Parse the port number (0 is not a valid port).
473  port_ = (uint16_t) atoi (port_str.c_str ());
474  if (port_ == 0) {
475  errno = EINVAL;
476  return -1;
477  }
478  return 0;
479 }
void encode(const socks_request_t &req)
Definition: socks.cpp:142
#define LIBZMQ_DELETE(p_object)
Definition: macros.hpp:7
zmq::socket_base_t * socket
bool message_ready() const
Definition: socks.cpp:261
void send_attach(zmq::session_base_t *destination_, zmq::i_engine *engine_, bool inc_seqnum_=true)
Definition: object.cpp:225
void tune_tcp_keepalives(fd_t s_, int keepalive_, int keepalive_cnt_, int keepalive_idle_, int keepalive_intvl_)
Definition: tcp.cpp:96
socks_choice_decoder_t choice_decoder
void process_term(int linger_)
Definition: own.cpp:158
void set_pollout(handle_t handle_)
Definition: io_object.cpp:84
socks_greeting_encoder_t greeting_encoder
fd_t open_socket(int domain_, int type_, int protocol_)
Definition: ip.cpp:50
tcp_address_t * tcp_addr
Definition: address.hpp:60
const std::string address
Definition: address.hpp:55
void unblock_socket(fd_t s_)
Definition: ip.cpp:84
#define zmq_assert(x)
Definition: err.hpp:119
void cancel_timer(int id_)
Definition: io_object.cpp:99
bool has_src_addr() const
socklen_t src_addrlen() const
Definition: command.hpp:84
int parse_address(const std::string &address_, std::string &hostname_, uint16_t &port_)
bool has_pending_data() const
Definition: socks.cpp:84
ZMQ_EXPORT int zmq_errno(void)
Definition: zmq.cpp:107
void event_connected(const std::string &addr_, int fd_)
void terminate()
Definition: own.cpp:135
int reconnect_ivl_max
Definition: options.hpp:118
int process_server_response(const socks_choice_t &response)
zmq::fd_t check_proxy_connection()
zmq::session_base_t * session
int resolve(const char *name_, bool local_, bool ipv6_, bool is_src_=false)
void set_pollin(handle_t handle_)
Definition: io_object.cpp:74
void event_connect_delayed(const std::string &addr_, int err_)
int tcp_keepalive_idle
Definition: options.hpp:159
socket_base_t * get_socket()
void set_tcp_receive_buffer(fd_t sockfd_, int bufsize_)
Definition: tcp.cpp:85
socks_response_decoder_t response_decoder
void tune_tcp_socket(fd_t s_)
Definition: tcp.cpp:51
void enable_ipv4_mapping(fd_t s_)
Definition: ip.cpp:103
uint32_t generate_random()
Definition: random.cpp:54
const std::string protocol
Definition: address.hpp:54
#define EHOSTUNREACH
Definition: zmq.h:160
uint8_t method
Definition: socks.hpp:68
void set_tcp_send_buffer(fd_t sockfd_, int bufsize_)
Definition: tcp.cpp:74
bool message_ready() const
Definition: socks.cpp:114
socks_connecter_t(zmq::io_thread_t *io_thread_, zmq::session_base_t *session_, const options_t &options_, address_t *addr_, address_t *proxy_addr_, bool delayed_start_)
#define ETIMEDOUT
Definition: zmq.h:157
socks_choice_t decode()
Definition: socks.cpp:119
#define ECONNREFUSED
Definition: zmq.h:130
void encode(const socks_greeting_t &greeting_)
Definition: socks.cpp:62
void reset_pollout(handle_t handle_)
Definition: io_object.cpp:89
void add_timer(int timout_, int id_)
Definition: io_object.cpp:94
socks_request_encoder_t request_encoder
void event_closed(const std::string &addr_, int fd_)
const sockaddr * addr() const
#define ECONNRESET
Definition: zmq.h:151
void event_connect_retried(const std::string &addr_, int interval_)
void reset_pollin(handle_t handle_)
Definition: io_object.cpp:79
#define ENETDOWN
Definition: zmq.h:121
socklen_t addrlen() const
int tcp_keepalive_cnt
Definition: options.hpp:158
#define EINPROGRESS
Definition: zmq.h:133
sa_family_t family() const
#define alloc_assert(x)
Definition: err.hpp:159
#define errno_assert(x)
Definition: err.hpp:129
bool has_pending_data() const
Definition: socks.cpp:204
#define ENETUNREACH
Definition: zmq.h:145
uint8_t response_code
Definition: socks.hpp:114
virtual void process_plug()
handle_t add_fd(fd_t fd_)
Definition: io_object.cpp:64
options_t options
Definition: own.hpp:109
union zmq::address_t::@0 resolved
int to_string(std::string &addr_) const
Definition: address.cpp:95
const sockaddr * src_addr() const
void rm_fd(handle_t handle_)
Definition: io_object.cpp:69
int tcp_keepalive_intvl
Definition: options.hpp:160
socks_response_t decode()
Definition: socks.cpp:278
virtual void timer_event(int id_)
void set_ip_type_of_service(fd_t s_, int iptos)
Definition: ip.cpp:169
virtual void process_term(int linger_)