libzmq  master
ZeroMQ C++ Core Engine (LIBZMQ)
pipe.cpp
Go to the documentation of this file.
1 /*
2  Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 
4  This file is part of libzmq, the ZeroMQ core engine in C++.
5 
6  libzmq is free software; you can redistribute it and/or modify it under
7  the terms of the GNU Lesser General Public License (LGPL) as published
8  by the Free Software Foundation; either version 3 of the License, or
9  (at your option) any later version.
10 
11  As a special exception, the Contributors give you permission to link
12  this library with independent modules to produce an executable,
13  regardless of the license terms of these independent modules, and to
14  copy and distribute the resulting executable under terms of your choice,
15  provided that you also meet, for each linked independent module, the
16  terms and conditions of the license of that module. An independent
17  module is a module which is not derived from or based on this library.
18  If you modify this library, you must extend this exception to your
19  version of the library.
20 
21  libzmq is distributed in the hope that it will be useful, but WITHOUT
22  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23  FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24  License for more details.
25 
26  You should have received a copy of the GNU Lesser General Public License
27  along with this program. If not, see <http://www.gnu.org/licenses/>.
28 */
29 
30 #include "precompiled.hpp"
31 #include <new>
32 #include <stddef.h>
33 
34 #include "macros.hpp"
35 #include "pipe.hpp"
36 #include "err.hpp"
37 
38 #include "ypipe.hpp"
39 #include "ypipe_conflate.hpp"
40 
41 int zmq::pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2],
42  int hwms_ [2], bool conflate_ [2])
43 {
44  // Creates two pipe objects. These objects are connected by two ypipes,
45  // each to pass messages in one direction.
46 
47  typedef ypipe_t <msg_t, message_pipe_granularity> upipe_normal_t;
48  typedef ypipe_conflate_t <msg_t> upipe_conflate_t;
49 
50  pipe_t::upipe_t *upipe1;
51  if(conflate_ [0])
52  upipe1 = new (std::nothrow) upipe_conflate_t ();
53  else
54  upipe1 = new (std::nothrow) upipe_normal_t ();
55  alloc_assert (upipe1);
56 
57  pipe_t::upipe_t *upipe2;
58  if(conflate_ [1])
59  upipe2 = new (std::nothrow) upipe_conflate_t ();
60  else
61  upipe2 = new (std::nothrow) upipe_normal_t ();
62  alloc_assert (upipe2);
63 
64  pipes_ [0] = new (std::nothrow) pipe_t (parents_ [0], upipe1, upipe2,
65  hwms_ [1], hwms_ [0], conflate_ [0]);
66  alloc_assert (pipes_ [0]);
67  pipes_ [1] = new (std::nothrow) pipe_t (parents_ [1], upipe2, upipe1,
68  hwms_ [0], hwms_ [1], conflate_ [1]);
69  alloc_assert (pipes_ [1]);
70 
71  pipes_ [0]->set_peer (pipes_ [1]);
72  pipes_ [1]->set_peer (pipes_ [0]);
73 
74  return 0;
75 }
76 
77 zmq::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_,
78  int inhwm_, int outhwm_, bool conflate_) :
79  object_t (parent_),
80  inpipe (inpipe_),
81  outpipe (outpipe_),
82  in_active (true),
83  out_active (true),
84  hwm (outhwm_),
85  lwm (compute_lwm (inhwm_)),
86  inhwmboost(0),
87  outhwmboost(0),
88  msgs_read (0),
89  msgs_written (0),
90  peers_msgs_read (0),
91  peer (NULL),
92  sink (NULL),
93  state (active),
94  delay (true),
95  routing_id(0),
96  conflate (conflate_)
97 {
98 }
99 
101 {
102 }
103 
105 {
106  // Peer can be set once only.
107  zmq_assert (!peer);
108  peer = peer_;
109 }
110 
112 {
113  // Sink can be set once only.
114  zmq_assert (!sink);
115  sink = sink_;
116 }
117 
118 void zmq::pipe_t::set_routing_id (uint32_t routing_id_)
119 {
120  routing_id = routing_id_;
121 }
122 
124 {
125  return routing_id;
126 }
127 
128 void zmq::pipe_t::set_identity (const blob_t &identity_)
129 {
130  identity = identity_;
131 }
132 
134 {
135  return identity;
136 }
137 
139 {
140  return credential;
141 }
142 
144 {
145  if (unlikely (!in_active))
146  return false;
148  return false;
149 
150  // Check if there's an item in the pipe.
151  if (!inpipe->check_read ()) {
152  in_active = false;
153  return false;
154  }
155 
156  // If the next item in the pipe is message delimiter,
157  // initiate termination process.
158  if (inpipe->probe (is_delimiter)) {
159  msg_t msg;
160  bool ok = inpipe->read (&msg);
161  zmq_assert (ok);
163  return false;
164  }
165 
166  return true;
167 }
168 
170 {
171  if (unlikely (!in_active))
172  return false;
174  return false;
175 
176 read_message:
177  if (!inpipe->read (msg_)) {
178  in_active = false;
179  return false;
180  }
181 
182  // If this is a credential, save a copy and receive next message.
183  if (unlikely (msg_->is_credential ())) {
184  const unsigned char *data = static_cast <const unsigned char *> (msg_->data ());
185  credential = blob_t (data, msg_->size ());
186  const int rc = msg_->close ();
187  zmq_assert (rc == 0);
188  goto read_message;
189  }
190 
191  // If delimiter was read, start termination process of the pipe.
192  if (msg_->is_delimiter ()) {
194  return false;
195  }
196 
197  if (!(msg_->flags () & msg_t::more) && !msg_->is_identity ())
198  msgs_read++;
199 
200  if (lwm > 0 && msgs_read % lwm == 0)
202 
203  return true;
204 }
205 
207 {
208  if (unlikely (!out_active || state != active))
209  return false;
210 
211  bool full = !check_hwm();
212 
213  if (unlikely (full)) {
214  out_active = false;
215  return false;
216  }
217 
218  return true;
219 }
220 
222 {
223  if (unlikely (!check_write ()))
224  return false;
225 
226  bool more = msg_->flags () & msg_t::more ? true : false;
227  const bool is_identity = msg_->is_identity ();
228  outpipe->write (*msg_, more);
229  if (!more && !is_identity)
230  msgs_written++;
231 
232  return true;
233 }
234 
236 {
237  // Remove incomplete message from the outbound pipe.
238  msg_t msg;
239  if (outpipe) {
240  while (outpipe->unwrite (&msg)) {
241  zmq_assert (msg.flags () & msg_t::more);
242  int rc = msg.close ();
243  errno_assert (rc == 0);
244  }
245  }
246 }
247 
249 {
250  // The peer does not exist anymore at this point.
251  if (state == term_ack_sent)
252  return;
253 
254  if (outpipe && !outpipe->flush ())
256 }
257 
259 {
260  if (!in_active && (state == active || state == waiting_for_delimiter)) {
261  in_active = true;
262  sink->read_activated (this);
263  }
264 }
265 
266 void zmq::pipe_t::process_activate_write (uint64_t msgs_read_)
267 {
268  // Remember the peer's message sequence number.
269  peers_msgs_read = msgs_read_;
270 
271  if (!out_active && state == active) {
272  out_active = true;
273  sink->write_activated (this);
274  }
275 }
276 
277 void zmq::pipe_t::process_hiccup (void *pipe_)
278 {
279  // Destroy old outpipe. Note that the read end of the pipe was already
280  // migrated to this thread.
282  outpipe->flush ();
283  msg_t msg;
284  while (outpipe->read (&msg)) {
285  if (!(msg.flags () & msg_t::more))
286  msgs_written--;
287  int rc = msg.close ();
288  errno_assert (rc == 0);
289  }
291 
292  // Plug in the new outpipe.
293  zmq_assert (pipe_);
294  outpipe = (upipe_t*) pipe_;
295  out_active = true;
296 
297  // If appropriate, notify the user about the hiccup.
298  if (state == active)
299  sink->hiccuped (this);
300 }
301 
303 {
306  || state == term_req_sent1);
307 
308  // This is the simple case of peer-induced termination. If there are no
309  // more pending messages to read, or if the pipe was configured to drop
310  // pending messages, we can move directly to the term_ack_sent state.
311  // Otherwise we'll hang up in waiting_for_delimiter state till all
312  // pending messages are read.
313  if (state == active) {
314  if (delay)
316  else {
318  outpipe = NULL;
320  }
321  }
322 
323  // Delimiter happened to arrive before the term command. Now we have the
324  // term command as well, so we can move straight to term_ack_sent state.
325  else
326  if (state == delimiter_received) {
328  outpipe = NULL;
330  }
331 
332  // This is the case where both ends of the pipe are closed in parallel.
333  // We simply reply to the request by ack and continue waiting for our
334  // own ack.
335  else
336  if (state == term_req_sent1) {
338  outpipe = NULL;
340  }
341 }
342 
344 {
345  // Notify the user that all the references to the pipe should be dropped.
346  zmq_assert (sink);
347  sink->pipe_terminated (this);
348 
349  // In term_ack_sent and term_req_sent2 states there's nothing to do.
350  // Simply deallocate the pipe. In term_req_sent1 state we have to ack
351  // the peer before deallocating this side of the pipe.
352  // All the other states are invalid.
353  if (state == term_req_sent1) {
354  outpipe = NULL;
356  }
357  else
359 
360  // We'll deallocate the inbound pipe, the peer will deallocate the outbound
361  // pipe (which is an inbound pipe from its point of view).
362  // First, delete all the unread messages in the pipe. We have to do it by
363  // hand because msg_t doesn't have automatic destructor. Then deallocate
364  // the ypipe itself.
365 
366  if (!conflate) {
367  msg_t msg;
368  while (inpipe->read (&msg)) {
369  int rc = msg.close ();
370  errno_assert (rc == 0);
371  }
372  }
373 
375 
376  // Deallocate the pipe object
377  delete this;
378 }
379 
381 {
382  this->delay = false;
383 }
384 
385 void zmq::pipe_t::terminate (bool delay_)
386 {
387  // Overload the value specified at pipe creation.
388  delay = delay_;
389 
390  // If terminate was already called, we can ignore the duplicate invocation.
391  if (state == term_req_sent1 || state == term_req_sent2) {
392  return;
393  }
394  // If the pipe is in the final phase of async termination, it's going to
395  // closed anyway. No need to do anything special here.
396  else if (state == term_ack_sent) {
397  return;
398  }
399  // The simple sync termination case. Ask the peer to terminate and wait
400  // for the ack.
401  else if (state == active) {
404  }
405  // There are still pending messages available, but the user calls
406  // 'terminate'. We can act as if all the pending messages were read.
407  else if (state == waiting_for_delimiter && !delay) {
408  outpipe = NULL;
411  }
412  // If there are pending messages still available, do nothing.
413  else if (state == waiting_for_delimiter) {
414  }
415  // We've already got delimiter, but not term command yet. We can ignore
416  // the delimiter and ack synchronously terminate as if we were in
417  // active state.
418  else if (state == delimiter_received) {
421  }
422  // There are no other states.
423  else {
424  zmq_assert (false);
425  }
426 
427  // Stop outbound flow of messages.
428  out_active = false;
429 
430  if (outpipe) {
431 
432  // Drop any unfinished outbound messages.
433  rollback ();
434 
435  // Write the delimiter into the pipe. Note that watermarks are not
436  // checked; thus the delimiter can be written even when the pipe is full.
437  msg_t msg;
438  msg.init_delimiter ();
439  outpipe->write (msg, false);
440  flush ();
441  }
442 }
443 
445 {
446  return msg_.is_delimiter ();
447 }
448 
450 {
451  // Compute the low water mark. Following point should be taken
452  // into consideration:
453  //
454  // 1. LWM has to be less than HWM.
455  // 2. LWM cannot be set to very low value (such as zero) as after filling
456  // the queue it would start to refill only after all the messages are
457  // read from it and thus unnecessarily hold the progress back.
458  // 3. LWM cannot be set to very high value (such as HWM-1) as it would
459  // result in lock-step filling of the queue - if a single message is
460  // read from a full queue, writer thread is resumed to write exactly one
461  // message to the queue and go back to sleep immediately. This would
462  // result in low performance.
463  //
464  // Given the 3. it would be good to keep HWM and LWM as far apart as
465  // possible to reduce the thread switching overhead to almost zero.
466  // Let's make LWM 1/2 of HWM.
467  int result = (hwm_ + 1) / 2;
468 
469  return result;
470 }
471 
473 {
476 
477  if (state == active)
479  else {
480  outpipe = NULL;
483  }
484 }
485 
487 {
488  // If termination is already under way do nothing.
489  if (state != active)
490  return;
491 
492  // We'll drop the pointer to the inpipe. From now on, the peer is
493  // responsible for deallocating it.
494  inpipe = NULL;
495 
496  // Create new inpipe.
497  if (conflate)
498  inpipe = new (std::nothrow)ypipe_conflate_t <msg_t>();
499  else
501 
503  in_active = true;
504 
505  // Notify the peer about the hiccup.
506  send_hiccup (peer, (void*) inpipe);
507 }
508 
509 void zmq::pipe_t::set_hwms (int inhwm_, int outhwm_)
510 {
511  int in = inhwm_ + inhwmboost;
512  int out = outhwm_ + outhwmboost;
513 
514  // if either send or recv side has hwm <= 0 it means infinite so we should set hwms infinite
515  if (inhwm_ <= 0 || inhwmboost <= 0)
516  in = 0;
517 
518  if (outhwm_ <= 0 || outhwmboost <= 0)
519  out = 0;
520 
521  lwm = compute_lwm(in);
522  hwm = out;
523 }
524 
525 void zmq::pipe_t::set_hwms_boost(int inhwmboost_, int outhwmboost_)
526 {
527  inhwmboost = inhwmboost_;
528  outhwmboost = outhwmboost_;
529 }
530 
532 {
533  bool full = hwm > 0 && msgs_written - peers_msgs_read >= uint64_t (hwm);
534  return( !full );
535 }
virtual bool unwrite(T *value_)=0
bool check_write()
Definition: pipe.cpp:206
bool read(msg_t *msg_)
Definition: pipe.cpp:169
#define LIBZMQ_DELETE(p_object)
Definition: macros.hpp:7
int lwm
Definition: pipe.hpp:180
virtual bool read(T *value_)=0
upipe_t * inpipe
Definition: pipe.hpp:169
int close()
Definition: msg.cpp:217
virtual void hiccuped(zmq::pipe_t *pipe_)=0
bool write(msg_t *msg_)
Definition: pipe.cpp:221
virtual bool check_read()=0
int pipepair(zmq::object_t *parents_[2], zmq::pipe_t *pipes_[2], int hwms_[2], bool conflate_[2])
Definition: pipe.cpp:41
#define zmq_assert(x)
Definition: err.hpp:119
void set_hwms(int inhwm_, int outhwm_)
Definition: pipe.cpp:509
void set_nodelay()
Definition: pipe.cpp:380
bool is_identity() const
Definition: msg.cpp:417
const bool conflate
Definition: pipe.hpp:240
blob_t credential
Definition: pipe.hpp:232
virtual bool flush()=0
uint64_t msgs_read
Definition: pipe.hpp:187
blob_t get_credential() const
Definition: pipe.cpp:138
void process_activate_read()
Definition: pipe.cpp:258
void send_pipe_term_ack(zmq::pipe_t *destination_)
Definition: object.cpp:286
blob_t identity
Definition: pipe.hpp:226
unsigned char size
Definition: msg.hpp:188
int routing_id
Definition: pipe.hpp:229
void process_pipe_term()
Definition: pipe.cpp:302
void set_routing_id(uint32_t routing_id_)
Definition: pipe.cpp:118
void rollback()
Definition: pipe.cpp:235
uint32_t get_routing_id()
Definition: pipe.cpp:123
int outhwmboost
Definition: pipe.hpp:184
pipe_t(object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_, int inhwm_, int outhwm_, bool conflate_)
Definition: pipe.cpp:77
void set_hwms_boost(int inhwmboost_, int outhwmboost_)
Definition: pipe.cpp:525
i_pipe_events * sink
Definition: pipe.hpp:198
bool delay
Definition: pipe.hpp:223
#define unlikely(x)
Definition: likely.hpp:38
void process_hiccup(void *pipe_)
Definition: pipe.cpp:277
void process_activate_write(uint64_t msgs_read_)
Definition: pipe.cpp:266
virtual void write_activated(zmq::pipe_t *pipe_)=0
std::basic_string< unsigned char > blob_t
Definition: blob.hpp:134
void send_activate_write(zmq::pipe_t *destination_, uint64_t msgs_read_)
Definition: object.cpp:259
void flush()
Definition: pipe.cpp:248
virtual void pipe_terminated(zmq::pipe_t *pipe_)=0
static int compute_lwm(int hwm_)
Definition: pipe.cpp:449
void set_peer(pipe_t *pipe_)
Definition: pipe.cpp:104
int init_delimiter()
Definition: msg.cpp:187
bool check_hwm() const
Definition: pipe.cpp:531
bool in_active
Definition: pipe.hpp:173
void set_event_sink(i_pipe_events *sink_)
Definition: pipe.cpp:111
uint64_t msgs_written
Definition: pipe.hpp:188
virtual void read_activated(zmq::pipe_t *pipe_)=0
uint64_t peers_msgs_read
Definition: pipe.hpp:192
static bool is_delimiter(const msg_t &msg_)
Definition: pipe.cpp:444
void terminate(bool delay_)
Definition: pipe.cpp:385
bool out_active
Definition: pipe.hpp:174
upipe_t * outpipe
Definition: pipe.hpp:170
#define alloc_assert(x)
Definition: err.hpp:159
#define errno_assert(x)
Definition: err.hpp:129
void hiccup()
Definition: pipe.cpp:486
int hwm
Definition: pipe.hpp:177
virtual bool probe(bool(*fn)(const T &))=0
virtual void write(const T &value_, bool incomplete_)=0
void send_activate_read(zmq::pipe_t *destination_)
Definition: object.cpp:251
unsigned char data[max_vsm_size]
Definition: msg.hpp:187
void set_identity(const blob_t &identity_)
Definition: pipe.cpp:128
int inhwmboost
Definition: pipe.hpp:183
bool check_read()
Definition: pipe.cpp:143
bool is_delimiter() const
Definition: msg.cpp:427
void send_pipe_term(zmq::pipe_t *destination_)
Definition: object.cpp:278
enum zmq::pipe_t::@49 state
void send_hiccup(zmq::pipe_t *destination_, void *pipe_)
Definition: object.cpp:269
void process_pipe_term_ack()
Definition: pipe.cpp:343
pipe_t * peer
Definition: pipe.hpp:195
bool is_credential() const
Definition: msg.cpp:422
unsigned char flags
Definition: msg.hpp:181
void process_delimiter()
Definition: pipe.cpp:472
blob_t get_identity()
Definition: pipe.cpp:133