Line data Source code
1 : /*
2 : Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 :
4 : This file is part of libzmq, the ZeroMQ core engine in C++.
5 :
6 : libzmq is free software; you can redistribute it and/or modify it under
7 : the terms of the GNU Lesser General Public License (LGPL) as published
8 : by the Free Software Foundation; either version 3 of the License, or
9 : (at your option) any later version.
10 :
11 : As a special exception, the Contributors give you permission to link
12 : this library with independent modules to produce an executable,
13 : regardless of the license terms of these independent modules, and to
14 : copy and distribute the resulting executable under terms of your choice,
15 : provided that you also meet, for each linked independent module, the
16 : terms and conditions of the license of that module. An independent
17 : module is a module which is not derived from or based on this library.
18 : If you modify this library, you must extend this exception to your
19 : version of the library.
20 :
21 : libzmq is distributed in the hope that it will be useful, but WITHOUT
22 : ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 : FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 : License for more details.
25 :
26 : You should have received a copy of the GNU Lesser General Public License
27 : along with this program. If not, see <http://www.gnu.org/licenses/>.
28 : */
29 :
30 : #include "precompiled.hpp"
31 : #include "fq.hpp"
32 : #include "pipe.hpp"
33 : #include "err.hpp"
34 : #include "msg.hpp"
35 :
36 4398 : zmq::fq_t::fq_t () :
37 : active (0),
38 : last_in (NULL),
39 : current (0),
40 8796 : more (false)
41 : {
42 4398 : }
43 :
44 13194 : zmq::fq_t::~fq_t ()
45 : {
46 8796 : zmq_assert (pipes.empty ());
47 4398 : }
48 :
49 5001 : void zmq::fq_t::attach (pipe_t *pipe_)
50 : {
51 5001 : pipes.push_back (pipe_);
52 10002 : pipes.swap (active, pipes.size () - 1);
53 5001 : active++;
54 5001 : }
55 :
56 5001 : void zmq::fq_t::pipe_terminated (pipe_t *pipe_)
57 : {
58 10002 : const pipes_t::size_type index = pipes.index (pipe_);
59 :
60 : // Remove the pipe from the list; adjust number of active pipes
61 : // accordingly.
62 5001 : if (index < active) {
63 4626 : active--;
64 4626 : pipes.swap (index, active);
65 4626 : if (current == active)
66 4208 : current = 0;
67 : }
68 5001 : pipes.erase (pipe_);
69 :
70 5001 : if (last_in == pipe_) {
71 1978 : saved_credential = last_in->get_credential ();
72 989 : last_in = NULL;
73 : }
74 5001 : }
75 :
76 1129 : void zmq::fq_t::activated (pipe_t *pipe_)
77 : {
78 : // Move the pipe to the list of active pipes.
79 2258 : pipes.swap (pipes.index (pipe_), active);
80 1129 : active++;
81 1129 : }
82 :
83 3683632 : int zmq::fq_t::recv (msg_t *msg_)
84 : {
85 3683632 : return recvpipe (msg_, NULL);
86 : }
87 :
88 7000717 : int zmq::fq_t::recvpipe (msg_t *msg_, pipe_t **pipe_)
89 : {
90 : // Deallocate old content of the message.
91 7000717 : int rc = msg_->close ();
92 6999129 : errno_assert (rc == 0);
93 :
94 : // Round-robin over the pipes to get the next message.
95 7024328 : while (active > 0) {
96 :
97 : // Try to fetch new message. If we've already read part of the message
98 : // subsequent part should be immediately available.
99 1706814 : bool fetched = pipes [current]->read (msg_);
100 :
101 : // Note that when message is not fetched, current pipe is deactivated
102 : // and replaced by another active pipe. Thus we don't have to increase
103 : // the 'current' pointer.
104 853407 : if (fetched) {
105 852000 : if (pipe_)
106 1202274 : *pipe_ = pipes [current];
107 852000 : more = msg_->flags () & msg_t::more? true: false;
108 852000 : if (!more) {
109 1702248 : last_in = pipes [current];
110 851124 : current = (current + 1) % active;
111 : }
112 : return 0;
113 : }
114 :
115 : // Check the atomicity of the message.
116 : // If we've already received the first part of the message
117 : // we should get the remaining parts without blocking.
118 1407 : zmq_assert (!more);
119 :
120 1407 : active--;
121 1407 : pipes.swap (current, active);
122 6362 : if (current == active)
123 1226 : current = 0;
124 : }
125 :
126 : // No message is available. Initialise the output parameter
127 : // to be a 0-byte message.
128 6170921 : rc = msg_->init ();
129 6169440 : errno_assert (rc == 0);
130 6169440 : errno = EAGAIN;
131 6169319 : return -1;
132 : }
133 :
134 1124 : bool zmq::fq_t::has_in ()
135 : {
136 : // There are subsequent parts of the partly-read message available.
137 1124 : if (more)
138 : return true;
139 :
140 : // Note that messing with current doesn't break the fairness of fair
141 : // queueing algorithm. If there are no messages available current will
142 : // get back to its original value. Otherwise it'll point to the first
143 : // pipe holding messages, skipping only pipes with no messages available.
144 1221 : while (active > 0) {
145 386 : if (pipes [current]->check_read ())
146 : return true;
147 :
148 : // Deactivate the pipe.
149 97 : active--;
150 97 : pipes.swap (current, active);
151 97 : if (current == active)
152 95 : current = 0;
153 : }
154 :
155 : return false;
156 : }
157 :
158 0 : zmq::blob_t zmq::fq_t::get_credential () const
159 : {
160 : return last_in?
161 0 : last_in->get_credential (): saved_credential;
162 : }
163 :
|