LCOV - code coverage report
Current view: top level - home/h/core/forks/m4-libzmq/src - tcp_connecter.cpp (source / functions) Hit Total Coverage
Test: zeromq-4.2.0 Code Coverage Lines: 130 157 82.8 %
Date: 2016-05-09 Functions: 14 15 93.3 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*
       2             :     Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
       3             : 
       4             :     This file is part of libzmq, the ZeroMQ core engine in C++.
       5             : 
       6             :     libzmq is free software; you can redistribute it and/or modify it under
       7             :     the terms of the GNU Lesser General Public License (LGPL) as published
       8             :     by the Free Software Foundation; either version 3 of the License, or
       9             :     (at your option) any later version.
      10             : 
      11             :     As a special exception, the Contributors give you permission to link
      12             :     this library with independent modules to produce an executable,
      13             :     regardless of the license terms of these independent modules, and to
      14             :     copy and distribute the resulting executable under terms of your choice,
      15             :     provided that you also meet, for each linked independent module, the
      16             :     terms and conditions of the license of that module. An independent
      17             :     module is a module which is not derived from or based on this library.
      18             :     If you modify this library, you must extend this exception to your
      19             :     version of the library.
      20             : 
      21             :     libzmq is distributed in the hope that it will be useful, but WITHOUT
      22             :     ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
      23             :     FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
      24             :     License for more details.
      25             : 
      26             :     You should have received a copy of the GNU Lesser General Public License
      27             :     along with this program.  If not, see <http://www.gnu.org/licenses/>.
      28             : */
      29             : 
      30             : #include "precompiled.hpp"
      31             : #include <new>
      32             : #include <string>
      33             : 
      34             : #include "macros.hpp"
      35             : #include "tcp_connecter.hpp"
      36             : #include "stream_engine.hpp"
      37             : #include "io_thread.hpp"
      38             : #include "platform.hpp"
      39             : #include "random.hpp"
      40             : #include "err.hpp"
      41             : #include "ip.hpp"
      42             : #include "tcp.hpp"
      43             : #include "address.hpp"
      44             : #include "tcp_address.hpp"
      45             : #include "session_base.hpp"
      46             : 
      47             : #if defined ZMQ_HAVE_WINDOWS
      48             : #include "windows.hpp"
      49             : #else
      50             : #include <unistd.h>
      51             : #include <sys/types.h>
      52             : #include <sys/socket.h>
      53             : #include <arpa/inet.h>
      54             : #include <netinet/tcp.h>
      55             : #include <netinet/in.h>
      56             : #include <netdb.h>
      57             : #include <fcntl.h>
      58             : #ifdef ZMQ_HAVE_OPENVMS
      59             : #include <ioctl.h>
      60             : #endif
      61             : #endif
      62             : 
      63        3877 : zmq::tcp_connecter_t::tcp_connecter_t (class io_thread_t *io_thread_,
      64             :       class session_base_t *session_, const options_t &options_,
      65             :       address_t *addr_, bool delayed_start_) :
      66             :     own_t (io_thread_, options_),
      67             :     io_object_t (io_thread_),
      68             :     addr (addr_),
      69             :     s (retired_fd),
      70             :     handle(NULL),
      71             :     handle_valid (false),
      72             :     delayed_start (delayed_start_),
      73             :     connect_timer_started (false),
      74             :     reconnect_timer_started (false),
      75             :     session (session_),
      76        3877 :     current_reconnect_ivl (options.reconnect_ivl)
      77             : {
      78        3876 :     zmq_assert (addr);
      79        7752 :     zmq_assert (addr->protocol == "tcp");
      80        3876 :     addr->to_string (endpoint);
      81        3877 :     socket = session->get_socket ();
      82        3877 : }
      83             : 
      84       15508 : zmq::tcp_connecter_t::~tcp_connecter_t ()
      85             : {
      86        3877 :     zmq_assert (!connect_timer_started);
      87        3877 :     zmq_assert (!reconnect_timer_started);
      88        3877 :     zmq_assert (!handle_valid);
      89        3877 :     zmq_assert (s == retired_fd);
      90        7754 : }
      91             : 
      92        3876 : void zmq::tcp_connecter_t::process_plug ()
      93             : {
      94        3876 :     if (delayed_start)
      95         199 :         add_reconnect_timer ();
      96             :     else
      97        3677 :         start_connecting ();
      98        3877 : }
      99             : 
     100        3876 : void zmq::tcp_connecter_t::process_term (int linger_)
     101             : {
     102        3876 :     if (connect_timer_started) {
     103           0 :         cancel_timer (connect_timer_id);
     104           0 :         connect_timer_started = false;
     105             :     }
     106             : 
     107        3876 :     if (reconnect_timer_started) {
     108         205 :         cancel_timer (reconnect_timer_id);
     109         205 :         reconnect_timer_started = false;
     110             :     }
     111             : 
     112        3876 :     if (handle_valid) {
     113         488 :         rm_fd (handle);
     114         488 :         handle_valid = false;
     115             :     }
     116             : 
     117        3876 :     if (s != retired_fd)
     118         488 :         close ();
     119             : 
     120        3876 :     own_t::process_term (linger_);
     121        3877 : }
     122             : 
     123         114 : void zmq::tcp_connecter_t::in_event ()
     124             : {
     125             :     //  We are not polling for incoming data, so we are actually called
     126             :     //  because of error here. However, we can get error on out event as well
     127             :     //  on some platforms, so we'll simply handle both events in the same way.
     128         114 :     out_event ();
     129         114 : }
     130             : 
     131        3297 : void zmq::tcp_connecter_t::out_event ()
     132             : {
     133        3297 :     if (connect_timer_started) {
     134           0 :         cancel_timer (connect_timer_id);
     135           0 :         connect_timer_started = false;
     136             :     }
     137             : 
     138        3297 :     rm_fd (handle);
     139        3298 :     handle_valid = false;
     140             : 
     141        3298 :     const fd_t fd = connect ();
     142             :     //  Handle the error condition by attempt to reconnect.
     143        3298 :     if (fd == retired_fd) {
     144         114 :         close ();
     145         114 :         add_reconnect_timer ();
     146        3412 :         return;
     147             :     }
     148             : 
     149        3184 :     tune_tcp_socket (fd);
     150             :     tune_tcp_keepalives (fd, options.tcp_keepalive, options.tcp_keepalive_cnt,
     151        3184 :             options.tcp_keepalive_idle, options.tcp_keepalive_intvl);
     152        3184 :     tune_tcp_maxrt (fd, options.tcp_maxrt);
     153             : 
     154             :     //  Create the engine object for this connection.
     155             :     stream_engine_t *engine = new (std::nothrow)
     156        3184 :         stream_engine_t (fd, options, endpoint);
     157        3184 :     alloc_assert (engine);
     158             : 
     159             :     //  Attach the engine to the corresponding session object.
     160        3184 :     send_attach (session, engine);
     161             : 
     162             :     //  Shut the connecter down.
     163        3184 :     terminate ();
     164             : 
     165        3184 :     socket->event_connected (endpoint, (int) fd);
     166             : }
     167             : 
     168         117 : void zmq::tcp_connecter_t::timer_event (int id_)
     169             : {
     170         117 :     zmq_assert (id_ == reconnect_timer_id || id_ == connect_timer_id);
     171         117 :     if (id_ == connect_timer_id) {
     172           0 :         connect_timer_started = false;
     173             : 
     174           0 :         rm_fd (handle);
     175           0 :         handle_valid = false;
     176             : 
     177           0 :         close ();
     178           0 :         add_reconnect_timer ();
     179             :     }
     180         117 :     else if (id_ == reconnect_timer_id) {
     181         117 :         reconnect_timer_started = false;
     182         117 :         start_connecting ();
     183             :     }
     184         117 : }
     185             : 
     186        3794 : void zmq::tcp_connecter_t::start_connecting ()
     187             : {
     188             :     //  Open the connecting socket.
     189        3794 :     const int rc = open ();
     190             : 
     191             :     //  Connect may succeed in synchronous manner.
     192        3795 :     if (rc == 0) {
     193           0 :         handle = add_fd (s);
     194           0 :         handle_valid = true;
     195           0 :         out_event ();
     196             :     }
     197             : 
     198             :     //  Connection establishment may be delayed. Poll for its completion.
     199             :     else
     200        3795 :     if (rc == -1 && errno == EINPROGRESS) {
     201        3786 :         handle = add_fd (s);
     202        3786 :         handle_valid = true;
     203        3786 :         set_pollout (handle);
     204        3786 :         socket->event_connect_delayed (endpoint, zmq_errno());
     205             : 
     206             :         //  add userspace connect timeout
     207             :         add_connect_timer ();
     208             :     }
     209             : 
     210             :     //  Handle any other error condition by eventual reconnect.
     211             :     else {
     212           9 :         if (s != retired_fd)
     213           3 :             close ();
     214           9 :         add_reconnect_timer ();
     215             :     }
     216        3795 : }
     217             : 
     218           0 : void zmq::tcp_connecter_t::add_connect_timer ()
     219             : {
     220        3786 :     if (options.connect_timeout > 0) {
     221           0 :         add_timer (options.connect_timeout, connect_timer_id);
     222           0 :         connect_timer_started = true;
     223             :     }
     224           0 : }
     225             : 
     226         322 : void zmq::tcp_connecter_t::add_reconnect_timer ()
     227             : {
     228         322 :     const int interval = get_new_reconnect_ivl ();
     229         322 :     add_timer (interval, reconnect_timer_id);
     230         322 :     socket->event_connect_retried (endpoint, interval);
     231         322 :     reconnect_timer_started = true;
     232         322 : }
     233             : 
     234         322 : int zmq::tcp_connecter_t::get_new_reconnect_ivl ()
     235             : {
     236             :     //  The new interval is the current interval + random value.
     237         644 :     const int interval = current_reconnect_ivl +
     238         644 :         generate_random () % options.reconnect_ivl;
     239             : 
     240             :     //  Only change the current reconnect interval  if the maximum reconnect
     241             :     //  interval was set and if it's larger than the reconnect interval.
     242         322 :     if (options.reconnect_ivl_max > 0 &&
     243             :         options.reconnect_ivl_max > options.reconnect_ivl)
     244             :         //  Calculate the next interval
     245             :         current_reconnect_ivl =
     246           0 :             std::min (current_reconnect_ivl * 2, options.reconnect_ivl_max);
     247         322 :     return interval;
     248             : }
     249             : 
     250        3794 : int zmq::tcp_connecter_t::open ()
     251             : {
     252        3794 :     zmq_assert (s == retired_fd);
     253             : 
     254             :     //  Resolve the address
     255        3794 :     if (addr->resolved.tcp_addr != NULL) {
     256         117 :         LIBZMQ_DELETE(addr->resolved.tcp_addr);
     257             :     }
     258             : 
     259        3794 :     addr->resolved.tcp_addr = new (std::nothrow) tcp_address_t ();
     260        3795 :     alloc_assert (addr->resolved.tcp_addr);
     261             :     int rc = addr->resolved.tcp_addr->resolve (
     262        7590 :         addr->address.c_str (), false, options.ipv6);
     263        3795 :     if (rc != 0) {
     264           6 :         LIBZMQ_DELETE(addr->resolved.tcp_addr);
     265           6 :         return -1;
     266             :     }
     267        3789 :     zmq_assert (addr->resolved.tcp_addr != NULL);
     268        3789 :     tcp_address_t * const tcp_addr = addr->resolved.tcp_addr;
     269             : 
     270             :     //  Create the socket.
     271        3789 :     s = open_socket (tcp_addr->family (), SOCK_STREAM, IPPROTO_TCP);
     272             : 
     273             :     //  IPv6 address family not supported, try automatic downgrade to IPv4.
     274        3789 :     if (s == zmq::retired_fd && tcp_addr->family () == AF_INET6
     275           0 :     && errno == EAFNOSUPPORT
     276        3789 :     && options.ipv6) {
     277             :         rc = addr->resolved.tcp_addr->resolve (
     278           0 :             addr->address.c_str (), false, false);
     279           0 :         if (rc != 0) {
     280           0 :             LIBZMQ_DELETE(addr->resolved.tcp_addr);
     281           0 :             return -1;
     282             :         }
     283           0 :         s = open_socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
     284             :     }
     285             : 
     286             : #ifdef ZMQ_HAVE_WINDOWS
     287             :     if (s == INVALID_SOCKET) {
     288             :         errno = wsa_error_to_errno (WSAGetLastError ());
     289             :         return -1;
     290             :     }
     291             : #else
     292        3789 :     if (s == -1)
     293             :         return -1;
     294             : #endif
     295             : 
     296             :     //  On some systems, IPv4 mapping in IPv6 sockets is disabled by default.
     297             :     //  Switch it on in such cases.
     298        3789 :     if (tcp_addr->family () == AF_INET6)
     299          15 :         enable_ipv4_mapping (s);
     300             : 
     301             :     // Set the IP Type-Of-Service priority for this socket
     302        3789 :     if (options.tos != 0)
     303           3 :         set_ip_type_of_service (s, options.tos);
     304             : 
     305             :     // Set the socket to non-blocking mode so that we get async connect().
     306        3789 :     unblock_socket (s);
     307             : 
     308             :     //  Set the socket buffer limits for the underlying socket.
     309        3789 :     if (options.sndbuf >= 0)
     310           0 :         set_tcp_send_buffer (s, options.sndbuf);
     311        3789 :     if (options.rcvbuf >= 0)
     312           0 :         set_tcp_receive_buffer (s, options.rcvbuf);
     313             : 
     314             :     // Set the IP Type-Of-Service for the underlying socket
     315        3789 :     if (options.tos != 0)
     316           3 :         set_ip_type_of_service (s, options.tos);
     317             : 
     318             :     // Set a source address for conversations
     319        3789 :     if (tcp_addr->has_src_addr ()) {
     320           6 :         rc = ::bind (s, tcp_addr->src_addr (), tcp_addr->src_addrlen ());
     321           6 :         if (rc == -1)
     322             :             return -1;
     323             :     }
     324             : 
     325             :     //  Connect to the remote peer.
     326        3786 :     rc = ::connect (s, tcp_addr->addr (), tcp_addr->addrlen ());
     327             : 
     328             :     //  Connect was successful immediately.
     329        3786 :     if (rc == 0)
     330             :         return 0;
     331             : 
     332             :     //  Translate error codes indicating asynchronous connect has been
     333             :     //  launched to a uniform EINPROGRESS.
     334             : #ifdef ZMQ_HAVE_WINDOWS
     335             :     const int last_error = WSAGetLastError();
     336             :     if (last_error == WSAEINPROGRESS || last_error == WSAEWOULDBLOCK)
     337             :         errno = EINPROGRESS;
     338             :     else
     339             :         errno = wsa_error_to_errno (last_error);
     340             : #else
     341        3786 :     if (errno == EINTR)
     342           0 :         errno = EINPROGRESS;
     343             : #endif
     344             :     return -1;
     345             : }
     346             : 
     347        3298 : zmq::fd_t zmq::tcp_connecter_t::connect ()
     348             : {
     349             :     //  Async connect has finished. Check whether an error occurred
     350        3298 :     int err = 0;
     351             : #ifdef ZMQ_HAVE_HPUX
     352             :     int len = sizeof err;
     353             : #else
     354        3298 :     socklen_t len = sizeof err;
     355             : #endif
     356             : 
     357        3298 :     const int rc = getsockopt (s, SOL_SOCKET, SO_ERROR, (char*) &err, &len);
     358             : 
     359             :     //  Assert if the error was caused by 0MQ bug.
     360             :     //  Networking problems are OK. No need to assert.
     361             : #ifdef ZMQ_HAVE_WINDOWS
     362             :     zmq_assert (rc == 0);
     363             :     if (err != 0) {
     364             :         if (err == WSAEBADF ||
     365             :             err == WSAENOPROTOOPT ||
     366             :             err == WSAENOTSOCK ||
     367             :             err == WSAENOBUFS)
     368             :         {
     369             :             wsa_assert_no (err);
     370             :         }
     371             :         return retired_fd;
     372             :     }
     373             : #else
     374             :     //  Following code should handle both Berkeley-derived socket
     375             :     //  implementations and Solaris.
     376        3298 :     if (rc == -1)
     377           0 :         err = errno;
     378        3298 :     if (err != 0) {
     379         114 :         errno = err;
     380         114 :         errno_assert (
     381             :             errno != EBADF &&
     382             :             errno != ENOPROTOOPT &&
     383             :             errno != ENOTSOCK &&
     384             :             errno != ENOBUFS);
     385             :         return retired_fd;
     386             :     }
     387             : #endif
     388             : 
     389             :     //  Return the newly connected socket.
     390        3184 :     const fd_t result = s;
     391        3184 :     s = retired_fd;
     392        3184 :     return result;
     393             : }
     394             : 
     395         605 : void zmq::tcp_connecter_t::close ()
     396             : {
     397         605 :     zmq_assert (s != retired_fd);
     398             : #ifdef ZMQ_HAVE_WINDOWS
     399             :     const int rc = closesocket (s);
     400             :     wsa_assert (rc != SOCKET_ERROR);
     401             : #else
     402         605 :     const int rc = ::close (s);
     403         605 :     errno_assert (rc == 0);
     404             : #endif
     405         605 :     socket->event_closed (endpoint, (int) s);
     406         605 :     s = retired_fd;
     407         605 : }

Generated by: LCOV version 1.10