libzmq  master
ZeroMQ C++ Core Engine (LIBZMQ)
pipe.hpp
Go to the documentation of this file.
1 /*
2  Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 
4  This file is part of libzmq, the ZeroMQ core engine in C++.
5 
6  libzmq is free software; you can redistribute it and/or modify it under
7  the terms of the GNU Lesser General Public License (LGPL) as published
8  by the Free Software Foundation; either version 3 of the License, or
9  (at your option) any later version.
10 
11  As a special exception, the Contributors give you permission to link
12  this library with independent modules to produce an executable,
13  regardless of the license terms of these independent modules, and to
14  copy and distribute the resulting executable under terms of your choice,
15  provided that you also meet, for each linked independent module, the
16  terms and conditions of the license of that module. An independent
17  module is a module which is not derived from or based on this library.
18  If you modify this library, you must extend this exception to your
19  version of the library.
20 
21  libzmq is distributed in the hope that it will be useful, but WITHOUT
22  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23  FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24  License for more details.
25 
26  You should have received a copy of the GNU Lesser General Public License
27  along with this program. If not, see <http://www.gnu.org/licenses/>.
28 */
29 
30 #ifndef __ZMQ_PIPE_HPP_INCLUDED__
31 #define __ZMQ_PIPE_HPP_INCLUDED__
32 
33 #include "msg.hpp"
34 #include "ypipe_base.hpp"
35 #include "config.hpp"
36 #include "object.hpp"
37 #include "stdint.hpp"
38 #include "array.hpp"
39 #include "blob.hpp"
40 
41 namespace zmq
42 {
43 
44  class object_t;
45  class pipe_t;
46 
47  // Create a pipepair for bi-directional transfer of messages.
48  // First HWM is for messages passed from first pipe to the second pipe.
49  // Second HWM is for messages passed from second pipe to the first pipe.
50  // Delay specifies how the pipe behaves when the peer terminates. If true
51  // pipe receives all the pending messages before terminating, otherwise it
52  // terminates straight away.
53  // If conflate is true, only the most recently arrived message could be
54  // read (older messages are discarded)
55  int pipepair (zmq::object_t *parents_ [2], zmq::pipe_t* pipes_ [2],
56  int hwms_ [2], bool conflate_ [2]);
57 
59  {
60  virtual ~i_pipe_events () {}
61 
62  virtual void read_activated (zmq::pipe_t *pipe_) = 0;
63  virtual void write_activated (zmq::pipe_t *pipe_) = 0;
64  virtual void hiccuped (zmq::pipe_t *pipe_) = 0;
65  virtual void pipe_terminated (zmq::pipe_t *pipe_) = 0;
66  };
67 
68  // Note that pipe can be stored in three different arrays.
69  // The array of inbound pipes (1), the array of outbound pipes (2) and
70  // the generic array of pipes to be deallocated (3).
71 
72  class pipe_t :
73  public object_t,
74  public array_item_t <1>,
75  public array_item_t <2>,
76  public array_item_t <3>
77  {
78  // This allows pipepair to create pipe objects.
79  friend int pipepair (zmq::object_t *parents_ [2], zmq::pipe_t* pipes_ [2],
80  int hwms_ [2], bool conflate_ [2]);
81 
82  public:
83 
84  // Specifies the object to send events to.
85  void set_event_sink (i_pipe_events *sink_);
86 
87  // Pipe endpoint can store an routing ID to be used by its clients.
88  void set_routing_id (uint32_t routing_id_);
89  uint32_t get_routing_id ();
90 
91  // Pipe endpoint can store an opaque ID to be used by its clients.
92  void set_identity (const blob_t &identity_);
94 
95  blob_t get_credential () const;
96 
97  // Returns true if there is at least one message to read in the pipe.
98  bool check_read ();
99 
100  // Reads a message to the underlying pipe.
101  bool read (msg_t *msg_);
102 
103  // Checks whether messages can be written to the pipe. If the pipe is
104  // closed or if writing the message would cause high watermark the
105  // function returns false.
106  bool check_write ();
107 
108  // Writes a message to the underlying pipe. Returns false if the
109  // message does not pass check_write. If false, the message object
110  // retains ownership of its message buffer.
111  bool write (msg_t *msg_);
112 
113  // Remove unfinished parts of the outbound message from the pipe.
114  void rollback ();
115 
116  // Flush the messages downstream.
117  void flush ();
118 
119  // Temporarily disconnects the inbound message stream and drops
120  // all the messages on the fly. Causes 'hiccuped' event to be generated
121  // in the peer.
122  void hiccup ();
123 
124  // Ensure the pipe won't block on receiving pipe_term.
125  void set_nodelay ();
126 
127  // Ask pipe to terminate. The termination will happen asynchronously
128  // and user will be notified about actual deallocation by 'terminated'
129  // event. If delay is true, the pending messages will be processed
130  // before actual shutdown.
131  void terminate (bool delay_);
132 
133  // Set the high water marks.
134  void set_hwms (int inhwm_, int outhwm_);
135 
136  // Set the boost to high water marks, used by inproc sockets so total hwm are sum of connect and bind sockets watermarks
137  void set_hwms_boost(int inhwmboost_, int outhwmboost_);
138 
139  // Returns true if HWM is not reached
140  bool check_hwm () const;
141  private:
142 
143  // Type of the underlying lock-free pipe.
145 
146  // Command handlers.
147  void process_activate_read ();
148  void process_activate_write (uint64_t msgs_read_);
149  void process_hiccup (void *pipe_);
150  void process_pipe_term ();
151  void process_pipe_term_ack ();
152 
153  // Handler for delimiter read from the pipe.
154  void process_delimiter ();
155 
156  // Constructor is private. Pipe can only be created using
157  // pipepair function.
158  pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_,
159  int inhwm_, int outhwm_, bool conflate_);
160 
161  // Pipepair uses this function to let us know about
162  // the peer pipe object.
163  void set_peer (pipe_t *pipe_);
164 
165  // Destructor is private. Pipe objects destroy themselves.
166  ~pipe_t ();
167 
168  // Underlying pipes for both directions.
169  upipe_t *inpipe;
170  upipe_t *outpipe;
171 
172  // Can the pipe be read from / written to?
173  bool in_active;
175 
176  // High watermark for the outbound pipe.
177  int hwm;
178 
179  // Low watermark for the inbound pipe.
180  int lwm;
181 
182  // boosts for high and low watermarks, used with inproc sockets so hwm are sum of send and recv hmws on each side of pipe
185 
186  // Number of messages read and written so far.
187  uint64_t msgs_read;
188  uint64_t msgs_written;
189 
190  // Last received peer's msgs_read. The actual number in the peer
191  // can be higher at the moment.
192  uint64_t peers_msgs_read;
193 
194  // The pipe object on the other side of the pipepair.
196 
197  // Sink to send events to.
199 
200  // States of the pipe endpoint:
201  // active: common state before any termination begins,
202  // delimiter_received: delimiter was read from pipe before
203  // term command was received,
204  // waiting_for_delimiter: term command was already received
205  // from the peer but there are still pending messages to read,
206  // term_ack_sent: all pending messages were already read and
207  // all we are waiting for is ack from the peer,
208  // term_req_sent1: 'terminate' was explicitly called by the user,
209  // term_req_sent2: user called 'terminate' and then we've got
210  // term command from the peer as well.
211  enum {
217  term_req_sent2
218  } state;
219 
220  // If true, we receive all the pending inbound messages before
221  // terminating. If false, we terminate immediately when the peer
222  // asks us to.
223  bool delay;
224 
225  // Identity of the writer. Used uniquely by the reader side.
227 
228  // Identity of the writer. Used uniquely by the reader side.
230 
231  // Pipe's credential.
233 
234  // Returns true if the message is delimiter; false otherwise.
235  static bool is_delimiter (const msg_t &msg_);
236 
237  // Computes appropriate low watermark from the given high watermark.
238  static int compute_lwm (int hwm_);
239 
240  const bool conflate;
241 
242  // Disable copying.
243  pipe_t (const pipe_t&);
244  const pipe_t &operator = (const pipe_t&);
245  };
246 
247 }
248 
249 #endif
int lwm
Definition: pipe.hpp:180
upipe_t * inpipe
Definition: pipe.hpp:169
virtual void hiccuped(zmq::pipe_t *pipe_)=0
int pipepair(zmq::object_t *parents_[2], zmq::pipe_t *pipes_[2], int hwms_[2], bool conflate_[2])
Definition: pipe.cpp:41
virtual ~i_pipe_events()
Definition: pipe.hpp:60
bool get_identity(void *socket, char *data, size_t *size)
const bool conflate
Definition: pipe.hpp:240
blob_t credential
Definition: pipe.hpp:232
uint64_t msgs_read
Definition: pipe.hpp:187
blob_t identity
Definition: pipe.hpp:226
int routing_id
Definition: pipe.hpp:229
int outhwmboost
Definition: pipe.hpp:184
i_pipe_events * sink
Definition: pipe.hpp:198
bool delay
Definition: pipe.hpp:223
ypipe_base_t< msg_t > upipe_t
Definition: pipe.hpp:144
virtual void write_activated(zmq::pipe_t *pipe_)=0
std::basic_string< unsigned char > blob_t
Definition: blob.hpp:134
virtual void pipe_terminated(zmq::pipe_t *pipe_)=0
bool in_active
Definition: pipe.hpp:173
uint64_t msgs_written
Definition: pipe.hpp:188
virtual void read_activated(zmq::pipe_t *pipe_)=0
uint64_t peers_msgs_read
Definition: pipe.hpp:192
bool out_active
Definition: pipe.hpp:174
upipe_t * outpipe
Definition: pipe.hpp:170
int hwm
Definition: pipe.hpp:177
int inhwmboost
Definition: pipe.hpp:183
Definition: address.hpp:35
pipe_t * peer
Definition: pipe.hpp:195