LCOV - code coverage report
Current view: top level - home/h/core/forks/m4-libzmq/src - dist.cpp (source / functions) Hit Total Coverage
Test: zeromq-4.2.0 Code Coverage Lines: 77 99 77.8 %
Date: 2016-05-09 Functions: 13 14 92.9 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*
       2             :     Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
       3             : 
       4             :     This file is part of libzmq, the ZeroMQ core engine in C++.
       5             : 
       6             :     libzmq is free software; you can redistribute it and/or modify it under
       7             :     the terms of the GNU Lesser General Public License (LGPL) as published
       8             :     by the Free Software Foundation; either version 3 of the License, or
       9             :     (at your option) any later version.
      10             : 
      11             :     As a special exception, the Contributors give you permission to link
      12             :     this library with independent modules to produce an executable,
      13             :     regardless of the license terms of these independent modules, and to
      14             :     copy and distribute the resulting executable under terms of your choice,
      15             :     provided that you also meet, for each linked independent module, the
      16             :     terms and conditions of the license of that module. An independent
      17             :     module is a module which is not derived from or based on this library.
      18             :     If you modify this library, you must extend this exception to your
      19             :     version of the library.
      20             : 
      21             :     libzmq is distributed in the hope that it will be useful, but WITHOUT
      22             :     ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
      23             :     FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
      24             :     License for more details.
      25             : 
      26             :     You should have received a copy of the GNU Lesser General Public License
      27             :     along with this program.  If not, see <http://www.gnu.org/licenses/>.
      28             : */
      29             : 
      30             : #include "precompiled.hpp"
      31             : #include "dist.hpp"
      32             : #include "pipe.hpp"
      33             : #include "err.hpp"
      34             : #include "msg.hpp"
      35             : #include "likely.hpp"
      36             : 
      37        3531 : zmq::dist_t::dist_t () :
      38             :     matching (0),
      39             :     active (0),
      40             :     eligible (0),
      41        7062 :     more (false)
      42             : {
      43        3531 : }
      44             : 
      45        7062 : zmq::dist_t::~dist_t ()
      46             : {
      47        7062 :     zmq_assert (pipes.empty ());
      48        3531 : }
      49             : 
      50        6301 : void zmq::dist_t::attach (pipe_t *pipe_)
      51             : {
      52             :     //  If we are in the middle of sending a message, we'll add new pipe
      53             :     //  into the list of eligible pipes. Otherwise we add it to the list
      54             :     //  of active pipes.
      55        6301 :     if (more) {
      56           0 :         pipes.push_back (pipe_);
      57           0 :         pipes.swap (eligible, pipes.size () - 1);
      58           0 :         eligible++;
      59             :     }
      60             :     else {
      61        6301 :         pipes.push_back (pipe_);
      62       12602 :         pipes.swap (active, pipes.size () - 1);
      63        6301 :         active++;
      64        6301 :         eligible++;
      65             :     }
      66        6301 : }
      67             : 
      68       75230 : void zmq::dist_t::match (pipe_t *pipe_)
      69             : {
      70             :     //  If pipe is already matching do nothing.
      71      150460 :     if (pipes.index (pipe_) < matching)
      72             :         return;
      73             : 
      74             :     //  If the pipe isn't eligible, ignore it.
      75      126864 :     if (pipes.index (pipe_) >= eligible)
      76             :         return;
      77             : 
      78             :     //  Mark the pipe as matching.
      79      126864 :     pipes.swap (pipes.index (pipe_), matching);
      80       63432 :     matching++;
      81             : }
      82             : 
      83           6 : void zmq::dist_t::reverse_match ()
      84             : {
      85           6 :     pipes_t::size_type prev_matching = matching;
      86             : 
      87             :     // Reset matching to 0
      88             :     unmatch();
      89             : 
      90             :     // Mark all matching pipes as not matching and vice-versa.
      91             :     // To do this, push all pipes that are eligible but not
      92             :     // matched - i.e. between "matching" and "eligible" -
      93             :     // to the beginning of the queue.
      94          12 :     for (pipes_t::size_type i = prev_matching; i < eligible; ++i) {
      95           6 :         pipes.swap(i, matching++);
      96             :     }
      97           6 : }
      98             : 
      99       63423 : void zmq::dist_t::unmatch ()
     100             : {
     101       63429 :     matching = 0;
     102       63423 : }
     103             : 
     104        6301 : void zmq::dist_t::pipe_terminated (pipe_t *pipe_)
     105             : {
     106             :     //  Remove the pipe from the list; adjust number of matching, active and/or
     107             :     //  eligible pipes accordingly.
     108       12602 :     if (pipes.index (pipe_) < matching) {
     109         156 :         pipes.swap (pipes.index (pipe_), matching - 1);
     110          78 :         matching--;
     111             :     }
     112       12602 :     if (pipes.index (pipe_) < active) {
     113       12602 :         pipes.swap (pipes.index (pipe_), active - 1);
     114        6301 :         active--;
     115             :     }
     116       12602 :     if (pipes.index (pipe_) < eligible) {
     117       12602 :         pipes.swap (pipes.index (pipe_), eligible - 1);
     118        6301 :         eligible--;
     119             :     }
     120             : 
     121        6301 :     pipes.erase (pipe_);
     122        6301 : }
     123             : 
     124           0 : void zmq::dist_t::activated (pipe_t *pipe_)
     125             : {
     126             :     //  Move the pipe from passive to eligible state.
     127           0 :     if (eligible < pipes.size ()) {
     128           0 :         pipes.swap (pipes.index (pipe_), eligible);
     129           0 :         eligible++;
     130             :     }
     131             : 
     132             :     //  If there's no message being sent at the moment, move it to
     133             :     //  the active state.
     134           0 :     if (!more && active < pipes.size ()) {
     135           0 :         pipes.swap (eligible - 1, active);
     136           0 :         active++;
     137             :     }
     138           0 : }
     139             : 
     140         138 : int zmq::dist_t::send_to_all (msg_t *msg_)
     141             : {
     142         138 :     matching = active;
     143         138 :     return send_to_matching (msg_);
     144             : }
     145             : 
     146       63621 : int zmq::dist_t::send_to_matching (msg_t *msg_)
     147             : {
     148             :     //  Is this end of a multipart message?
     149       63621 :     bool msg_more = msg_->flags () & msg_t::more ? true : false;
     150             : 
     151             :     //  Push the message to matching pipes.
     152       63621 :     distribute (msg_);
     153             : 
     154             :     //  If multipart message is fully sent, activate all the eligible pipes.
     155       63621 :     if (!msg_more)
     156       63561 :         active = eligible;
     157             : 
     158       63621 :     more = msg_more;
     159             : 
     160       63621 :     return 0;
     161             : }
     162             : 
     163       63621 : void zmq::dist_t::distribute (msg_t *msg_)
     164             : {
     165             :     //  If there are no matching pipes available, simply drop the message.
     166       63621 :     if (matching == 0) {
     167          84 :         int rc = msg_->close ();
     168          84 :         errno_assert (rc == 0);
     169          84 :         rc = msg_->init ();
     170          84 :         errno_assert (rc == 0);
     171             :         return;
     172             :     }
     173             : 
     174       63537 :     if (msg_->is_vsm ()) {
     175       63543 :         for (pipes_t::size_type i = 0; i < matching; ++i)
     176      127086 :             if(!write (pipes [i], msg_))
     177           0 :                 --i; //  Retry last write because index will have been swapped
     178       63507 :         int rc = msg_->close();
     179       63507 :         errno_assert (rc == 0);
     180       63507 :         rc = msg_->init ();
     181       63507 :         errno_assert (rc == 0);
     182             :         return;
     183             :     }
     184             : 
     185             :     //  Add matching-1 references to the message. We already hold one reference,
     186             :     //  that's why -1.
     187          30 :     msg_->add_refs ((int) matching - 1);
     188             : 
     189             :     //  Push copy of the message to each matching pipe.
     190          30 :     int failed = 0;
     191          60 :     for (pipes_t::size_type i = 0; i < matching; ++i)
     192          60 :         if (!write (pipes [i], msg_)) {
     193           0 :             ++failed;
     194           0 :             --i; //  Retry last write because index will have been swapped
     195             :         }
     196          30 :     if (unlikely (failed))
     197           0 :         msg_->rm_refs (failed);
     198             : 
     199             :     //  Detach the original message from the data buffer. Note that we don't
     200             :     //  close the message. That's because we've already used all the references.
     201          30 :     int rc = msg_->init ();
     202          30 :     errno_assert (rc == 0);
     203             : }
     204             : 
     205          33 : bool zmq::dist_t::has_out ()
     206             : {
     207          33 :     return true;
     208             : }
     209             : 
     210       63573 : bool zmq::dist_t::write (pipe_t *pipe_, msg_t *msg_)
     211             : {
     212       63573 :     if (!pipe_->write (msg_)) {
     213           0 :         pipes.swap (pipes.index (pipe_), matching - 1);
     214           0 :         matching--;
     215           0 :         pipes.swap (pipes.index (pipe_), active - 1);
     216           0 :         active--;
     217           0 :         pipes.swap (active, eligible - 1);
     218           0 :         eligible--;
     219           0 :         return false;
     220             :     }
     221       63573 :     if (!(msg_->flags () & msg_t::more))
     222       63519 :         pipe_->flush ();
     223             :     return true;
     224             : }
     225             : 
     226       38801 : bool zmq::dist_t::check_hwm ()
     227             : {
     228       65801 :     for (pipes_t::size_type i = 0; i < matching; ++i)
     229       77602 :         if (!pipes [i]->check_hwm ())
     230             :             return false;
     231             : 
     232             :     return true;
     233             : }
     234             : 
     235             : 

Generated by: LCOV version 1.10