LCOV - code coverage report
Current view: top level - home/h/core/forks/m4-libzmq/src - ctx.cpp (source / functions) Hit Total Coverage
Test: zeromq-4.2.0 Code Coverage Lines: 236 279 84.6 %
Date: 2016-05-09 Functions: 23 23 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*
       2             :     Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
       3             : 
       4             :     This file is part of libzmq, the ZeroMQ core engine in C++.
       5             : 
       6             :     libzmq is free software; you can redistribute it and/or modify it under
       7             :     the terms of the GNU Lesser General Public License (LGPL) as published
       8             :     by the Free Software Foundation; either version 3 of the License, or
       9             :     (at your option) any later version.
      10             : 
      11             :     As a special exception, the Contributors give you permission to link
      12             :     this library with independent modules to produce an executable,
      13             :     regardless of the license terms of these independent modules, and to
      14             :     copy and distribute the resulting executable under terms of your choice,
      15             :     provided that you also meet, for each linked independent module, the
      16             :     terms and conditions of the license of that module. An independent
      17             :     module is a module which is not derived from or based on this library.
      18             :     If you modify this library, you must extend this exception to your
      19             :     version of the library.
      20             : 
      21             :     libzmq is distributed in the hope that it will be useful, but WITHOUT
      22             :     ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
      23             :     FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
      24             :     License for more details.
      25             : 
      26             :     You should have received a copy of the GNU Lesser General Public License
      27             :     along with this program.  If not, see <http://www.gnu.org/licenses/>.
      28             : */
      29             : 
      30             : #include "precompiled.hpp"
      31             : #include "macros.hpp"
      32             : #include "platform.hpp"
      33             : #ifdef ZMQ_HAVE_WINDOWS
      34             : #include "windows.hpp"
      35             : #else
      36             : #include <unistd.h>
      37             : #endif
      38             : 
      39             : #include <limits>
      40             : #include <climits>
      41             : #include <new>
      42             : #include <string.h>
      43             : 
      44             : #include "ctx.hpp"
      45             : #include "socket_base.hpp"
      46             : #include "io_thread.hpp"
      47             : #include "reaper.hpp"
      48             : #include "pipe.hpp"
      49             : #include "err.hpp"
      50             : #include "msg.hpp"
      51             : 
      52             : #if defined (ZMQ_USE_TWEETNACL)
      53             : #   include "tweetnacl.h"
      54             : #elif defined (ZMQ_USE_LIBSODIUM)
      55             : #   include "sodium.h"
      56             : #endif
      57             : 
      58             : #ifdef ZMQ_HAVE_VMCI
      59             : #include <vmci_sockets.h>
      60             : #endif
      61             : 
      62             : #define ZMQ_CTX_TAG_VALUE_GOOD 0xabadcafe
      63             : #define ZMQ_CTX_TAG_VALUE_BAD  0xdeadbeef
      64             : 
      65         429 : int clipped_maxsocket (int max_requested)
      66             : {
      67         429 :     if (max_requested >= zmq::poller_t::max_fds () && zmq::poller_t::max_fds () != -1)
      68             :         // -1 because we need room for the reaper mailbox.
      69           0 :         max_requested = zmq::poller_t::max_fds () - 1;
      70             : 
      71         429 :     return max_requested;
      72             : }
      73             : 
      74         423 : zmq::ctx_t::ctx_t () :
      75             :     tag (ZMQ_CTX_TAG_VALUE_GOOD),
      76             :     starting (true),
      77             :     terminating (false),
      78             :     reaper (NULL),
      79             :     slot_count (0),
      80             :     slots (NULL),
      81         423 :     max_sockets (clipped_maxsocket (ZMQ_MAX_SOCKETS_DFLT)),
      82             :     max_msgsz (INT_MAX),
      83             :     io_thread_count (ZMQ_IO_THREADS_DFLT),
      84             :     blocky (true),
      85             :     ipv6 (false),
      86             :     thread_priority (ZMQ_THREAD_PRIORITY_DFLT),
      87        1692 :     thread_sched_policy (ZMQ_THREAD_SCHED_POLICY_DFLT)
      88             : {
      89             : #ifdef HAVE_FORK
      90         423 :     pid = getpid();
      91             : #endif
      92             : #ifdef ZMQ_HAVE_VMCI
      93             :     vmci_fd = -1;
      94             :     vmci_family = -1;
      95             : #endif
      96             : 
      97         423 :     crypto_sync.lock ();
      98             : #if defined (ZMQ_USE_TWEETNACL)
      99             :     // allow opening of /dev/urandom
     100             :     unsigned char tmpbytes[4];
     101         423 :     randombytes(tmpbytes, 4);
     102             : #elif defined (ZMQ_USE_SODIUM)
     103             :     int rc = sodium_init ();
     104             :     zmq_assert (rc != -1);
     105             : #endif
     106         423 :     crypto_sync.unlock ();
     107         423 : }
     108             : 
     109       11670 : bool zmq::ctx_t::check_tag ()
     110             : {
     111       11670 :     return tag == ZMQ_CTX_TAG_VALUE_GOOD;
     112             : }
     113             : 
     114        2538 : zmq::ctx_t::~ctx_t ()
     115             : {
     116             :     //  Check that there are no remaining sockets.
     117         846 :     zmq_assert (sockets.empty ());
     118             : 
     119             :     //  Ask I/O threads to terminate. If stop signal wasn't sent to I/O
     120             :     //  thread subsequent invocation of destructor would hang-up.
     121        3681 :     for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) {
     122        5913 :         io_threads [i]->stop ();
     123             :     }
     124             : 
     125             :     //  Wait till I/O threads actually terminate.
     126        3681 :     for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) {
     127        2412 :         LIBZMQ_DELETE(io_threads [i]);
     128             :     }
     129             : 
     130             :     //  Deallocate the reaper thread object.
     131         423 :     LIBZMQ_DELETE(reaper);
     132             : 
     133             :     //  Deallocate the array of mailboxes. No special work is
     134             :     //  needed as mailboxes themselves were deallocated with their
     135             :     //  corresponding io_thread/socket objects.
     136         423 :     free (slots);
     137             : 
     138             :     //  If we've done any Curve encryption, we may have a file handle
     139             :     //  to /dev/urandom open that needs to be cleaned up.
     140             : #ifdef ZMQ_HAVE_CURVE
     141         423 :     randombytes_close ();
     142             : #endif
     143             : 
     144             :     //  Remove the tag, so that the object is considered dead.
     145         423 :     tag = ZMQ_CTX_TAG_VALUE_BAD;
     146         423 : }
     147             : 
     148         423 : int zmq::ctx_t::terminate ()
     149             : {
     150         423 :     slot_sync.lock();
     151             : 
     152         423 :     bool saveTerminating = terminating;
     153         423 :     terminating = false;
     154             : 
     155             :     // Connect up any pending inproc connections, otherwise we will hang
     156         423 :     pending_connections_t copy = pending_connections;
     157         862 :     for (pending_connections_t::iterator p = copy.begin (); p != copy.end (); ++p) {
     158          16 :         zmq::socket_base_t *s = create_socket (ZMQ_PAIR);
     159          32 :         s->bind (p->first.c_str ());
     160          16 :         s->close ();
     161             :     }
     162         423 :     terminating = saveTerminating;
     163             : 
     164         423 :     if (!starting) {
     165             : 
     166             : #ifdef HAVE_FORK
     167         423 :         if (pid != getpid ()) {
     168             :             // we are a forked child process. Close all file descriptors
     169             :             // inherited from the parent.
     170           0 :             for (sockets_t::size_type i = 0; i != sockets.size (); i++)
     171           0 :                 sockets [i]->get_mailbox ()->forked ();
     172             : 
     173           0 :             term_mailbox.forked ();
     174             :         }
     175             : #endif
     176             : 
     177             :         //  Check whether termination was already underway, but interrupted and now
     178             :         //  restarted.
     179         423 :         bool restarted = terminating;
     180         423 :         terminating = true;
     181             : 
     182             :         //  First attempt to terminate the context.
     183         423 :         if (!restarted) {
     184             :             //  First send stop command to sockets so that any blocking calls
     185             :             //  can be interrupted. If there are no sockets we can ask reaper
     186             :             //  thread to stop.
     187       16899 :             for (sockets_t::size_type i = 0; i != sockets.size (); i++)
     188       16482 :                 sockets [i]->stop ();
     189         834 :             if (sockets.empty ())
     190          16 :                 reaper->stop ();
     191             :         }
     192         423 :         slot_sync.unlock();
     193             : 
     194             :         //  Wait till reaper thread closes all the sockets.
     195             :         command_t cmd;
     196         423 :         int rc = term_mailbox.recv (&cmd, -1);
     197         423 :         if (rc == -1 && errno == EINTR)
     198           0 :             return -1;
     199         423 :         errno_assert (rc == 0);
     200         423 :         zmq_assert (cmd.type == command_t::done);
     201         423 :         slot_sync.lock ();
     202         846 :         zmq_assert (sockets.empty ());
     203             :     }
     204         423 :     slot_sync.unlock ();
     205             : 
     206             : #ifdef ZMQ_HAVE_VMCI
     207             :     vmci_sync.lock ();
     208             : 
     209             :     VMCISock_ReleaseAFValueFd (vmci_fd);
     210             :     vmci_family = -1;
     211             :     vmci_fd = -1;
     212             : 
     213             :     vmci_sync.unlock ();
     214             : #endif
     215             : 
     216             :     //  Deallocate the resources.
     217         423 :     delete this;
     218             : 
     219             :     return 0;
     220             : }
     221             : 
     222           6 : int zmq::ctx_t::shutdown ()
     223             : {
     224           6 :     slot_sync.lock ();
     225           6 :     if (!starting && !terminating) {
     226           6 :         terminating = true;
     227             : 
     228             :         //  Send stop command to sockets so that any blocking calls
     229             :         //  can be interrupted. If there are no sockets we can ask reaper
     230             :         //  thread to stop.
     231          24 :         for (sockets_t::size_type i = 0; i != sockets.size (); i++)
     232          12 :             sockets [i]->stop ();
     233          12 :         if (sockets.empty ())
     234           0 :             reaper->stop ();
     235             :     }
     236           6 :     slot_sync.unlock ();
     237             : 
     238           6 :     return 0;
     239             : }
     240             : 
     241          42 : int zmq::ctx_t::set (int option_, int optval_)
     242             : {
     243          42 :     int rc = 0;
     244          84 :     if (option_ == ZMQ_MAX_SOCKETS
     245          42 :     &&  optval_ >= 1 && optval_ == clipped_maxsocket (optval_)) {
     246           3 :         opt_sync.lock ();
     247           3 :         max_sockets = optval_;
     248           3 :         opt_sync.unlock ();
     249             :     }
     250             :     else
     251          39 :     if (option_ == ZMQ_IO_THREADS && optval_ >= 0) {
     252          33 :         opt_sync.lock ();
     253          33 :         io_thread_count = optval_;
     254          33 :         opt_sync.unlock ();
     255             :     }
     256             :     else
     257           6 :     if (option_ == ZMQ_IPV6 && optval_ >= 0) {
     258           3 :         opt_sync.lock ();
     259           3 :         ipv6 = (optval_ != 0);
     260           3 :         opt_sync.unlock ();
     261             :     }
     262             :     else
     263           3 :     if (option_ == ZMQ_THREAD_PRIORITY && optval_ >= 0) {
     264           0 :         opt_sync.lock();
     265           0 :         thread_priority = optval_;
     266           0 :         opt_sync.unlock ();
     267             :     }
     268             :     else
     269           3 :     if (option_ == ZMQ_THREAD_SCHED_POLICY && optval_ >= 0) {
     270           0 :         opt_sync.lock();
     271           0 :         thread_sched_policy = optval_;
     272           0 :         opt_sync.unlock ();
     273             :     }
     274             :     else
     275           3 :     if (option_ == ZMQ_BLOCKY && optval_ >= 0) {
     276           3 :         opt_sync.lock ();
     277           3 :         blocky = (optval_ != 0);
     278           3 :         opt_sync.unlock ();
     279             :     }
     280             :     else
     281           0 :     if (option_ == ZMQ_MAX_MSGSZ && optval_ >= 0) {
     282           0 :         opt_sync.lock ();
     283           0 :         max_msgsz = optval_ < INT_MAX? optval_: INT_MAX;
     284           0 :         opt_sync.unlock ();
     285             :     }
     286             :     else {
     287           0 :         errno = EINVAL;
     288           0 :         rc = -1;
     289             :     }
     290          42 :     return rc;
     291             : }
     292             : 
     293       22412 : int zmq::ctx_t::get (int option_)
     294             : {
     295       22412 :     int rc = 0;
     296       22412 :     if (option_ == ZMQ_MAX_SOCKETS)
     297           3 :         rc = max_sockets;
     298             :     else
     299       22409 :     if (option_ == ZMQ_SOCKET_LIMIT)
     300           3 :         rc = clipped_maxsocket (65535);
     301             :     else
     302       22406 :     if (option_ == ZMQ_IO_THREADS)
     303           3 :         rc = io_thread_count;
     304             :     else
     305       22403 :     if (option_ == ZMQ_IPV6)
     306       11203 :         rc = ipv6;
     307             :     else
     308       11200 :     if (option_ == ZMQ_BLOCKY)
     309       11200 :         rc = blocky;
     310             :     else
     311           0 :     if (option_ == ZMQ_MAX_MSGSZ)
     312           0 :         rc = max_msgsz;
     313             :     else {
     314           0 :         errno = EINVAL;
     315           0 :         rc = -1;
     316             :     }
     317       22412 :     return rc;
     318             : }
     319             : 
     320       11197 : zmq::socket_base_t *zmq::ctx_t::create_socket (int type_)
     321             : {
     322       11197 :     slot_sync.lock ();
     323       11197 :     if (unlikely (starting)) {
     324             : 
     325         423 :         starting = false;
     326             :         //  Initialise the array of mailboxes. Additional three slots are for
     327             :         //  zmq_ctx_term thread and reaper thread.
     328         423 :         opt_sync.lock ();
     329         423 :         int mazmq = max_sockets;
     330         423 :         int ios = io_thread_count;
     331         423 :         opt_sync.unlock ();
     332         423 :         slot_count = mazmq + ios + 2;
     333         423 :         slots = (i_mailbox **) malloc (sizeof (i_mailbox*) * slot_count);
     334         423 :         alloc_assert (slots);
     335             : 
     336             :         //  Initialise the infrastructure for zmq_ctx_term thread.
     337         423 :         slots [term_tid] = &term_mailbox;
     338             : 
     339             :         //  Create the reaper thread.
     340         423 :         reaper = new (std::nothrow) reaper_t (this, reaper_tid);
     341         423 :         alloc_assert (reaper);
     342         423 :         slots [reaper_tid] = reaper->get_mailbox ();
     343         423 :         reaper->start ();
     344             : 
     345             :         //  Create I/O thread objects and launch them.
     346         603 :         for (int i = 2; i != ios + 2; i++) {
     347         603 :             io_thread_t *io_thread = new (std::nothrow) io_thread_t (this, i);
     348         603 :             alloc_assert (io_thread);
     349         603 :             io_threads.push_back (io_thread);
     350         603 :             slots [i] = io_thread->get_mailbox ();
     351         603 :             io_thread->start ();
     352             :         }
     353             : 
     354             :         //  In the unused part of the slot array, create a list of empty slots.
     355     1646598 :         for (int32_t i = (int32_t) slot_count - 1;
     356      823299 :               i >= (int32_t) ios + 2; i--) {
     357     1645752 :             empty_slots.push_back (i);
     358      822876 :             slots [i] = NULL;
     359             :         }
     360             :     }
     361             : 
     362             :     //  Once zmq_ctx_term() was called, we can't create new sockets.
     363       11197 :     if (terminating) {
     364           0 :         slot_sync.unlock ();
     365           0 :         errno = ETERM;
     366           0 :         return NULL;
     367             :     }
     368             : 
     369             :     //  If max_sockets limit was reached, return error.
     370       22394 :     if (empty_slots.empty ()) {
     371           0 :         slot_sync.unlock ();
     372           0 :         errno = EMFILE;
     373           0 :         return NULL;
     374             :     }
     375             : 
     376             :     //  Choose a slot for the socket.
     377       22394 :     uint32_t slot = empty_slots.back ();
     378       11197 :     empty_slots.pop_back ();
     379             : 
     380             :     //  Generate new unique socket ID.
     381       11197 :     int sid = ((int) max_socket_id.add (1)) + 1;
     382             : 
     383             :     //  Create the socket and register its mailbox.
     384       11197 :     socket_base_t *s = socket_base_t::create (type_, this, slot, sid);
     385       11197 :     if (!s) {
     386          66 :         empty_slots.push_back (slot);
     387          66 :         slot_sync.unlock ();
     388             :         return NULL;
     389             :     }
     390       11131 :     sockets.push_back (s);
     391       11131 :     slots [slot] = s->get_mailbox ();
     392             : 
     393       11131 :     slot_sync.unlock ();
     394             :     return s;
     395             : }
     396             : 
     397       11131 : void zmq::ctx_t::destroy_socket (class socket_base_t *socket_)
     398             : {
     399       11131 :     slot_sync.lock ();
     400             : 
     401             :     //  Free the associated thread slot.
     402       11131 :     uint32_t tid = socket_->get_tid ();
     403       11131 :     empty_slots.push_back (tid);
     404       11131 :     slots [tid] = NULL;
     405             : 
     406             :     //  Remove the socket from the list of sockets.
     407       11131 :     sockets.erase (socket_);
     408             : 
     409             :     //  If zmq_ctx_term() was already called and there are no more socket
     410             :     //  we can ask reaper thread to terminate.
     411       19381 :     if (terminating && sockets.empty ())
     412         407 :         reaper->stop ();
     413             : 
     414       11131 :     slot_sync.unlock ();
     415       11131 : }
     416             : 
     417       22240 : zmq::object_t *zmq::ctx_t::get_reaper ()
     418             : {
     419       22240 :     return reaper;
     420             : }
     421             : 
     422        1026 : void zmq::ctx_t::start_thread (thread_t &thread_, thread_fn *tfn_, void *arg_) const
     423             : {
     424        1026 :     thread_.start(tfn_, arg_);
     425        1026 :     thread_.setSchedulingParameters(thread_priority, thread_sched_policy);
     426        1026 : }
     427             : 
     428      135236 : void zmq::ctx_t::send_command (uint32_t tid_, const command_t &command_)
     429             : {
     430      135236 :     slots [tid_]->send (command_);
     431      135470 : }
     432             : 
     433       11559 : zmq::io_thread_t *zmq::ctx_t::choose_io_thread (uint64_t affinity_)
     434             : {
     435      196248 :     if (io_threads.empty ())
     436             :         return NULL;
     437             : 
     438             :     //  Find the I/O thread with minimum load.
     439             :     int min_load = -1;
     440             :     io_thread_t *selected_io_thread = NULL;
     441      294865 :     for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) {
     442       65045 :         if (!affinity_ || (affinity_ & (uint64_t (1) << i))) {
     443      130090 :             int load = io_threads [i]->get_load ();
     444       65047 :             if (selected_io_thread == NULL || load < min_load) {
     445       19918 :                 min_load = load;
     446       59754 :                 selected_io_thread = io_threads [i];
     447             :             }
     448             :         }
     449             :     }
     450             :     return selected_io_thread;
     451             : }
     452             : 
     453         358 : int zmq::ctx_t::register_endpoint (const char *addr_,
     454             :         const endpoint_t &endpoint_)
     455             : {
     456         358 :     endpoints_sync.lock ();
     457             : 
     458             :     const bool inserted = endpoints.insert (
     459        1074 :         endpoints_t::value_type (std::string (addr_), endpoint_)).second;
     460             : 
     461         358 :     endpoints_sync.unlock ();
     462             : 
     463         358 :     if (!inserted) {
     464           0 :         errno = EADDRINUSE;
     465           0 :         return -1;
     466             :     }
     467             :     return 0;
     468             : }
     469             : 
     470           9 : int zmq::ctx_t::unregister_endpoint (
     471             :         const std::string &addr_, socket_base_t *socket_)
     472             : {
     473           9 :     endpoints_sync.lock ();
     474             : 
     475          18 :     const endpoints_t::iterator it = endpoints.find (addr_);
     476          27 :     if (it == endpoints.end () || it->second.socket != socket_) {
     477           3 :         endpoints_sync.unlock ();
     478           3 :         errno = ENOENT;
     479           3 :         return -1;
     480             :     }
     481             : 
     482             :     //  Remove endpoint.
     483           6 :     endpoints.erase (it);
     484             : 
     485           6 :     endpoints_sync.unlock ();
     486             : 
     487             :     return 0;
     488             : }
     489             : 
     490       11131 : void zmq::ctx_t::unregister_endpoints (socket_base_t *socket_)
     491             : {
     492       11131 :     endpoints_sync.lock ();
     493             : 
     494       22262 :     endpoints_t::iterator it = endpoints.begin ();
     495       42085 :     while (it != endpoints.end ()) {
     496        4346 :         if (it->second.socket == socket_) {
     497         352 :             endpoints_t::iterator to_erase = it;
     498             :             ++it;
     499         352 :             endpoints.erase (to_erase);
     500             :             continue;
     501             :         }
     502             :         ++it;
     503             :     }
     504             : 
     505       11131 :     endpoints_sync.unlock ();
     506       11131 : }
     507             : 
     508         591 : zmq::endpoint_t zmq::ctx_t::find_endpoint (const char *addr_)
     509             : {
     510         591 :      endpoints_sync.lock ();
     511             : 
     512        1773 :      endpoints_t::iterator it = endpoints.find (addr_);
     513        1182 :      if (it == endpoints.end ()) {
     514         279 :          endpoints_sync.unlock ();
     515         279 :          errno = ECONNREFUSED;
     516         279 :          endpoint_t empty = {NULL, options_t()};
     517             :          return empty;
     518             :      }
     519         312 :      endpoint_t endpoint = it->second;
     520             : 
     521             :      //  Increment the command sequence number of the peer so that it won't
     522             :      //  get deallocated until "bind" command is issued by the caller.
     523             :      //  The subsequent 'bind' has to be called with inc_seqnum parameter
     524             :      //  set to false, so that the seqnum isn't incremented twice.
     525         312 :      endpoint.socket->inc_seqnum ();
     526             : 
     527         312 :      endpoints_sync.unlock ();
     528             :      return endpoint;
     529             : }
     530             : 
     531         276 : void zmq::ctx_t::pend_connection (const std::string &addr_,
     532             :         const endpoint_t &endpoint_, pipe_t **pipes_)
     533             : {
     534             :     const pending_connection_t pending_connection =
     535         276 :         {endpoint_, pipes_ [0], pipes_ [1]};
     536             : 
     537         276 :     endpoints_sync.lock ();
     538             : 
     539         552 :     endpoints_t::iterator it = endpoints.find (addr_);
     540         552 :     if (it == endpoints.end ()) {
     541             :         // Still no bind.
     542         256 :         endpoint_.socket->inc_seqnum ();
     543         512 :         pending_connections.insert (pending_connections_t::value_type (addr_, pending_connection));
     544             :     }
     545             :     else
     546             :         // Bind has happened in the mean time, connect directly
     547          40 :         connect_inproc_sockets (it->second.socket, it->second.options, pending_connection, connect_side);
     548             : 
     549         276 :     endpoints_sync.unlock ();
     550         276 : }
     551             : 
     552         358 : void zmq::ctx_t::connect_pending (const char *addr_, zmq::socket_base_t *bind_socket_)
     553             : {
     554         358 :     endpoints_sync.lock ();
     555             : 
     556        1074 :     std::pair<pending_connections_t::iterator, pending_connections_t::iterator> pending = pending_connections.equal_range(addr_);
     557             : 
     558         972 :     for (pending_connections_t::iterator p = pending.first; p != pending.second; ++p)
     559         512 :         connect_inproc_sockets(bind_socket_, endpoints[addr_].options, p->second, bind_side);
     560             : 
     561         358 :     pending_connections.erase(pending.first, pending.second);
     562         358 :     endpoints_sync.unlock ();
     563         358 : }
     564             : 
     565         276 : void zmq::ctx_t::connect_inproc_sockets (zmq::socket_base_t *bind_socket_,
     566             :     options_t& bind_options, const pending_connection_t &pending_connection_, side side_)
     567             : {
     568         276 :     bind_socket_->inc_seqnum();
     569         276 :     pending_connection_.bind_pipe->set_tid (bind_socket_->get_tid ());
     570             : 
     571         276 :     if (!bind_options.recv_identity) {
     572             :         msg_t msg;
     573         273 :         const bool ok = pending_connection_.bind_pipe->read (&msg);
     574         273 :         zmq_assert (ok);
     575         273 :         const int rc = msg.close ();
     576         273 :         errno_assert (rc == 0);
     577             :     }
     578             : 
     579         276 :     bool conflate = pending_connection_.endpoint.options.conflate &&
     580           0 :             (pending_connection_.endpoint.options.type == ZMQ_DEALER ||
     581           0 :              pending_connection_.endpoint.options.type == ZMQ_PULL ||
     582           0 :              pending_connection_.endpoint.options.type == ZMQ_PUSH ||
     583           0 :              pending_connection_.endpoint.options.type == ZMQ_PUB ||
     584         276 :              pending_connection_.endpoint.options.type == ZMQ_SUB);
     585             : 
     586         276 :     if (!conflate) {
     587         276 :         pending_connection_.connect_pipe->set_hwms_boost(bind_options.sndhwm, bind_options.rcvhwm);
     588         276 :         pending_connection_.bind_pipe->set_hwms_boost(pending_connection_.endpoint.options.sndhwm, pending_connection_.endpoint.options.rcvhwm);
     589             : 
     590         276 :         pending_connection_.connect_pipe->set_hwms(pending_connection_.endpoint.options.rcvhwm, pending_connection_.endpoint.options.sndhwm);
     591         276 :         pending_connection_.bind_pipe->set_hwms(bind_options.rcvhwm, bind_options.sndhwm);
     592             :     }
     593             :     else {
     594           0 :         pending_connection_.connect_pipe->set_hwms(-1, -1);
     595           0 :         pending_connection_.bind_pipe->set_hwms(-1, -1);
     596             :     }
     597             : 
     598         276 :     if (side_ == bind_side) {
     599             :         command_t cmd;
     600         256 :         cmd.type = command_t::bind;
     601         256 :         cmd.args.bind.pipe = pending_connection_.bind_pipe;
     602         256 :         bind_socket_->process_command (cmd);
     603         256 :         bind_socket_->send_inproc_connected (pending_connection_.endpoint.socket);
     604             :     }
     605             :     else
     606          20 :         pending_connection_.connect_pipe->send_bind (bind_socket_, pending_connection_.bind_pipe, false);
     607             : 
     608         276 :     if (pending_connection_.endpoint.options.recv_identity) {
     609             :         msg_t id;
     610           0 :         int rc = id.init_size (bind_options.identity_size);
     611           0 :         errno_assert (rc == 0);
     612           0 :         memcpy (id.data (), bind_options.identity, bind_options.identity_size);
     613           0 :         id.set_flags (msg_t::identity);
     614           0 :         bool written = pending_connection_.bind_pipe->write (&id);
     615           0 :         zmq_assert (written);
     616           0 :         pending_connection_.bind_pipe->flush ();
     617             :     }
     618         276 : }
     619             : 
     620             : #ifdef ZMQ_HAVE_VMCI
     621             : 
     622             : int zmq::ctx_t::get_vmci_socket_family ()
     623             : {
     624             :     vmci_sync.lock ();
     625             : 
     626             :     if (vmci_fd == -1)  {
     627             :         vmci_family = VMCISock_GetAFValueFd (&vmci_fd);
     628             : 
     629             :         if (vmci_fd != -1) {
     630             : #ifdef FD_CLOEXEC
     631             :             int rc = fcntl (vmci_fd, F_SETFD, FD_CLOEXEC);
     632             :             errno_assert (rc != -1);
     633             : #endif
     634             :         }
     635             :     }
     636             : 
     637             :     vmci_sync.unlock ();
     638             : 
     639             :     return vmci_family;
     640             : }
     641             : 
     642             : #endif
     643             : 
     644             : //  The last used socket ID, or 0 if no socket was used so far. Note that this
     645             : //  is a global variable. Thus, even sockets created in different contexts have
     646             : //  unique IDs.
     647         729 : zmq::atomic_counter_t zmq::ctx_t::max_socket_id;

Generated by: LCOV version 1.10