LCOV - code coverage report
Current view: top level - home/h/core/forks/m4-libzmq/src - socket_base.cpp (source / functions) Hit Total Coverage
Test: zeromq-4.2.0 Code Coverage Lines: 621 754 82.4 %
Date: 2016-05-09 Functions: 50 65 76.9 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*
       2             :     Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
       3             : 
       4             :     This file is part of libzmq, the ZeroMQ core engine in C++.
       5             : 
       6             :     libzmq is free software; you can redistribute it and/or modify it under
       7             :     the terms of the GNU Lesser General Public License (LGPL) as published
       8             :     by the Free Software Foundation; either version 3 of the License, or
       9             :     (at your option) any later version.
      10             : 
      11             :     As a special exception, the Contributors give you permission to link
      12             :     this library with independent modules to produce an executable,
      13             :     regardless of the license terms of these independent modules, and to
      14             :     copy and distribute the resulting executable under terms of your choice,
      15             :     provided that you also meet, for each linked independent module, the
      16             :     terms and conditions of the license of that module. An independent
      17             :     module is a module which is not derived from or based on this library.
      18             :     If you modify this library, you must extend this exception to your
      19             :     version of the library.
      20             : 
      21             :     libzmq is distributed in the hope that it will be useful, but WITHOUT
      22             :     ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
      23             :     FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
      24             :     License for more details.
      25             : 
      26             :     You should have received a copy of the GNU Lesser General Public License
      27             :     along with this program.  If not, see <http://www.gnu.org/licenses/>.
      28             : */
      29             : 
      30             : #include "precompiled.hpp"
      31             : #include <new>
      32             : #include <string>
      33             : #include <algorithm>
      34             : 
      35             : #include "macros.hpp"
      36             : #include "platform.hpp"
      37             : 
      38             : #if defined ZMQ_HAVE_WINDOWS
      39             : #include "windows.hpp"
      40             : #if defined _MSC_VER
      41             : #if defined _WIN32_WCE
      42             : #include <cmnintrin.h>
      43             : #else
      44             : #include <intrin.h>
      45             : #endif
      46             : #endif
      47             : #else
      48             : #include <unistd.h>
      49             : #include <ctype.h>
      50             : #endif
      51             : 
      52             : #include "socket_base.hpp"
      53             : #include "tcp_listener.hpp"
      54             : #include "ipc_listener.hpp"
      55             : #include "tipc_listener.hpp"
      56             : #include "tcp_connecter.hpp"
      57             : #include "io_thread.hpp"
      58             : #include "session_base.hpp"
      59             : #include "config.hpp"
      60             : #include "pipe.hpp"
      61             : #include "err.hpp"
      62             : #include "ctx.hpp"
      63             : #include "platform.hpp"
      64             : #include "likely.hpp"
      65             : #include "msg.hpp"
      66             : #include "address.hpp"
      67             : #include "ipc_address.hpp"
      68             : #include "tcp_address.hpp"
      69             : #include "udp_address.hpp"
      70             : #include "tipc_address.hpp"
      71             : #include "mailbox.hpp"
      72             : #include "mailbox_safe.hpp"
      73             : 
      74             : #if defined ZMQ_HAVE_VMCI
      75             : #include "vmci_address.hpp"
      76             : #include "vmci_listener.hpp"
      77             : #endif
      78             : 
      79             : #ifdef ZMQ_HAVE_OPENPGM
      80             : #include "pgm_socket.hpp"
      81             : #endif
      82             : 
      83             : #include "pair.hpp"
      84             : #include "pub.hpp"
      85             : #include "sub.hpp"
      86             : #include "req.hpp"
      87             : #include "rep.hpp"
      88             : #include "pull.hpp"
      89             : #include "push.hpp"
      90             : #include "dealer.hpp"
      91             : #include "router.hpp"
      92             : #include "xpub.hpp"
      93             : #include "xsub.hpp"
      94             : #include "stream.hpp"
      95             : #include "server.hpp"
      96             : #include "client.hpp"
      97             : #include "radio.hpp"
      98             : #include "dish.hpp"
      99             : #include "gather.hpp"
     100             : #include "scatter.hpp"
     101             : 
     102             : #define ENTER_MUTEX() \
     103             :     if (thread_safe) \
     104             :         sync.lock();
     105             : 
     106             : #define EXIT_MUTEX(); \
     107             :     if (thread_safe) \
     108             :         sync.unlock();
     109             : 
     110     7508899 : bool zmq::socket_base_t::check_tag ()
     111             : {
     112     7508899 :     return tag == 0xbaddecaf;
     113             : }
     114             : 
     115       11197 : zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_,
     116             :     uint32_t tid_, int sid_)
     117             : {
     118       11197 :     socket_base_t *s = NULL;
     119       11197 :     switch (type_) {
     120             :         case ZMQ_PAIR:
     121        6376 :             s = new (std::nothrow) pair_t (parent_, tid_, sid_);
     122        6376 :             break;
     123             :         case ZMQ_PUB:
     124         234 :             s = new (std::nothrow) pub_t (parent_, tid_, sid_);
     125         234 :             break;
     126             :         case ZMQ_SUB:
     127        3243 :             s = new (std::nothrow) sub_t (parent_, tid_, sid_);
     128        3243 :             break;
     129             :         case ZMQ_REQ:
     130         108 :             s = new (std::nothrow) req_t (parent_, tid_, sid_);
     131         108 :             break;
     132             :         case ZMQ_REP:
     133         150 :             s = new (std::nothrow) rep_t (parent_, tid_, sid_);
     134         150 :             break;
     135             :         case ZMQ_DEALER:
     136         600 :             s = new (std::nothrow) dealer_t (parent_, tid_, sid_);
     137         600 :             break;
     138             :         case ZMQ_ROUTER:
     139          87 :             s = new (std::nothrow) router_t (parent_, tid_, sid_);
     140          87 :             break;
     141             :         case ZMQ_PULL:
     142         117 :             s = new (std::nothrow) pull_t (parent_, tid_, sid_);
     143         117 :             break;
     144             :         case ZMQ_PUSH:
     145         156 :             s = new (std::nothrow) push_t (parent_, tid_, sid_);
     146         156 :             break;
     147             :         case ZMQ_XPUB:
     148          24 :             s = new (std::nothrow) xpub_t (parent_, tid_, sid_);
     149          24 :             break;
     150             :         case ZMQ_XSUB:
     151          18 :             s = new (std::nothrow) xsub_t (parent_, tid_, sid_);
     152          18 :             break;
     153             :         case ZMQ_STREAM:
     154          33 :             s = new (std::nothrow) stream_t (parent_, tid_, sid_);
     155          33 :             break;
     156             :         case ZMQ_SERVER:
     157          15 :             s = new (std::nothrow) server_t (parent_, tid_, sid_);
     158          15 :             break;
     159             :         case ZMQ_CLIENT:
     160          15 :             s = new (std::nothrow) client_t (parent_, tid_, sid_);
     161          15 :             break;
     162             :         case ZMQ_RADIO:
     163           6 :             s = new (std::nothrow) radio_t (parent_, tid_, sid_);
     164           6 :             break;
     165             :         case ZMQ_DISH:
     166           6 :             s = new (std::nothrow) dish_t (parent_, tid_, sid_);
     167           6 :             break;
     168             :         case ZMQ_GATHER:
     169           6 :             s = new (std::nothrow) gather_t (parent_, tid_, sid_);
     170           6 :             break;
     171             :         case ZMQ_SCATTER:
     172           3 :             s = new (std::nothrow) scatter_t (parent_, tid_, sid_);
     173           3 :             break;
     174             :         default:
     175           0 :             errno = EINVAL;
     176           0 :             return NULL;
     177             :     }
     178             : 
     179       11197 :     alloc_assert (s);
     180             : 
     181       11197 :     if (s->mailbox == NULL) {
     182          66 :         s->destroyed = true;
     183          66 :         LIBZMQ_DELETE(s);
     184             :         return NULL;
     185             :     }
     186             : 
     187             :     return s;
     188             : }
     189             : 
     190       11197 : zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_, int sid_, bool thread_safe_) :
     191             :     own_t (parent_, tid_),
     192             :     tag (0xbaddecaf),
     193             :     ctx_terminated (false),
     194             :     destroyed (false),
     195             :     poller(NULL),
     196             :     handle(NULL),
     197             :     last_tsc (0),
     198             :     ticks (0),
     199             :     rcvmore (false),
     200             :     monitor_socket (NULL),
     201             :     monitor_events (0),
     202             :     thread_safe (thread_safe_),
     203      100773 :     reaper_signaler (NULL)
     204             : {
     205       11197 :     options.socket_id = sid_;
     206       11197 :     options.ipv6 = (parent_->get (ZMQ_IPV6) != 0);
     207       11197 :     options.linger = parent_->get (ZMQ_BLOCKY)? -1: 0;
     208             : 
     209       11197 :     if (thread_safe)
     210          51 :         mailbox = new mailbox_safe_t(&sync);
     211             :     else {
     212       11146 :         mailbox_t *m = new mailbox_t();
     213       11146 :         if (m->get_fd () != retired_fd)
     214       11080 :             mailbox = m;
     215             :         else {
     216          66 :             LIBZMQ_DELETE (m);
     217          66 :             mailbox = NULL;
     218             :         }
     219             :     }
     220       11197 : }
     221             : 
     222      111970 : zmq::socket_base_t::~socket_base_t ()
     223             : {
     224       11197 :     if (mailbox)
     225       11131 :         LIBZMQ_DELETE(mailbox);
     226             : 
     227       11197 :     if (reaper_signaler)
     228          51 :         LIBZMQ_DELETE(reaper_signaler);
     229             : 
     230       11197 :     stop_monitor ();
     231       11197 :     zmq_assert (destroyed);
     232       11197 : }
     233             : 
     234       11131 : zmq::i_mailbox *zmq::socket_base_t::get_mailbox ()
     235             : {
     236       11131 :     return mailbox;
     237             : }
     238             : 
     239        8247 : void zmq::socket_base_t::stop ()
     240             : {
     241             :     //  Called by ctx when it is terminated (zmq_ctx_term).
     242             :     //  'stop' command is sent from the threads that called zmq_ctx_term to
     243             :     //  the thread owning the socket. This way, blocking call in the
     244             :     //  owner thread can be interrupted.
     245        8247 :     send_stop ();
     246        8247 : }
     247             : 
     248        5143 : int zmq::socket_base_t::parse_uri (const char *uri_,
     249             :                         std::string &protocol_, std::string &address_)
     250             : {
     251        5143 :     zmq_assert (uri_ != NULL);
     252             : 
     253        5143 :     std::string uri (uri_);
     254        5143 :     std::string::size_type pos = uri.find ("://");
     255        5143 :     if (pos == std::string::npos) {
     256           0 :         errno = EINVAL;
     257           0 :         return -1;
     258             :     }
     259       10286 :     protocol_ = uri.substr (0, pos);
     260       10285 :     address_ = uri.substr (pos + 3);
     261             : 
     262       10284 :     if (protocol_.empty () || address_.empty ()) {
     263           3 :         errno = EINVAL;
     264           3 :         return -1;
     265             :     }
     266             :     return 0;
     267             : }
     268             : 
     269        5139 : int zmq::socket_base_t::check_protocol (const std::string &protocol_)
     270             : {
     271             :     //  First check out whether the protocol is something we are aware of.
     272        5139 :     if (protocol_ != "inproc"
     273             : #if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
     274        4181 :     &&  protocol_ != "ipc"
     275             : #endif
     276        4049 :     &&  protocol_ != "tcp"
     277             : #if defined ZMQ_HAVE_OPENPGM
     278             :     //  pgm/epgm transports only available if 0MQ is compiled with OpenPGM.
     279             :     &&  protocol_ != "pgm"
     280             :     &&  protocol_ != "epgm"
     281             : #endif
     282             : #if defined ZMQ_HAVE_TIPC
     283             :     // TIPC transport is only available on Linux.
     284             :     &&  protocol_ != "tipc"
     285             : #endif
     286             : #if defined ZMQ_HAVE_NORM
     287             :     &&  protocol_ != "norm"
     288             : #endif
     289             : #if defined ZMQ_HAVE_VMCI
     290             :     &&  protocol_ != "vmci"
     291             : #endif
     292        5151 :     &&  protocol_ != "udp") {
     293           3 :         errno = EPROTONOSUPPORT;
     294           3 :         return -1;
     295             :     }
     296             : 
     297             :     //  Check whether socket type and transport protocol match.
     298             :     //  Specifically, multicast protocols can't be combined with
     299             :     //  bi-directional messaging patterns (socket types).
     300             : #if defined ZMQ_HAVE_OPENPGM || defined ZMQ_HAVE_NORM
     301             :     if ((protocol_ == "pgm" || protocol_ == "epgm" || protocol_ == "norm") &&
     302             :           options.type != ZMQ_PUB && options.type != ZMQ_SUB &&
     303             :           options.type != ZMQ_XPUB && options.type != ZMQ_XSUB) {
     304             :         errno = ENOCOMPATPROTO;
     305             :         return -1;
     306             :     }
     307             : #endif
     308             : 
     309        5135 :     if (protocol_ == "udp" && (options.type != ZMQ_DISH &&
     310             :                                options.type != ZMQ_RADIO)) {
     311           0 :         errno = ENOCOMPATPROTO;
     312           0 :         return -1;
     313             :     }
     314             : 
     315             :     //  Protocol is available.
     316             :     return 0;
     317             : }
     318             : 
     319        8417 : void zmq::socket_base_t::attach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
     320             : {
     321             :     //  First, register the pipe so that we can terminate it later on.
     322        8417 :     pipe_->set_event_sink (this);
     323        8417 :     pipes.push_back (pipe_);
     324             : 
     325             :     //  Let the derived socket type know about new pipe.
     326        8417 :     xattach_pipe (pipe_, subscribe_to_all_);
     327             : 
     328             :     //  If the socket is already being closed, ask any new pipes to terminate
     329             :     //  straight away.
     330        8417 :     if (is_terminating ()) {
     331        2823 :         register_term_acks (1);
     332        2823 :         pipe_->terminate (false);
     333             :     }
     334        8417 : }
     335             : 
     336        2613 : int zmq::socket_base_t::setsockopt (int option_, const void *optval_,
     337             :     size_t optvallen_)
     338             : {
     339        2613 :     ENTER_MUTEX ();
     340             : 
     341        2613 :     if (!options.is_valid(option_)) {
     342           0 :         errno = EINVAL;
     343           0 :         EXIT_MUTEX ();
     344             :         return -1;
     345             :     }
     346             : 
     347        2613 :     if (unlikely (ctx_terminated)) {
     348           6 :         errno = ETERM;
     349           6 :         EXIT_MUTEX ();
     350             :         return -1;
     351             :     }
     352             : 
     353             :     //  First, check whether specific socket type overloads the option.
     354        2607 :     int rc = xsetsockopt (option_, optval_, optvallen_);
     355        2607 :     if (rc == 0 || errno != EINVAL) {
     356         210 :         EXIT_MUTEX ();
     357         210 :         return rc;
     358             :     }
     359             : 
     360             :     //  If the socket type doesn't support the option, pass it to
     361             :     //  the generic option parser.
     362        2397 :     rc = options.setsockopt (option_, optval_, optvallen_);
     363        2397 :     update_pipe_options(option_);
     364             : 
     365        2397 :     EXIT_MUTEX ();
     366        2397 :     return rc;
     367             : }
     368             : 
     369     2654195 : int zmq::socket_base_t::getsockopt (int option_, void *optval_,
     370             :     size_t *optvallen_)
     371             : {
     372     2654195 :     ENTER_MUTEX ();
     373             : 
     374     2654195 :     if (unlikely (ctx_terminated)) {
     375           0 :         errno = ETERM;
     376           0 :         EXIT_MUTEX ();
     377             :         return -1;
     378             :     }
     379             : 
     380     2654195 :     if (option_ == ZMQ_RCVMORE) {
     381        1020 :         if (*optvallen_ < sizeof (int)) {
     382           0 :             errno = EINVAL;
     383           0 :             EXIT_MUTEX ();
     384             :             return -1;
     385             :         }
     386        1020 :         memset(optval_, 0, *optvallen_);
     387        1020 :         *((int*) optval_) = rcvmore ? 1 : 0;
     388        1020 :         *optvallen_ = sizeof (int);
     389        1020 :         EXIT_MUTEX ();
     390             :         return 0;
     391             :     }
     392             : 
     393     2653175 :     if (option_ == ZMQ_FD) {
     394     1325879 :         if (*optvallen_ < sizeof (fd_t)) {
     395           0 :             errno = EINVAL;
     396           0 :             EXIT_MUTEX ();
     397             :             return -1;
     398             :         }
     399             : 
     400     1325879 :         if (thread_safe) {
     401             :             // thread safe socket doesn't provide file descriptor
     402           0 :             errno = EINVAL;
     403           0 :             EXIT_MUTEX ();
     404             :             return -1;
     405             :         }
     406             : 
     407     1325879 :         *((fd_t*)optval_) = ((mailbox_t*)mailbox)->get_fd();
     408     1325879 :         *optvallen_ = sizeof(fd_t);
     409             : 
     410     1325879 :         EXIT_MUTEX ();
     411             :         return 0;
     412             :     }
     413             : 
     414     1327296 :     if (option_ == ZMQ_EVENTS) {
     415     1327173 :         if (*optvallen_ < sizeof (int)) {
     416           0 :             errno = EINVAL;
     417           0 :             EXIT_MUTEX ();
     418             :             return -1;
     419             :         }
     420     1327173 :         int rc = process_commands (0, false);
     421     1327173 :         if (rc != 0 && (errno == EINTR || errno == ETERM)) {
     422           0 :             EXIT_MUTEX ();
     423             :             return -1;
     424             :         }
     425     1327173 :         errno_assert (rc == 0);
     426     1327173 :         *((int*) optval_) = 0;
     427     1327171 :         if (has_out ())
     428        1366 :             *((int*) optval_) |= ZMQ_POLLOUT;
     429     1327171 :         if (has_in ())
     430      530030 :             *((int*) optval_) |= ZMQ_POLLIN;
     431     1327171 :         *optvallen_ = sizeof (int);
     432     1327171 :         EXIT_MUTEX ();
     433             :         return 0;
     434             :     }
     435             : 
     436         123 :     if (option_ == ZMQ_LAST_ENDPOINT) {
     437          66 :         if (*optvallen_ < last_endpoint.size () + 1) {
     438           0 :             errno = EINVAL;
     439           0 :             EXIT_MUTEX ();
     440             :             return -1;
     441             :         }
     442          33 :         strncpy(static_cast <char *> (optval_), last_endpoint.c_str(), last_endpoint.size() + 1);
     443          66 :         *optvallen_ = last_endpoint.size () + 1;
     444          33 :         EXIT_MUTEX ();
     445             :         return 0;
     446             :     }
     447             : 
     448          90 :     if (option_ == ZMQ_THREAD_SAFE) {
     449          30 :         if (*optvallen_ < sizeof (int)) {
     450           0 :             errno = EINVAL;
     451           0 :             EXIT_MUTEX ();
     452             :             return -1;
     453             :         }
     454          30 :         memset(optval_, 0, *optvallen_);
     455          30 :         *((int*) optval_) = thread_safe ? 1 : 0;
     456          30 :         *optvallen_ = sizeof (int);
     457          30 :         EXIT_MUTEX ();
     458             :         return 0;
     459             :     }
     460             : 
     461          60 :     int rc = options.getsockopt (option_, optval_, optvallen_);
     462          60 :     EXIT_MUTEX ();
     463          60 :     return rc;
     464             : }
     465             : 
     466          15 : int zmq::socket_base_t::join (const char* group_)
     467             : {
     468          15 :     ENTER_MUTEX ();
     469             : 
     470          15 :     int rc = xjoin (group_);
     471             : 
     472          15 :     EXIT_MUTEX();
     473             : 
     474          15 :     return rc;
     475             : }
     476             : 
     477           6 : int zmq::socket_base_t::leave (const char* group_)
     478             : {
     479           6 :     ENTER_MUTEX ();
     480             : 
     481           6 :     int rc = xleave (group_);
     482             : 
     483           6 :     EXIT_MUTEX();
     484             : 
     485           6 :     return rc;
     486             : }
     487             : 
     488           3 : int zmq::socket_base_t::add_signaler(signaler_t *s_)
     489             : {
     490           3 :     ENTER_MUTEX ();
     491             : 
     492           3 :     if (!thread_safe) {
     493           0 :         errno = EINVAL;
     494           0 :         EXIT_MUTEX ();
     495             :         return -1;
     496             :     }
     497             : 
     498           3 :     ((mailbox_safe_t*)mailbox)->add_signaler(s_);
     499             : 
     500           3 :     EXIT_MUTEX ();
     501             :     return 0;
     502             : }
     503             : 
     504           0 : int zmq::socket_base_t::remove_signaler(signaler_t *s_)
     505             : {
     506           0 :     ENTER_MUTEX ();
     507             : 
     508           0 :     if (!thread_safe) {
     509           0 :         errno = EINVAL;
     510           0 :         EXIT_MUTEX ();
     511             :         return -1;
     512             :     }
     513             : 
     514           0 :     ((mailbox_safe_t*)mailbox)->remove_signaler(s_);
     515             : 
     516           0 :     EXIT_MUTEX ();
     517             :     return 0;
     518             : }
     519             : 
     520         703 : int zmq::socket_base_t::bind (const char *addr_)
     521             : {
     522         703 :     ENTER_MUTEX ();
     523             : 
     524         703 :     if (unlikely (ctx_terminated)) {
     525           0 :         errno = ETERM;
     526           0 :         EXIT_MUTEX ();
     527             :         return -1;
     528             :     }
     529             : 
     530             :     //  Process pending commands, if any.
     531         703 :     int rc = process_commands (0, false);
     532         703 :     if (unlikely (rc != 0)) {
     533           0 :         EXIT_MUTEX ();
     534             :         return -1;
     535             :     }
     536             : 
     537             :     //  Parse addr_ string.
     538             :     std::string protocol;
     539             :     std::string address;
     540         703 :     if (parse_uri (addr_, protocol, address) || check_protocol (protocol)) {
     541           0 :         EXIT_MUTEX ();
     542             :         return -1;
     543             :     }
     544             : 
     545         703 :     if (protocol == "inproc") {
     546         358 :         const endpoint_t endpoint = { this, options };
     547         358 :         rc = register_endpoint (addr_, endpoint);
     548         358 :         if (rc == 0) {
     549         358 :             connect_pending (addr_, this);
     550         358 :             last_endpoint.assign (addr_);
     551         358 :             options.connected = true;
     552             :         }
     553         358 :         EXIT_MUTEX ();
     554         358 :         return rc;
     555             :     }
     556             : 
     557        1380 :     if (protocol == "pgm" || protocol == "epgm" || protocol == "norm" || protocol == "udp") {
     558             :         //  For convenience's sake, bind can be used interchangeable with
     559             :         //  connect for PGM, EPGM, NORM and UDP transports.
     560           3 :         EXIT_MUTEX ();
     561           3 :         rc = connect (addr_);
     562           3 :         if (rc != -1)
     563           3 :             options.connected = true;
     564           3 :         return rc;
     565             :     }
     566             : 
     567             :     //  Remaining transports require to be run in an I/O thread, so at this
     568             :     //  point we'll choose one.
     569         342 :     io_thread_t *io_thread = choose_io_thread (options.affinity);
     570         342 :     if (!io_thread) {
     571           0 :         errno = EMTHREAD;
     572           0 :         EXIT_MUTEX ();
     573             :         return -1;
     574             :     }
     575             : 
     576         342 :     if (protocol == "tcp") {
     577             :         tcp_listener_t *listener = new (std::nothrow) tcp_listener_t (
     578         279 :             io_thread, this, options);
     579         279 :         alloc_assert (listener);
     580         279 :         rc = listener->set_address (address.c_str ());
     581         279 :         if (rc != 0) {
     582           0 :             LIBZMQ_DELETE(listener);
     583           0 :             event_bind_failed (address, zmq_errno());
     584           0 :             EXIT_MUTEX ();
     585             :             return -1;
     586             :         }
     587             : 
     588             :         // Save last endpoint URI
     589         279 :         listener->get_address (last_endpoint);
     590             : 
     591         558 :         add_endpoint (last_endpoint.c_str (), (own_t *) listener, NULL);
     592         279 :         options.connected = true;
     593         279 :         EXIT_MUTEX ();
     594             :         return 0;
     595             :     }
     596             : 
     597             : #if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
     598          63 :     if (protocol == "ipc") {
     599             :         ipc_listener_t *listener = new (std::nothrow) ipc_listener_t (
     600          63 :             io_thread, this, options);
     601          63 :         alloc_assert (listener);
     602          63 :         int rc = listener->set_address (address.c_str ());
     603          63 :         if (rc != 0) {
     604           0 :             LIBZMQ_DELETE(listener);
     605           0 :             event_bind_failed (address, zmq_errno());
     606           0 :             EXIT_MUTEX ();
     607             :             return -1;
     608             :         }
     609             : 
     610             :         // Save last endpoint URI
     611          63 :         listener->get_address (last_endpoint);
     612             : 
     613         126 :         add_endpoint (last_endpoint.c_str (), (own_t *) listener, NULL);
     614          63 :         options.connected = true;
     615          63 :         EXIT_MUTEX ();
     616             :         return 0;
     617             :     }
     618             : #endif
     619             : #if defined ZMQ_HAVE_TIPC
     620             :     if (protocol == "tipc") {
     621             :          tipc_listener_t *listener = new (std::nothrow) tipc_listener_t (
     622             :               io_thread, this, options);
     623             :          alloc_assert (listener);
     624             :          int rc = listener->set_address (address.c_str ());
     625             :          if (rc != 0) {
     626             :              LIBZMQ_DELETE(listener);
     627             :              event_bind_failed (address, zmq_errno());
     628             :              EXIT_MUTEX ();
     629             :              return -1;
     630             :          }
     631             : 
     632             :         // Save last endpoint URI
     633             :         listener->get_address (last_endpoint);
     634             : 
     635             :         add_endpoint (addr_, (own_t *) listener, NULL);
     636             :         options.connected = true;
     637             :         EXIT_MUTEX ();
     638             :         return 0;
     639             :     }
     640             : #endif
     641             : #if defined ZMQ_HAVE_VMCI
     642             :     if (protocol == "vmci") {
     643             :         vmci_listener_t *listener = new (std::nothrow) vmci_listener_t (
     644             :             io_thread, this, options);
     645             :         alloc_assert (listener);
     646             :         int rc = listener->set_address (address.c_str ());
     647             :         if (rc != 0) {
     648             :             LIBZMQ_DELETE(listener);
     649             :             event_bind_failed (address, zmq_errno ());
     650             :             EXIT_MUTEX ();
     651             :             return -1;
     652             :         }
     653             : 
     654             :         listener->get_address (last_endpoint);
     655             : 
     656             :         add_endpoint (last_endpoint.c_str(), (own_t *) listener, NULL);
     657             :         options.connected = true;
     658             :         EXIT_MUTEX ();
     659             :         return 0;
     660             :     }
     661             : #endif
     662             : 
     663           0 :     EXIT_MUTEX ();
     664           0 :     zmq_assert (false);
     665             :     return -1;
     666             : }
     667             : 
     668        4329 : int zmq::socket_base_t::connect (const char *addr_)
     669             : {
     670        4329 :     ENTER_MUTEX ();
     671             : 
     672        4329 :     if (unlikely (ctx_terminated)) {
     673           0 :         errno = ETERM;
     674           0 :         EXIT_MUTEX ();
     675             :         return -1;
     676             :     }
     677             : 
     678             :     //  Process pending commands, if any.
     679        4329 :     int rc = process_commands (0, false);
     680        4329 :     if (unlikely (rc != 0)) {
     681           0 :         EXIT_MUTEX ();
     682             :         return -1;
     683             :     }
     684             : 
     685             :     //  Parse addr_ string.
     686             :     std::string protocol;
     687             :     std::string address;
     688        4329 :     if (parse_uri (addr_, protocol, address) || check_protocol (protocol)) {
     689           6 :         EXIT_MUTEX ();
     690             :         return -1;
     691             :     }
     692             : 
     693        4323 :     if (protocol == "inproc") {
     694             : 
     695             :         //  TODO: inproc connect is specific with respect to creating pipes
     696             :         //  as there's no 'reconnect' functionality implemented. Once that
     697             :         //  is in place we should follow generic pipe creation algorithm.
     698             : 
     699             :         //  Find the peer endpoint.
     700         567 :         endpoint_t peer = find_endpoint (addr_);
     701             : 
     702             :         // The total HWM for an inproc connection should be the sum of
     703             :         // the binder's HWM and the connector's HWM.
     704         567 :         int sndhwm = 0;
     705         567 :         if (peer.socket == NULL)
     706         276 :             sndhwm = options.sndhwm;
     707         291 :         else if (options.sndhwm != 0 && peer.options.rcvhwm != 0)
     708         282 :             sndhwm = options.sndhwm + peer.options.rcvhwm;
     709         567 :         int rcvhwm = 0;
     710         567 :         if (peer.socket == NULL)
     711         276 :             rcvhwm = options.rcvhwm;
     712             :         else
     713         291 :         if (options.rcvhwm != 0 && peer.options.sndhwm != 0)
     714         291 :             rcvhwm = options.rcvhwm + peer.options.sndhwm;
     715             : 
     716             :         //  Create a bi-directional pipe to connect the peers.
     717         567 :         object_t *parents [2] = {this, peer.socket == NULL ? this : peer.socket};
     718         567 :         pipe_t *new_pipes [2] = {NULL, NULL};
     719             : 
     720         567 :         bool conflate = options.conflate &&
     721           0 :             (options.type == ZMQ_DEALER ||
     722           0 :              options.type == ZMQ_PULL ||
     723           0 :              options.type == ZMQ_PUSH ||
     724           0 :              options.type == ZMQ_PUB ||
     725         567 :              options.type == ZMQ_SUB);
     726             : 
     727         567 :         int hwms [2] = {conflate? -1 : sndhwm, conflate? -1 : rcvhwm};
     728         567 :         bool conflates [2] = {conflate, conflate};
     729         567 :         rc = pipepair (parents, new_pipes, hwms, conflates);
     730         567 :         if (!conflate) {
     731         567 :             new_pipes[0]->set_hwms_boost(peer.options.sndhwm, peer.options.rcvhwm);
     732         567 :             new_pipes[1]->set_hwms_boost(options.sndhwm, options.rcvhwm);
     733             :         }
     734             : 
     735         567 :         errno_assert (rc == 0);
     736             : 
     737         567 :         if (!peer.socket) {
     738             :             //  The peer doesn't exist yet so we don't know whether
     739             :             //  to send the identity message or not. To resolve this,
     740             :             //  we always send our identity and drop it later if
     741             :             //  the peer doesn't expect it.
     742             :             msg_t id;
     743         276 :             rc = id.init_size (options.identity_size);
     744         276 :             errno_assert (rc == 0);
     745         276 :             memcpy (id.data (), options.identity, options.identity_size);
     746         276 :             id.set_flags (msg_t::identity);
     747         276 :             bool written = new_pipes [0]->write (&id);
     748         276 :             zmq_assert (written);
     749         276 :             new_pipes [0]->flush ();
     750             : 
     751         276 :             const endpoint_t endpoint = {this, options};
     752         552 :             pend_connection (std::string (addr_), endpoint, new_pipes);
     753             :         }
     754             :         else {
     755             :             //  If required, send the identity of the local socket to the peer.
     756         291 :             if (peer.options.recv_identity) {
     757             :                 msg_t id;
     758          42 :                 rc = id.init_size (options.identity_size);
     759          42 :                 errno_assert (rc == 0);
     760          42 :                 memcpy (id.data (), options.identity, options.identity_size);
     761          42 :                 id.set_flags (msg_t::identity);
     762          42 :                 bool written = new_pipes [0]->write (&id);
     763          42 :                 zmq_assert (written);
     764          42 :                 new_pipes [0]->flush ();
     765             :             }
     766             : 
     767             :             //  If required, send the identity of the peer to the local socket.
     768         291 :             if (options.recv_identity) {
     769             :                 msg_t id;
     770          33 :                 rc = id.init_size (peer.options.identity_size);
     771          33 :                 errno_assert (rc == 0);
     772          33 :                 memcpy (id.data (), peer.options.identity, peer.options.identity_size);
     773          33 :                 id.set_flags (msg_t::identity);
     774          33 :                 bool written = new_pipes [1]->write (&id);
     775          33 :                 zmq_assert (written);
     776          33 :                 new_pipes [1]->flush ();
     777             :             }
     778             : 
     779             :             //  Attach remote end of the pipe to the peer socket. Note that peer's
     780             :             //  seqnum was incremented in find_endpoint function. We don't need it
     781             :             //  increased here.
     782         291 :             send_bind (peer.socket, new_pipes [1], false);
     783             :         }
     784             : 
     785             :         //  Attach local end of the pipe to this socket object.
     786         567 :         attach_pipe (new_pipes [0]);
     787             : 
     788             :         // Save last endpoint URI
     789         567 :         last_endpoint.assign (addr_);
     790             : 
     791             :         // remember inproc connections for disconnect
     792        2268 :         inprocs.insert (inprocs_t::value_type (std::string (addr_), new_pipes [0]));
     793             : 
     794         567 :         options.connected = true;
     795         567 :         EXIT_MUTEX ();
     796         567 :         return 0;
     797             :     }
     798        3756 :     bool is_single_connect = (options.type == ZMQ_DEALER ||
     799        3756 :                               options.type == ZMQ_SUB ||
     800        3756 :                               options.type == ZMQ_REQ);
     801        3756 :     if (unlikely (is_single_connect)) {
     802       10627 :         const endpoints_t::iterator it = endpoints.find (addr_);
     803        7086 :         if (it != endpoints.end ()) {
     804             :             // There is no valid use for multiple connects for SUB-PUB nor
     805             :             // DEALER-ROUTER nor REQ-REP. Multiple connects produces
     806             :             // nonsensical results.
     807           0 :             EXIT_MUTEX ();
     808             :             return 0;
     809             :         }
     810             :     }
     811             : 
     812             :     //  Choose the I/O thread to run the session in.
     813        3756 :     io_thread_t *io_thread = choose_io_thread (options.affinity);
     814        3753 :     if (!io_thread) {
     815           0 :         errno = EMTHREAD;
     816           0 :         EXIT_MUTEX ();
     817             :         return -1;
     818             :     }
     819             : 
     820        3753 :     address_t *paddr = new (std::nothrow) address_t (protocol, address, this->get_ctx ());
     821        3756 :     alloc_assert (paddr);
     822             : 
     823             :     //  Resolve address (if needed by the protocol)
     824        3753 :     if (protocol == "tcp") {
     825             :         //  Do some basic sanity checks on tcp:// address syntax
     826             :         //  - hostname starts with digit or letter, with embedded '-' or '.'
     827             :         //  - IPv6 address may contain hex chars and colons.
     828             :         //  - IPv6 link local address may contain % followed by interface name / zone_id
     829             :         //    (Reference: https://tools.ietf.org/html/rfc4007)
     830             :         //  - IPv4 address may contain decimal digits and dots.
     831             :         //  - Address must end in ":port" where port is *, or numeric
     832             :         //  - Address may contain two parts separated by ':'
     833             :         //  Following code is quick and dirty check to catch obvious errors,
     834             :         //  without trying to be fully accurate.
     835        3684 :         const char *check = address.c_str ();
     836        3684 :         if (isalnum (*check) || isxdigit (*check) || *check == '[') {
     837        3686 :             check++;
     838       55340 :             while (isalnum  (*check)
     839       17814 :                 || isxdigit (*check)
     840       17813 :                 || *check == '.' || *check == '-' || *check == ':' || *check == '%'
     841        3706 :                 || *check == ';' || *check == ']' || *check == '_'
     842             :             ) {
     843       47968 :                 check++;
     844             :             }
     845             :         }
     846             :         //  Assume the worst, now look for success
     847        3684 :         rc = -1;
     848             :         //  Did we reach the end of the address safely?
     849        3684 :         if (*check == 0) {
     850             :             //  Do we have a valid port string? (cannot be '*' in connect
     851        7354 :             check = strrchr (address.c_str (), ':');
     852        3677 :             if (check) {
     853        3678 :                 check++;
     854        3678 :                 if (*check && (isdigit (*check)))
     855        3675 :                     rc = 0;     //  Valid
     856             :             }
     857             :         }
     858        3684 :         if (rc == -1) {
     859           9 :             errno = EINVAL;
     860           9 :             LIBZMQ_DELETE(paddr);
     861           9 :             EXIT_MUTEX ();
     862             :             return -1;
     863             :         }
     864             :         //  Defer resolution until a socket is opened
     865        3675 :         paddr->resolved.tcp_addr = NULL;
     866             :     }
     867             : #if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
     868             :     else
     869          69 :     if (protocol == "ipc") {
     870          63 :         paddr->resolved.ipc_addr = new (std::nothrow) ipc_address_t ();
     871          63 :         alloc_assert (paddr->resolved.ipc_addr);
     872          63 :         int rc = paddr->resolved.ipc_addr->resolve (address.c_str ());
     873          63 :         if (rc != 0) {
     874           0 :             LIBZMQ_DELETE(paddr);
     875           0 :             EXIT_MUTEX ();
     876             :             return -1;
     877             :         }
     878             :     }
     879             : #endif
     880             : 
     881        3744 : if (protocol  == "udp") {
     882           6 :     paddr->resolved.udp_addr = new (std::nothrow) udp_address_t ();
     883           6 :     alloc_assert (paddr->resolved.udp_addr);
     884          12 :     rc = paddr->resolved.udp_addr->resolve (address.c_str(), options.type == ZMQ_DISH);
     885           6 :     if (rc != 0) {
     886           0 :         LIBZMQ_DELETE(paddr);
     887           0 :         EXIT_MUTEX ();
     888             :         return -1;
     889             :     }
     890             : }
     891             : 
     892             : // TBD - Should we check address for ZMQ_HAVE_NORM???
     893             : 
     894             : #ifdef ZMQ_HAVE_OPENPGM
     895             :     if (protocol == "pgm" || protocol == "epgm") {
     896             :         struct pgm_addrinfo_t *res = NULL;
     897             :         uint16_t port_number = 0;
     898             :         int rc = pgm_socket_t::init_address(address.c_str(), &res, &port_number);
     899             :         if (res != NULL)
     900             :             pgm_freeaddrinfo (res);
     901             :         if (rc != 0 || port_number == 0) {
     902             :           EXIT_MUTEX ();
     903             :           return -1;
     904             :         }
     905             :     }
     906             : #endif
     907             : #if defined ZMQ_HAVE_TIPC
     908             :     else
     909             :     if (protocol == "tipc") {
     910             :         paddr->resolved.tipc_addr = new (std::nothrow) tipc_address_t ();
     911             :         alloc_assert (paddr->resolved.tipc_addr);
     912             :         int rc = paddr->resolved.tipc_addr->resolve (address.c_str());
     913             :         if (rc != 0) {
     914             :             LIBZMQ_DELETE(paddr);
     915             :             EXIT_MUTEX ();
     916             :             return -1;
     917             :         }
     918             :     }
     919             : #endif
     920             : #if defined ZMQ_HAVE_VMCI
     921             :     else
     922             :     if (protocol == "vmci") {
     923             :         paddr->resolved.vmci_addr = new (std::nothrow) vmci_address_t (this->get_ctx ());
     924             :         alloc_assert (paddr->resolved.vmci_addr);
     925             :         int rc = paddr->resolved.vmci_addr->resolve (address.c_str ());
     926             :         if (rc != 0) {
     927             :             LIBZMQ_DELETE(paddr);
     928             :             EXIT_MUTEX ();
     929             :             return -1;
     930             :         }
     931             :     }
     932             : #endif
     933             : 
     934             :     //  Create session.
     935             :     session_base_t *session = session_base_t::create (io_thread, true, this,
     936        3744 :         options, paddr);
     937        3744 :     errno_assert (session);
     938             : 
     939             :     //  PGM does not support subscription forwarding; ask for all data to be
     940             :     //  sent to this pipe. (same for NORM, currently?)
     941       14984 :     bool subscribe_to_all = protocol == "pgm" || protocol == "epgm" || protocol == "norm" || protocol == "udp";
     942        3749 :     pipe_t *newpipe = NULL;
     943             : 
     944        3749 :     if (options.immediate != 1 || subscribe_to_all) {
     945             :         //  Create a bi-directional pipe.
     946        3738 :         object_t *parents [2] = {this, session};
     947        3738 :         pipe_t *new_pipes [2] = {NULL, NULL};
     948             : 
     949        3738 :         bool conflate = options.conflate &&
     950           0 :             (options.type == ZMQ_DEALER ||
     951           0 :              options.type == ZMQ_PULL ||
     952           0 :              options.type == ZMQ_PUSH ||
     953           0 :              options.type == ZMQ_PUB ||
     954        3738 :              options.type == ZMQ_SUB);
     955             : 
     956             :         int hwms [2] = {conflate? -1 : options.sndhwm,
     957        3738 :             conflate? -1 : options.rcvhwm};
     958        3738 :         bool conflates [2] = {conflate, conflate};
     959        3738 :         rc = pipepair (parents, new_pipes, hwms, conflates);
     960        3738 :         errno_assert (rc == 0);
     961             : 
     962             :         //  Attach local end of the pipe to the socket object.
     963        3738 :         attach_pipe (new_pipes [0], subscribe_to_all);
     964        3738 :         newpipe = new_pipes [0];
     965             : 
     966             :         //  Attach remote end of the pipe to the session object later on.
     967        3738 :         session->attach_pipe (new_pipes [1]);
     968             :     }
     969             : 
     970             :     //  Save last endpoint URI
     971        3749 :     paddr->to_string (last_endpoint);
     972             : 
     973        3746 :     add_endpoint (addr_, (own_t *) session, newpipe);
     974        3747 :     EXIT_MUTEX ();
     975             :     return 0;
     976             : }
     977             : 
     978        4088 : void zmq::socket_base_t::add_endpoint (const char *addr_, own_t *endpoint_, pipe_t *pipe)
     979             : {
     980             :     //  Activate the session. Make it a child of this socket.
     981        4088 :     launch_child (endpoint_);
     982       20445 :     endpoints.insert (endpoints_t::value_type (std::string (addr_), endpoint_pipe_t (endpoint_, pipe)));
     983        4089 : }
     984             : 
     985          84 : int zmq::socket_base_t::term_endpoint (const char *addr_)
     986             : {
     987          84 :     ENTER_MUTEX ();
     988             : 
     989             :     //  Check whether the library haven't been shut down yet.
     990          84 :     if (unlikely (ctx_terminated)) {
     991           0 :         errno = ETERM;
     992           0 :         EXIT_MUTEX ();
     993             :         return -1;
     994             :     }
     995             : 
     996             :     //  Check whether endpoint address passed to the function is valid.
     997          84 :     if (unlikely (!addr_)) {
     998           0 :         errno = EINVAL;
     999           0 :         EXIT_MUTEX ();
    1000             :         return -1;
    1001             :     }
    1002             : 
    1003             :     //  Process pending commands, if any, since there could be pending unprocessed process_own()'s
    1004             :     //  (from launch_child() for example) we're asked to terminate now.
    1005          84 :     int rc = process_commands (0, false);
    1006          84 :     if (unlikely(rc != 0)) {
    1007           0 :         EXIT_MUTEX ();
    1008             :         return -1;
    1009             :     }
    1010             : 
    1011             :     //  Parse addr_ string.
    1012             :     std::string protocol;
    1013             :     std::string address;
    1014          84 :     if (parse_uri(addr_, protocol, address) || check_protocol(protocol)) {
    1015           0 :         EXIT_MUTEX ();
    1016             :         return -1;
    1017             :     }
    1018             : 
    1019             :     // Disconnect an inproc socket
    1020          84 :     if (protocol == "inproc") {
    1021          18 :         if (unregister_endpoint (std::string(addr_), this) == 0) {
    1022           6 :             EXIT_MUTEX ();
    1023             :             return 0;
    1024             :         }
    1025           9 :         std::pair <inprocs_t::iterator, inprocs_t::iterator> range = inprocs.equal_range (std::string (addr_));
    1026           3 :         if (range.first == range.second) {
    1027           0 :             errno = ENOENT;
    1028           0 :             EXIT_MUTEX ();
    1029             :             return -1;
    1030             :         }
    1031             : 
    1032           6 :         for (inprocs_t::iterator it = range.first; it != range.second; ++it)
    1033           3 :             it->second->terminate (true);
    1034           3 :         inprocs.erase (range.first, range.second);
    1035           3 :         EXIT_MUTEX ();
    1036             :         return 0;
    1037             :     }
    1038             : 
    1039          75 :     std::string resolved_addr = std::string (addr_);
    1040             :     std::pair <endpoints_t::iterator, endpoints_t::iterator> range;
    1041             : 
    1042             :     // The resolved last_endpoint is used as a key in the endpoints map.
    1043             :     // The address passed by the user might not match in the TCP case due to
    1044             :     // IPv4-in-IPv6 mapping (EG: tcp://[::ffff:127.0.0.1]:9999), so try to
    1045             :     // resolve before giving up. Given at this stage we don't know whether a
    1046             :     // socket is connected or bound, try with both.
    1047          75 :     if (protocol == "tcp") {
    1048         207 :         range = endpoints.equal_range (resolved_addr);
    1049          69 :         if (range.first == range.second) {
    1050           6 :             tcp_address_t *tcp_addr = new (std::nothrow) tcp_address_t ();
    1051           6 :             alloc_assert (tcp_addr);
    1052          12 :             rc = tcp_addr->resolve (address.c_str (), false, options.ipv6);
    1053             : 
    1054           6 :             if (rc == 0) {
    1055           6 :                 tcp_addr->to_string (resolved_addr);
    1056          18 :                 range = endpoints.equal_range (resolved_addr);
    1057             : 
    1058           6 :                 if (range.first == range.second) {
    1059           6 :                     rc = tcp_addr->resolve (address.c_str (), true, options.ipv6);
    1060           3 :                     if (rc == 0) {
    1061           3 :                         tcp_addr->to_string (resolved_addr);
    1062             :                     }
    1063             :                 }
    1064             :             }
    1065           6 :             LIBZMQ_DELETE(tcp_addr);
    1066             :         }
    1067             :     }
    1068             : 
    1069             :     //  Find the endpoints range (if any) corresponding to the addr_ string.
    1070         225 :     range = endpoints.equal_range (resolved_addr);
    1071          75 :     if (range.first == range.second) {
    1072           6 :         errno = ENOENT;
    1073           6 :         EXIT_MUTEX ();
    1074             :         return -1;
    1075             :     }
    1076             : 
    1077         138 :     for (endpoints_t::iterator it = range.first; it != range.second; ++it) {
    1078             :         //  If we have an associated pipe, terminate it.
    1079          69 :         if (it->second.second != NULL)
    1080          27 :             it->second.second->terminate (false);
    1081          69 :         term_child (it->second.first);
    1082             :     }
    1083          69 :     endpoints.erase (range.first, range.second);
    1084          69 :     EXIT_MUTEX ();
    1085             :     return 0;
    1086             : }
    1087             : 
    1088      864514 : int zmq::socket_base_t::send (msg_t *msg_, int flags_)
    1089             : {
    1090      864514 :     ENTER_MUTEX ();
    1091             : 
    1092             :     //  Check whether the library haven't been shut down yet.
    1093      864522 :     if (unlikely (ctx_terminated)) {
    1094           0 :         errno = ETERM;
    1095           0 :         EXIT_MUTEX ();
    1096             :         return -1;
    1097             :     }
    1098             : 
    1099             :     //  Check whether message passed to the function is valid.
    1100      864522 :     if (unlikely (!msg_ || !msg_->check ())) {
    1101           0 :         errno = EFAULT;
    1102           0 :         EXIT_MUTEX ();
    1103             :         return -1;
    1104             :     }
    1105             : 
    1106             :     //  Process pending commands, if any.
    1107      864522 :     int rc = process_commands (0, true);
    1108      864522 :     if (unlikely (rc != 0)) {
    1109           0 :         EXIT_MUTEX ();
    1110             :         return -1;
    1111             :     }
    1112             : 
    1113             :     //  Clear any user-visible flags that are set on the message.
    1114      864522 :     msg_->reset_flags (msg_t::more);
    1115             : 
    1116             :     //  At this point we impose the flags on the message.
    1117      864522 :     if (flags_ & ZMQ_SNDMORE)
    1118        1177 :         msg_->set_flags (msg_t::more);
    1119             : 
    1120      864522 :     msg_->reset_metadata ();
    1121             : 
    1122             :     //  Try to send the message using method in each socket class
    1123      864522 :     rc = xsend (msg_);
    1124      864522 :     if (rc == 0) {
    1125      851560 :         EXIT_MUTEX ();
    1126             :         return 0;
    1127             :     }
    1128       12962 :     if (unlikely (errno != EAGAIN)) {
    1129          25 :         EXIT_MUTEX ();
    1130             :         return -1;
    1131             :     }
    1132             : 
    1133             :     //  In case of non-blocking send we'll simply propagate
    1134             :     //  the error - including EAGAIN - up the stack.
    1135       12937 :     if (flags_ & ZMQ_DONTWAIT || options.sndtimeo == 0) {
    1136       11861 :         EXIT_MUTEX ();
    1137             :         return -1;
    1138             :     }
    1139             : 
    1140             :     //  Compute the time when the timeout should occur.
    1141             :     //  If the timeout is infinite, don't care.
    1142        1076 :     int timeout = options.sndtimeo;
    1143        1076 :     uint64_t end = timeout < 0 ? 0 : (clock.now_ms () + timeout);
    1144             : 
    1145             :     //  Oops, we couldn't send the message. Wait for the next
    1146             :     //  command, process it and try to send the message again.
    1147             :     //  If timeout is reached in the meantime, return EAGAIN.
    1148             :     while (true) {
    1149        1433 :         if (unlikely (process_commands (timeout, false) != 0)) {
    1150           0 :             EXIT_MUTEX ();
    1151             :             return -1;
    1152             :         }
    1153        1433 :         rc = xsend (msg_);
    1154        1433 :         if (rc == 0)
    1155             :             break;
    1156         420 :         if (unlikely (errno != EAGAIN)) {
    1157           0 :             EXIT_MUTEX ();
    1158             :             return -1;
    1159             :         }
    1160         420 :         if (timeout > 0) {
    1161          63 :             timeout = (int) (end - clock.now_ms ());
    1162          63 :             if (timeout <= 0) {
    1163          63 :                 errno = EAGAIN;
    1164          63 :                 EXIT_MUTEX ();
    1165             :                 return -1;
    1166             :             }
    1167             :         }
    1168             :     }
    1169             : 
    1170        1013 :     EXIT_MUTEX ();
    1171             :     return 0;
    1172             : }
    1173             : 
    1174     3971399 : int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
    1175             : {
    1176     3971399 :     ENTER_MUTEX ();
    1177             : 
    1178             :     //  Check whether the library haven't been shut down yet.
    1179     3970009 :     if (unlikely (ctx_terminated)) {
    1180           0 :         errno = ETERM;
    1181           0 :         EXIT_MUTEX ();
    1182             :         return -1;
    1183             :     }
    1184             : 
    1185             :     //  Check whether message passed to the function is valid.
    1186     3970009 :     if (unlikely (!msg_ || !msg_->check ())) {
    1187           0 :         errno = EFAULT;
    1188           0 :         EXIT_MUTEX ();
    1189             :         return -1;
    1190             :     }
    1191             : 
    1192             :     //  Once every inbound_poll_rate messages check for signals and process
    1193             :     //  incoming commands. This happens only if we are not polling altogether
    1194             :     //  because there are messages available all the time. If poll occurs,
    1195             :     //  ticks is set to zero and thus we avoid this code.
    1196             :     //
    1197             :     //  Note that 'recv' uses different command throttling algorithm (the one
    1198             :     //  described above) from the one used by 'send'. This is because counting
    1199             :     //  ticks is more efficient than doing RDTSC all the time.
    1200     3967580 :     if (++ticks == inbound_poll_rate) {
    1201        8493 :         if (unlikely (process_commands (0, false) != 0)) {
    1202           0 :             EXIT_MUTEX ();
    1203             :             return -1;
    1204             :         }
    1205        8493 :         ticks = 0;
    1206             :     }
    1207             : 
    1208             :     //  Get the message.
    1209     3967580 :     int rc = xrecv (msg_);
    1210     3588989 :     if (unlikely (rc != 0 && errno != EAGAIN)) {
    1211           0 :         EXIT_MUTEX ();
    1212             :         return -1;
    1213             :     }
    1214             : 
    1215             :     //  If we have the message, return immediately.
    1216     3589043 :     if (rc == 0) {
    1217      850909 :         extract_flags (msg_);
    1218      850909 :         EXIT_MUTEX ();
    1219             :         return 0;
    1220             :     }
    1221             : 
    1222             :     //  If the message cannot be fetched immediately, there are two scenarios.
    1223             :     //  For non-blocking recv, commands are processed in case there's an
    1224             :     //  activate_reader command already waiting int a command pipe.
    1225             :     //  If it's not, return EAGAIN.
    1226     2738134 :     if (flags_ & ZMQ_DONTWAIT || options.rcvtimeo == 0) {
    1227     2736980 :         if (unlikely (process_commands (0, false) != 0)) {
    1228           0 :             EXIT_MUTEX ();
    1229             :             return -1;
    1230             :         }
    1231     3149908 :         ticks = 0;
    1232             : 
    1233     3149908 :         rc = xrecv (msg_);
    1234     3116642 :         if (rc < 0) {
    1235     3116552 :             EXIT_MUTEX ();
    1236     3119671 :             return rc;
    1237             :         }
    1238          90 :         extract_flags (msg_);
    1239             : 
    1240          90 :         EXIT_MUTEX ();
    1241             :         return 0;
    1242             :     }
    1243             : 
    1244             :     //  Compute the time when the timeout should occur.
    1245             :     //  If the timeout is infinite, don't care.
    1246        1154 :     int timeout = options.rcvtimeo;
    1247        1154 :     uint64_t end = timeout < 0 ? 0 : (clock.now_ms () + timeout);
    1248             : 
    1249             :     //  In blocking scenario, commands are processed over and over again until
    1250             :     //  we are able to fetch a message.
    1251        1154 :     bool block = (ticks != 0);
    1252             :     while (true) {
    1253        2358 :         if (unlikely (process_commands (block ? timeout : 0, false) != 0)) {
    1254          15 :             EXIT_MUTEX ();
    1255             :             return -1;
    1256             :         }
    1257        2343 :         rc = xrecv (msg_);
    1258        2343 :         if (rc == 0) {
    1259        1058 :             ticks = 0;
    1260             :             break;
    1261             :         }
    1262        1285 :         if (unlikely (errno != EAGAIN)) {
    1263           0 :             EXIT_MUTEX ();
    1264             :             return -1;
    1265             :         }
    1266        1285 :         block = true;
    1267        1285 :         if (timeout > 0) {
    1268         698 :             timeout = (int) (end - clock.now_ms ());
    1269         698 :             if (timeout <= 0) {
    1270          81 :                 errno = EAGAIN;
    1271          81 :                 EXIT_MUTEX ();
    1272             :                 return -1;
    1273             :             }
    1274             :         }
    1275             :     }
    1276             : 
    1277        1058 :     extract_flags (msg_);
    1278        1208 :     EXIT_MUTEX ();
    1279             :     return 0;
    1280             : }
    1281             : 
    1282       11131 : int zmq::socket_base_t::close ()
    1283             : {
    1284       11131 :     ENTER_MUTEX ();
    1285             :     
    1286             :     //  Remove all existing signalers for thread safe sockets
    1287       11131 :     if (thread_safe)
    1288          51 :         ((mailbox_safe_t*)mailbox)->clear_signalers();
    1289             : 
    1290             :     //  Mark the socket as dead
    1291       11131 :     tag = 0xdeadbeef;
    1292             : 
    1293       11131 :     EXIT_MUTEX ();
    1294             : 
    1295             :     //  Transfer the ownership of the socket from this application thread
    1296             :     //  to the reaper thread which will take care of the rest of shutdown
    1297             :     //  process.
    1298       11131 :     send_reap (this);
    1299             : 
    1300       11131 :     return 0;
    1301             : }
    1302             : 
    1303           0 : bool zmq::socket_base_t::has_in ()
    1304             : {
    1305     1327171 :     return xhas_in ();
    1306             : }
    1307             : 
    1308           0 : bool zmq::socket_base_t::has_out ()
    1309             : {
    1310     1327173 :     return xhas_out ();
    1311             : }
    1312             : 
    1313       11131 : void zmq::socket_base_t::start_reaping (poller_t *poller_)
    1314             : {
    1315             :     //  Plug the socket to the reaper thread.
    1316       11131 :     poller = poller_;
    1317             : 
    1318             :     fd_t fd;
    1319             : 
    1320       11131 :     if (!thread_safe)
    1321       11080 :         fd = ((mailbox_t*)mailbox)->get_fd();
    1322             :     else {
    1323          51 :         ENTER_MUTEX ();
    1324             : 
    1325          51 :         reaper_signaler =  new signaler_t();
    1326             : 
    1327             :         //  Add signaler to the safe mailbox
    1328          51 :         fd = reaper_signaler->get_fd();
    1329          51 :         ((mailbox_safe_t*)mailbox)->add_signaler(reaper_signaler);
    1330             : 
    1331             :         //  Send a signal to make sure reaper handle existing commands
    1332          51 :         reaper_signaler->send();
    1333             : 
    1334          51 :         EXIT_MUTEX ();
    1335             :     }
    1336             : 
    1337       11131 :     handle = poller->add_fd (fd, this);
    1338       11131 :     poller->set_pollin (handle);
    1339             : 
    1340             :     //  Initialise the termination and check whether it can be deallocated
    1341             :     //  immediately.
    1342       11131 :     terminate ();
    1343       11131 :     check_destroy ();
    1344       11131 : }
    1345             : 
    1346     4958756 : int zmq::socket_base_t::process_commands (int timeout_, bool throttle_)
    1347             : {
    1348             :     int rc;
    1349             :     command_t cmd;
    1350     4958756 :     if (timeout_ != 0) {
    1351             : 
    1352             :         //  If we are asked to wait, simply ask mailbox to wait.
    1353        3687 :         rc = mailbox->recv (&cmd, timeout_);
    1354             :     }
    1355             :     else {
    1356             : 
    1357             :         //  If we are asked not to wait, check whether we haven't processed
    1358             :         //  commands recently, so that we can throttle the new commands.
    1359             : 
    1360             :         //  Get the CPU's tick counter. If 0, the counter is not available.
    1361     4955069 :         const uint64_t tsc = zmq::clock_t::rdtsc ();
    1362             : 
    1363             :         //  Optimised version of command processing - it doesn't have to check
    1364             :         //  for incoming commands each time. It does so only if certain time
    1365             :         //  elapsed since last command processing. Command delay varies
    1366             :         //  depending on CPU speed: It's ~1ms on 3GHz CPU, ~2ms on 1.5GHz CPU
    1367             :         //  etc. The optimisation makes sense only on platforms where getting
    1368             :         //  a timestamp is a very cheap operation (tens of nanoseconds).
    1369     4955087 :         if (tsc && throttle_) {
    1370             : 
    1371             :             //  Check whether TSC haven't jumped backwards (in case of migration
    1372             :             //  between CPU cores) and whether certain time have elapsed since
    1373             :             //  last command processing. If it didn't do nothing.
    1374      864522 :             if (tsc >= last_tsc && tsc - last_tsc <= max_command_delay)
    1375             :                 return 0;
    1376        1756 :             last_tsc = tsc;
    1377             :         }
    1378             : 
    1379             :         //  Check whether there are any commands pending for this thread.
    1380     4092321 :         rc = mailbox->recv (&cmd, 0);
    1381             :     }
    1382             : 
    1383             :     //  Process all available commands.
    1384     4539117 :     while (rc == 0) {
    1385       29796 :         cmd.destination->process_command (cmd);
    1386       29794 :         rc = mailbox->recv (&cmd, 0);
    1387             :     }
    1388             : 
    1389     4509321 :     if (errno == EINTR)
    1390             :         return -1;
    1391             : 
    1392     4509318 :     zmq_assert (errno == EAGAIN);
    1393             : 
    1394     4509312 :     if (ctx_terminated) {
    1395        5924 :         errno = ETERM;
    1396        5924 :         return -1;
    1397             :     }
    1398             : 
    1399             :     return 0;
    1400             : }
    1401             : 
    1402        2263 : void zmq::socket_base_t::process_stop ()
    1403             : {
    1404             :     //  Here, someone have called zmq_ctx_term while the socket was still alive.
    1405             :     //  We'll remember the fact so that any blocking call is interrupted and any
    1406             :     //  further attempt to use the socket will return ETERM. The user is still
    1407             :     //  responsible for calling zmq_close on the socket though!
    1408        2263 :     stop_monitor ();
    1409        2263 :     ctx_terminated = true;
    1410        2263 : }
    1411             : 
    1412        4112 : void zmq::socket_base_t::process_bind (pipe_t *pipe_)
    1413             : {
    1414        4112 :     attach_pipe (pipe_);
    1415        4112 : }
    1416             : 
    1417       11131 : void zmq::socket_base_t::process_term (int linger_)
    1418             : {
    1419             :     //  Unregister all inproc endpoints associated with this socket.
    1420             :     //  Doing this we make sure that no new pipes from other sockets (inproc)
    1421             :     //  will be initiated.
    1422       11131 :     unregister_endpoints (this);
    1423             : 
    1424             :     //  Ask all attached pipes to terminate.
    1425       33046 :     for (pipes_t::size_type i = 0; i != pipes.size (); ++i)
    1426       10784 :         pipes [i]->terminate (false);
    1427       22262 :     register_term_acks ((int) pipes.size ());
    1428             : 
    1429             :     //  Continue the termination process immediately.
    1430       11131 :     own_t::process_term (linger_);
    1431       11131 : }
    1432             : 
    1433        2397 : void zmq::socket_base_t::update_pipe_options(int option_)
    1434             : {
    1435        2397 :     if (option_ == ZMQ_SNDHWM || option_ == ZMQ_RCVHWM)
    1436             :     {
    1437         114 :         for (pipes_t::size_type i = 0; i != pipes.size(); ++i)
    1438             :         {
    1439          12 :             pipes[i]->set_hwms(options.rcvhwm, options.sndhwm);
    1440             :         }
    1441             :     }
    1442             : 
    1443        2397 : }
    1444             : 
    1445       11131 : void zmq::socket_base_t::process_destroy ()
    1446             : {
    1447       11131 :     destroyed = true;
    1448       11131 : }
    1449             : 
    1450         270 : int zmq::socket_base_t::xsetsockopt (int, const void *, size_t)
    1451             : {
    1452         270 :     errno = EINVAL;
    1453         270 :     return -1;
    1454             : }
    1455             : 
    1456           9 : bool zmq::socket_base_t::xhas_out ()
    1457             : {
    1458           9 :     return false;
    1459             : }
    1460             : 
    1461           0 : int zmq::socket_base_t::xsend (msg_t *)
    1462             : {
    1463           0 :     errno = ENOTSUP;
    1464           0 :     return -1;
    1465             : }
    1466             : 
    1467      529850 : bool zmq::socket_base_t::xhas_in ()
    1468             : {
    1469      529850 :     return false;
    1470             : }
    1471             : 
    1472           0 : int zmq::socket_base_t::xjoin (const char *group_)
    1473             : {
    1474             :     LIBZMQ_UNUSED (group_);
    1475           0 :     errno = ENOTSUP;
    1476           0 :     return -1;
    1477             : }
    1478             : 
    1479           0 : int zmq::socket_base_t::xleave (const char *group_)
    1480             : {
    1481             :     LIBZMQ_UNUSED (group_);
    1482           0 :     errno = ENOTSUP;
    1483           0 :     return -1;
    1484             : }
    1485             : 
    1486           0 : int zmq::socket_base_t::xrecv (msg_t *)
    1487             : {
    1488           0 :     errno = ENOTSUP;
    1489           0 :     return -1;
    1490             : }
    1491             : 
    1492           0 : zmq::blob_t zmq::socket_base_t::get_credential () const
    1493             : {
    1494           0 :     return blob_t ();
    1495             : }
    1496             : 
    1497           0 : void zmq::socket_base_t::xread_activated (pipe_t *)
    1498             : {
    1499           0 :     zmq_assert (false);
    1500           0 : }
    1501           0 : void zmq::socket_base_t::xwrite_activated (pipe_t *)
    1502             : {
    1503           0 :     zmq_assert (false);
    1504           0 : }
    1505             : 
    1506           0 : void zmq::socket_base_t::xhiccuped (pipe_t *)
    1507             : {
    1508           0 :     zmq_assert (false);
    1509           0 : }
    1510             : 
    1511       13186 : void zmq::socket_base_t::in_event ()
    1512             : {
    1513             :     //  This function is invoked only once the socket is running in the context
    1514             :     //  of the reaper thread. Process any commands from other threads/sockets
    1515             :     //  that may be available at the moment. Ultimately, the socket will
    1516             :     //  be destroyed.
    1517       13186 :     ENTER_MUTEX ();
    1518             : 
    1519             :     //  If the socket is thread safe we need to unsignal the reaper signaler
    1520       13186 :     if (thread_safe)
    1521          82 :         reaper_signaler->recv();
    1522             : 
    1523       13186 :     process_commands (0, false);
    1524       13186 :     EXIT_MUTEX();
    1525       13186 :     check_destroy();
    1526       13186 : }
    1527             : 
    1528           0 : void zmq::socket_base_t::out_event ()
    1529             : {
    1530           0 :     zmq_assert (false);
    1531           0 : }
    1532             : 
    1533           0 : void zmq::socket_base_t::timer_event (int)
    1534             : {
    1535           0 :     zmq_assert (false);
    1536           0 : }
    1537             : 
    1538       24317 : void zmq::socket_base_t::check_destroy ()
    1539             : {
    1540             :     //  If the object was already marked as destroyed, finish the deallocation.
    1541       24317 :     if (destroyed) {
    1542             : 
    1543             :         //  Remove the socket from the reaper's poller.
    1544       11131 :         poller->rm_fd (handle);
    1545             : 
    1546             :         //  Remove the socket from the context.
    1547       11131 :         destroy_socket (this);
    1548             : 
    1549             :         //  Notify the reaper about the fact.
    1550       11131 :         send_reaped ();
    1551             : 
    1552             :         //  Deallocate.
    1553       11131 :         own_t::process_destroy ();
    1554             :     }
    1555       24317 : }
    1556             : 
    1557        1370 : void zmq::socket_base_t::read_activated (pipe_t *pipe_)
    1558             : {
    1559        1370 :     xread_activated (pipe_);
    1560        1370 : }
    1561             : 
    1562         880 : void zmq::socket_base_t::write_activated (pipe_t *pipe_)
    1563             : {
    1564         880 :     xwrite_activated (pipe_);
    1565         880 : }
    1566             : 
    1567           3 : void zmq::socket_base_t::hiccuped (pipe_t *pipe_)
    1568             : {
    1569           3 :     if (options.immediate == 1)
    1570           3 :         pipe_->terminate (false);
    1571             :     else
    1572             :         // Notify derived sockets of the hiccup
    1573           0 :         xhiccuped (pipe_);
    1574           3 : }
    1575             : 
    1576        8417 : void zmq::socket_base_t::pipe_terminated (pipe_t *pipe_)
    1577             : {
    1578             :     //  Notify the specific socket type about the pipe termination.
    1579        8417 :     xpipe_terminated (pipe_);
    1580             : 
    1581             :     // Remove pipe from inproc pipes
    1582       33668 :     for (inprocs_t::iterator it = inprocs.begin (); it != inprocs.end (); ++it)
    1583         564 :         if (it->second == pipe_) {
    1584         564 :             inprocs.erase (it);
    1585             :             break;
    1586             :         }
    1587             : 
    1588             :     //  Remove the pipe from the list of attached pipes and confirm its
    1589             :     //  termination if we are already shutting down.
    1590        8417 :     pipes.erase (pipe_);
    1591        8417 :     if (is_terminating ())
    1592        8215 :         unregister_term_ack ();
    1593        8417 : }
    1594             : 
    1595      852057 : void zmq::socket_base_t::extract_flags (msg_t *msg_)
    1596             : {
    1597             :     //  Test whether IDENTITY flag is valid for this socket type.
    1598      852057 :     if (unlikely (msg_->flags () & msg_t::identity))
    1599           0 :         zmq_assert (options.recv_identity);
    1600             : 
    1601             :     //  Remove MORE flag.
    1602      852057 :     rcvmore = msg_->flags () & msg_t::more ? true : false;
    1603      852057 : }
    1604             : 
    1605          27 : int zmq::socket_base_t::monitor (const char *addr_, int events_)
    1606             : {
    1607          27 :     if (unlikely (ctx_terminated)) {
    1608           0 :         errno = ETERM;
    1609           0 :         return -1;
    1610             :     }
    1611             :     //  Support deregistering monitoring endpoints as well
    1612          27 :     if (addr_ == NULL) {
    1613           0 :         stop_monitor ();
    1614             :         return 0;
    1615             :     }
    1616             :     //  Parse addr_ string.
    1617             :     std::string protocol;
    1618             :     std::string address;
    1619          27 :     if (parse_uri (addr_, protocol, address) || check_protocol (protocol))
    1620             :         return -1;
    1621             : 
    1622             :     //  Event notification only supported over inproc://
    1623          27 :     if (protocol != "inproc") {
    1624           3 :         errno = EPROTONOSUPPORT;
    1625           3 :         return -1;
    1626             :     }
    1627             :     // already monitoring. Stop previous monitor before starting new one.
    1628          24 :     if (monitor_socket != NULL) {
    1629           0 :         stop_monitor (true);
    1630             :     }
    1631             :     //  Register events to monitor
    1632          24 :     monitor_events = events_;
    1633          24 :     monitor_socket = zmq_socket (get_ctx (), ZMQ_PAIR);
    1634          24 :     if (monitor_socket == NULL)
    1635             :         return -1;
    1636             : 
    1637             :     //  Never block context termination on pending event messages
    1638          24 :     int linger = 0;
    1639          24 :     int rc = zmq_setsockopt (monitor_socket, ZMQ_LINGER, &linger, sizeof (linger));
    1640          24 :     if (rc == -1)
    1641           0 :         stop_monitor (false);
    1642             : 
    1643             :     //  Spawn the monitor socket endpoint
    1644          24 :     rc = zmq_bind (monitor_socket, addr_);
    1645          24 :     if (rc == -1)
    1646           0 :          stop_monitor (false);
    1647          24 :     return rc;
    1648             : }
    1649             : 
    1650        3247 : void zmq::socket_base_t::event_connected (const std::string &addr_, int fd_)
    1651             : {
    1652        3247 :     if (monitor_events & ZMQ_EVENT_CONNECTED)
    1653           6 :         monitor_event (ZMQ_EVENT_CONNECTED, fd_, addr_);
    1654        3247 : }
    1655             : 
    1656        3786 : void zmq::socket_base_t::event_connect_delayed (const std::string &addr_, int err_)
    1657             : {
    1658        3786 :     if (monitor_events & ZMQ_EVENT_CONNECT_DELAYED)
    1659           3 :         monitor_event (ZMQ_EVENT_CONNECT_DELAYED, err_, addr_);
    1660        3786 : }
    1661             : 
    1662         324 : void zmq::socket_base_t::event_connect_retried (const std::string &addr_, int interval_)
    1663             : {
    1664         324 :     if (monitor_events & ZMQ_EVENT_CONNECT_RETRIED)
    1665           0 :         monitor_event (ZMQ_EVENT_CONNECT_RETRIED, interval_, addr_);
    1666         324 : }
    1667             : 
    1668         342 : void zmq::socket_base_t::event_listening (const std::string &addr_, int fd_)
    1669             : {
    1670         342 :     if (monitor_events & ZMQ_EVENT_LISTENING)
    1671           3 :         monitor_event (ZMQ_EVENT_LISTENING, fd_, addr_);
    1672         342 : }
    1673             : 
    1674           0 : void zmq::socket_base_t::event_bind_failed (const std::string &addr_, int err_)
    1675             : {
    1676           0 :     if (monitor_events & ZMQ_EVENT_BIND_FAILED)
    1677           0 :         monitor_event (ZMQ_EVENT_BIND_FAILED, err_, addr_);
    1678           0 : }
    1679             : 
    1680        3515 : void zmq::socket_base_t::event_accepted (const std::string &addr_, int fd_)
    1681             : {
    1682        3515 :     if (monitor_events & ZMQ_EVENT_ACCEPTED)
    1683          21 :         monitor_event (ZMQ_EVENT_ACCEPTED, fd_, addr_);
    1684        3515 : }
    1685             : 
    1686           9 : void zmq::socket_base_t::event_accept_failed (const std::string &addr_, int err_)
    1687             : {
    1688           9 :     if (monitor_events & ZMQ_EVENT_ACCEPT_FAILED)
    1689           0 :         monitor_event (ZMQ_EVENT_ACCEPT_FAILED, err_, addr_);
    1690           9 : }
    1691             : 
    1692         944 : void zmq::socket_base_t::event_closed (const std::string &addr_, int fd_)
    1693             : {
    1694         944 :     if (monitor_events & ZMQ_EVENT_CLOSED)
    1695           3 :         monitor_event (ZMQ_EVENT_CLOSED, fd_, addr_);
    1696         944 : }
    1697             : 
    1698           3 : void zmq::socket_base_t::event_close_failed (const std::string &addr_, int err_)
    1699             : {
    1700           3 :     if (monitor_events & ZMQ_EVENT_CLOSE_FAILED)
    1701           0 :         monitor_event (ZMQ_EVENT_CLOSE_FAILED, err_, addr_);
    1702           3 : }
    1703             : 
    1704        2401 : void zmq::socket_base_t::event_disconnected (const std::string &addr_, int fd_)
    1705             : {
    1706        2401 :     if (monitor_events & ZMQ_EVENT_DISCONNECTED)
    1707          12 :         monitor_event (ZMQ_EVENT_DISCONNECTED, fd_, addr_);
    1708        2401 : }
    1709             : 
    1710             : //  Send a monitor event
    1711          54 : void zmq::socket_base_t::monitor_event (int event_, int value_, const std::string &addr_)
    1712             : {
    1713          54 :     if (monitor_socket) {
    1714             :         //  Send event in first frame
    1715             :         zmq_msg_t msg;
    1716          54 :         zmq_msg_init_size (&msg, 6);
    1717          54 :         uint8_t *data = (uint8_t *) zmq_msg_data (&msg);
    1718             :         //  Avoid dereferencing uint32_t on unaligned address
    1719          54 :         uint16_t event = (uint16_t) event_;
    1720          54 :         uint32_t value = (uint32_t) value_;
    1721             :         memcpy (data + 0, &event, sizeof(event));
    1722          54 :         memcpy (data + 2, &value, sizeof(value));
    1723          54 :         zmq_sendmsg (monitor_socket, &msg, ZMQ_SNDMORE);
    1724             : 
    1725             :         //  Send address in second frame
    1726          54 :         zmq_msg_init_size (&msg, addr_.size());
    1727          54 :         memcpy (zmq_msg_data (&msg), addr_.c_str (), addr_.size ());
    1728          54 :         zmq_sendmsg (monitor_socket, &msg, 0);
    1729             :     }
    1730          54 : }
    1731             : 
    1732       13459 : void zmq::socket_base_t::stop_monitor (bool send_monitor_stopped_event_)
    1733             : {
    1734       13459 :     if (monitor_socket) {
    1735          24 :         if ((monitor_events & ZMQ_EVENT_MONITOR_STOPPED) && send_monitor_stopped_event_)
    1736          12 :             monitor_event (ZMQ_EVENT_MONITOR_STOPPED, 0, "");
    1737          24 :         zmq_close (monitor_socket);
    1738          24 :         monitor_socket = NULL;
    1739          24 :         monitor_events = 0;
    1740             :     }
    1741       13459 : }

Generated by: LCOV version 1.10