LCOV - code coverage report
Current view: top level - home/h/core/forks/m4-libzmq/src - socks_connecter.cpp (source / functions) Hit Total Coverage
Test: zeromq-4.2.0 Code Coverage Lines: 0 198 0.0 %
Date: 2016-05-09 Functions: 0 18 0.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*
       2             :     Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
       3             : 
       4             :     This file is part of libzmq, the ZeroMQ core engine in C++.
       5             : 
       6             :     libzmq is free software; you can redistribute it and/or modify it under
       7             :     the terms of the GNU Lesser General Public License (LGPL) as published
       8             :     by the Free Software Foundation; either version 3 of the License, or
       9             :     (at your option) any later version.
      10             : 
      11             :     As a special exception, the Contributors give you permission to link
      12             :     this library with independent modules to produce an executable,
      13             :     regardless of the license terms of these independent modules, and to
      14             :     copy and distribute the resulting executable under terms of your choice,
      15             :     provided that you also meet, for each linked independent module, the
      16             :     terms and conditions of the license of that module. An independent
      17             :     module is a module which is not derived from or based on this library.
      18             :     If you modify this library, you must extend this exception to your
      19             :     version of the library.
      20             : 
      21             :     libzmq is distributed in the hope that it will be useful, but WITHOUT
      22             :     ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
      23             :     FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
      24             :     License for more details.
      25             : 
      26             :     You should have received a copy of the GNU Lesser General Public License
      27             :     along with this program.  If not, see <http://www.gnu.org/licenses/>.
      28             : */
      29             : 
      30             : #include "precompiled.hpp"
      31             : #include <new>
      32             : #include <string>
      33             : 
      34             : #include "macros.hpp"
      35             : #include "socks_connecter.hpp"
      36             : #include "stream_engine.hpp"
      37             : #include "platform.hpp"
      38             : #include "random.hpp"
      39             : #include "err.hpp"
      40             : #include "ip.hpp"
      41             : #include "tcp.hpp"
      42             : #include "address.hpp"
      43             : #include "tcp_address.hpp"
      44             : #include "session_base.hpp"
      45             : #include "socks.hpp"
      46             : 
      47             : #ifdef ZMQ_HAVE_WINDOWS
      48             : #include "windows.hpp"
      49             : #else
      50             : #include <unistd.h>
      51             : #include <sys/types.h>
      52             : #include <sys/socket.h>
      53             : #endif
      54             : 
      55           0 : zmq::socks_connecter_t::socks_connecter_t (class io_thread_t *io_thread_,
      56             :       class session_base_t *session_, const options_t &options_,
      57             :       address_t *addr_, address_t *proxy_addr_, bool delayed_start_) :
      58             :     own_t (io_thread_, options_),
      59             :     io_object_t (io_thread_),
      60             :     addr (addr_),
      61             :     proxy_addr (proxy_addr_),
      62             :     status (unplugged),
      63             :     s (retired_fd),
      64             :     handle(NULL),
      65             :     handle_valid(false),
      66             :     delayed_start (delayed_start_),
      67             :     timer_started(false),
      68             :     session (session_),
      69           0 :     current_reconnect_ivl (options.reconnect_ivl)
      70             : {
      71           0 :     zmq_assert (addr);
      72           0 :     zmq_assert (addr->protocol == "tcp");
      73           0 :     proxy_addr->to_string (endpoint);
      74           0 :     socket = session->get_socket ();
      75           0 : }
      76             : 
      77           0 : zmq::socks_connecter_t::~socks_connecter_t ()
      78             : {
      79           0 :     zmq_assert (s == retired_fd);
      80           0 :     LIBZMQ_DELETE(proxy_addr);
      81           0 : }
      82             : 
      83           0 : void zmq::socks_connecter_t::process_plug ()
      84             : {
      85           0 :     if (delayed_start)
      86           0 :         start_timer ();
      87             :     else
      88           0 :         initiate_connect ();
      89           0 : }
      90             : 
      91           0 : void zmq::socks_connecter_t::process_term (int linger_)
      92             : {
      93           0 :     switch (status) {
      94             :         case unplugged:
      95             :             break;
      96             :         case waiting_for_reconnect_time:
      97           0 :             cancel_timer (reconnect_timer_id);
      98           0 :             break;
      99             :         case waiting_for_proxy_connection:
     100             :         case sending_greeting:
     101             :         case waiting_for_choice:
     102             :         case sending_request:
     103             :         case waiting_for_response:
     104           0 :             rm_fd (handle);
     105           0 :             if (s != retired_fd)
     106           0 :                 close ();
     107             :             break;
     108             :     }
     109             : 
     110           0 :     own_t::process_term (linger_);
     111           0 : }
     112             : 
     113           0 : void zmq::socks_connecter_t::in_event ()
     114             : {
     115           0 :     zmq_assert (status != unplugged
     116             :              && status != waiting_for_reconnect_time);
     117             : 
     118           0 :     if (status == waiting_for_choice) {
     119           0 :         int rc = choice_decoder.input (s);
     120           0 :         if (rc == 0 || rc == -1)
     121           0 :             error ();
     122             :         else
     123           0 :         if (choice_decoder.message_ready ()) {
     124           0 :              const socks_choice_t choice = choice_decoder.decode ();
     125           0 :              rc = process_server_response (choice);
     126           0 :              if (rc == -1)
     127           0 :                  error ();
     128             :              else {
     129           0 :                  std::string hostname = "";
     130           0 :                  uint16_t port = 0;
     131           0 :                  if (parse_address (addr->address, hostname, port) == -1)
     132           0 :                      error ();
     133             :                  else {
     134             :                      request_encoder.encode (
     135           0 :                          socks_request_t (1, hostname, port));
     136           0 :                      reset_pollin (handle);
     137           0 :                      set_pollout (handle);
     138           0 :                      status = sending_request;
     139             :                  }
     140             :              }
     141             :         }
     142             :     }
     143             :     else
     144           0 :     if (status == waiting_for_response) {
     145           0 :         int rc = response_decoder.input (s);
     146           0 :         if (rc == 0 || rc == -1)
     147           0 :             error ();
     148             :         else
     149           0 :         if (response_decoder.message_ready ()) {
     150           0 :             const socks_response_t response = response_decoder.decode ();
     151           0 :             rc = process_server_response (response);
     152           0 :             if (rc == -1)
     153           0 :                 error ();
     154             :             else {
     155             :                 //  Create the engine object for this connection.
     156             :                 stream_engine_t *engine = new (std::nothrow)
     157           0 :                     stream_engine_t (s, options, endpoint);
     158           0 :                 alloc_assert (engine);
     159             : 
     160             :                 //  Attach the engine to the corresponding session object.
     161           0 :                 send_attach (session, engine);
     162             : 
     163           0 :                 socket->event_connected (endpoint, (int) s);
     164             : 
     165           0 :                 rm_fd (handle);
     166           0 :                 s = -1;
     167           0 :                 status = unplugged;
     168             : 
     169             :                 //  Shut the connecter down.
     170           0 :                 terminate ();
     171             :             }
     172             :         }
     173             :     }
     174             :     else
     175           0 :         error ();
     176           0 : }
     177             : 
     178           0 : void zmq::socks_connecter_t::out_event ()
     179             : {
     180           0 :     zmq_assert (status == waiting_for_proxy_connection
     181             :              || status == sending_greeting
     182             :              || status == sending_request);
     183             : 
     184           0 :     if (status == waiting_for_proxy_connection) {
     185           0 :         const int rc = (int) check_proxy_connection ();
     186           0 :         if (rc == -1)
     187           0 :             error ();
     188             :         else {
     189             :             greeting_encoder.encode (
     190           0 :                 socks_greeting_t (socks_no_auth_required));
     191           0 :             status = sending_greeting;
     192             :         }
     193             :     }
     194             :     else
     195           0 :     if (status == sending_greeting) {
     196           0 :         zmq_assert (greeting_encoder.has_pending_data ());
     197           0 :         const int rc = greeting_encoder.output (s);
     198           0 :         if (rc == -1 || rc == 0)
     199           0 :             error ();
     200             :         else
     201           0 :         if (!greeting_encoder.has_pending_data ()) {
     202           0 :             reset_pollout (handle);
     203           0 :             set_pollin (handle);
     204           0 :             status = waiting_for_choice;
     205             :         }
     206             :     }
     207             :     else {
     208           0 :         zmq_assert (request_encoder.has_pending_data ());
     209           0 :         const int rc = request_encoder.output (s);
     210           0 :         if (rc == -1 || rc == 0)
     211           0 :             error ();
     212             :         else
     213           0 :         if (!request_encoder.has_pending_data ()) {
     214           0 :             reset_pollout (handle);
     215           0 :             set_pollin (handle);
     216           0 :             status = waiting_for_response;
     217             :         }
     218             :     }
     219           0 : }
     220             : 
     221           0 : void zmq::socks_connecter_t::initiate_connect ()
     222             : {
     223             :     //  Open the connecting socket.
     224           0 :     const int rc = connect_to_proxy ();
     225             : 
     226             :     //  Connect may succeed in synchronous manner.
     227           0 :     if (rc == 0) {
     228           0 :         handle = add_fd (s);
     229           0 :         set_pollout (handle);
     230           0 :         status = sending_greeting;
     231             :     }
     232             :     //  Connection establishment may be delayed. Poll for its completion.
     233             :     else
     234           0 :     if (errno == EINPROGRESS) {
     235           0 :         handle = add_fd (s);
     236           0 :         set_pollout (handle);
     237           0 :         status = waiting_for_proxy_connection;
     238           0 :         socket->event_connect_delayed (endpoint, zmq_errno ());
     239             :     }
     240             :     //  Handle any other error condition by eventual reconnect.
     241             :     else {
     242           0 :         if (s != retired_fd)
     243           0 :             close ();
     244           0 :         start_timer ();
     245             :     }
     246           0 : }
     247             : 
     248           0 : int zmq::socks_connecter_t::process_server_response (
     249             :         const socks_choice_t &response)
     250             : {
     251             :     //  We do not support any authentication method for now.
     252           0 :     return response.method == 0? 0: -1;
     253             : }
     254             : 
     255           0 : int zmq::socks_connecter_t::process_server_response (
     256             :         const socks_response_t &response)
     257             : {
     258           0 :     return response.response_code == 0? 0: -1;
     259             : }
     260             : 
     261           0 : void zmq::socks_connecter_t::timer_event (int id_)
     262             : {
     263           0 :     zmq_assert (status == waiting_for_reconnect_time);
     264           0 :     zmq_assert (id_ == reconnect_timer_id);
     265           0 :     initiate_connect ();
     266           0 : }
     267             : 
     268           0 : void zmq::socks_connecter_t::error ()
     269             : {
     270           0 :     rm_fd (handle);
     271           0 :     close ();
     272           0 :     greeting_encoder.reset ();
     273           0 :     choice_decoder.reset ();
     274           0 :     request_encoder.reset ();
     275           0 :     response_decoder.reset ();
     276           0 :     start_timer ();
     277           0 : }
     278             : 
     279           0 : void zmq::socks_connecter_t::start_timer ()
     280             : {
     281           0 :     const int interval = get_new_reconnect_ivl ();
     282           0 :     add_timer (interval, reconnect_timer_id);
     283           0 :     status = waiting_for_reconnect_time;
     284           0 :     socket->event_connect_retried (endpoint, interval);
     285           0 : }
     286             : 
     287           0 : int zmq::socks_connecter_t::get_new_reconnect_ivl ()
     288             : {
     289             :     //  The new interval is the current interval + random value.
     290           0 :     const int interval = current_reconnect_ivl +
     291           0 :         generate_random () % options.reconnect_ivl;
     292             : 
     293             :     //  Only change the current reconnect interval  if the maximum reconnect
     294             :     //  interval was set and if it's larger than the reconnect interval.
     295           0 :     if (options.reconnect_ivl_max > 0 &&
     296             :         options.reconnect_ivl_max > options.reconnect_ivl)
     297             :         //  Calculate the next interval
     298             :         current_reconnect_ivl =
     299           0 :             std::min (current_reconnect_ivl * 2, options.reconnect_ivl_max);
     300           0 :     return interval;
     301             : }
     302             : 
     303           0 : int zmq::socks_connecter_t::connect_to_proxy ()
     304             : {
     305           0 :     zmq_assert (s == retired_fd);
     306             : 
     307             :     //  Resolve the address
     308           0 :     LIBZMQ_DELETE(proxy_addr->resolved.tcp_addr);
     309           0 :     proxy_addr->resolved.tcp_addr = new (std::nothrow) tcp_address_t ();
     310           0 :     alloc_assert (proxy_addr->resolved.tcp_addr);
     311             : 
     312             :     int rc = proxy_addr->resolved.tcp_addr->resolve (
     313           0 :         proxy_addr->address.c_str (), false, options.ipv6);
     314           0 :     if (rc != 0) {
     315           0 :         LIBZMQ_DELETE(proxy_addr->resolved.tcp_addr);
     316           0 :         return -1;
     317             :     }
     318           0 :     zmq_assert (proxy_addr->resolved.tcp_addr != NULL);
     319           0 :     const tcp_address_t *tcp_addr = proxy_addr->resolved.tcp_addr;
     320             : 
     321             :     //  Create the socket.
     322           0 :     s = open_socket (tcp_addr->family (), SOCK_STREAM, IPPROTO_TCP);
     323             : #ifdef ZMQ_HAVE_WINDOWS
     324             :     if (s == INVALID_SOCKET)
     325             :         return -1;
     326             : #else
     327           0 :     if (s == -1)
     328             :         return -1;
     329             : #endif
     330             : 
     331             :     //  On some systems, IPv4 mapping in IPv6 sockets is disabled by default.
     332             :     //  Switch it on in such cases.
     333           0 :     if (tcp_addr->family () == AF_INET6)
     334           0 :         enable_ipv4_mapping (s);
     335             : 
     336             :     // Set the IP Type-Of-Service priority for this socket
     337           0 :     if (options.tos != 0)
     338           0 :         set_ip_type_of_service (s, options.tos);
     339             : 
     340             :     // Set the socket to non-blocking mode so that we get async connect().
     341           0 :     unblock_socket (s);
     342             : 
     343             :     //  Set the socket buffer limits for the underlying socket.
     344           0 :     if (options.sndbuf >= 0)
     345           0 :         set_tcp_send_buffer (s, options.sndbuf);
     346           0 :     if (options.rcvbuf >= 0)
     347           0 :         set_tcp_receive_buffer (s, options.rcvbuf);
     348             : 
     349             :     // Set the IP Type-Of-Service for the underlying socket
     350           0 :     if (options.tos != 0)
     351           0 :         set_ip_type_of_service (s, options.tos);
     352             : 
     353             :     // Set a source address for conversations
     354           0 :     if (tcp_addr->has_src_addr ()) {
     355           0 :         rc = ::bind (s, tcp_addr->src_addr (), tcp_addr->src_addrlen ());
     356           0 :         if (rc == -1) {
     357           0 :             close ();
     358           0 :             return -1;
     359             :         }
     360             :     }
     361             : 
     362             :     //  Connect to the remote peer.
     363           0 :     rc = ::connect (s, tcp_addr->addr (), tcp_addr->addrlen ());
     364             : 
     365             :     //  Connect was successful immediately.
     366           0 :     if (rc == 0)
     367             :         return 0;
     368             : 
     369             :     //  Translate error codes indicating asynchronous connect has been
     370             :     //  launched to a uniform EINPROGRESS.
     371             : #ifdef ZMQ_HAVE_WINDOWS
     372             :     const int last_error = WSAGetLastError();
     373             :     if (last_error == WSAEINPROGRESS || last_error == WSAEWOULDBLOCK)
     374             :         errno = EINPROGRESS;
     375             :     else {
     376             :         errno = wsa_error_to_errno (last_error);
     377             :         close ();
     378             :     }
     379             : #else
     380           0 :     if (errno == EINTR)
     381           0 :         errno = EINPROGRESS;
     382             : #endif
     383             :     return -1;
     384             : }
     385             : 
     386           0 : zmq::fd_t zmq::socks_connecter_t::check_proxy_connection ()
     387             : {
     388             :     //  Async connect has finished. Check whether an error occurred
     389           0 :     int err = 0;
     390             : #ifdef ZMQ_HAVE_HPUX
     391             :     int len = sizeof err;
     392             : #else
     393           0 :     socklen_t len = sizeof err;
     394             : #endif
     395             : 
     396           0 :     const int rc = getsockopt (s, SOL_SOCKET, SO_ERROR, (char*) &err, &len);
     397             : 
     398             :     //  Assert if the error was caused by 0MQ bug.
     399             :     //  Networking problems are OK. No need to assert.
     400             : #ifdef ZMQ_HAVE_WINDOWS
     401             :     zmq_assert (rc == 0);
     402             :     if (err != 0) {
     403             :         wsa_assert (err == WSAECONNREFUSED
     404             :                  || err == WSAETIMEDOUT
     405             :                  || err == WSAECONNABORTED
     406             :                  || err == WSAEHOSTUNREACH
     407             :                  || err == WSAENETUNREACH
     408             :                  || err == WSAENETDOWN
     409             :                  || err == WSAEACCES
     410             :                  || err == WSAEINVAL
     411             :                  || err == WSAEADDRINUSE);
     412             :         return -1;
     413             :     }
     414             : #else
     415             :     //  Following code should handle both Berkeley-derived socket
     416             :     //  implementations and Solaris.
     417           0 :     if (rc == -1)
     418           0 :         err = errno;
     419           0 :     if (err != 0) {
     420           0 :         errno = err;
     421           0 :         errno_assert (
     422             :             errno == ECONNREFUSED ||
     423             :             errno == ECONNRESET ||
     424             :             errno == ETIMEDOUT ||
     425             :             errno == EHOSTUNREACH ||
     426             :             errno == ENETUNREACH ||
     427             :             errno == ENETDOWN ||
     428             :             errno == EINVAL);
     429             :         return -1;
     430             :     }
     431             : #endif
     432             : 
     433           0 :     tune_tcp_socket (s);
     434             :     tune_tcp_keepalives (s, options.tcp_keepalive, options.tcp_keepalive_cnt,
     435           0 :         options.tcp_keepalive_idle, options.tcp_keepalive_intvl);
     436             : 
     437             :     return 0;
     438             : }
     439             : 
     440           0 : void zmq::socks_connecter_t::close ()
     441             : {
     442           0 :     zmq_assert (s != retired_fd);
     443             : #ifdef ZMQ_HAVE_WINDOWS
     444             :     const int rc = closesocket (s);
     445             :     wsa_assert (rc != SOCKET_ERROR);
     446             : #else
     447           0 :     const int rc = ::close (s);
     448           0 :     errno_assert (rc == 0);
     449             : #endif
     450           0 :     socket->event_closed (endpoint, (int) s);
     451           0 :     s = retired_fd;
     452           0 : }
     453             : 
     454           0 : int zmq::socks_connecter_t::parse_address (
     455             :         const std::string &address_, std::string &hostname_, uint16_t &port_)
     456             : {
     457             :     //  Find the ':' at end that separates address from the port number.
     458           0 :     const size_t idx = address_.rfind (':');
     459           0 :     if (idx == std::string::npos) {
     460           0 :         errno = EINVAL;
     461           0 :         return -1;
     462             :     }
     463             : 
     464             :     //  Extract hostname
     465           0 :     if (idx < 2 || address_ [0] != '[' || address_ [idx - 1] != ']')
     466           0 :         hostname_ = address_.substr (0, idx);
     467             :     else
     468           0 :         hostname_ = address_.substr (1, idx - 2);
     469             : 
     470             :     //  Separate the hostname/port.
     471           0 :     const std::string port_str = address_.substr (idx + 1);
     472             :     //  Parse the port number (0 is not a valid port).
     473           0 :     port_ = (uint16_t) atoi (port_str.c_str ());
     474           0 :     if (port_ == 0) {
     475           0 :         errno = EINVAL;
     476           0 :         return -1;
     477             :     }
     478             :     return 0;
     479             : }

Generated by: LCOV version 1.10