LCOV - code coverage report
Current view: top level - home/h/core/forks/m4-libzmq/src - router.cpp (source / functions) Hit Total Coverage
Test: zeromq-4.2.0 Code Coverage Lines: 200 235 85.1 %
Date: 2016-05-09 Functions: 13 15 86.7 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*
       2             :     Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
       3             : 
       4             :     This file is part of libzmq, the ZeroMQ core engine in C++.
       5             : 
       6             :     libzmq is free software; you can redistribute it and/or modify it under
       7             :     the terms of the GNU Lesser General Public License (LGPL) as published
       8             :     by the Free Software Foundation; either version 3 of the License, or
       9             :     (at your option) any later version.
      10             : 
      11             :     As a special exception, the Contributors give you permission to link
      12             :     this library with independent modules to produce an executable,
      13             :     regardless of the license terms of these independent modules, and to
      14             :     copy and distribute the resulting executable under terms of your choice,
      15             :     provided that you also meet, for each linked independent module, the
      16             :     terms and conditions of the license of that module. An independent
      17             :     module is a module which is not derived from or based on this library.
      18             :     If you modify this library, you must extend this exception to your
      19             :     version of the library.
      20             : 
      21             :     libzmq is distributed in the hope that it will be useful, but WITHOUT
      22             :     ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
      23             :     FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
      24             :     License for more details.
      25             : 
      26             :     You should have received a copy of the GNU Lesser General Public License
      27             :     along with this program.  If not, see <http://www.gnu.org/licenses/>.
      28             : */
      29             : 
      30             : #include "precompiled.hpp"
      31             : #include "macros.hpp"
      32             : #include "router.hpp"
      33             : #include "pipe.hpp"
      34             : #include "wire.hpp"
      35             : #include "random.hpp"
      36             : #include "likely.hpp"
      37             : #include "err.hpp"
      38             : 
      39         237 : zmq::router_t::router_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
      40             :     socket_base_t (parent_, tid_, sid_),
      41             :     prefetched (false),
      42             :     identity_sent (false),
      43             :     current_in (NULL),
      44             :     terminate_current_in (false),
      45             :     more_in (false),
      46             :     current_out (NULL),
      47             :     more_out (false),
      48         237 :     next_rid (generate_random ()),
      49             :     mandatory (false),
      50             :     //  raw_socket functionality in ROUTER is deprecated
      51             :     raw_socket (false),
      52             :     probe_router (false),
      53         948 :     handover (false)
      54             : {
      55         237 :     options.type = ZMQ_ROUTER;
      56         237 :     options.recv_identity = true;
      57         237 :     options.raw_socket = false;
      58             : 
      59         237 :     prefetched_id.init ();
      60         237 :     prefetched_msg.init ();
      61         237 : }
      62             : 
      63        1035 : zmq::router_t::~router_t ()
      64             : {
      65         474 :     zmq_assert (anonymous_pipes.empty ());;
      66         474 :     zmq_assert (outpipes.empty ());
      67         237 :     prefetched_id.close ();
      68         237 :     prefetched_msg.close ();
      69         324 : }
      70             : 
      71         591 : void zmq::router_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
      72             : {
      73             :     LIBZMQ_UNUSED (subscribe_to_all_);
      74             : 
      75         591 :     zmq_assert (pipe_);
      76             : 
      77         591 :     if (probe_router) {
      78             :         msg_t probe_msg_;
      79           3 :         int rc = probe_msg_.init ();
      80           3 :         errno_assert (rc == 0);
      81             : 
      82           3 :         rc = pipe_->write (&probe_msg_);
      83             :         // zmq_assert (rc) is not applicable here, since it is not a bug.
      84           3 :         pipe_->flush ();
      85             : 
      86           3 :         rc = probe_msg_.close ();
      87           3 :         errno_assert (rc == 0);
      88             :     }
      89             : 
      90         591 :     bool identity_ok = identify_peer (pipe_);
      91         591 :     if (identity_ok)
      92         457 :         fq.attach (pipe_);
      93             :     else
      94         134 :         anonymous_pipes.insert (pipe_);
      95         591 : }
      96             : 
      97         300 : int zmq::router_t::xsetsockopt (int option_, const void *optval_,
      98             :     size_t optvallen_)
      99             : {
     100         300 :     bool is_int = (optvallen_ == sizeof (int));
     101         300 :     int value = 0;
     102         300 :     if (is_int) memcpy(&value, optval_, sizeof (int));
     103             : 
     104         300 :     switch (option_) {
     105             :         case ZMQ_CONNECT_RID:
     106           6 :             if (optval_ && optvallen_) {
     107           6 :                 connect_rid.assign ((char *) optval_, optvallen_);
     108             :                 return 0;
     109             :             }
     110             :             break;
     111             : 
     112             :         case ZMQ_ROUTER_RAW:
     113           0 :             if (is_int && value >= 0) {
     114           0 :                 raw_socket = (value != 0);
     115           0 :                 if (raw_socket) {
     116           0 :                     options.recv_identity = false;
     117           0 :                     options.raw_socket = true;
     118             :                 }
     119             :                 return 0;
     120             :             }
     121             :             break;
     122             : 
     123             :         case ZMQ_ROUTER_MANDATORY:
     124           9 :             if (is_int && value >= 0) {
     125           9 :                 mandatory = (value != 0);
     126           9 :                 return 0;
     127             :             }
     128             :             break;
     129             : 
     130             :         case ZMQ_PROBE_ROUTER:
     131           3 :             if (is_int && value >= 0) {
     132           3 :                 probe_router = (value != 0);
     133           3 :                 return 0;
     134             :             }
     135             :             break;
     136             : 
     137             :         case ZMQ_ROUTER_HANDOVER:
     138           3 :             if (is_int && value >= 0) {
     139           3 :                 handover = (value != 0);
     140           3 :                 return 0;
     141             :             }
     142             :             break;
     143             : 
     144             :         default:
     145             :             break;
     146             :     }
     147         279 :     errno = EINVAL;
     148         279 :     return -1;
     149             : }
     150             : 
     151             : 
     152         591 : void zmq::router_t::xpipe_terminated (pipe_t *pipe_)
     153             : {
     154        1182 :     std::set <pipe_t*>::iterator it = anonymous_pipes.find (pipe_);
     155        1182 :     if (it != anonymous_pipes.end ())
     156           0 :         anonymous_pipes.erase (it);
     157             :     else {
     158        1773 :         outpipes_t::iterator iter = outpipes.find (pipe_->get_identity ());
     159        1182 :         zmq_assert (iter != outpipes.end ());
     160         591 :         outpipes.erase (iter);
     161         591 :         fq.pipe_terminated (pipe_);
     162         591 :         if (pipe_ == current_out)
     163          36 :             current_out = NULL;
     164             :     }
     165         591 : }
     166             : 
     167         289 : void zmq::router_t::xread_activated (pipe_t *pipe_)
     168             : {
     169         578 :     std::set <pipe_t*>::iterator it = anonymous_pipes.find (pipe_);
     170         578 :     if (it == anonymous_pipes.end ())
     171         155 :         fq.activated (pipe_);
     172             :     else {
     173         134 :         bool identity_ok = identify_peer (pipe_);
     174         134 :         if (identity_ok) {
     175         134 :             anonymous_pipes.erase (it);
     176         134 :             fq.attach (pipe_);
     177             :         }
     178             :     }
     179         289 : }
     180             : 
     181           3 : void zmq::router_t::xwrite_activated (pipe_t *pipe_)
     182             : {
     183             :     outpipes_t::iterator it;
     184          12 :     for (it = outpipes.begin (); it != outpipes.end (); ++it)
     185           3 :         if (it->second.pipe == pipe_)
     186             :             break;
     187             : 
     188           6 :     zmq_assert (it != outpipes.end ());
     189           3 :     zmq_assert (!it->second.active);
     190           3 :     it->second.active = true;
     191           3 : }
     192             : 
     193        1489 : int zmq::router_t::xsend (msg_t *msg_)
     194             : {
     195             :     //  If this is the first part of the message it's the ID of the
     196             :     //  peer to send the message to.
     197        1489 :     if (!more_out) {
     198         571 :         zmq_assert (!current_out);
     199             : 
     200             :         //  If we have malformed message (prefix with no subsequent message)
     201             :         //  then just silently ignore it.
     202             :         //  TODO: The connections should be killed instead.
     203         571 :         if (msg_->flags () & msg_t::more) {
     204             : 
     205         568 :             more_out = true;
     206             : 
     207             :             //  Find the pipe associated with the identity stored in the prefix.
     208             :             //  If there's no such pipe just silently ignore the message, unless
     209             :             //  router_mandatory is set.
     210        1136 :             blob_t identity ((unsigned char*) msg_->data (), msg_->size ());
     211        1136 :             outpipes_t::iterator it = outpipes.find (identity);
     212             : 
     213        1136 :             if (it != outpipes.end ()) {
     214         561 :                 current_out = it->second.pipe;
     215         561 :                 if (!current_out->check_write ()) {
     216           6 :                     it->second.active = false;
     217           6 :                     current_out = NULL;
     218           6 :                     if (mandatory) {
     219           6 :                         more_out = false;
     220           6 :                         errno = EAGAIN;
     221           6 :                         return -1;
     222             :                     }
     223             :                 }
     224             :             }
     225             :             else
     226           7 :             if (mandatory) {
     227           4 :                 more_out = false;
     228           4 :                 errno = EHOSTUNREACH;
     229           4 :                 return -1;
     230             :             }
     231             :         }
     232             : 
     233         561 :         int rc = msg_->close ();
     234         561 :         errno_assert (rc == 0);
     235         561 :         rc = msg_->init ();
     236         561 :         errno_assert (rc == 0);
     237             :         return 0;
     238             :     }
     239             : 
     240             :     //  Ignore the MORE flag for raw-sock or assert?
     241         918 :     if (options.raw_socket)
     242           0 :         msg_->reset_flags (msg_t::more);
     243             : 
     244             :     //  Check whether this is the last part of the message.
     245         918 :     more_out = msg_->flags () & msg_t::more ? true : false;
     246             : 
     247             :     //  Push the message into the pipe. If there's no out pipe, just drop it.
     248         918 :     if (current_out) {
     249             : 
     250             :         // Close the remote connection if user has asked to do so
     251             :         // by sending zero length message.
     252             :         // Pending messages in the pipe will be dropped (on receiving term- ack)
     253         915 :         if (raw_socket && msg_->size() == 0) {
     254           0 :             current_out->terminate (false);
     255           0 :             int rc = msg_->close ();
     256           0 :             errno_assert (rc == 0);
     257           0 :             rc = msg_->init ();
     258           0 :             errno_assert (rc == 0);
     259           0 :             current_out = NULL;
     260           0 :             return 0;
     261             :         }
     262             : 
     263         915 :         bool ok = current_out->write (msg_);
     264         915 :         if (unlikely (!ok)) {
     265             :             // Message failed to send - we must close it ourselves.
     266           0 :             int rc = msg_->close ();
     267           0 :             errno_assert (rc == 0);
     268           0 :             current_out = NULL;
     269             :         } else {
     270         915 :           if (!more_out) {
     271         519 :               current_out->flush ();
     272         519 :               current_out = NULL;
     273             :           }
     274             :         }
     275             :     }
     276             :     else {
     277           3 :         int rc = msg_->close ();
     278           3 :         errno_assert (rc == 0);
     279             :     }
     280             : 
     281             :     //  Detach the message from the data buffer.
     282         918 :     int rc = msg_->init ();
     283         918 :     errno_assert (rc == 0);
     284             : 
     285             :     return 0;
     286             : }
     287             : 
     288        1483 : int zmq::router_t::xrecv (msg_t *msg_)
     289             : {
     290        1483 :     if (prefetched) {
     291         312 :         if (!identity_sent) {
     292          24 :             int rc = msg_->move (prefetched_id);
     293          24 :             errno_assert (rc == 0);
     294          24 :             identity_sent = true;
     295             :         }
     296             :         else {
     297         288 :             int rc = msg_->move (prefetched_msg);
     298         288 :             errno_assert (rc == 0);
     299         288 :             prefetched = false;
     300             :         }
     301         312 :         more_in = msg_->flags () & msg_t::more ? true : false;
     302             : 
     303         312 :         if (!more_in) {
     304          87 :             if (terminate_current_in) {
     305           0 :                 current_in->terminate (true);
     306           0 :                 terminate_current_in = false;
     307             :             }
     308          87 :             current_in = NULL;
     309             :         }
     310             :         return 0;
     311             :     }
     312             : 
     313        1171 :     pipe_t *pipe = NULL;
     314        1171 :     int rc = fq.recvpipe (msg_, &pipe);
     315             : 
     316             :     //  It's possible that we receive peer's identity. That happens
     317             :     //  after reconnection. The current implementation assumes that
     318             :     //  the peer always uses the same identity.
     319        1177 :     while (rc == 0 && msg_->is_identity ())
     320           6 :         rc = fq.recvpipe (msg_, &pipe);
     321             : 
     322        1171 :     if (rc != 0)
     323             :         return -1;
     324             : 
     325         699 :     zmq_assert (pipe != NULL);
     326             : 
     327             :     //  If we are in the middle of reading a message, just return the next part.
     328         699 :     if (more_in) {
     329         426 :         more_in = msg_->flags () & msg_t::more ? true : false;
     330             : 
     331         426 :         if (!more_in) {
     332         201 :             if (terminate_current_in) {
     333           0 :                 current_in->terminate (true);
     334           0 :                 terminate_current_in = false;
     335             :             }
     336         201 :             current_in = NULL;
     337             :         }
     338             :     }
     339             :     else {
     340             :         //  We are at the beginning of a message.
     341             :         //  Keep the message part we have in the prefetch buffer
     342             :         //  and return the ID of the peer instead.
     343         273 :         rc = prefetched_msg.move (*msg_);
     344         273 :         errno_assert (rc == 0);
     345         273 :         prefetched = true;
     346         273 :         current_in = pipe;
     347             : 
     348         273 :         blob_t identity = pipe->get_identity ();
     349         273 :         rc = msg_->init_size (identity.size ());
     350         273 :         errno_assert (rc == 0);
     351         273 :         memcpy (msg_->data (), identity.data (), identity.size ());
     352         273 :         msg_->set_flags (msg_t::more);
     353         273 :         if (prefetched_msg.metadata())
     354         174 :             msg_->set_metadata(prefetched_msg.metadata());
     355         273 :         identity_sent = true;
     356             :     }
     357             : 
     358             :     return 0;
     359             : }
     360             : 
     361           0 : int zmq::router_t::rollback (void)
     362             : {
     363           0 :     if (current_out) {
     364           0 :         current_out->rollback ();
     365           0 :         current_out = NULL;
     366           0 :         more_out = false;
     367             :     }
     368           0 :     return 0;
     369             : }
     370             : 
     371         232 : bool zmq::router_t::xhas_in ()
     372             : {
     373             :     //  If we are in the middle of reading the messages, there are
     374             :     //  definitely more parts available.
     375         232 :     if (more_in)
     376             :         return true;
     377             : 
     378             :     //  We may already have a message pre-fetched.
     379         232 :     if (prefetched)
     380             :         return true;
     381             : 
     382             :     //  Try to read the next message.
     383             :     //  The message, if read, is kept in the pre-fetch buffer.
     384         208 :     pipe_t *pipe = NULL;
     385         208 :     int rc = fq.recvpipe (&prefetched_msg, &pipe);
     386             : 
     387             :     //  It's possible that we receive peer's identity. That happens
     388             :     //  after reconnection. The current implementation assumes that
     389             :     //  the peer always uses the same identity.
     390             :     //  TODO: handle the situation when the peer changes its identity.
     391         208 :     while (rc == 0 && prefetched_msg.is_identity ())
     392           0 :         rc = fq.recvpipe (&prefetched_msg, &pipe);
     393             : 
     394         208 :     if (rc != 0)
     395             :         return false;
     396             : 
     397          24 :     zmq_assert (pipe != NULL);
     398             : 
     399          24 :     blob_t identity = pipe->get_identity ();
     400          24 :     rc = prefetched_id.init_size (identity.size ());
     401          24 :     errno_assert (rc == 0);
     402          24 :     memcpy (prefetched_id.data (), identity.data (), identity.size ());
     403          24 :     prefetched_id.set_flags (msg_t::more);
     404             : 
     405          24 :     prefetched = true;
     406          24 :     identity_sent = false;
     407          24 :     current_in = pipe;
     408             : 
     409          24 :     return true;
     410             : }
     411             : 
     412         178 : bool zmq::router_t::xhas_out ()
     413             : {
     414             :     //  In theory, ROUTER socket is always ready for writing. Whether actual
     415             :     //  attempt to write succeeds depends on which pipe the message is going
     416             :     //  to be routed to.
     417         178 :     return true;
     418             : }
     419             : 
     420           0 : zmq::blob_t zmq::router_t::get_credential () const
     421             : {
     422           0 :     return fq.get_credential ();
     423             : }
     424             : 
     425         725 : bool zmq::router_t::identify_peer (pipe_t *pipe_)
     426             : {
     427             :     msg_t msg;
     428             :     blob_t identity;
     429             :     bool ok;
     430             : 
     431        1450 :     if (connect_rid.length()) {
     432          12 :         identity = blob_t ((unsigned char*) connect_rid.c_str (),
     433             :             connect_rid.length());
     434           6 :         connect_rid.clear ();
     435          12 :         outpipes_t::iterator it = outpipes.find (identity);
     436          12 :         if (it != outpipes.end ())
     437           0 :             zmq_assert(false); //  Not allowed to duplicate an existing rid
     438             :     }
     439             :     else
     440         719 :     if (options.raw_socket) { //  Always assign identity for raw-socket
     441             :         unsigned char buf [5];
     442           0 :         buf [0] = 0;
     443           0 :         put_uint32 (buf + 1, next_rid++);
     444           0 :         identity = blob_t (buf, sizeof buf);
     445             :     }
     446             :     else
     447         719 :     if (!options.raw_socket) {
     448             :         //  Pick up handshake cases and also case where next identity is set
     449         719 :         msg.init ();
     450         719 :         ok = pipe_->read (&msg);
     451         719 :         if (!ok)
     452             :             return false;
     453             : 
     454         585 :         if (msg.size () == 0) {
     455             :             //  Fall back on the auto-generation
     456             :             unsigned char buf [5];
     457         228 :             buf [0] = 0;
     458         228 :             put_uint32 (buf + 1, next_rid++);
     459         228 :             identity = blob_t (buf, sizeof buf);
     460         228 :             msg.close ();
     461             :         }
     462             :         else {
     463         714 :             identity = blob_t ((unsigned char*) msg.data (), msg.size ());
     464         714 :             outpipes_t::iterator it = outpipes.find (identity);
     465         357 :             msg.close ();
     466             : 
     467         714 :             if (it != outpipes.end ()) {
     468           3 :                 if (!handover)
     469             :                     //  Ignore peers with duplicate ID
     470             :                     return false;
     471             :                 else {
     472             :                     //  We will allow the new connection to take over this
     473             :                     //  identity. Temporarily assign a new identity to the
     474             :                     //  existing pipe so we can terminate it asynchronously.
     475             :                     unsigned char buf [5];
     476           3 :                     buf [0] = 0;
     477           3 :                     put_uint32 (buf + 1, next_rid++);
     478           3 :                     blob_t new_identity = blob_t (buf, sizeof buf);
     479             : 
     480           3 :                     it->second.pipe->set_identity (new_identity);
     481             :                     outpipe_t existing_outpipe =
     482           6 :                         {it->second.pipe, it->second.active};
     483             : 
     484             :                     ok = outpipes.insert (outpipes_t::value_type (
     485           9 :                         new_identity, existing_outpipe)).second;
     486           3 :                     zmq_assert (ok);
     487             : 
     488             :                     //  Remove the existing identity entry to allow the new
     489             :                     //  connection to take the identity.
     490           3 :                     outpipes.erase (it);
     491             : 
     492           3 :                     if (existing_outpipe.pipe == current_in)
     493           0 :                         terminate_current_in = true;
     494             :                     else
     495           3 :                         existing_outpipe.pipe->terminate (true);
     496             :                 }
     497             :             }
     498             :         }
     499             :     }
     500             : 
     501         591 :     pipe_->set_identity (identity);
     502             :     //  Add the record into output pipes lookup table
     503         591 :     outpipe_t outpipe = {pipe_, true};
     504        1773 :     ok = outpipes.insert (outpipes_t::value_type (identity, outpipe)).second;
     505         591 :     zmq_assert (ok);
     506             : 
     507             :     return true;
     508             : }

Generated by: LCOV version 1.10