LCOV - code coverage report
Current view: top level - home/h/core/forks/m4-libzmq/src - session_base.cpp (source / functions) Hit Total Coverage
Test: zeromq-4.2.0 Code Coverage Lines: 231 267 86.5 %
Date: 2016-05-09 Functions: 24 26 92.3 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*
       2             :     Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
       3             : 
       4             :     This file is part of libzmq, the ZeroMQ core engine in C++.
       5             : 
       6             :     libzmq is free software; you can redistribute it and/or modify it under
       7             :     the terms of the GNU Lesser General Public License (LGPL) as published
       8             :     by the Free Software Foundation; either version 3 of the License, or
       9             :     (at your option) any later version.
      10             : 
      11             :     As a special exception, the Contributors give you permission to link
      12             :     this library with independent modules to produce an executable,
      13             :     regardless of the license terms of these independent modules, and to
      14             :     copy and distribute the resulting executable under terms of your choice,
      15             :     provided that you also meet, for each linked independent module, the
      16             :     terms and conditions of the license of that module. An independent
      17             :     module is a module which is not derived from or based on this library.
      18             :     If you modify this library, you must extend this exception to your
      19             :     version of the library.
      20             : 
      21             :     libzmq is distributed in the hope that it will be useful, but WITHOUT
      22             :     ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
      23             :     FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
      24             :     License for more details.
      25             : 
      26             :     You should have received a copy of the GNU Lesser General Public License
      27             :     along with this program.  If not, see <http://www.gnu.org/licenses/>.
      28             : */
      29             : 
      30             : #include "precompiled.hpp"
      31             : #include "macros.hpp"
      32             : #include "session_base.hpp"
      33             : #include "i_engine.hpp"
      34             : #include "err.hpp"
      35             : #include "pipe.hpp"
      36             : #include "likely.hpp"
      37             : #include "tcp_connecter.hpp"
      38             : #include "ipc_connecter.hpp"
      39             : #include "tipc_connecter.hpp"
      40             : #include "socks_connecter.hpp"
      41             : #include "vmci_connecter.hpp"
      42             : #include "pgm_sender.hpp"
      43             : #include "pgm_receiver.hpp"
      44             : #include "address.hpp"
      45             : #include "norm_engine.hpp"
      46             : #include "udp_engine.hpp"
      47             : 
      48             : #include "ctx.hpp"
      49             : #include "req.hpp"
      50             : #include "radio.hpp"
      51             : #include "dish.hpp"
      52             : 
      53        7258 : zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_,
      54             :     bool active_, class socket_base_t *socket_, const options_t &options_,
      55             :     address_t *addr_)
      56             : {
      57        7258 :     session_base_t *s = NULL;
      58        7258 :     switch (options_.type) {
      59             :     case ZMQ_REQ:
      60             :         s = new (std::nothrow) req_session_t (io_thread_, active_,
      61          99 :             socket_, options_, addr_);
      62          99 :         break;
      63             :     case ZMQ_RADIO:
      64             :         s = new (std::nothrow) radio_session_t (io_thread_, active_,
      65           6 :             socket_, options_, addr_);
      66           6 :         break;
      67             :     case ZMQ_DISH:
      68             :         s = new (std::nothrow) dish_session_t (io_thread_, active_,
      69           6 :             socket_, options_, addr_);
      70           6 :             break;
      71             :     case ZMQ_DEALER:
      72             :     case ZMQ_REP:
      73             :     case ZMQ_ROUTER:
      74             :     case ZMQ_PUB:
      75             :     case ZMQ_XPUB:
      76             :     case ZMQ_SUB:
      77             :     case ZMQ_XSUB:
      78             :     case ZMQ_PUSH:
      79             :     case ZMQ_PULL:
      80             :     case ZMQ_PAIR:
      81             :     case ZMQ_STREAM:
      82             :     case ZMQ_SERVER:
      83             :     case ZMQ_CLIENT:
      84             :     case ZMQ_GATHER:
      85             :     case ZMQ_SCATTER:
      86             :         s = new (std::nothrow) session_base_t (io_thread_, active_,
      87        7147 :             socket_, options_, addr_);
      88        7146 :         break;
      89             :     default:
      90           0 :         errno = EINVAL;
      91           0 :         return NULL;
      92             :     }
      93        7257 :     alloc_assert (s);
      94        7257 :     return s;
      95             : }
      96             : 
      97        7262 : zmq::session_base_t::session_base_t (class io_thread_t *io_thread_,
      98             :       bool active_, class socket_base_t *socket_, const options_t &options_,
      99             :       address_t *addr_) :
     100             :     own_t (io_thread_, options_),
     101             :     io_object_t (io_thread_),
     102             :     active (active_),
     103             :     pipe (NULL),
     104             :     zap_pipe (NULL),
     105             :     incomplete_in (false),
     106             :     pending (false),
     107             :     engine (NULL),
     108             :     socket (socket_),
     109             :     io_thread (io_thread_),
     110             :     has_linger_timer (false),
     111       21778 :     addr (addr_)
     112             : {
     113        7258 : }
     114             : 
     115       36196 : zmq::session_base_t::~session_base_t ()
     116             : {
     117        7262 :     zmq_assert (!pipe);
     118        7262 :     zmq_assert (!zap_pipe);
     119             : 
     120             :     //  If there's still a pending linger timer, remove it.
     121        7262 :     if (has_linger_timer) {
     122           0 :         cancel_timer (linger_timer_id);
     123           0 :         has_linger_timer = false;
     124             :     }
     125             : 
     126             :     //  Close the engine.
     127        7262 :     if (engine)
     128        4347 :         engine->terminate ();
     129             : 
     130        7262 :     LIBZMQ_DELETE(addr);
     131       14406 : }
     132             : 
     133        3738 : void zmq::session_base_t::attach_pipe (pipe_t *pipe_)
     134             : {
     135        3738 :     zmq_assert (!is_terminating ());
     136        3738 :     zmq_assert (!pipe);
     137        3738 :     zmq_assert (pipe_);
     138        3738 :     pipe = pipe_;
     139        3738 :     pipe->set_event_sink (this);
     140        3738 : }
     141             : 
     142      649832 : int zmq::session_base_t::pull_msg (msg_t *msg_)
     143             : {
     144      649832 :     if (!pipe || !pipe->read (msg_)) {
     145       14828 :         errno = EAGAIN;
     146       14828 :         return -1;
     147             :     }
     148             : 
     149      635004 :     incomplete_in = msg_->flags () & msg_t::more ? true : false;
     150             : 
     151      635004 :     return 0;
     152             : }
     153             : 
     154      636982 : int zmq::session_base_t::push_msg (msg_t *msg_)
     155             : {
     156      636982 :     if(msg_->flags() & msg_t::command)
     157             :         return 0;
     158      636772 :     if (pipe && pipe->write (msg_)) {
     159      635565 :         int rc = msg_->init ();
     160      635565 :         errno_assert (rc == 0);
     161             :         return 0;
     162             :     }
     163             : 
     164        1207 :     errno = EAGAIN;
     165        1207 :     return -1;
     166             : }
     167             : 
     168         166 : int zmq::session_base_t::read_zap_msg (msg_t *msg_)
     169             : {
     170         166 :     if (zap_pipe == NULL) {
     171           0 :         errno = ENOTCONN;
     172           0 :         return -1;
     173             :     }
     174             : 
     175         166 :     if (!zap_pipe->read (msg_)) {
     176          19 :         errno = EAGAIN;
     177          19 :         return -1;
     178             :     }
     179             : 
     180             :     return 0;
     181             : }
     182             : 
     183         165 : int zmq::session_base_t::write_zap_msg (msg_t *msg_)
     184             : {
     185         165 :     if (zap_pipe == NULL) {
     186           0 :         errno = ENOTCONN;
     187           0 :         return -1;
     188             :     }
     189             : 
     190         165 :     const bool ok = zap_pipe->write (msg_);
     191         165 :     zmq_assert (ok);
     192             : 
     193         165 :     if ((msg_->flags () & msg_t::more) == 0)
     194          21 :         zap_pipe->flush ();
     195             : 
     196         165 :     const int rc = msg_->init ();
     197         165 :     errno_assert (rc == 0);
     198             :     return 0;
     199             : }
     200             : 
     201         210 : void zmq::session_base_t::reset ()
     202             : {
     203         210 : }
     204             : 
     205        9562 : void zmq::session_base_t::flush ()
     206             : {
     207        9562 :     if (pipe)
     208        9498 :         pipe->flush ();
     209        9563 : }
     210             : 
     211        2373 : void zmq::session_base_t::clean_pipes ()
     212             : {
     213        2373 :     zmq_assert (pipe != NULL);
     214             : 
     215             :     //  Get rid of half-processed messages in the out pipe. Flush any
     216             :     //  unflushed messages upstream.
     217        2373 :     pipe->rollback ();
     218        2373 :     pipe->flush ();
     219             : 
     220             :     //  Remove any half-read message from the in pipe.
     221        4746 :     while (incomplete_in) {
     222             :         msg_t msg;
     223           0 :         int rc = msg.init ();
     224           0 :         errno_assert (rc == 0);
     225           0 :         rc = pull_msg (&msg);
     226           0 :         errno_assert (rc == 0);
     227           0 :         rc = msg.close ();
     228           0 :         errno_assert (rc == 0);
     229             :     }
     230        2373 : }
     231             : 
     232        7278 : void zmq::session_base_t::pipe_terminated (pipe_t *pipe_)
     233             : {
     234             :     // Drop the reference to the deallocated pipe if required.
     235        7281 :     zmq_assert (pipe_ == pipe
     236             :              || pipe_ == zap_pipe
     237             :              || terminating_pipes.count (pipe_) == 1);
     238             : 
     239        7278 :     if (pipe_ == pipe) {
     240             :         // If this is our current pipe, remove it
     241        7254 :         pipe = NULL;
     242        7254 :         if (has_linger_timer) {
     243           3 :             cancel_timer (linger_timer_id);
     244           3 :             has_linger_timer = false;
     245             :         }
     246             :     }
     247             :     else
     248          24 :     if (pipe_ == zap_pipe)
     249          21 :         zap_pipe = NULL;
     250             :     else
     251             :         // Remove the pipe from the detached pipes set
     252           3 :         terminating_pipes.erase (pipe_);
     253             : 
     254        7278 :     if (!is_terminating () && options.raw_socket) {
     255          33 :         if (engine) {
     256          20 :             engine->terminate ();
     257          20 :             engine = NULL;
     258             :         }
     259          33 :         terminate ();
     260             :     }
     261             : 
     262             :     //  If we are waiting for pending messages to be sent, at this point
     263             :     //  we are sure that there will be no more messages and we can proceed
     264             :     //  with termination safely.
     265       14182 :     if (pending && !pipe && !zap_pipe && terminating_pipes.empty ()) {
     266        6904 :         pending = false;
     267        6904 :         own_t::process_term (0);
     268             :     }
     269        7281 : }
     270             : 
     271        8048 : void zmq::session_base_t::read_activated (pipe_t *pipe_)
     272             : {
     273             :     // Skip activating if we're detaching this pipe
     274        8048 :     if (unlikely (pipe_ != pipe && pipe_ != zap_pipe)) {
     275           0 :         zmq_assert (terminating_pipes.count (pipe_) == 1);
     276             :         return;
     277             :     }
     278             : 
     279        8048 :     if (unlikely (engine == NULL)) {
     280         148 :         pipe->check_read ();
     281         148 :         return;
     282             :     }
     283             : 
     284        7900 :     if (likely (pipe_ == pipe))
     285        7881 :         engine->restart_output ();
     286             :     else
     287          19 :         engine->zap_msg_available ();
     288             : }
     289             : 
     290        1191 : void zmq::session_base_t::write_activated (pipe_t *pipe_)
     291             : {
     292             :     // Skip activating if we're detaching this pipe
     293        1191 :     if (pipe != pipe_) {
     294           0 :         zmq_assert (terminating_pipes.count (pipe_) == 1);
     295        1191 :         return;
     296             :     }
     297             : 
     298        1191 :     if (engine)
     299        1191 :         engine->restart_input ();
     300             : }
     301             : 
     302           0 : void zmq::session_base_t::hiccuped (pipe_t *)
     303             : {
     304             :     //  Hiccups are always sent from session to socket, not the other
     305             :     //  way round.
     306           0 :     zmq_assert (false);
     307           0 : }
     308             : 
     309       10704 : zmq::socket_base_t *zmq::session_base_t::get_socket ()
     310             : {
     311       10704 :     return socket;
     312             : }
     313             : 
     314        7262 : void zmq::session_base_t::process_plug ()
     315             : {
     316        7262 :     if (active)
     317        3747 :         start_connecting (false);
     318        7262 : }
     319             : 
     320          24 : int zmq::session_base_t::zap_connect ()
     321             : {
     322          24 :     zmq_assert (zap_pipe == NULL);
     323             : 
     324          24 :     endpoint_t peer = find_endpoint ("inproc://zeromq.zap.01");
     325          24 :     if (peer.socket == NULL) {
     326           3 :         errno = ECONNREFUSED;
     327           3 :         return -1;
     328             :     }
     329          21 :     if (peer.options.type != ZMQ_REP
     330          21 :     &&  peer.options.type != ZMQ_ROUTER
     331          21 :     &&  peer.options.type != ZMQ_SERVER) {
     332           0 :         errno = ECONNREFUSED;
     333           0 :         return -1;
     334             :     }
     335             : 
     336             :     //  Create a bi-directional pipe that will connect
     337             :     //  session with zap socket.
     338          21 :     object_t *parents [2] = {this, peer.socket};
     339          21 :     pipe_t *new_pipes [2] = {NULL, NULL};
     340          21 :     int hwms [2] = {0, 0};
     341          21 :     bool conflates [2] = {false, false};
     342          21 :     int rc = pipepair (parents, new_pipes, hwms, conflates);
     343          21 :     errno_assert (rc == 0);
     344             : 
     345             :     //  Attach local end of the pipe to this socket object.
     346          21 :     zap_pipe = new_pipes [0];
     347          21 :     zap_pipe->set_nodelay ();
     348          21 :     zap_pipe->set_event_sink (this);
     349             : 
     350          21 :     send_bind (peer.socket, new_pipes [1], false);
     351             : 
     352             :     //  Send empty identity if required by the peer.
     353          21 :     if (peer.options.recv_identity) {
     354             :         msg_t id;
     355          21 :         rc = id.init ();
     356          21 :         errno_assert (rc == 0);
     357          21 :         id.set_flags (msg_t::identity);
     358          21 :         bool ok = zap_pipe->write (&id);
     359          21 :         zmq_assert (ok);
     360          21 :         zap_pipe->flush ();
     361             :     }
     362             : 
     363             :     return 0;
     364             : }
     365             : 
     366           9 : bool zmq::session_base_t::zap_enabled ()
     367             : {
     368             :     return (
     369           9 :          options.mechanism != ZMQ_NULL ||
     370           6 :         (options.mechanism == ZMQ_NULL && options.zap_domain.length() > 0)
     371           9 :     );
     372             : }
     373             : 
     374        6768 : void zmq::session_base_t::process_attach (i_engine *engine_)
     375             : {
     376        6768 :     zmq_assert (engine_ != NULL);
     377             : 
     378             :     //  Create the pipe if it does not exist yet.
     379        6768 :     if (!pipe && !is_terminating ()) {
     380        3524 :         object_t *parents [2] = {this, socket};
     381        3524 :         pipe_t *pipes [2] = {NULL, NULL};
     382             : 
     383        3527 :         bool conflate = options.conflate &&
     384           3 :             (options.type == ZMQ_DEALER ||
     385           0 :              options.type == ZMQ_PULL ||
     386           0 :              options.type == ZMQ_PUSH ||
     387           0 :              options.type == ZMQ_PUB ||
     388        3524 :              options.type == ZMQ_SUB);
     389             : 
     390             :         int hwms [2] = {conflate? -1 : options.rcvhwm,
     391        3524 :             conflate? -1 : options.sndhwm};
     392        3524 :         bool conflates [2] = {conflate, conflate};
     393        3524 :         int rc = pipepair (parents, pipes, hwms, conflates);
     394        3524 :         errno_assert (rc == 0);
     395             : 
     396             :         //  Plug the local end of the pipe.
     397        3524 :         pipes [0]->set_event_sink (this);
     398             : 
     399             :         //  Remember the local end of the pipe.
     400        3524 :         zmq_assert (!pipe);
     401        3524 :         pipe = pipes [0];
     402             : 
     403             :         //  Ask socket to plug into the remote end of the pipe.
     404        3524 :         send_bind (socket, pipes [1]);
     405             :     }
     406             : 
     407             :     //  Plug in the engine.
     408        6768 :     zmq_assert (!engine);
     409        6768 :     engine = engine_;
     410        6768 :     engine->plug (io_thread, this);
     411        6768 : }
     412             : 
     413        2401 : void zmq::session_base_t::engine_error (
     414             :         zmq::stream_engine_t::error_reason_t reason)
     415             : {
     416             :     //  Engine is dead. Let's forget about it.
     417        2401 :     engine = NULL;
     418             : 
     419             :     //  Remove any half-done messages from the pipes.
     420        2401 :     if (pipe)
     421        2373 :         clean_pipes ();
     422             : 
     423        2401 :     zmq_assert (reason == stream_engine_t::connection_error
     424             :              || reason == stream_engine_t::timeout_error
     425             :              || reason == stream_engine_t::protocol_error);
     426             : 
     427        2401 :     switch (reason) {
     428             :         case stream_engine_t::timeout_error:
     429             :         case stream_engine_t::connection_error:
     430        2341 :             if (active)
     431         210 :                 reconnect ();
     432             :             else
     433        2131 :                 terminate ();
     434             :             break;
     435             :         case stream_engine_t::protocol_error:
     436          60 :             terminate ();
     437          60 :             break;
     438             :     }
     439             : 
     440             :     //  Just in case there's only a delimiter in the pipe.
     441        2401 :     if (pipe)
     442        2370 :         pipe->check_read ();
     443             : 
     444        2401 :     if (zap_pipe)
     445          18 :         zap_pipe->check_read ();
     446        2401 : }
     447             : 
     448        7259 : void zmq::session_base_t::process_term (int linger_)
     449             : {
     450        7259 :     zmq_assert (!pending);
     451             : 
     452             :     //  If the termination of the pipe happens before the term command is
     453             :     //  delivered there's nothing much to do. We can proceed with the
     454             :     //  standard termination immediately.
     455        7613 :     if (!pipe && !zap_pipe && terminating_pipes.empty ()) {
     456         354 :         own_t::process_term (0);
     457        7615 :         return;
     458             :     }
     459             : 
     460        6905 :     pending = true;
     461             : 
     462        6905 :     if (pipe != NULL) {
     463             :         //  If there's finite linger value, delay the termination.
     464             :         //  If linger is infinite (negative) we don't even have to set
     465             :         //  the timer.
     466        6902 :         if (linger_ > 0) {
     467           3 :             zmq_assert (!has_linger_timer);
     468           3 :             add_timer (linger_, linger_timer_id);
     469           3 :             has_linger_timer = true;
     470             :         }
     471             : 
     472             :         //  Start pipe termination process. Delay the termination till all messages
     473             :         //  are processed in case the linger time is non-zero.
     474        6902 :         pipe->terminate (linger_ != 0);
     475             : 
     476             :         //  TODO: Should this go into pipe_t::terminate ?
     477             :         //  In case there's no engine and there's only delimiter in the
     478             :         //  pipe it wouldn't be ever read. Thus we check for it explicitly.
     479        6904 :         if (!engine)
     480        3186 :             pipe->check_read ();
     481             :     }
     482             : 
     483        6907 :     if (zap_pipe != NULL)
     484          21 :         zap_pipe->terminate (false);
     485             : }
     486             : 
     487           0 : void zmq::session_base_t::timer_event (int id_)
     488             : {
     489             :     //  Linger period expired. We can proceed with termination even though
     490             :     //  there are still pending messages to be sent.
     491           0 :     zmq_assert (id_ == linger_timer_id);
     492           0 :     has_linger_timer = false;
     493             : 
     494             :     //  Ask pipe to terminate even though there may be pending messages in it.
     495           0 :     zmq_assert (pipe);
     496           0 :     pipe->terminate (false);
     497           0 : }
     498             : 
     499         210 : void zmq::session_base_t::reconnect ()
     500             : {
     501             :     //  For delayed connect situations, terminate the pipe
     502             :     //  and reestablish later on
     503         397 :     if (pipe && options.immediate == 1
     504           9 :         && addr->protocol != "pgm" && addr->protocol != "epgm"
     505         219 :         && addr->protocol != "norm" && addr->protocol != "udp") {
     506           3 :         pipe->hiccup ();
     507           3 :         pipe->terminate (false);
     508           3 :         terminating_pipes.insert (pipe);
     509           3 :         pipe = NULL;
     510             :     }
     511             : 
     512         210 :     reset ();
     513             : 
     514             :     //  Reconnect.
     515         210 :     if (options.reconnect_ivl != -1)
     516         201 :         start_connecting (true);
     517             : 
     518             :     //  For subscriber sockets we hiccup the inbound pipe, which will cause
     519             :     //  the socket object to resend all the subscriptions.
     520         210 :     if (pipe && (options.type == ZMQ_SUB || options.type == ZMQ_XSUB))
     521         112 :         pipe->hiccup ();
     522         210 : }
     523             : 
     524        3948 : void zmq::session_base_t::start_connecting (bool wait_)
     525             : {
     526        3948 :     zmq_assert (active);
     527             : 
     528             :     //  Choose I/O thread to run connecter in. Given that we are already
     529             :     //  running in an I/O thread, there must be at least one available.
     530        3948 :     io_thread_t *io_thread = choose_io_thread (options.affinity);
     531        3947 :     zmq_assert (io_thread);
     532             : 
     533             :     //  Create the connecter object.
     534             : 
     535        7893 :     if (addr->protocol == "tcp") {
     536        7750 :         if (!options.socks_proxy_address.empty()) {
     537             :             address_t *proxy_address = new (std::nothrow)
     538           0 :                 address_t ("tcp", options.socks_proxy_address, this->get_ctx ());
     539           0 :             alloc_assert (proxy_address);
     540             :             socks_connecter_t *connecter =
     541             :                 new (std::nothrow) socks_connecter_t (
     542           0 :                     io_thread, this, options, addr, proxy_address, wait_);
     543           0 :             alloc_assert (connecter);
     544           0 :             launch_child (connecter);
     545             :         }
     546             :         else {
     547             :             tcp_connecter_t *connecter = new (std::nothrow)
     548        3875 :                 tcp_connecter_t (io_thread, this, options, addr, wait_);
     549        3877 :             alloc_assert (connecter);
     550        3877 :             launch_child (connecter);
     551             :         }
     552             :         return;
     553             :     }
     554             : 
     555             : #if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
     556         142 :     if (addr->protocol == "ipc") {
     557             :         ipc_connecter_t *connecter = new (std::nothrow) ipc_connecter_t (
     558          65 :             io_thread, this, options, addr, wait_);
     559          65 :         alloc_assert (connecter);
     560          65 :         launch_child (connecter);
     561          65 :         return;
     562             :     }
     563             : #endif
     564             : #if defined ZMQ_HAVE_TIPC
     565             :     if (addr->protocol == "tipc") {
     566             :         tipc_connecter_t *connecter = new (std::nothrow) tipc_connecter_t (
     567             :             io_thread, this, options, addr, wait_);
     568             :         alloc_assert (connecter);
     569             :         launch_child (connecter);
     570             :         return;
     571             :     }
     572             : #endif
     573             : 
     574          12 : if (addr->protocol == "udp") {
     575           6 :     zmq_assert (options.type == ZMQ_DISH || options.type == ZMQ_RADIO);
     576             : 
     577           6 :     udp_engine_t* engine = new (std::nothrow) udp_engine_t ();
     578           6 :     alloc_assert (engine);
     579             : 
     580           6 :     bool recv = false;
     581           6 :     bool send = false;
     582             : 
     583           6 :     if (options.type == ZMQ_RADIO) {
     584             :         send = true;
     585             :         recv = false;
     586             :     }
     587           3 :     else if (options.type == ZMQ_DISH) {
     588           3 :         send = false;
     589           3 :         recv = true;
     590             :     }
     591             : 
     592           6 :     int rc = engine->init (addr, send, recv);
     593           6 :     errno_assert (rc == 0);
     594             : 
     595           6 :     send_attach (this, engine);
     596             : 
     597           6 :     return;
     598             : }
     599             : 
     600             : #ifdef ZMQ_HAVE_OPENPGM
     601             : 
     602             :     //  Both PGM and EPGM transports are using the same infrastructure.
     603             :     if (addr->protocol == "pgm" || addr->protocol == "epgm") {
     604             : 
     605             :         zmq_assert (options.type == ZMQ_PUB || options.type == ZMQ_XPUB
     606             :                  || options.type == ZMQ_SUB || options.type == ZMQ_XSUB);
     607             : 
     608             :         //  For EPGM transport with UDP encapsulation of PGM is used.
     609             :         bool const udp_encapsulation = addr->protocol == "epgm";
     610             : 
     611             :         //  At this point we'll create message pipes to the session straight
     612             :         //  away. There's no point in delaying it as no concept of 'connect'
     613             :         //  exists with PGM anyway.
     614             :         if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB) {
     615             : 
     616             :             //  PGM sender.
     617             :             pgm_sender_t *pgm_sender = new (std::nothrow) pgm_sender_t (
     618             :                 io_thread, options);
     619             :             alloc_assert (pgm_sender);
     620             : 
     621             :             int rc = pgm_sender->init (udp_encapsulation, addr->address.c_str ());
     622             :             errno_assert (rc == 0);
     623             : 
     624             :             send_attach (this, pgm_sender);
     625             :         }
     626             :         else {
     627             : 
     628             :             //  PGM receiver.
     629             :             pgm_receiver_t *pgm_receiver = new (std::nothrow) pgm_receiver_t (
     630             :                 io_thread, options);
     631             :             alloc_assert (pgm_receiver);
     632             : 
     633             :             int rc = pgm_receiver->init (udp_encapsulation, addr->address.c_str ());
     634             :             errno_assert (rc == 0);
     635             : 
     636             :             send_attach (this, pgm_receiver);
     637             :         }
     638             : 
     639             :         return;
     640             :     }
     641             : #endif
     642             : 
     643             : #ifdef ZMQ_HAVE_NORM
     644             :     if (addr->protocol == "norm") {
     645             :         //  At this point we'll create message pipes to the session straight
     646             :         //  away. There's no point in delaying it as no concept of 'connect'
     647             :         //  exists with NORM anyway.
     648             :         if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB) {
     649             : 
     650             :             //  NORM sender.
     651             :             norm_engine_t* norm_sender = new (std::nothrow) norm_engine_t(io_thread, options);
     652             :             alloc_assert (norm_sender);
     653             : 
     654             :             int rc = norm_sender->init (addr->address.c_str (), true, false);
     655             :             errno_assert (rc == 0);
     656             : 
     657             :             send_attach (this, norm_sender);
     658             :         }
     659             :         else {  // ZMQ_SUB or ZMQ_XSUB
     660             : 
     661             :             //  NORM receiver.
     662             :             norm_engine_t* norm_receiver = new (std::nothrow) norm_engine_t (io_thread, options);
     663             :             alloc_assert (norm_receiver);
     664             : 
     665             :             int rc = norm_receiver->init (addr->address.c_str (), false, true);
     666             :             errno_assert (rc == 0);
     667             : 
     668             :             send_attach (this, norm_receiver);
     669             :         }
     670             :         return;
     671             :     }
     672             : #endif // ZMQ_HAVE_NORM
     673             : 
     674             : #if defined ZMQ_HAVE_VMCI
     675             :     if (addr->protocol == "vmci") {
     676             :         vmci_connecter_t *connecter = new (std::nothrow) vmci_connecter_t (
     677             :                 io_thread, this, options, addr, wait_);
     678             :         alloc_assert (connecter);
     679             :         launch_child (connecter);
     680             :         return;
     681             :     }
     682             : #endif
     683             : 
     684           0 :     zmq_assert (false);
     685             : }

Generated by: LCOV version 1.10