LCOV - code coverage report
Current view: top level - home/h/core/forks/m4-libzmq/src - ipc_connecter.cpp (source / functions) Hit Total Coverage
Test: zeromq-4.2.0 Code Coverage Lines: 70 109 64.2 %
Date: 2016-05-09 Functions: 11 14 78.6 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*
       2             :     Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
       3             : 
       4             :     This file is part of libzmq, the ZeroMQ core engine in C++.
       5             : 
       6             :     libzmq is free software; you can redistribute it and/or modify it under
       7             :     the terms of the GNU Lesser General Public License (LGPL) as published
       8             :     by the Free Software Foundation; either version 3 of the License, or
       9             :     (at your option) any later version.
      10             : 
      11             :     As a special exception, the Contributors give you permission to link
      12             :     this library with independent modules to produce an executable,
      13             :     regardless of the license terms of these independent modules, and to
      14             :     copy and distribute the resulting executable under terms of your choice,
      15             :     provided that you also meet, for each linked independent module, the
      16             :     terms and conditions of the license of that module. An independent
      17             :     module is a module which is not derived from or based on this library.
      18             :     If you modify this library, you must extend this exception to your
      19             :     version of the library.
      20             : 
      21             :     libzmq is distributed in the hope that it will be useful, but WITHOUT
      22             :     ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
      23             :     FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
      24             :     License for more details.
      25             : 
      26             :     You should have received a copy of the GNU Lesser General Public License
      27             :     along with this program.  If not, see <http://www.gnu.org/licenses/>.
      28             : */
      29             : 
      30             : #include "precompiled.hpp"
      31             : #include "ipc_connecter.hpp"
      32             : 
      33             : #if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
      34             : 
      35             : #include <new>
      36             : #include <string>
      37             : 
      38             : #include "stream_engine.hpp"
      39             : #include "io_thread.hpp"
      40             : #include "platform.hpp"
      41             : #include "random.hpp"
      42             : #include "err.hpp"
      43             : #include "ip.hpp"
      44             : #include "address.hpp"
      45             : #include "ipc_address.hpp"
      46             : #include "session_base.hpp"
      47             : 
      48             : #include <unistd.h>
      49             : #include <sys/types.h>
      50             : #include <sys/socket.h>
      51             : #include <sys/un.h>
      52             : 
      53          65 : zmq::ipc_connecter_t::ipc_connecter_t (class io_thread_t *io_thread_,
      54             :       class session_base_t *session_, const options_t &options_,
      55             :       const address_t *addr_, bool delayed_start_) :
      56             :     own_t (io_thread_, options_),
      57             :     io_object_t (io_thread_),
      58             :     addr (addr_),
      59             :     s (retired_fd),
      60             :     handle_valid (false),
      61             :     delayed_start (delayed_start_),
      62             :     timer_started (false),
      63             :     session (session_),
      64          65 :     current_reconnect_ivl(options.reconnect_ivl)
      65             : {
      66          65 :     zmq_assert (addr);
      67         130 :     zmq_assert (addr->protocol == "ipc");
      68          65 :     addr->to_string (endpoint);
      69          65 :     socket = session-> get_socket();
      70          65 : }
      71             : 
      72         260 : zmq::ipc_connecter_t::~ipc_connecter_t ()
      73             : {
      74          65 :     zmq_assert (!timer_started);
      75          65 :     zmq_assert (!handle_valid);
      76          65 :     zmq_assert (s == retired_fd);
      77         130 : }
      78             : 
      79          65 : void zmq::ipc_connecter_t::process_plug ()
      80             : {
      81          65 :     if (delayed_start)
      82           2 :         add_reconnect_timer ();
      83             :     else
      84          63 :         start_connecting ();
      85          65 : }
      86             : 
      87          65 : void zmq::ipc_connecter_t::process_term (int linger_)
      88             : {
      89          65 :     if (timer_started) {
      90           2 :         cancel_timer (reconnect_timer_id);
      91           2 :         timer_started = false;
      92             :     }
      93             : 
      94          65 :     if (handle_valid) {
      95           0 :         rm_fd (handle);
      96           0 :         handle_valid = false;
      97             :     }
      98             : 
      99          65 :     if (s != retired_fd)
     100           0 :         close ();
     101             : 
     102          65 :     own_t::process_term (linger_);
     103          65 : }
     104             : 
     105           0 : void zmq::ipc_connecter_t::in_event ()
     106             : {
     107             :     //  We are not polling for incoming data, so we are actually called
     108             :     //  because of error here. However, we can get error on out event as well
     109             :     //  on some platforms, so we'll simply handle both events in the same way.
     110           0 :     out_event ();
     111           0 : }
     112             : 
     113          63 : void zmq::ipc_connecter_t::out_event ()
     114             : {
     115          63 :     fd_t fd = connect ();
     116          63 :     rm_fd (handle);
     117          63 :     handle_valid = false;
     118             : 
     119             :     //  Handle the error condition by attempt to reconnect.
     120          63 :     if (fd == retired_fd) {
     121           0 :         close ();
     122           0 :         add_reconnect_timer();
     123          63 :         return;
     124             :     }
     125             :     //  Create the engine object for this connection.
     126             :     stream_engine_t *engine = new (std::nothrow)
     127          63 :         stream_engine_t (fd, options, endpoint);
     128          63 :     alloc_assert (engine);
     129             : 
     130             :     //  Attach the engine to the corresponding session object.
     131          63 :     send_attach (session, engine);
     132             : 
     133             :     //  Shut the connecter down.
     134          63 :     terminate ();
     135             : 
     136          63 :     socket->event_connected (endpoint, fd);
     137             : }
     138             : 
     139           0 : void zmq::ipc_connecter_t::timer_event (int id_)
     140             : {
     141           0 :     zmq_assert (id_ == reconnect_timer_id);
     142           0 :     timer_started = false;
     143           0 :     start_connecting ();
     144           0 : }
     145             : 
     146          63 : void zmq::ipc_connecter_t::start_connecting ()
     147             : {
     148             :     //  Open the connecting socket.
     149          63 :     int rc = open ();
     150             : 
     151             :     //  Connect may succeed in synchronous manner.
     152          63 :     if (rc == 0) {
     153          63 :         handle = add_fd (s);
     154          63 :         handle_valid = true;
     155          63 :         out_event ();
     156             :     }
     157             : 
     158             :     //  Connection establishment may be delayed. Poll for its completion.
     159             :     else
     160           0 :     if (rc == -1 && errno == EINPROGRESS) {
     161           0 :         handle = add_fd (s);
     162           0 :         handle_valid = true;
     163           0 :         set_pollout (handle);
     164           0 :         socket->event_connect_delayed (endpoint, zmq_errno());
     165             :     }
     166             : 
     167             :     //  Handle any other error condition by eventual reconnect.
     168             :     else {
     169           0 :         if (s != retired_fd)
     170           0 :             close ();
     171           0 :         add_reconnect_timer ();
     172             :     }
     173          63 : }
     174             : 
     175           2 : void zmq::ipc_connecter_t::add_reconnect_timer()
     176             : {
     177           2 :     int rc_ivl = get_new_reconnect_ivl();
     178           2 :     add_timer (rc_ivl, reconnect_timer_id);
     179           2 :     socket->event_connect_retried (endpoint, rc_ivl);
     180           2 :     timer_started = true;
     181           2 : }
     182             : 
     183           2 : int zmq::ipc_connecter_t::get_new_reconnect_ivl ()
     184             : {
     185             :     //  The new interval is the current interval + random value.
     186           4 :     int this_interval = current_reconnect_ivl +
     187           4 :         (generate_random () % options.reconnect_ivl);
     188             : 
     189             :     //  Only change the current reconnect interval  if the maximum reconnect
     190             :     //  interval was set and if it's larger than the reconnect interval.
     191           2 :     if (options.reconnect_ivl_max > 0 &&
     192             :         options.reconnect_ivl_max > options.reconnect_ivl) {
     193             : 
     194             :         //  Calculate the next interval
     195           0 :         current_reconnect_ivl = current_reconnect_ivl * 2;
     196           0 :         if(current_reconnect_ivl >= options.reconnect_ivl_max) {
     197           0 :             current_reconnect_ivl = options.reconnect_ivl_max;
     198             :         }
     199             :     }
     200           2 :     return this_interval;
     201             : }
     202             : 
     203          63 : int zmq::ipc_connecter_t::open ()
     204             : {
     205          63 :     zmq_assert (s == retired_fd);
     206             : 
     207             :     //  Create the socket.
     208          63 :     s = open_socket (AF_UNIX, SOCK_STREAM, 0);
     209          63 :     if (s == -1)
     210             :         return -1;
     211             : 
     212             :     //  Set the non-blocking flag.
     213          63 :     unblock_socket (s);
     214             : 
     215             :     //  Connect to the remote peer.
     216             :     int rc = ::connect (
     217             :         s, addr->resolved.ipc_addr->addr (),
     218          63 :         addr->resolved.ipc_addr->addrlen ());
     219             : 
     220             :     //  Connect was successful immediately.
     221          63 :     if (rc == 0)
     222             :         return 0;
     223             : 
     224             :     //  Translate other error codes indicating asynchronous connect has been
     225             :     //  launched to a uniform EINPROGRESS.
     226           0 :     if (rc == -1 && errno == EINTR) {
     227           0 :         errno = EINPROGRESS;
     228           0 :         return -1;
     229             :     }
     230             : 
     231             :     //  Forward the error.
     232             :     return -1;
     233             : }
     234             : 
     235           0 : int zmq::ipc_connecter_t::close ()
     236             : {
     237           0 :     zmq_assert (s != retired_fd);
     238           0 :     int rc = ::close (s);
     239           0 :     errno_assert (rc == 0);
     240           0 :     socket->event_closed (endpoint, s);
     241           0 :     s = retired_fd;
     242           0 :     return 0;
     243             : }
     244             : 
     245          63 : zmq::fd_t zmq::ipc_connecter_t::connect ()
     246             : {
     247             :     //  Following code should handle both Berkeley-derived socket
     248             :     //  implementations and Solaris.
     249          63 :     int err = 0;
     250             : #if defined ZMQ_HAVE_HPUX
     251             :     int len = sizeof (err);
     252             : #else
     253          63 :     socklen_t len = sizeof (err);
     254             : #endif
     255          63 :     int rc = getsockopt (s, SOL_SOCKET, SO_ERROR, (char*) &err, &len);
     256          63 :     if (rc == -1) {
     257           0 :         if (errno == ENOPROTOOPT)
     258           0 :             errno = 0;
     259           0 :         err = errno;
     260             :     }
     261          63 :     if (err != 0) {
     262             : 
     263             :         //  Assert if the error was caused by 0MQ bug.
     264             :         //  Networking problems are OK. No need to assert.
     265           0 :         errno = err;
     266           0 :         errno_assert (errno == ECONNREFUSED || errno == ECONNRESET ||
     267             :             errno == ETIMEDOUT || errno == EHOSTUNREACH ||
     268             :             errno == ENETUNREACH || errno == ENETDOWN);
     269             : 
     270             :         return retired_fd;
     271             :     }
     272             : 
     273          63 :     fd_t result = s;
     274          63 :     s = retired_fd;
     275          63 :     return result;
     276             : }
     277             : 
     278             : #endif
     279             : 

Generated by: LCOV version 1.10