Line data Source code
1 : /*
2 : Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 :
4 : This file is part of libzmq, the ZeroMQ core engine in C++.
5 :
6 : libzmq is free software; you can redistribute it and/or modify it under
7 : the terms of the GNU Lesser General Public License (LGPL) as published
8 : by the Free Software Foundation; either version 3 of the License, or
9 : (at your option) any later version.
10 :
11 : As a special exception, the Contributors give you permission to link
12 : this library with independent modules to produce an executable,
13 : regardless of the license terms of these independent modules, and to
14 : copy and distribute the resulting executable under terms of your choice,
15 : provided that you also meet, for each linked independent module, the
16 : terms and conditions of the license of that module. An independent
17 : module is a module which is not derived from or based on this library.
18 : If you modify this library, you must extend this exception to your
19 : version of the library.
20 :
21 : libzmq is distributed in the hope that it will be useful, but WITHOUT
22 : ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 : FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 : License for more details.
25 :
26 : You should have received a copy of the GNU Lesser General Public License
27 : along with this program. If not, see <http://www.gnu.org/licenses/>.
28 : */
29 :
30 : #include "precompiled.hpp"
31 : #include <string.h>
32 :
33 : #include "platform.hpp"
34 :
35 : #ifdef ZMQ_HAVE_WINDOWS
36 : #include "windows.hpp"
37 : #endif
38 :
39 : #include "../include/zmq.h"
40 : #include "macros.hpp"
41 : #include "dish.hpp"
42 : #include "err.hpp"
43 :
44 6 : zmq::dish_t::dish_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
45 : socket_base_t (parent_, tid_, sid_, true),
46 12 : has_message (false)
47 : {
48 6 : options.type = ZMQ_DISH;
49 :
50 : // When socket is being closed down we don't want to wait till pending
51 : // subscription commands are sent to the wire.
52 6 : options.linger = 0;
53 :
54 6 : int rc = message.init ();
55 6 : errno_assert (rc == 0);
56 6 : }
57 :
58 24 : zmq::dish_t::~dish_t ()
59 : {
60 6 : int rc = message.close ();
61 6 : errno_assert (rc == 0);
62 12 : }
63 :
64 6 : void zmq::dish_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
65 : {
66 : LIBZMQ_UNUSED (subscribe_to_all_);
67 :
68 6 : zmq_assert (pipe_);
69 6 : fq.attach (pipe_);
70 6 : dist.attach (pipe_);
71 :
72 : // Send all the cached subscriptions to the new upstream peer.
73 6 : send_subscriptions (pipe_);
74 6 : }
75 :
76 12 : void zmq::dish_t::xread_activated (pipe_t *pipe_)
77 : {
78 12 : fq.activated (pipe_);
79 12 : }
80 :
81 0 : void zmq::dish_t::xwrite_activated (pipe_t *pipe_)
82 : {
83 0 : dist.activated (pipe_);
84 0 : }
85 :
86 6 : void zmq::dish_t::xpipe_terminated (pipe_t *pipe_)
87 : {
88 6 : fq.pipe_terminated (pipe_);
89 6 : dist.pipe_terminated (pipe_);
90 6 : }
91 :
92 0 : void zmq::dish_t::xhiccuped (pipe_t *pipe_)
93 : {
94 : // Send all the cached subscriptions to the hiccuped pipe.
95 0 : send_subscriptions (pipe_);
96 0 : }
97 :
98 15 : int zmq::dish_t::xjoin (const char* group_)
99 : {
100 15 : std::string group = std::string (group_);
101 :
102 15 : if (group.length () > ZMQ_GROUP_MAX_LENGTH) {
103 3 : errno = EINVAL;
104 3 : return -1;
105 : }
106 :
107 24 : subscriptions_t::iterator it = subscriptions.find (group);
108 :
109 : // User cannot join same group twice
110 24 : if (it != subscriptions.end ()) {
111 3 : errno = EINVAL;
112 3 : return -1;
113 : }
114 :
115 9 : subscriptions.insert (group);
116 :
117 : msg_t msg;
118 9 : int rc = msg.init_join ();
119 9 : errno_assert (rc == 0);
120 :
121 9 : rc = msg.set_group (group_);
122 9 : errno_assert (rc == 0);
123 :
124 9 : int err = 0;
125 9 : rc = dist.send_to_all (&msg);
126 9 : if (rc != 0)
127 0 : err = errno;
128 9 : int rc2 = msg.close ();
129 9 : errno_assert (rc2 == 0);
130 9 : if (rc != 0)
131 0 : errno = err;
132 9 : return rc;
133 : }
134 :
135 6 : int zmq::dish_t::xleave (const char* group_)
136 : {
137 6 : std::string group = std::string (group_);
138 :
139 6 : if (group.length () > ZMQ_GROUP_MAX_LENGTH) {
140 0 : errno = EINVAL;
141 0 : return -1;
142 : }
143 :
144 18 : subscriptions_t::iterator it = std::find (subscriptions.begin (), subscriptions.end (), group);
145 :
146 12 : if (it == subscriptions.end ()) {
147 3 : errno = EINVAL;
148 3 : return -1;
149 : }
150 :
151 3 : subscriptions.erase (it);
152 :
153 : msg_t msg;
154 3 : int rc = msg.init_leave ();
155 3 : errno_assert (rc == 0);
156 :
157 3 : rc = msg.set_group (group_);
158 3 : errno_assert (rc == 0);
159 :
160 3 : int err = 0;
161 3 : rc = dist.send_to_all (&msg);
162 3 : if (rc != 0)
163 0 : err = errno;
164 3 : int rc2 = msg.close ();
165 3 : errno_assert (rc2 == 0);
166 3 : if (rc != 0)
167 0 : errno = err;
168 3 : return rc;
169 : }
170 :
171 0 : int zmq::dish_t::xsend (msg_t *msg_)
172 : {
173 : LIBZMQ_UNUSED (msg_);
174 0 : errno = ENOTSUP;
175 0 : return -1;
176 : }
177 :
178 0 : bool zmq::dish_t::xhas_out ()
179 : {
180 : // Subscription can be added/removed anytime.
181 0 : return true;
182 : }
183 :
184 30 : int zmq::dish_t::xrecv (msg_t *msg_)
185 : {
186 : // If there's already a message prepared by a previous call to zmq_poll,
187 : // return it straight ahead.
188 30 : if (has_message) {
189 0 : int rc = msg_->move (message);
190 0 : errno_assert (rc == 0);
191 0 : has_message = false;
192 0 : return 0;
193 : }
194 :
195 : while (true) {
196 :
197 : // Get a message using fair queueing algorithm.
198 30 : int rc = fq.recv (msg_);
199 :
200 : // If there's no message available, return immediately.
201 : // The same when error occurs.
202 30 : if (rc != 0)
203 : return -1;
204 :
205 : // Filtering non matching messages
206 36 : subscriptions_t::iterator it = subscriptions.find (std::string(msg_->group ()));
207 24 : if (it != subscriptions.end ())
208 : return 0;
209 : }
210 : }
211 :
212 0 : bool zmq::dish_t::xhas_in ()
213 : {
214 : // If there's already a message prepared by a previous call to zmq_poll,
215 : // return straight ahead.
216 0 : if (has_message)
217 : return true;
218 :
219 : while (true) {
220 : // Get a message using fair queueing algorithm.
221 0 : int rc = fq.recv (&message);
222 :
223 : // If there's no message available, return immediately.
224 : // The same when error occurs.
225 0 : if (rc != 0) {
226 0 : errno_assert (errno == EAGAIN);
227 : return false;
228 : }
229 :
230 : // Filtering non matching messages
231 0 : subscriptions_t::iterator it = subscriptions.find (std::string(message.group ()));
232 0 : if (it != subscriptions.end ()) {
233 0 : has_message = true;
234 0 : return true;
235 : }
236 : }
237 : }
238 :
239 0 : zmq::blob_t zmq::dish_t::get_credential () const
240 : {
241 0 : return fq.get_credential ();
242 : }
243 :
244 6 : void zmq::dish_t::send_subscriptions (pipe_t *pipe_)
245 : {
246 30 : for (subscriptions_t::iterator it = subscriptions.begin (); it != subscriptions.end (); ++it) {
247 : msg_t msg;
248 3 : int rc = msg.init_join ();
249 3 : errno_assert (rc == 0);
250 :
251 6 : rc = msg.set_group (it->c_str());
252 3 : errno_assert (rc == 0);
253 :
254 : // Send it to the pipe.
255 3 : pipe_->write (&msg);
256 3 : msg.close ();
257 : }
258 :
259 6 : pipe_->flush ();
260 6 : }
261 :
262 6 : zmq::dish_session_t::dish_session_t (io_thread_t *io_thread_, bool connect_,
263 : socket_base_t *socket_, const options_t &options_,
264 : address_t *addr_) :
265 : session_base_t (io_thread_, connect_, socket_, options_, addr_),
266 6 : state (group)
267 : {
268 6 : }
269 :
270 6 : zmq::dish_session_t::~dish_session_t ()
271 : {
272 6 : }
273 :
274 24 : int zmq::dish_session_t::push_msg (msg_t *msg_)
275 : {
276 24 : if (state == group) {
277 12 : if ((msg_->flags() & msg_t::more) != msg_t::more) {
278 0 : errno = EFAULT;
279 0 : return -1;
280 : }
281 :
282 12 : if (msg_->size() > ZMQ_GROUP_MAX_LENGTH) {
283 0 : errno = EFAULT;
284 0 : return -1;
285 : }
286 :
287 12 : group_msg = *msg_;
288 12 : state = body;
289 :
290 12 : int rc = msg_->init ();
291 12 : errno_assert (rc == 0);
292 : return 0;
293 : }
294 : else {
295 : // Set the message group
296 12 : int rc = msg_->set_group ((char*)group_msg.data (), group_msg. size());
297 12 : errno_assert (rc == 0);
298 :
299 : // We set the group, so we don't need the group_msg anymore
300 12 : rc = group_msg.close ();
301 12 : errno_assert (rc == 0);
302 :
303 : // Thread safe socket doesn't support multipart messages
304 12 : if ((msg_->flags() & msg_t::more) == msg_t::more) {
305 0 : errno = EFAULT;
306 0 : return -1;
307 : }
308 :
309 : // Push message to dish socket
310 12 : rc = session_base_t::push_msg (msg_);
311 :
312 12 : if (rc == 0)
313 12 : state = group;
314 :
315 12 : return rc;
316 : }
317 : }
318 :
319 42 : int zmq::dish_session_t::pull_msg (msg_t *msg_)
320 : {
321 42 : int rc = session_base_t::pull_msg (msg_);
322 :
323 42 : if (rc != 0)
324 : return rc;
325 :
326 12 : if (!msg_->is_join () && !msg_->is_leave ())
327 : return rc;
328 : else {
329 12 : int group_length = (int) strlen (msg_->group ());
330 :
331 : msg_t command;
332 : int offset;
333 :
334 12 : if (msg_->is_join ()) {
335 9 : rc = command.init_size (group_length + 5);
336 9 : errno_assert(rc == 0);
337 9 : offset = 5;
338 9 : memcpy (command.data (), "\4JOIN", 5);
339 : }
340 : else {
341 3 : rc = command.init_size (group_length + 6);
342 3 : errno_assert(rc == 0);
343 3 : offset = 6;
344 3 : memcpy (command.data (), "\5LEAVE", 6);
345 : }
346 :
347 12 : command.set_flags (msg_t::command);
348 12 : char* command_data = (char*)command.data ();
349 :
350 : // Copy the group
351 12 : memcpy (command_data + offset, msg_->group (), group_length);
352 :
353 : // Close the join message
354 12 : rc = msg_->close ();
355 12 : errno_assert (rc == 0);
356 :
357 12 : *msg_ = command;
358 :
359 : return 0;
360 : }
361 : }
362 :
363 1 : void zmq::dish_session_t::reset ()
364 : {
365 1 : session_base_t::reset ();
366 1 : state = group;
367 1 : }
|