LCOV - code coverage report
Current view: top level - home/h/core/forks/m4-libzmq/src - server.cpp (source / functions) Hit Total Coverage
Test: zeromq-4.2.0 Code Coverage Lines: 53 75 70.7 %
Date: 2016-05-09 Functions: 10 12 83.3 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*
       2             :     Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
       3             : 
       4             :     This file is part of libzmq, the ZeroMQ core engine in C++.
       5             : 
       6             :     libzmq is free software; you can redistribute it and/or modify it under
       7             :     the terms of the GNU Lesser General Public License (LGPL) as published
       8             :     by the Free Software Foundation; either version 3 of the License, or
       9             :     (at your option) any later version.
      10             : 
      11             :     As a special exception, the Contributors give you permission to link
      12             :     this library with independent modules to produce an executable,
      13             :     regardless of the license terms of these independent modules, and to
      14             :     copy and distribute the resulting executable under terms of your choice,
      15             :     provided that you also meet, for each linked independent module, the
      16             :     terms and conditions of the license of that module. An independent
      17             :     module is a module which is not derived from or based on this library.
      18             :     If you modify this library, you must extend this exception to your
      19             :     version of the library.
      20             : 
      21             :     libzmq is distributed in the hope that it will be useful, but WITHOUT
      22             :     ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
      23             :     FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
      24             :     License for more details.
      25             : 
      26             :     You should have received a copy of the GNU Lesser General Public License
      27             :     along with this program.  If not, see <http://www.gnu.org/licenses/>.
      28             : */
      29             : 
      30             : #include "precompiled.hpp"
      31             : #include "macros.hpp"
      32             : #include "server.hpp"
      33             : #include "pipe.hpp"
      34             : #include "wire.hpp"
      35             : #include "random.hpp"
      36             : #include "likely.hpp"
      37             : #include "err.hpp"
      38             : 
      39          15 : zmq::server_t::server_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
      40             :     socket_base_t (parent_, tid_, sid_, true),
      41          30 :     next_rid (generate_random ())
      42             : {
      43          15 :     options.type = ZMQ_SERVER;
      44          15 : }
      45             : 
      46          60 : zmq::server_t::~server_t ()
      47             : {
      48          30 :     zmq_assert (outpipes.empty ());
      49          30 : }
      50             : 
      51          15 : void zmq::server_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
      52             : {
      53             :     LIBZMQ_UNUSED (subscribe_to_all_);
      54             : 
      55          15 :     zmq_assert (pipe_);
      56             : 
      57          15 :     uint32_t routing_id = next_rid++;
      58          15 :     if (!routing_id)
      59           0 :         routing_id = next_rid++;        //  Never use RID zero
      60             : 
      61          15 :     pipe_->set_routing_id (routing_id);
      62             :     //  Add the record into output pipes lookup table
      63          15 :     outpipe_t outpipe = {pipe_, true};
      64          45 :     bool ok = outpipes.insert (outpipes_t::value_type (routing_id, outpipe)).second;
      65          15 :     zmq_assert (ok);
      66             : 
      67          15 :     fq.attach (pipe_);
      68          15 : }
      69             : 
      70          15 : void zmq::server_t::xpipe_terminated (pipe_t *pipe_)
      71             : {
      72          30 :     outpipes_t::iterator it = outpipes.find (pipe_->get_routing_id ());
      73          30 :     zmq_assert (it != outpipes.end ());
      74          15 :     outpipes.erase (it);
      75          15 :     fq.pipe_terminated (pipe_);
      76          15 : }
      77             : 
      78         129 : void zmq::server_t::xread_activated (pipe_t *pipe_)
      79             : {
      80         129 :     fq.activated (pipe_);
      81         129 : }
      82             : 
      83           0 : void zmq::server_t::xwrite_activated (pipe_t *pipe_)
      84             : {
      85             :     outpipes_t::iterator it;
      86           0 :     for (it = outpipes.begin (); it != outpipes.end (); ++it)
      87           0 :         if (it->second.pipe == pipe_)
      88             :             break;
      89             : 
      90           0 :     zmq_assert (it != outpipes.end ());
      91           0 :     zmq_assert (!it->second.active);
      92           0 :     it->second.active = true;
      93           0 : }
      94             : 
      95          18 : int zmq::server_t::xsend (msg_t *msg_)
      96             : {
      97             :     //  SERVER sockets do not allow multipart data (ZMQ_SNDMORE)
      98          18 :     if (msg_->flags () & msg_t::more) {
      99           9 :         errno = EINVAL;
     100           9 :         return -1;
     101             :     }
     102             :     //  Find the pipe associated with the routing stored in the message.
     103           9 :     uint32_t routing_id = msg_->get_routing_id ();
     104          18 :     outpipes_t::iterator it = outpipes.find (routing_id);
     105             : 
     106          18 :     if (it != outpipes.end ()) {
     107           9 :         if (!it->second.pipe->check_write ()) {
     108           0 :             it->second.active = false;
     109           0 :             errno = EAGAIN;
     110           0 :             return -1;
     111             :         }
     112             :     }
     113             :     else {
     114           0 :         errno = EHOSTUNREACH;
     115           0 :         return -1;
     116             :     }
     117             : 
     118             :     //  Message might be delivered over inproc, so we reset routing id
     119           9 :     int rc = msg_->reset_routing_id ();
     120           9 :     errno_assert (rc == 0);
     121             : 
     122           9 :     bool ok = it->second.pipe->write (msg_);
     123           9 :     if (unlikely (!ok)) {
     124             :         // Message failed to send - we must close it ourselves.
     125           0 :         rc = msg_->close ();
     126           0 :         errno_assert (rc == 0);
     127             :     }
     128             :     else
     129           9 :         it->second.pipe->flush ();
     130             : 
     131             :     //  Detach the message from the data buffer.
     132           9 :     rc = msg_->init ();
     133           9 :     errno_assert (rc == 0);
     134             : 
     135             :     return 0;
     136             : }
     137             : 
     138      600241 : int zmq::server_t::xrecv (msg_t *msg_)
     139             : {
     140      600241 :     pipe_t *pipe = NULL;
     141      600241 :     int rc = fq.recvpipe (msg_, &pipe);
     142             : 
     143             :     // Drop any messages with more flag
     144      600241 :     while (rc == 0 && msg_->flags () & msg_t::more) {
     145             : 
     146             :         // drop all frames of the current multi-frame message
     147           0 :         rc = fq.recvpipe (msg_, NULL);
     148             : 
     149           0 :         while (rc == 0 && msg_->flags () & msg_t::more)
     150           0 :             rc = fq.recvpipe (msg_, NULL);
     151             : 
     152             :         // get the new message
     153           0 :         if (rc == 0)
     154           0 :             rc = fq.recvpipe (msg_, &pipe);
     155             :     }
     156             : 
     157      600241 :     if (rc != 0)
     158             :         return rc;
     159             : 
     160      600018 :     zmq_assert (pipe != NULL);
     161             : 
     162      600018 :     uint32_t routing_id = pipe->get_routing_id ();
     163      600018 :     msg_->set_routing_id (routing_id);
     164             : 
     165             :     return 0;
     166             : }
     167             : 
     168          11 : bool zmq::server_t::xhas_in ()
     169             : {
     170          11 :     return fq.has_in ();
     171             : }
     172             : 
     173          11 : bool zmq::server_t::xhas_out ()
     174             : {
     175             :     //  In theory, SERVER socket is always ready for writing. Whether actual
     176             :     //  attempt to write succeeds depends on which pipe the message is going
     177             :     //  to be routed to.
     178          11 :     return true;
     179             : }
     180             : 
     181           0 : zmq::blob_t zmq::server_t::get_credential () const
     182             : {
     183           0 :     return fq.get_credential ();
     184             : }

Generated by: LCOV version 1.10