Line data Source code
1 : /*
2 : Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 :
4 : This file is part of libzmq, the ZeroMQ core engine in C++.
5 :
6 : libzmq is free software; you can redistribute it and/or modify it under
7 : the terms of the GNU Lesser General Public License (LGPL) as published
8 : by the Free Software Foundation; either version 3 of the License, or
9 : (at your option) any later version.
10 :
11 : As a special exception, the Contributors give you permission to link
12 : this library with independent modules to produce an executable,
13 : regardless of the license terms of these independent modules, and to
14 : copy and distribute the resulting executable under terms of your choice,
15 : provided that you also meet, for each linked independent module, the
16 : terms and conditions of the license of that module. An independent
17 : module is a module which is not derived from or based on this library.
18 : If you modify this library, you must extend this exception to your
19 : version of the library.
20 :
21 : libzmq is distributed in the hope that it will be useful, but WITHOUT
22 : ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 : FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 : License for more details.
25 :
26 : You should have received a copy of the GNU Lesser General Public License
27 : along with this program. If not, see <http://www.gnu.org/licenses/>.
28 : */
29 :
30 : #include "precompiled.hpp"
31 : #include <new>
32 : #include <stddef.h>
33 :
34 : #include "macros.hpp"
35 : #include "pipe.hpp"
36 : #include "err.hpp"
37 :
38 : #include "ypipe.hpp"
39 : #include "ypipe_conflate.hpp"
40 :
41 7846 : int zmq::pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2],
42 : int hwms_ [2], bool conflate_ [2])
43 : {
44 : // Creates two pipe objects. These objects are connected by two ypipes,
45 : // each to pass messages in one direction.
46 :
47 : typedef ypipe_t <msg_t, message_pipe_granularity> upipe_normal_t;
48 : typedef ypipe_conflate_t <msg_t> upipe_conflate_t;
49 :
50 : pipe_t::upipe_t *upipe1;
51 7846 : if(conflate_ [0])
52 6 : upipe1 = new (std::nothrow) upipe_conflate_t ();
53 : else
54 7843 : upipe1 = new (std::nothrow) upipe_normal_t ();
55 7849 : alloc_assert (upipe1);
56 :
57 : pipe_t::upipe_t *upipe2;
58 7849 : if(conflate_ [1])
59 6 : upipe2 = new (std::nothrow) upipe_conflate_t ();
60 : else
61 7846 : upipe2 = new (std::nothrow) upipe_normal_t ();
62 7850 : alloc_assert (upipe2);
63 :
64 : pipes_ [0] = new (std::nothrow) pipe_t (parents_ [0], upipe1, upipe2,
65 7850 : hwms_ [1], hwms_ [0], conflate_ [0]);
66 7850 : alloc_assert (pipes_ [0]);
67 : pipes_ [1] = new (std::nothrow) pipe_t (parents_ [1], upipe2, upipe1,
68 7850 : hwms_ [0], hwms_ [1], conflate_ [1]);
69 7850 : alloc_assert (pipes_ [1]);
70 :
71 7850 : pipes_ [0]->set_peer (pipes_ [1]);
72 7850 : pipes_ [1]->set_peer (pipes_ [0]);
73 :
74 7850 : return 0;
75 : }
76 :
77 15698 : zmq::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_,
78 : int inhwm_, int outhwm_, bool conflate_) :
79 : object_t (parent_),
80 : inpipe (inpipe_),
81 : outpipe (outpipe_),
82 : in_active (true),
83 : out_active (true),
84 : hwm (outhwm_),
85 : lwm (compute_lwm (inhwm_)),
86 : inhwmboost(0),
87 : outhwmboost(0),
88 : msgs_read (0),
89 : msgs_written (0),
90 : peers_msgs_read (0),
91 : peer (NULL),
92 : sink (NULL),
93 : state (active),
94 : delay (true),
95 : routing_id(0),
96 109892 : conflate (conflate_)
97 : {
98 15699 : }
99 :
100 109849 : zmq::pipe_t::~pipe_t ()
101 : {
102 31386 : }
103 :
104 15700 : void zmq::pipe_t::set_peer (pipe_t *peer_)
105 : {
106 : // Peer can be set once only.
107 15700 : zmq_assert (!peer);
108 15700 : peer = peer_;
109 15700 : }
110 :
111 15697 : void zmq::pipe_t::set_event_sink (i_pipe_events *sink_)
112 : {
113 : // Sink can be set once only.
114 15697 : zmq_assert (!sink);
115 15697 : sink = sink_;
116 15697 : }
117 :
118 15 : void zmq::pipe_t::set_routing_id (uint32_t routing_id_)
119 : {
120 15 : routing_id = routing_id_;
121 15 : }
122 :
123 600033 : uint32_t zmq::pipe_t::get_routing_id ()
124 : {
125 600033 : return routing_id;
126 : }
127 :
128 627 : void zmq::pipe_t::set_identity (const blob_t &identity_)
129 : {
130 627 : identity = identity_;
131 627 : }
132 :
133 996 : zmq::blob_t zmq::pipe_t::get_identity ()
134 : {
135 1992 : return identity;
136 : }
137 :
138 1064 : zmq::blob_t zmq::pipe_t::get_credential () const
139 : {
140 2128 : return credential;
141 : }
142 :
143 5915 : bool zmq::pipe_t::check_read ()
144 : {
145 5915 : if (unlikely (!in_active))
146 : return false;
147 3487 : if (unlikely (state != active && state != waiting_for_delimiter))
148 : return false;
149 :
150 : // Check if there's an item in the pipe.
151 1783 : if (!inpipe->check_read ()) {
152 1406 : in_active = false;
153 1406 : return false;
154 : }
155 :
156 : // If the next item in the pipe is message delimiter,
157 : // initiate termination process.
158 377 : if (inpipe->probe (is_delimiter)) {
159 : msg_t msg;
160 212 : bool ok = inpipe->read (&msg);
161 212 : zmq_assert (ok);
162 212 : process_delimiter ();
163 : return false;
164 : }
165 :
166 : return true;
167 : }
168 :
169 1495454 : bool zmq::pipe_t::read (msg_t *msg_)
170 : {
171 1495454 : if (unlikely (!in_active))
172 : return false;
173 1493100 : if (unlikely (state != active && state != waiting_for_delimiter))
174 : return false;
175 :
176 : read_message:
177 1490877 : if (!inpipe->read (msg_)) {
178 11591 : in_active = false;
179 11591 : return false;
180 : }
181 :
182 : // If this is a credential, save a copy and receive next message.
183 1480851 : if (unlikely (msg_->is_credential ())) {
184 12 : const unsigned char *data = static_cast <const unsigned char *> (msg_->data ());
185 36 : credential = blob_t (data, msg_->size ());
186 12 : const int rc = msg_->close ();
187 12 : zmq_assert (rc == 0);
188 : goto read_message;
189 : }
190 :
191 : // If delimiter was read, start termination process of the pipe.
192 1480819 : if (msg_->is_delimiter ()) {
193 3171 : process_delimiter ();
194 3171 : return false;
195 : }
196 :
197 1477739 : if (!(msg_->flags () & msg_t::more) && !msg_->is_identity ())
198 1475213 : msgs_read++;
199 :
200 1477725 : if (lwm > 0 && msgs_read % lwm == 0)
201 4452 : send_activate_write (peer, msgs_read);
202 :
203 : return true;
204 : }
205 :
206 1487622 : bool zmq::pipe_t::check_write ()
207 : {
208 1487622 : if (unlikely (!out_active || state != active))
209 : return false;
210 :
211 1487605 : bool full = !check_hwm();
212 :
213 1487605 : if (unlikely (full)) {
214 2089 : out_active = false;
215 2089 : return false;
216 : }
217 :
218 : return true;
219 : }
220 :
221 1485909 : bool zmq::pipe_t::write (msg_t *msg_)
222 : {
223 1485909 : if (unlikely (!check_write ()))
224 : return false;
225 :
226 1483821 : bool more = msg_->flags () & msg_t::more ? true : false;
227 1479839 : const bool is_identity = msg_->is_identity ();
228 1480369 : outpipe->write (*msg_, more);
229 1480146 : if (!more && !is_identity)
230 1477565 : msgs_written++;
231 :
232 : return true;
233 : }
234 :
235 12136 : void zmq::pipe_t::rollback ()
236 : {
237 : // Remove incomplete message from the outbound pipe.
238 : msg_t msg;
239 12136 : if (outpipe) {
240 11881 : while (outpipe->unwrite (&msg)) {
241 39 : zmq_assert (msg.flags () & msg_t::more);
242 39 : int rc = msg.close ();
243 39 : errno_assert (rc == 0);
244 : }
245 : }
246 12136 : }
247 :
248 876740 : void zmq::pipe_t::flush ()
249 : {
250 : // The peer does not exist anymore at this point.
251 876740 : if (state == term_ack_sent)
252 876773 : return;
253 :
254 874498 : if (outpipe && !outpipe->flush ())
255 11130 : send_activate_read (peer);
256 : }
257 :
258 11130 : void zmq::pipe_t::process_activate_read ()
259 : {
260 11130 : if (!in_active && (state == active || state == waiting_for_delimiter)) {
261 9418 : in_active = true;
262 9418 : sink->read_activated (this);
263 : }
264 11130 : }
265 :
266 4452 : void zmq::pipe_t::process_activate_write (uint64_t msgs_read_)
267 : {
268 : // Remember the peer's message sequence number.
269 4452 : peers_msgs_read = msgs_read_;
270 :
271 4452 : if (!out_active && state == active) {
272 2071 : out_active = true;
273 2071 : sink->write_activated (this);
274 : }
275 4452 : }
276 :
277 4 : void zmq::pipe_t::process_hiccup (void *pipe_)
278 : {
279 : // Destroy old outpipe. Note that the read end of the pipe was already
280 : // migrated to this thread.
281 4 : zmq_assert (outpipe);
282 4 : outpipe->flush ();
283 : msg_t msg;
284 5 : while (outpipe->read (&msg)) {
285 1 : if (!(msg.flags () & msg_t::more))
286 1 : msgs_written--;
287 1 : int rc = msg.close ();
288 1 : errno_assert (rc == 0);
289 : }
290 4 : LIBZMQ_DELETE(outpipe);
291 :
292 : // Plug in the new outpipe.
293 4 : zmq_assert (pipe_);
294 4 : outpipe = (upipe_t*) pipe_;
295 4 : out_active = true;
296 :
297 : // If appropriate, notify the user about the hiccup.
298 4 : if (state == active)
299 3 : sink->hiccuped (this);
300 4 : }
301 :
302 9737 : void zmq::pipe_t::process_pipe_term ()
303 : {
304 9737 : zmq_assert (state == active
305 : || state == delimiter_received
306 : || state == term_req_sent1);
307 :
308 : // This is the simple case of peer-induced termination. If there are no
309 : // more pending messages to read, or if the pipe was configured to drop
310 : // pending messages, we can move directly to the term_ack_sent state.
311 : // Otherwise we'll hang up in waiting_for_delimiter state till all
312 : // pending messages are read.
313 9738 : if (state == active) {
314 5812 : if (delay)
315 5806 : state = waiting_for_delimiter;
316 : else {
317 6 : state = term_ack_sent;
318 6 : outpipe = NULL;
319 6 : send_pipe_term_ack (peer);
320 : }
321 : }
322 :
323 : // Delimiter happened to arrive before the term command. Now we have the
324 : // term command as well, so we can move straight to term_ack_sent state.
325 : else
326 3926 : if (state == delimiter_received) {
327 139 : state = term_ack_sent;
328 139 : outpipe = NULL;
329 139 : send_pipe_term_ack (peer);
330 : }
331 :
332 : // This is the case where both ends of the pipe are closed in parallel.
333 : // We simply reply to the request by ack and continue waiting for our
334 : // own ack.
335 : else
336 3787 : if (state == term_req_sent1) {
337 3787 : state = term_req_sent2;
338 3787 : outpipe = NULL;
339 3787 : send_pipe_term_ack (peer);
340 : }
341 9747 : }
342 :
343 15683 : void zmq::pipe_t::process_pipe_term_ack ()
344 : {
345 : // Notify the user that all the references to the pipe should be dropped.
346 15683 : zmq_assert (sink);
347 15683 : sink->pipe_terminated (this);
348 :
349 : // In term_ack_sent and term_req_sent2 states there's nothing to do.
350 : // Simply deallocate the pipe. In term_req_sent1 state we have to ack
351 : // the peer before deallocating this side of the pipe.
352 : // All the other states are invalid.
353 15692 : if (state == term_req_sent1) {
354 5951 : outpipe = NULL;
355 5951 : send_pipe_term_ack (peer);
356 : }
357 : else
358 9741 : zmq_assert (state == term_ack_sent || state == term_req_sent2);
359 :
360 : // We'll deallocate the inbound pipe, the peer will deallocate the outbound
361 : // pipe (which is an inbound pipe from its point of view).
362 : // First, delete all the unread messages in the pipe. We have to do it by
363 : // hand because msg_t doesn't have automatic destructor. Then deallocate
364 : // the ypipe itself.
365 :
366 15675 : if (!conflate) {
367 : msg_t msg;
368 22303 : while (inpipe->read (&msg)) {
369 6634 : int rc = msg.close ();
370 6634 : errno_assert (rc == 0);
371 : }
372 : }
373 :
374 15689 : LIBZMQ_DELETE(inpipe);
375 :
376 : // Deallocate the pipe object
377 15695 : delete this;
378 15694 : }
379 :
380 3184 : void zmq::pipe_t::set_nodelay ()
381 : {
382 3184 : this->delay = false;
383 3184 : }
384 :
385 15177 : void zmq::pipe_t::terminate (bool delay_)
386 : {
387 : // Overload the value specified at pipe creation.
388 15177 : delay = delay_;
389 :
390 : // If terminate was already called, we can ignore the duplicate invocation.
391 15177 : if (state == term_req_sent1 || state == term_req_sent2) {
392 : return;
393 : }
394 : // If the pipe is in the final phase of async termination, it's going to
395 : // closed anyway. No need to do anything special here.
396 15149 : else if (state == term_ack_sent) {
397 : return;
398 : }
399 : // The simple sync termination case. Ask the peer to terminate and wait
400 : // for the ack.
401 13759 : else if (state == active) {
402 8312 : send_pipe_term (peer);
403 8312 : state = term_req_sent1;
404 : }
405 : // There are still pending messages available, but the user calls
406 : // 'terminate'. We can act as if all the pending messages were read.
407 5447 : else if (state == waiting_for_delimiter && !delay) {
408 3996 : outpipe = NULL;
409 3996 : send_pipe_term_ack (peer);
410 3998 : state = term_ack_sent;
411 : }
412 : // If there are pending messages still available, do nothing.
413 1451 : else if (state == waiting_for_delimiter) {
414 : }
415 : // We've already got delimiter, but not term command yet. We can ignore
416 : // the delimiter and ack synchronously terminate as if we were in
417 : // active state.
418 1436 : else if (state == delimiter_received) {
419 1436 : send_pipe_term (peer);
420 1436 : state = term_req_sent1;
421 : }
422 : // There are no other states.
423 : else {
424 0 : zmq_assert (false);
425 : }
426 :
427 : // Stop outbound flow of messages.
428 13761 : out_active = false;
429 :
430 13761 : if (outpipe) {
431 :
432 : // Drop any unfinished outbound messages.
433 9763 : rollback ();
434 :
435 : // Write the delimiter into the pipe. Note that watermarks are not
436 : // checked; thus the delimiter can be written even when the pipe is full.
437 : msg_t msg;
438 9763 : msg.init_delimiter ();
439 9763 : outpipe->write (msg, false);
440 9762 : flush ();
441 : }
442 : }
443 :
444 377 : bool zmq::pipe_t::is_delimiter (const msg_t &msg_)
445 : {
446 377 : return msg_.is_delimiter ();
447 : }
448 :
449 0 : int zmq::pipe_t::compute_lwm (int hwm_)
450 : {
451 : // Compute the low water mark. Following point should be taken
452 : // into consideration:
453 : //
454 : // 1. LWM has to be less than HWM.
455 : // 2. LWM cannot be set to very low value (such as zero) as after filling
456 : // the queue it would start to refill only after all the messages are
457 : // read from it and thus unnecessarily hold the progress back.
458 : // 3. LWM cannot be set to very high value (such as HWM-1) as it would
459 : // result in lock-step filling of the queue - if a single message is
460 : // read from a full queue, writer thread is resumed to write exactly one
461 : // message to the queue and go back to sleep immediately. This would
462 : // result in low performance.
463 : //
464 : // Given the 3. it would be good to keep HWM and LWM as far apart as
465 : // possible to reduce the thread switching overhead to almost zero.
466 : // Let's make LWM 1/2 of HWM.
467 16257 : int result = (hwm_ + 1) / 2;
468 :
469 0 : return result;
470 : }
471 :
472 3383 : void zmq::pipe_t::process_delimiter ()
473 : {
474 3383 : zmq_assert (state == active
475 : || state == waiting_for_delimiter);
476 :
477 3383 : if (state == active)
478 1575 : state = delimiter_received;
479 : else {
480 1808 : outpipe = NULL;
481 1808 : send_pipe_term_ack (peer);
482 1808 : state = term_ack_sent;
483 : }
484 3383 : }
485 :
486 115 : void zmq::pipe_t::hiccup ()
487 : {
488 : // If termination is already under way do nothing.
489 115 : if (state != active)
490 115 : return;
491 :
492 : // We'll drop the pointer to the inpipe. From now on, the peer is
493 : // responsible for deallocating it.
494 4 : inpipe = NULL;
495 :
496 : // Create new inpipe.
497 4 : if (conflate)
498 0 : inpipe = new (std::nothrow)ypipe_conflate_t <msg_t>();
499 : else
500 4 : inpipe = new (std::nothrow)ypipe_t <msg_t, message_pipe_granularity>();
501 :
502 4 : alloc_assert (inpipe);
503 4 : in_active = true;
504 :
505 : // Notify the peer about the hiccup.
506 4 : send_hiccup (peer, (void*) inpipe);
507 : }
508 :
509 558 : void zmq::pipe_t::set_hwms (int inhwm_, int outhwm_)
510 : {
511 558 : int in = inhwm_ + inhwmboost;
512 558 : int out = outhwm_ + outhwmboost;
513 :
514 : // if either send or recv side has hwm <= 0 it means infinite so we should set hwms infinite
515 558 : if (inhwm_ <= 0 || inhwmboost <= 0)
516 12 : in = 0;
517 :
518 558 : if (outhwm_ <= 0 || outhwmboost <= 0)
519 12 : out = 0;
520 :
521 558 : lwm = compute_lwm(in);
522 558 : hwm = out;
523 558 : }
524 :
525 1686 : void zmq::pipe_t::set_hwms_boost(int inhwmboost_, int outhwmboost_)
526 : {
527 1686 : inhwmboost = inhwmboost_;
528 1686 : outhwmboost = outhwmboost_;
529 1686 : }
530 :
531 38801 : bool zmq::pipe_t::check_hwm () const
532 : {
533 1526406 : bool full = hwm > 0 && msgs_written - peers_msgs_read >= uint64_t (hwm);
534 1526406 : return( !full );
535 : }
|