LCOV - code coverage report
Current view: top level - home/h/core/forks/m4-libzmq/src - fq.cpp (source / functions) Hit Total Coverage
Test: zeromq-4.2.0 Code Coverage Lines: 58 60 96.7 %
Date: 2016-05-09 Functions: 8 9 88.9 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*
       2             :     Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
       3             : 
       4             :     This file is part of libzmq, the ZeroMQ core engine in C++.
       5             : 
       6             :     libzmq is free software; you can redistribute it and/or modify it under
       7             :     the terms of the GNU Lesser General Public License (LGPL) as published
       8             :     by the Free Software Foundation; either version 3 of the License, or
       9             :     (at your option) any later version.
      10             : 
      11             :     As a special exception, the Contributors give you permission to link
      12             :     this library with independent modules to produce an executable,
      13             :     regardless of the license terms of these independent modules, and to
      14             :     copy and distribute the resulting executable under terms of your choice,
      15             :     provided that you also meet, for each linked independent module, the
      16             :     terms and conditions of the license of that module. An independent
      17             :     module is a module which is not derived from or based on this library.
      18             :     If you modify this library, you must extend this exception to your
      19             :     version of the library.
      20             : 
      21             :     libzmq is distributed in the hope that it will be useful, but WITHOUT
      22             :     ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
      23             :     FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
      24             :     License for more details.
      25             : 
      26             :     You should have received a copy of the GNU Lesser General Public License
      27             :     along with this program.  If not, see <http://www.gnu.org/licenses/>.
      28             : */
      29             : 
      30             : #include "precompiled.hpp"
      31             : #include "fq.hpp"
      32             : #include "pipe.hpp"
      33             : #include "err.hpp"
      34             : #include "msg.hpp"
      35             : 
      36        4398 : zmq::fq_t::fq_t () :
      37             :     active (0),
      38             :     last_in (NULL),
      39             :     current (0),
      40        8796 :     more (false)
      41             : {
      42        4398 : }
      43             : 
      44       13194 : zmq::fq_t::~fq_t ()
      45             : {
      46        8796 :     zmq_assert (pipes.empty ());
      47        4398 : }
      48             : 
      49        5001 : void zmq::fq_t::attach (pipe_t *pipe_)
      50             : {
      51        5001 :     pipes.push_back (pipe_);
      52       10002 :     pipes.swap (active, pipes.size () - 1);
      53        5001 :     active++;
      54        5001 : }
      55             : 
      56        5001 : void zmq::fq_t::pipe_terminated (pipe_t *pipe_)
      57             : {
      58       10002 :     const pipes_t::size_type index = pipes.index (pipe_);
      59             : 
      60             :     //  Remove the pipe from the list; adjust number of active pipes
      61             :     //  accordingly.
      62        5001 :     if (index < active) {
      63        4626 :         active--;
      64        4626 :         pipes.swap (index, active);
      65        4626 :         if (current == active)
      66        4208 :             current = 0;
      67             :     }
      68        5001 :     pipes.erase (pipe_);
      69             : 
      70        5001 :     if (last_in == pipe_) {
      71        1978 :         saved_credential = last_in->get_credential ();
      72         989 :         last_in = NULL;
      73             :     }
      74        5001 : }
      75             : 
      76        1129 : void zmq::fq_t::activated (pipe_t *pipe_)
      77             : {
      78             :     //  Move the pipe to the list of active pipes.
      79        2258 :     pipes.swap (pipes.index (pipe_), active);
      80        1129 :     active++;
      81        1129 : }
      82             : 
      83     3683632 : int zmq::fq_t::recv (msg_t *msg_)
      84             : {
      85     3683632 :     return recvpipe (msg_, NULL);
      86             : }
      87             : 
      88     7000717 : int zmq::fq_t::recvpipe (msg_t *msg_, pipe_t **pipe_)
      89             : {
      90             :     //  Deallocate old content of the message.
      91     7000717 :     int rc = msg_->close ();
      92     6999129 :     errno_assert (rc == 0);
      93             : 
      94             :     //  Round-robin over the pipes to get the next message.
      95     7024328 :     while (active > 0) {
      96             : 
      97             :         //  Try to fetch new message. If we've already read part of the message
      98             :         //  subsequent part should be immediately available.
      99     1706814 :         bool fetched = pipes [current]->read (msg_);
     100             : 
     101             :         //  Note that when message is not fetched, current pipe is deactivated
     102             :         //  and replaced by another active pipe. Thus we don't have to increase
     103             :         //  the 'current' pointer.
     104      853407 :         if (fetched) {
     105      852000 :             if (pipe_)
     106     1202274 :                 *pipe_ = pipes [current];
     107      852000 :             more = msg_->flags () & msg_t::more? true: false;
     108      852000 :             if (!more) {
     109     1702248 :                 last_in = pipes [current];
     110      851124 :                 current = (current + 1) % active;
     111             :             }
     112             :             return 0;
     113             :         }
     114             : 
     115             :         //  Check the atomicity of the message.
     116             :         //  If we've already received the first part of the message
     117             :         //  we should get the remaining parts without blocking.
     118        1407 :         zmq_assert (!more);
     119             : 
     120        1407 :         active--;
     121        1407 :         pipes.swap (current, active);
     122        6362 :         if (current == active)
     123        1226 :             current = 0;
     124             :     }
     125             : 
     126             :     //  No message is available. Initialise the output parameter
     127             :     //  to be a 0-byte message.
     128     6170921 :     rc = msg_->init ();
     129     6169440 :     errno_assert (rc == 0);
     130     6169440 :     errno = EAGAIN;
     131     6169319 :     return -1;
     132             : }
     133             : 
     134        1124 : bool zmq::fq_t::has_in ()
     135             : {
     136             :     //  There are subsequent parts of the partly-read message available.
     137        1124 :     if (more)
     138             :         return true;
     139             : 
     140             :     //  Note that messing with current doesn't break the fairness of fair
     141             :     //  queueing algorithm. If there are no messages available current will
     142             :     //  get back to its original value. Otherwise it'll point to the first
     143             :     //  pipe holding messages, skipping only pipes with no messages available.
     144        1221 :     while (active > 0) {
     145         386 :         if (pipes [current]->check_read ())
     146             :             return true;
     147             : 
     148             :         //  Deactivate the pipe.
     149          97 :         active--;
     150          97 :         pipes.swap (current, active);
     151          97 :         if (current == active)
     152          95 :             current = 0;
     153             :     }
     154             : 
     155             :     return false;
     156             : }
     157             : 
     158           0 : zmq::blob_t zmq::fq_t::get_credential () const
     159             : {
     160             :     return last_in?
     161           0 :         last_in->get_credential (): saved_credential;
     162             : }
     163             : 

Generated by: LCOV version 1.10