LCOV - code coverage report
Current view: top level - home/h/core/forks/m4-libzmq/src - pipe.hpp (source / functions) Hit Total Coverage
Test: zeromq-4.2.0 Code Coverage Lines: 2 2 100.0 %
Date: 2016-05-09 Functions: 0 2 0.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*
       2             :     Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
       3             : 
       4             :     This file is part of libzmq, the ZeroMQ core engine in C++.
       5             : 
       6             :     libzmq is free software; you can redistribute it and/or modify it under
       7             :     the terms of the GNU Lesser General Public License (LGPL) as published
       8             :     by the Free Software Foundation; either version 3 of the License, or
       9             :     (at your option) any later version.
      10             : 
      11             :     As a special exception, the Contributors give you permission to link
      12             :     this library with independent modules to produce an executable,
      13             :     regardless of the license terms of these independent modules, and to
      14             :     copy and distribute the resulting executable under terms of your choice,
      15             :     provided that you also meet, for each linked independent module, the
      16             :     terms and conditions of the license of that module. An independent
      17             :     module is a module which is not derived from or based on this library.
      18             :     If you modify this library, you must extend this exception to your
      19             :     version of the library.
      20             : 
      21             :     libzmq is distributed in the hope that it will be useful, but WITHOUT
      22             :     ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
      23             :     FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
      24             :     License for more details.
      25             : 
      26             :     You should have received a copy of the GNU Lesser General Public License
      27             :     along with this program.  If not, see <http://www.gnu.org/licenses/>.
      28             : */
      29             : 
      30             : #ifndef __ZMQ_PIPE_HPP_INCLUDED__
      31             : #define __ZMQ_PIPE_HPP_INCLUDED__
      32             : 
      33             : #include "msg.hpp"
      34             : #include "ypipe_base.hpp"
      35             : #include "config.hpp"
      36             : #include "object.hpp"
      37             : #include "stdint.hpp"
      38             : #include "array.hpp"
      39             : #include "blob.hpp"
      40             : 
      41             : namespace zmq
      42             : {
      43             : 
      44             :     class object_t;
      45             :     class pipe_t;
      46             : 
      47             :     //  Create a pipepair for bi-directional transfer of messages.
      48             :     //  First HWM is for messages passed from first pipe to the second pipe.
      49             :     //  Second HWM is for messages passed from second pipe to the first pipe.
      50             :     //  Delay specifies how the pipe behaves when the peer terminates. If true
      51             :     //  pipe receives all the pending messages before terminating, otherwise it
      52             :     //  terminates straight away.
      53             :     //  If conflate is true, only the most recently arrived message could be
      54             :     //  read (older messages are discarded)
      55             :     int pipepair (zmq::object_t *parents_ [2], zmq::pipe_t* pipes_ [2],
      56             :         int hwms_ [2], bool conflate_ [2]);
      57             : 
      58       18455 :     struct i_pipe_events
      59             :     {
      60       18458 :         virtual ~i_pipe_events () {}
      61             : 
      62             :         virtual void read_activated (zmq::pipe_t *pipe_) = 0;
      63             :         virtual void write_activated (zmq::pipe_t *pipe_) = 0;
      64             :         virtual void hiccuped (zmq::pipe_t *pipe_) = 0;
      65             :         virtual void pipe_terminated (zmq::pipe_t *pipe_) = 0;
      66             :     };
      67             : 
      68             :     //  Note that pipe can be stored in three different arrays.
      69             :     //  The array of inbound pipes (1), the array of outbound pipes (2) and
      70             :     //  the generic array of pipes to be deallocated (3).
      71             : 
      72             :     class pipe_t :
      73             :         public object_t,
      74             :         public array_item_t <1>,
      75             :         public array_item_t <2>,
      76             :         public array_item_t <3>
      77             :     {
      78             :         //  This allows pipepair to create pipe objects.
      79             :         friend int pipepair (zmq::object_t *parents_ [2], zmq::pipe_t* pipes_ [2],
      80             :             int hwms_ [2], bool conflate_ [2]);
      81             : 
      82             :     public:
      83             : 
      84             :         //  Specifies the object to send events to.
      85             :         void set_event_sink (i_pipe_events *sink_);
      86             : 
      87             :         //  Pipe endpoint can store an routing ID to be used by its clients.
      88             :         void set_routing_id (uint32_t routing_id_);
      89             :         uint32_t get_routing_id ();
      90             : 
      91             :         //  Pipe endpoint can store an opaque ID to be used by its clients.
      92             :         void set_identity (const blob_t &identity_);
      93             :         blob_t get_identity ();
      94             : 
      95             :         blob_t get_credential () const;
      96             : 
      97             :         //  Returns true if there is at least one message to read in the pipe.
      98             :         bool check_read ();
      99             : 
     100             :         //  Reads a message to the underlying pipe.
     101             :         bool read (msg_t *msg_);
     102             : 
     103             :         //  Checks whether messages can be written to the pipe. If the pipe is
     104             :         //  closed or if writing the message would cause high watermark the
     105             :         //  function returns false.
     106             :         bool check_write ();
     107             : 
     108             :         //  Writes a message to the underlying pipe. Returns false if the
     109             :         //  message does not pass check_write. If false, the message object
     110             :         //  retains ownership of its message buffer.
     111             :         bool write (msg_t *msg_);
     112             : 
     113             :         //  Remove unfinished parts of the outbound message from the pipe.
     114             :         void rollback ();
     115             : 
     116             :         //  Flush the messages downstream.
     117             :         void flush ();
     118             : 
     119             :         //  Temporarily disconnects the inbound message stream and drops
     120             :         //  all the messages on the fly. Causes 'hiccuped' event to be generated
     121             :         //  in the peer.
     122             :         void hiccup ();
     123             : 
     124             :         //  Ensure the pipe won't block on receiving pipe_term.
     125             :         void set_nodelay ();
     126             : 
     127             :         //  Ask pipe to terminate. The termination will happen asynchronously
     128             :         //  and user will be notified about actual deallocation by 'terminated'
     129             :         //  event. If delay is true, the pending messages will be processed
     130             :         //  before actual shutdown.
     131             :         void terminate (bool delay_);
     132             : 
     133             :         //  Set the high water marks.
     134             :         void set_hwms (int inhwm_, int outhwm_);
     135             : 
     136             :         //  Set the boost to high water marks, used by inproc sockets so total hwm are sum of connect and bind sockets watermarks
     137             :         void set_hwms_boost(int inhwmboost_, int outhwmboost_);
     138             : 
     139             :         //  Returns true if HWM is not reached
     140             :         bool check_hwm () const;
     141             :     private:
     142             : 
     143             :         //  Type of the underlying lock-free pipe.
     144             :         typedef ypipe_base_t <msg_t> upipe_t;
     145             : 
     146             :         //  Command handlers.
     147             :         void process_activate_read ();
     148             :         void process_activate_write (uint64_t msgs_read_);
     149             :         void process_hiccup (void *pipe_);
     150             :         void process_pipe_term ();
     151             :         void process_pipe_term_ack ();
     152             : 
     153             :         //  Handler for delimiter read from the pipe.
     154             :         void process_delimiter ();
     155             : 
     156             :         //  Constructor is private. Pipe can only be created using
     157             :         //  pipepair function.
     158             :         pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_,
     159             :             int inhwm_, int outhwm_, bool conflate_);
     160             : 
     161             :         //  Pipepair uses this function to let us know about
     162             :         //  the peer pipe object.
     163             :         void set_peer (pipe_t *pipe_);
     164             : 
     165             :         //  Destructor is private. Pipe objects destroy themselves.
     166             :         ~pipe_t ();
     167             : 
     168             :         //  Underlying pipes for both directions.
     169             :         upipe_t *inpipe;
     170             :         upipe_t *outpipe;
     171             : 
     172             :         //  Can the pipe be read from / written to?
     173             :         bool in_active;
     174             :         bool out_active;
     175             : 
     176             :         //  High watermark for the outbound pipe.
     177             :         int hwm;
     178             : 
     179             :         //  Low watermark for the inbound pipe.
     180             :         int lwm;
     181             : 
     182             :         // boosts for high and low watermarks, used with inproc sockets so hwm are sum of send and recv hmws on each side of pipe
     183             :         int inhwmboost;
     184             :         int outhwmboost;
     185             : 
     186             :         //  Number of messages read and written so far.
     187             :         uint64_t msgs_read;
     188             :         uint64_t msgs_written;
     189             : 
     190             :         //  Last received peer's msgs_read. The actual number in the peer
     191             :         //  can be higher at the moment.
     192             :         uint64_t peers_msgs_read;
     193             : 
     194             :         //  The pipe object on the other side of the pipepair.
     195             :         pipe_t *peer;
     196             : 
     197             :         //  Sink to send events to.
     198             :         i_pipe_events *sink;
     199             : 
     200             :         //  States of the pipe endpoint:
     201             :         //  active: common state before any termination begins,
     202             :         //  delimiter_received: delimiter was read from pipe before
     203             :         //      term command was received,
     204             :         //  waiting_for_delimiter: term command was already received
     205             :         //      from the peer but there are still pending messages to read,
     206             :         //  term_ack_sent: all pending messages were already read and
     207             :         //      all we are waiting for is ack from the peer,
     208             :         //  term_req_sent1: 'terminate' was explicitly called by the user,
     209             :         //  term_req_sent2: user called 'terminate' and then we've got
     210             :         //      term command from the peer as well.
     211             :         enum {
     212             :             active,
     213             :             delimiter_received,
     214             :             waiting_for_delimiter,
     215             :             term_ack_sent,
     216             :             term_req_sent1,
     217             :             term_req_sent2
     218             :         } state;
     219             : 
     220             :         //  If true, we receive all the pending inbound messages before
     221             :         //  terminating. If false, we terminate immediately when the peer
     222             :         //  asks us to.
     223             :         bool delay;
     224             : 
     225             :         //  Identity of the writer. Used uniquely by the reader side.
     226             :         blob_t identity;
     227             : 
     228             :         //  Identity of the writer. Used uniquely by the reader side.
     229             :         int routing_id;
     230             : 
     231             :         //  Pipe's credential.
     232             :         blob_t credential;
     233             : 
     234             :         //  Returns true if the message is delimiter; false otherwise.
     235             :         static bool is_delimiter (const msg_t &msg_);
     236             : 
     237             :         //  Computes appropriate low watermark from the given high watermark.
     238             :         static int compute_lwm (int hwm_);
     239             : 
     240             :         const bool conflate;
     241             : 
     242             :         //  Disable copying.
     243             :         pipe_t (const pipe_t&);
     244             :         const pipe_t &operator = (const pipe_t&);
     245             :     };
     246             : 
     247             : }
     248             : 
     249             : #endif

Generated by: LCOV version 1.10