LCOV - code coverage report
Current view: top level - home/h/core/forks/m4-libzmq/src - pipe.cpp (source / functions) Hit Total Coverage
Test: zeromq-4.2.0 Code Coverage Lines: 212 216 98.1 %
Date: 2016-05-09 Functions: 30 31 96.8 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*
       2             :     Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
       3             : 
       4             :     This file is part of libzmq, the ZeroMQ core engine in C++.
       5             : 
       6             :     libzmq is free software; you can redistribute it and/or modify it under
       7             :     the terms of the GNU Lesser General Public License (LGPL) as published
       8             :     by the Free Software Foundation; either version 3 of the License, or
       9             :     (at your option) any later version.
      10             : 
      11             :     As a special exception, the Contributors give you permission to link
      12             :     this library with independent modules to produce an executable,
      13             :     regardless of the license terms of these independent modules, and to
      14             :     copy and distribute the resulting executable under terms of your choice,
      15             :     provided that you also meet, for each linked independent module, the
      16             :     terms and conditions of the license of that module. An independent
      17             :     module is a module which is not derived from or based on this library.
      18             :     If you modify this library, you must extend this exception to your
      19             :     version of the library.
      20             : 
      21             :     libzmq is distributed in the hope that it will be useful, but WITHOUT
      22             :     ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
      23             :     FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
      24             :     License for more details.
      25             : 
      26             :     You should have received a copy of the GNU Lesser General Public License
      27             :     along with this program.  If not, see <http://www.gnu.org/licenses/>.
      28             : */
      29             : 
      30             : #include "precompiled.hpp"
      31             : #include <new>
      32             : #include <stddef.h>
      33             : 
      34             : #include "macros.hpp"
      35             : #include "pipe.hpp"
      36             : #include "err.hpp"
      37             : 
      38             : #include "ypipe.hpp"
      39             : #include "ypipe_conflate.hpp"
      40             : 
      41        7846 : int zmq::pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2],
      42             :     int hwms_ [2], bool conflate_ [2])
      43             : {
      44             :     //   Creates two pipe objects. These objects are connected by two ypipes,
      45             :     //   each to pass messages in one direction.
      46             : 
      47             :     typedef ypipe_t <msg_t, message_pipe_granularity> upipe_normal_t;
      48             :     typedef ypipe_conflate_t <msg_t> upipe_conflate_t;
      49             : 
      50             :     pipe_t::upipe_t *upipe1;
      51        7846 :     if(conflate_ [0])
      52           6 :         upipe1 = new (std::nothrow) upipe_conflate_t ();
      53             :     else
      54        7843 :         upipe1 = new (std::nothrow) upipe_normal_t ();
      55        7849 :     alloc_assert (upipe1);
      56             : 
      57             :     pipe_t::upipe_t *upipe2;
      58        7849 :     if(conflate_ [1])
      59           6 :         upipe2 = new (std::nothrow) upipe_conflate_t ();
      60             :     else
      61        7846 :         upipe2 = new (std::nothrow) upipe_normal_t ();
      62        7850 :     alloc_assert (upipe2);
      63             : 
      64             :     pipes_ [0] = new (std::nothrow) pipe_t (parents_ [0], upipe1, upipe2,
      65        7850 :         hwms_ [1], hwms_ [0], conflate_ [0]);
      66        7850 :     alloc_assert (pipes_ [0]);
      67             :     pipes_ [1] = new (std::nothrow) pipe_t (parents_ [1], upipe2, upipe1,
      68        7850 :         hwms_ [0], hwms_ [1], conflate_ [1]);
      69        7850 :     alloc_assert (pipes_ [1]);
      70             : 
      71        7850 :     pipes_ [0]->set_peer (pipes_ [1]);
      72        7850 :     pipes_ [1]->set_peer (pipes_ [0]);
      73             : 
      74        7850 :     return 0;
      75             : }
      76             : 
      77       15698 : zmq::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_,
      78             :       int inhwm_, int outhwm_, bool conflate_) :
      79             :     object_t (parent_),
      80             :     inpipe (inpipe_),
      81             :     outpipe (outpipe_),
      82             :     in_active (true),
      83             :     out_active (true),
      84             :     hwm (outhwm_),
      85             :     lwm (compute_lwm (inhwm_)),
      86             :     inhwmboost(0),
      87             :     outhwmboost(0),
      88             :     msgs_read (0),
      89             :     msgs_written (0),
      90             :     peers_msgs_read (0),
      91             :     peer (NULL),
      92             :     sink (NULL),
      93             :     state (active),
      94             :     delay (true),
      95             :     routing_id(0),
      96      109892 :     conflate (conflate_)
      97             : {
      98       15699 : }
      99             : 
     100      109849 : zmq::pipe_t::~pipe_t ()
     101             : {
     102       31386 : }
     103             : 
     104       15700 : void zmq::pipe_t::set_peer (pipe_t *peer_)
     105             : {
     106             :     //  Peer can be set once only.
     107       15700 :     zmq_assert (!peer);
     108       15700 :     peer = peer_;
     109       15700 : }
     110             : 
     111       15697 : void zmq::pipe_t::set_event_sink (i_pipe_events *sink_)
     112             : {
     113             :     // Sink can be set once only.
     114       15697 :     zmq_assert (!sink);
     115       15697 :     sink = sink_;
     116       15697 : }
     117             : 
     118          15 : void zmq::pipe_t::set_routing_id (uint32_t routing_id_)
     119             : {
     120          15 :     routing_id = routing_id_;
     121          15 : }
     122             : 
     123      600033 : uint32_t zmq::pipe_t::get_routing_id ()
     124             : {
     125      600033 :     return routing_id;
     126             : }
     127             : 
     128         627 : void zmq::pipe_t::set_identity (const blob_t &identity_)
     129             : {
     130         627 :     identity = identity_;
     131         627 : }
     132             : 
     133         996 : zmq::blob_t zmq::pipe_t::get_identity ()
     134             : {
     135        1992 :     return identity;
     136             : }
     137             : 
     138        1064 : zmq::blob_t zmq::pipe_t::get_credential () const
     139             : {
     140        2128 :     return credential;
     141             : }
     142             : 
     143        5915 : bool zmq::pipe_t::check_read ()
     144             : {
     145        5915 :     if (unlikely (!in_active))
     146             :         return false;
     147        3487 :     if (unlikely (state != active && state != waiting_for_delimiter))
     148             :         return false;
     149             : 
     150             :     //  Check if there's an item in the pipe.
     151        1783 :     if (!inpipe->check_read ()) {
     152        1406 :         in_active = false;
     153        1406 :         return false;
     154             :     }
     155             : 
     156             :     //  If the next item in the pipe is message delimiter,
     157             :     //  initiate termination process.
     158         377 :     if (inpipe->probe (is_delimiter)) {
     159             :         msg_t msg;
     160         212 :         bool ok = inpipe->read (&msg);
     161         212 :         zmq_assert (ok);
     162         212 :         process_delimiter ();
     163             :         return false;
     164             :     }
     165             : 
     166             :     return true;
     167             : }
     168             : 
     169     1495454 : bool zmq::pipe_t::read (msg_t *msg_)
     170             : {
     171     1495454 :     if (unlikely (!in_active))
     172             :         return false;
     173     1493100 :     if (unlikely (state != active && state != waiting_for_delimiter))
     174             :         return false;
     175             : 
     176             : read_message:
     177     1490877 :     if (!inpipe->read (msg_)) {
     178       11591 :         in_active = false;
     179       11591 :         return false;
     180             :     }
     181             : 
     182             :     //  If this is a credential, save a copy and receive next message.
     183     1480851 :     if (unlikely (msg_->is_credential ())) {
     184          12 :         const unsigned char *data = static_cast <const unsigned char *> (msg_->data ());
     185          36 :         credential = blob_t (data, msg_->size ());
     186          12 :         const int rc = msg_->close ();
     187          12 :         zmq_assert (rc == 0);
     188             :         goto read_message;
     189             :     }
     190             : 
     191             :     //  If delimiter was read, start termination process of the pipe.
     192     1480819 :     if (msg_->is_delimiter ()) {
     193        3171 :         process_delimiter ();
     194        3171 :         return false;
     195             :     }
     196             : 
     197     1477739 :     if (!(msg_->flags () & msg_t::more) && !msg_->is_identity ())
     198     1475213 :         msgs_read++;
     199             : 
     200     1477725 :     if (lwm > 0 && msgs_read % lwm == 0)
     201        4452 :         send_activate_write (peer, msgs_read);
     202             : 
     203             :     return true;
     204             : }
     205             : 
     206     1487622 : bool zmq::pipe_t::check_write ()
     207             : {
     208     1487622 :     if (unlikely (!out_active || state != active))
     209             :         return false;
     210             : 
     211     1487605 :     bool full = !check_hwm();
     212             : 
     213     1487605 :     if (unlikely (full)) {
     214        2089 :         out_active = false;
     215        2089 :         return false;
     216             :     }
     217             : 
     218             :     return true;
     219             : }
     220             : 
     221     1485909 : bool zmq::pipe_t::write (msg_t *msg_)
     222             : {
     223     1485909 :     if (unlikely (!check_write ()))
     224             :         return false;
     225             : 
     226     1483821 :     bool more = msg_->flags () & msg_t::more ? true : false;
     227     1479839 :     const bool is_identity = msg_->is_identity ();
     228     1480369 :     outpipe->write (*msg_, more);
     229     1480146 :     if (!more && !is_identity)
     230     1477565 :         msgs_written++;
     231             : 
     232             :     return true;
     233             : }
     234             : 
     235       12136 : void zmq::pipe_t::rollback ()
     236             : {
     237             :     //  Remove incomplete message from the outbound pipe.
     238             :     msg_t msg;
     239       12136 :     if (outpipe) {
     240       11881 :         while (outpipe->unwrite (&msg)) {
     241          39 :             zmq_assert (msg.flags () & msg_t::more);
     242          39 :             int rc = msg.close ();
     243          39 :             errno_assert (rc == 0);
     244             :         }
     245             :     }
     246       12136 : }
     247             : 
     248      876740 : void zmq::pipe_t::flush ()
     249             : {
     250             :     //  The peer does not exist anymore at this point.
     251      876740 :     if (state == term_ack_sent)
     252      876773 :         return;
     253             : 
     254      874498 :     if (outpipe && !outpipe->flush ())
     255       11130 :         send_activate_read (peer);
     256             : }
     257             : 
     258       11130 : void zmq::pipe_t::process_activate_read ()
     259             : {
     260       11130 :     if (!in_active && (state == active || state == waiting_for_delimiter)) {
     261        9418 :         in_active = true;
     262        9418 :         sink->read_activated (this);
     263             :     }
     264       11130 : }
     265             : 
     266        4452 : void zmq::pipe_t::process_activate_write (uint64_t msgs_read_)
     267             : {
     268             :     //  Remember the peer's message sequence number.
     269        4452 :     peers_msgs_read = msgs_read_;
     270             : 
     271        4452 :     if (!out_active && state == active) {
     272        2071 :         out_active = true;
     273        2071 :         sink->write_activated (this);
     274             :     }
     275        4452 : }
     276             : 
     277           4 : void zmq::pipe_t::process_hiccup (void *pipe_)
     278             : {
     279             :     //  Destroy old outpipe. Note that the read end of the pipe was already
     280             :     //  migrated to this thread.
     281           4 :     zmq_assert (outpipe);
     282           4 :     outpipe->flush ();
     283             :     msg_t msg;
     284           5 :     while (outpipe->read (&msg)) {
     285           1 :        if (!(msg.flags () & msg_t::more))
     286           1 :             msgs_written--;
     287           1 :        int rc = msg.close ();
     288           1 :        errno_assert (rc == 0);
     289             :     }
     290           4 :     LIBZMQ_DELETE(outpipe);
     291             : 
     292             :     //  Plug in the new outpipe.
     293           4 :     zmq_assert (pipe_);
     294           4 :     outpipe = (upipe_t*) pipe_;
     295           4 :     out_active = true;
     296             : 
     297             :     //  If appropriate, notify the user about the hiccup.
     298           4 :     if (state == active)
     299           3 :         sink->hiccuped (this);
     300           4 : }
     301             : 
     302        9737 : void zmq::pipe_t::process_pipe_term ()
     303             : {
     304        9737 :     zmq_assert (state == active
     305             :             ||  state == delimiter_received
     306             :             ||  state == term_req_sent1);
     307             : 
     308             :     //  This is the simple case of peer-induced termination. If there are no
     309             :     //  more pending messages to read, or if the pipe was configured to drop
     310             :     //  pending messages, we can move directly to the term_ack_sent state.
     311             :     //  Otherwise we'll hang up in waiting_for_delimiter state till all
     312             :     //  pending messages are read.
     313        9738 :     if (state == active) {
     314        5812 :         if (delay)
     315        5806 :             state = waiting_for_delimiter;
     316             :         else {
     317           6 :             state = term_ack_sent;
     318           6 :             outpipe = NULL;
     319           6 :             send_pipe_term_ack (peer);
     320             :         }
     321             :     }
     322             : 
     323             :     //  Delimiter happened to arrive before the term command. Now we have the
     324             :     //  term command as well, so we can move straight to term_ack_sent state.
     325             :     else
     326        3926 :     if (state == delimiter_received) {
     327         139 :         state = term_ack_sent;
     328         139 :         outpipe = NULL;
     329         139 :         send_pipe_term_ack (peer);
     330             :     }
     331             : 
     332             :     //  This is the case where both ends of the pipe are closed in parallel.
     333             :     //  We simply reply to the request by ack and continue waiting for our
     334             :     //  own ack.
     335             :     else
     336        3787 :     if (state == term_req_sent1) {
     337        3787 :         state = term_req_sent2;
     338        3787 :         outpipe = NULL;
     339        3787 :         send_pipe_term_ack (peer);
     340             :     }
     341        9747 : }
     342             : 
     343       15683 : void zmq::pipe_t::process_pipe_term_ack ()
     344             : {
     345             :     //  Notify the user that all the references to the pipe should be dropped.
     346       15683 :     zmq_assert (sink);
     347       15683 :     sink->pipe_terminated (this);
     348             : 
     349             :     //  In term_ack_sent and term_req_sent2 states there's nothing to do.
     350             :     //  Simply deallocate the pipe. In term_req_sent1 state we have to ack
     351             :     //  the peer before deallocating this side of the pipe.
     352             :     //  All the other states are invalid.
     353       15692 :     if (state == term_req_sent1) {
     354        5951 :         outpipe = NULL;
     355        5951 :         send_pipe_term_ack (peer);
     356             :     }
     357             :     else
     358        9741 :         zmq_assert (state == term_ack_sent || state == term_req_sent2);
     359             : 
     360             :     //  We'll deallocate the inbound pipe, the peer will deallocate the outbound
     361             :     //  pipe (which is an inbound pipe from its point of view).
     362             :     //  First, delete all the unread messages in the pipe. We have to do it by
     363             :     //  hand because msg_t doesn't have automatic destructor. Then deallocate
     364             :     //  the ypipe itself.
     365             : 
     366       15675 :     if (!conflate) {
     367             :         msg_t msg;
     368       22303 :         while (inpipe->read (&msg)) {
     369        6634 :             int rc = msg.close ();
     370        6634 :             errno_assert (rc == 0);
     371             :         }
     372             :     }
     373             : 
     374       15689 :     LIBZMQ_DELETE(inpipe);
     375             : 
     376             :     //  Deallocate the pipe object
     377       15695 :     delete this;
     378       15694 : }
     379             : 
     380        3184 : void zmq::pipe_t::set_nodelay ()
     381             : {
     382        3184 :     this->delay = false;
     383        3184 : }
     384             : 
     385       15177 : void zmq::pipe_t::terminate (bool delay_)
     386             : {
     387             :     //  Overload the value specified at pipe creation.
     388       15177 :     delay = delay_;
     389             : 
     390             :     //  If terminate was already called, we can ignore the duplicate invocation.
     391       15177 :     if (state == term_req_sent1 || state == term_req_sent2) {
     392             :         return;
     393             :         }
     394             :     //  If the pipe is in the final phase of async termination, it's going to
     395             :     //  closed anyway. No need to do anything special here.
     396       15149 :     else if (state == term_ack_sent) {
     397             :         return;
     398             :         }
     399             :     //  The simple sync termination case. Ask the peer to terminate and wait
     400             :     //  for the ack.
     401       13759 :     else if (state == active) {
     402        8312 :         send_pipe_term (peer);
     403        8312 :         state = term_req_sent1;
     404             :     }
     405             :     //  There are still pending messages available, but the user calls
     406             :     //  'terminate'. We can act as if all the pending messages were read.
     407        5447 :     else if (state == waiting_for_delimiter && !delay) {
     408        3996 :         outpipe = NULL;
     409        3996 :         send_pipe_term_ack (peer);
     410        3998 :         state = term_ack_sent;
     411             :     }
     412             :     //  If there are pending messages still available, do nothing.
     413        1451 :     else if (state == waiting_for_delimiter) {
     414             :     }
     415             :     //  We've already got delimiter, but not term command yet. We can ignore
     416             :     //  the delimiter and ack synchronously terminate as if we were in
     417             :     //  active state.
     418        1436 :     else if (state == delimiter_received) {
     419        1436 :         send_pipe_term (peer);
     420        1436 :         state = term_req_sent1;
     421             :     }
     422             :     //  There are no other states.
     423             :     else {
     424           0 :         zmq_assert (false);
     425             :         }
     426             : 
     427             :     //  Stop outbound flow of messages.
     428       13761 :     out_active = false;
     429             : 
     430       13761 :     if (outpipe) {
     431             : 
     432             :         //  Drop any unfinished outbound messages.
     433        9763 :         rollback ();
     434             : 
     435             :         //  Write the delimiter into the pipe. Note that watermarks are not
     436             :         //  checked; thus the delimiter can be written even when the pipe is full.
     437             :         msg_t msg;
     438        9763 :         msg.init_delimiter ();
     439        9763 :         outpipe->write (msg, false);
     440        9762 :         flush ();
     441             :     }
     442             : }
     443             : 
     444         377 : bool zmq::pipe_t::is_delimiter (const msg_t &msg_)
     445             : {
     446         377 :     return msg_.is_delimiter ();
     447             : }
     448             : 
     449           0 : int zmq::pipe_t::compute_lwm (int hwm_)
     450             : {
     451             :     //  Compute the low water mark. Following point should be taken
     452             :     //  into consideration:
     453             :     //
     454             :     //  1. LWM has to be less than HWM.
     455             :     //  2. LWM cannot be set to very low value (such as zero) as after filling
     456             :     //     the queue it would start to refill only after all the messages are
     457             :     //     read from it and thus unnecessarily hold the progress back.
     458             :     //  3. LWM cannot be set to very high value (such as HWM-1) as it would
     459             :     //     result in lock-step filling of the queue - if a single message is
     460             :     //     read from a full queue, writer thread is resumed to write exactly one
     461             :     //     message to the queue and go back to sleep immediately. This would
     462             :     //     result in low performance.
     463             :     //
     464             :     //  Given the 3. it would be good to keep HWM and LWM as far apart as
     465             :     //  possible to reduce the thread switching overhead to almost zero.
     466             :     //  Let's make LWM 1/2 of HWM.
     467       16257 :     int result = (hwm_ + 1) / 2;
     468             : 
     469           0 :     return result;
     470             : }
     471             : 
     472        3383 : void zmq::pipe_t::process_delimiter ()
     473             : {
     474        3383 :     zmq_assert (state == active
     475             :             ||  state == waiting_for_delimiter);
     476             : 
     477        3383 :     if (state == active)
     478        1575 :         state = delimiter_received;
     479             :     else {
     480        1808 :         outpipe = NULL;
     481        1808 :         send_pipe_term_ack (peer);
     482        1808 :         state = term_ack_sent;
     483             :     }
     484        3383 : }
     485             : 
     486         115 : void zmq::pipe_t::hiccup ()
     487             : {
     488             :     //  If termination is already under way do nothing.
     489         115 :     if (state != active)
     490         115 :         return;
     491             : 
     492             :     //  We'll drop the pointer to the inpipe. From now on, the peer is
     493             :     //  responsible for deallocating it.
     494           4 :     inpipe = NULL;
     495             : 
     496             :     //  Create new inpipe.
     497           4 :     if (conflate)
     498           0 :         inpipe = new (std::nothrow)ypipe_conflate_t <msg_t>();
     499             :     else
     500           4 :         inpipe = new (std::nothrow)ypipe_t <msg_t, message_pipe_granularity>();
     501             : 
     502           4 :     alloc_assert (inpipe);
     503           4 :     in_active = true;
     504             : 
     505             :     //  Notify the peer about the hiccup.
     506           4 :     send_hiccup (peer, (void*) inpipe);
     507             : }
     508             : 
     509         558 : void zmq::pipe_t::set_hwms (int inhwm_, int outhwm_)
     510             : {
     511         558 :     int in = inhwm_ + inhwmboost;
     512         558 :     int out = outhwm_ + outhwmboost;
     513             : 
     514             :     // if either send or recv side has hwm <= 0 it means infinite so we should set hwms infinite
     515         558 :     if (inhwm_ <= 0 || inhwmboost <= 0)
     516          12 :         in = 0;
     517             : 
     518         558 :     if (outhwm_ <= 0 || outhwmboost <= 0)
     519          12 :         out = 0;
     520             : 
     521         558 :     lwm = compute_lwm(in);
     522         558 :     hwm = out;
     523         558 : }
     524             : 
     525        1686 : void zmq::pipe_t::set_hwms_boost(int inhwmboost_, int outhwmboost_)
     526             : {
     527        1686 :     inhwmboost = inhwmboost_;
     528        1686 :     outhwmboost = outhwmboost_;
     529        1686 : }
     530             : 
     531       38801 : bool zmq::pipe_t::check_hwm () const
     532             : {
     533     1526406 :     bool full = hwm > 0 && msgs_written - peers_msgs_read >= uint64_t (hwm);
     534     1526406 :     return( !full );
     535             : }

Generated by: LCOV version 1.10