Line data Source code
1 : /*
2 : Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 :
4 : This file is part of libzmq, the ZeroMQ core engine in C++.
5 :
6 : libzmq is free software; you can redistribute it and/or modify it under
7 : the terms of the GNU Lesser General Public License (LGPL) as published
8 : by the Free Software Foundation; either version 3 of the License, or
9 : (at your option) any later version.
10 :
11 : As a special exception, the Contributors give you permission to link
12 : this library with independent modules to produce an executable,
13 : regardless of the license terms of these independent modules, and to
14 : copy and distribute the resulting executable under terms of your choice,
15 : provided that you also meet, for each linked independent module, the
16 : terms and conditions of the license of that module. An independent
17 : module is a module which is not derived from or based on this library.
18 : If you modify this library, you must extend this exception to your
19 : version of the library.
20 :
21 : libzmq is distributed in the hope that it will be useful, but WITHOUT
22 : ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 : FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 : License for more details.
25 :
26 : You should have received a copy of the GNU Lesser General Public License
27 : along with this program. If not, see <http://www.gnu.org/licenses/>.
28 : */
29 :
30 : #include "precompiled.hpp"
31 : #include "dist.hpp"
32 : #include "pipe.hpp"
33 : #include "err.hpp"
34 : #include "msg.hpp"
35 : #include "likely.hpp"
36 :
37 3531 : zmq::dist_t::dist_t () :
38 : matching (0),
39 : active (0),
40 : eligible (0),
41 7062 : more (false)
42 : {
43 3531 : }
44 :
45 7062 : zmq::dist_t::~dist_t ()
46 : {
47 7062 : zmq_assert (pipes.empty ());
48 3531 : }
49 :
50 6301 : void zmq::dist_t::attach (pipe_t *pipe_)
51 : {
52 : // If we are in the middle of sending a message, we'll add new pipe
53 : // into the list of eligible pipes. Otherwise we add it to the list
54 : // of active pipes.
55 6301 : if (more) {
56 0 : pipes.push_back (pipe_);
57 0 : pipes.swap (eligible, pipes.size () - 1);
58 0 : eligible++;
59 : }
60 : else {
61 6301 : pipes.push_back (pipe_);
62 12602 : pipes.swap (active, pipes.size () - 1);
63 6301 : active++;
64 6301 : eligible++;
65 : }
66 6301 : }
67 :
68 75230 : void zmq::dist_t::match (pipe_t *pipe_)
69 : {
70 : // If pipe is already matching do nothing.
71 150460 : if (pipes.index (pipe_) < matching)
72 : return;
73 :
74 : // If the pipe isn't eligible, ignore it.
75 126864 : if (pipes.index (pipe_) >= eligible)
76 : return;
77 :
78 : // Mark the pipe as matching.
79 126864 : pipes.swap (pipes.index (pipe_), matching);
80 63432 : matching++;
81 : }
82 :
83 6 : void zmq::dist_t::reverse_match ()
84 : {
85 6 : pipes_t::size_type prev_matching = matching;
86 :
87 : // Reset matching to 0
88 : unmatch();
89 :
90 : // Mark all matching pipes as not matching and vice-versa.
91 : // To do this, push all pipes that are eligible but not
92 : // matched - i.e. between "matching" and "eligible" -
93 : // to the beginning of the queue.
94 12 : for (pipes_t::size_type i = prev_matching; i < eligible; ++i) {
95 6 : pipes.swap(i, matching++);
96 : }
97 6 : }
98 :
99 63423 : void zmq::dist_t::unmatch ()
100 : {
101 63429 : matching = 0;
102 63423 : }
103 :
104 6301 : void zmq::dist_t::pipe_terminated (pipe_t *pipe_)
105 : {
106 : // Remove the pipe from the list; adjust number of matching, active and/or
107 : // eligible pipes accordingly.
108 12602 : if (pipes.index (pipe_) < matching) {
109 156 : pipes.swap (pipes.index (pipe_), matching - 1);
110 78 : matching--;
111 : }
112 12602 : if (pipes.index (pipe_) < active) {
113 12602 : pipes.swap (pipes.index (pipe_), active - 1);
114 6301 : active--;
115 : }
116 12602 : if (pipes.index (pipe_) < eligible) {
117 12602 : pipes.swap (pipes.index (pipe_), eligible - 1);
118 6301 : eligible--;
119 : }
120 :
121 6301 : pipes.erase (pipe_);
122 6301 : }
123 :
124 0 : void zmq::dist_t::activated (pipe_t *pipe_)
125 : {
126 : // Move the pipe from passive to eligible state.
127 0 : if (eligible < pipes.size ()) {
128 0 : pipes.swap (pipes.index (pipe_), eligible);
129 0 : eligible++;
130 : }
131 :
132 : // If there's no message being sent at the moment, move it to
133 : // the active state.
134 0 : if (!more && active < pipes.size ()) {
135 0 : pipes.swap (eligible - 1, active);
136 0 : active++;
137 : }
138 0 : }
139 :
140 138 : int zmq::dist_t::send_to_all (msg_t *msg_)
141 : {
142 138 : matching = active;
143 138 : return send_to_matching (msg_);
144 : }
145 :
146 63621 : int zmq::dist_t::send_to_matching (msg_t *msg_)
147 : {
148 : // Is this end of a multipart message?
149 63621 : bool msg_more = msg_->flags () & msg_t::more ? true : false;
150 :
151 : // Push the message to matching pipes.
152 63621 : distribute (msg_);
153 :
154 : // If multipart message is fully sent, activate all the eligible pipes.
155 63621 : if (!msg_more)
156 63561 : active = eligible;
157 :
158 63621 : more = msg_more;
159 :
160 63621 : return 0;
161 : }
162 :
163 63621 : void zmq::dist_t::distribute (msg_t *msg_)
164 : {
165 : // If there are no matching pipes available, simply drop the message.
166 63621 : if (matching == 0) {
167 84 : int rc = msg_->close ();
168 84 : errno_assert (rc == 0);
169 84 : rc = msg_->init ();
170 84 : errno_assert (rc == 0);
171 : return;
172 : }
173 :
174 63537 : if (msg_->is_vsm ()) {
175 63543 : for (pipes_t::size_type i = 0; i < matching; ++i)
176 127086 : if(!write (pipes [i], msg_))
177 0 : --i; // Retry last write because index will have been swapped
178 63507 : int rc = msg_->close();
179 63507 : errno_assert (rc == 0);
180 63507 : rc = msg_->init ();
181 63507 : errno_assert (rc == 0);
182 : return;
183 : }
184 :
185 : // Add matching-1 references to the message. We already hold one reference,
186 : // that's why -1.
187 30 : msg_->add_refs ((int) matching - 1);
188 :
189 : // Push copy of the message to each matching pipe.
190 30 : int failed = 0;
191 60 : for (pipes_t::size_type i = 0; i < matching; ++i)
192 60 : if (!write (pipes [i], msg_)) {
193 0 : ++failed;
194 0 : --i; // Retry last write because index will have been swapped
195 : }
196 30 : if (unlikely (failed))
197 0 : msg_->rm_refs (failed);
198 :
199 : // Detach the original message from the data buffer. Note that we don't
200 : // close the message. That's because we've already used all the references.
201 30 : int rc = msg_->init ();
202 30 : errno_assert (rc == 0);
203 : }
204 :
205 33 : bool zmq::dist_t::has_out ()
206 : {
207 33 : return true;
208 : }
209 :
210 63573 : bool zmq::dist_t::write (pipe_t *pipe_, msg_t *msg_)
211 : {
212 63573 : if (!pipe_->write (msg_)) {
213 0 : pipes.swap (pipes.index (pipe_), matching - 1);
214 0 : matching--;
215 0 : pipes.swap (pipes.index (pipe_), active - 1);
216 0 : active--;
217 0 : pipes.swap (active, eligible - 1);
218 0 : eligible--;
219 0 : return false;
220 : }
221 63573 : if (!(msg_->flags () & msg_t::more))
222 63519 : pipe_->flush ();
223 : return true;
224 : }
225 :
226 38801 : bool zmq::dist_t::check_hwm ()
227 : {
228 65801 : for (pipes_t::size_type i = 0; i < matching; ++i)
229 77602 : if (!pipes [i]->check_hwm ())
230 : return false;
231 :
232 : return true;
233 : }
234 :
235 :
|