LCOV - code coverage report
Current view: top level - home/h/core/forks/m4-libzmq/src - lb.cpp (source / functions) Hit Total Coverage
Test: zeromq-4.2.0 Code Coverage Lines: 52 65 80.0 %
Date: 2016-05-09 Functions: 8 8 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*
       2             :     Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
       3             : 
       4             :     This file is part of libzmq, the ZeroMQ core engine in C++.
       5             : 
       6             :     libzmq is free software; you can redistribute it and/or modify it under
       7             :     the terms of the GNU Lesser General Public License (LGPL) as published
       8             :     by the Free Software Foundation; either version 3 of the License, or
       9             :     (at your option) any later version.
      10             : 
      11             :     As a special exception, the Contributors give you permission to link
      12             :     this library with independent modules to produce an executable,
      13             :     regardless of the license terms of these independent modules, and to
      14             :     copy and distribute the resulting executable under terms of your choice,
      15             :     provided that you also meet, for each linked independent module, the
      16             :     terms and conditions of the license of that module. An independent
      17             :     module is a module which is not derived from or based on this library.
      18             :     If you modify this library, you must extend this exception to your
      19             :     version of the library.
      20             : 
      21             :     libzmq is distributed in the hope that it will be useful, but WITHOUT
      22             :     ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
      23             :     FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
      24             :     License for more details.
      25             : 
      26             :     You should have received a copy of the GNU Lesser General Public License
      27             :     along with this program.  If not, see <http://www.gnu.org/licenses/>.
      28             : */
      29             : 
      30             : #include "precompiled.hpp"
      31             : #include "lb.hpp"
      32             : #include "pipe.hpp"
      33             : #include "err.hpp"
      34             : #include "msg.hpp"
      35             : 
      36         882 : zmq::lb_t::lb_t () :
      37             :     active (0),
      38             :     current (0),
      39             :     more (false),
      40        1764 :     dropping (false)
      41             : {
      42         882 : }
      43             : 
      44        1764 : zmq::lb_t::~lb_t ()
      45             : {
      46        1764 :     zmq_assert (pipes.empty ());
      47         882 : }
      48             : 
      49        1011 : void zmq::lb_t::attach (pipe_t *pipe_)
      50             : {
      51        1011 :     pipes.push_back (pipe_);
      52        1011 :     activated (pipe_);
      53        1011 : }
      54             : 
      55        1011 : void zmq::lb_t::pipe_terminated (pipe_t *pipe_)
      56             : {
      57        2022 :     pipes_t::size_type index = pipes.index (pipe_);
      58             : 
      59             :     //  If we are in the middle of multipart message and current pipe
      60             :     //  have disconnected, we have to drop the remainder of the message.
      61        1011 :     if (index == current && more)
      62           0 :         dropping = true;
      63             : 
      64             :     //  Remove the pipe from the list; adjust number of active pipes
      65             :     //  accordingly.
      66        1011 :     if (index < active) {
      67         989 :         active--;
      68         989 :         pipes.swap (index, active);
      69         989 :         if (current == active)
      70         849 :             current = 0;
      71             :     }
      72        1011 :     pipes.erase (pipe_);
      73        1011 : }
      74             : 
      75        1888 : void zmq::lb_t::activated (pipe_t *pipe_)
      76             : {
      77             :     //  Move the pipe to the list of active pipes.
      78        3776 :     pipes.swap (pipes.index (pipe_), active);
      79        1888 :     active++;
      80        1888 : }
      81             : 
      82      186846 : int zmq::lb_t::send (msg_t *msg_)
      83             : {
      84      186846 :     return sendpipe (msg_, NULL);
      85             : }
      86             : 
      87      789284 : int zmq::lb_t::sendpipe (msg_t *msg_, pipe_t **pipe_)
      88             : {
      89             :     //  Drop the message if required. If we are at the end of the message
      90             :     //  switch back to non-dropping mode.
      91      789284 :     if (dropping) {
      92             : 
      93           0 :         more = msg_->flags () & msg_t::more ? true : false;
      94           0 :         dropping = more;
      95             : 
      96           0 :         int rc = msg_->close ();
      97           0 :         errno_assert (rc == 0);
      98           0 :         rc = msg_->init ();
      99           0 :         errno_assert (rc == 0);
     100             :         return 0;
     101             :     }
     102             : 
     103      790182 :     while (active > 0) {
     104     1577264 :         if (pipes [current]->write (msg_))
     105             :         {
     106      787734 :             if (pipe_)
     107         312 :                 *pipe_ = pipes [current];
     108             :             break;
     109             :         }
     110             : 
     111             :         // If send fails for multi-part msg rollback other
     112             :         // parts sent earlier and return EAGAIN.
     113             :         // Application should handle this as suitable
     114         898 :         if (more)
     115             :         {
     116           0 :             pipes [current]->rollback ();
     117           0 :             more = 0;
     118           0 :             errno = EAGAIN;
     119           0 :             return -1;
     120             :         }
     121             : 
     122         898 :         active--;
     123         898 :         if (current < active)
     124           0 :             pipes.swap (current, active);
     125             :         else
     126         898 :             current = 0;
     127             :     }
     128             : 
     129             :     //  If there are no pipes we cannot send the message.
     130      789284 :     if (active == 0) {
     131        1550 :         errno = EAGAIN;
     132        1550 :         return -1;
     133             :     }
     134             : 
     135             :     //  If it's final part of the message we can flush it downstream and
     136             :     //  continue round-robining (load balance).
     137      787734 :     more = msg_->flags () & msg_t::more? true: false;
     138      787734 :     if (!more) {
     139     1574478 :         pipes [current]->flush ();
     140      787239 :         current = (current + 1) % active;
     141             :     }
     142             : 
     143             :     //  Detach the message from the data buffer.
     144      787734 :     int rc = msg_->init ();
     145      787734 :     errno_assert (rc == 0);
     146             : 
     147             :     return 0;
     148             : }
     149             : 
     150      530954 : bool zmq::lb_t::has_out ()
     151             : {
     152             :     //  If one part of the message was already written we can definitely
     153             :     //  write the rest of the message.
     154      530954 :     if (more)
     155             :         return true;
     156             : 
     157      530955 :     while (active > 0) {
     158             : 
     159             :         //  Check whether a pipe has room for another message.
     160        2206 :         if (pipes [current]->check_write ())
     161             :             return true;
     162             : 
     163             :         //  Deactivate the pipe.
     164           1 :         active--;
     165           1 :         pipes.swap (current, active);
     166           1 :         if (current == active)
     167           0 :             current = 0;
     168             :     }
     169             : 
     170             :     return false;
     171             : }

Generated by: LCOV version 1.10