libzmq  master
ZeroMQ C++ Core Engine (LIBZMQ)
dish.cpp
Go to the documentation of this file.
1 /*
2  Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 
4  This file is part of libzmq, the ZeroMQ core engine in C++.
5 
6  libzmq is free software; you can redistribute it and/or modify it under
7  the terms of the GNU Lesser General Public License (LGPL) as published
8  by the Free Software Foundation; either version 3 of the License, or
9  (at your option) any later version.
10 
11  As a special exception, the Contributors give you permission to link
12  this library with independent modules to produce an executable,
13  regardless of the license terms of these independent modules, and to
14  copy and distribute the resulting executable under terms of your choice,
15  provided that you also meet, for each linked independent module, the
16  terms and conditions of the license of that module. An independent
17  module is a module which is not derived from or based on this library.
18  If you modify this library, you must extend this exception to your
19  version of the library.
20 
21  libzmq is distributed in the hope that it will be useful, but WITHOUT
22  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23  FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24  License for more details.
25 
26  You should have received a copy of the GNU Lesser General Public License
27  along with this program. If not, see <http://www.gnu.org/licenses/>.
28 */
29 
30 #include "precompiled.hpp"
31 #include <string.h>
32 
33 #include "platform.hpp"
34 
35 #ifdef ZMQ_HAVE_WINDOWS
36 #include "windows.hpp"
37 #endif
38 
39 #include "../include/zmq.h"
40 #include "macros.hpp"
41 #include "dish.hpp"
42 #include "err.hpp"
43 
44 zmq::dish_t::dish_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
45  socket_base_t (parent_, tid_, sid_, true),
46  has_message (false)
47 {
49 
50  // When socket is being closed down we don't want to wait till pending
51  // subscription commands are sent to the wire.
52  options.linger = 0;
53 
54  int rc = message.init ();
55  errno_assert (rc == 0);
56 }
57 
59 {
60  int rc = message.close ();
61  errno_assert (rc == 0);
62 }
63 
64 void zmq::dish_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
65 {
66  LIBZMQ_UNUSED (subscribe_to_all_);
67 
68  zmq_assert (pipe_);
69  fq.attach (pipe_);
70  dist.attach (pipe_);
71 
72  // Send all the cached subscriptions to the new upstream peer.
73  send_subscriptions (pipe_);
74 }
75 
77 {
78  fq.activated (pipe_);
79 }
80 
82 {
83  dist.activated (pipe_);
84 }
85 
87 {
88  fq.pipe_terminated (pipe_);
89  dist.pipe_terminated (pipe_);
90 }
91 
93 {
94  // Send all the cached subscriptions to the hiccuped pipe.
95  send_subscriptions (pipe_);
96 }
97 
98 int zmq::dish_t::xjoin (const char* group_)
99 {
100  std::string group = std::string (group_);
101 
102  if (group.length () > ZMQ_GROUP_MAX_LENGTH) {
103  errno = EINVAL;
104  return -1;
105  }
106 
107  subscriptions_t::iterator it = subscriptions.find (group);
108 
109  // User cannot join same group twice
110  if (it != subscriptions.end ()) {
111  errno = EINVAL;
112  return -1;
113  }
114 
115  subscriptions.insert (group);
116 
117  msg_t msg;
118  int rc = msg.init_join ();
119  errno_assert (rc == 0);
120 
121  rc = msg.set_group (group_);
122  errno_assert (rc == 0);
123 
124  int err = 0;
125  rc = dist.send_to_all (&msg);
126  if (rc != 0)
127  err = errno;
128  int rc2 = msg.close ();
129  errno_assert (rc2 == 0);
130  if (rc != 0)
131  errno = err;
132  return rc;
133 }
134 
135 int zmq::dish_t::xleave (const char* group_)
136 {
137  std::string group = std::string (group_);
138 
139  if (group.length () > ZMQ_GROUP_MAX_LENGTH) {
140  errno = EINVAL;
141  return -1;
142  }
143 
144  subscriptions_t::iterator it = std::find (subscriptions.begin (), subscriptions.end (), group);
145 
146  if (it == subscriptions.end ()) {
147  errno = EINVAL;
148  return -1;
149  }
150 
151  subscriptions.erase (it);
152 
153  msg_t msg;
154  int rc = msg.init_leave ();
155  errno_assert (rc == 0);
156 
157  rc = msg.set_group (group_);
158  errno_assert (rc == 0);
159 
160  int err = 0;
161  rc = dist.send_to_all (&msg);
162  if (rc != 0)
163  err = errno;
164  int rc2 = msg.close ();
165  errno_assert (rc2 == 0);
166  if (rc != 0)
167  errno = err;
168  return rc;
169 }
170 
172 {
173  LIBZMQ_UNUSED (msg_);
174  errno = ENOTSUP;
175  return -1;
176 }
177 
179 {
180  // Subscription can be added/removed anytime.
181  return true;
182 }
183 
185 {
186  // If there's already a message prepared by a previous call to zmq_poll,
187  // return it straight ahead.
188  if (has_message) {
189  int rc = msg_->move (message);
190  errno_assert (rc == 0);
191  has_message = false;
192  return 0;
193  }
194 
195  while (true) {
196 
197  // Get a message using fair queueing algorithm.
198  int rc = fq.recv (msg_);
199 
200  // If there's no message available, return immediately.
201  // The same when error occurs.
202  if (rc != 0)
203  return -1;
204 
205  // Filtering non matching messages
206  subscriptions_t::iterator it = subscriptions.find (std::string(msg_->group ()));
207  if (it != subscriptions.end ())
208  return 0;
209  }
210 }
211 
213 {
214  // If there's already a message prepared by a previous call to zmq_poll,
215  // return straight ahead.
216  if (has_message)
217  return true;
218 
219  while (true) {
220  // Get a message using fair queueing algorithm.
221  int rc = fq.recv (&message);
222 
223  // If there's no message available, return immediately.
224  // The same when error occurs.
225  if (rc != 0) {
226  errno_assert (errno == EAGAIN);
227  return false;
228  }
229 
230  // Filtering non matching messages
231  subscriptions_t::iterator it = subscriptions.find (std::string(message.group ()));
232  if (it != subscriptions.end ()) {
233  has_message = true;
234  return true;
235  }
236  }
237 }
238 
240 {
241  return fq.get_credential ();
242 }
243 
245 {
246  for (subscriptions_t::iterator it = subscriptions.begin (); it != subscriptions.end (); ++it) {
247  msg_t msg;
248  int rc = msg.init_join ();
249  errno_assert (rc == 0);
250 
251  rc = msg.set_group (it->c_str());
252  errno_assert (rc == 0);
253 
254  // Send it to the pipe.
255  pipe_->write (&msg);
256  msg.close ();
257  }
258 
259  pipe_->flush ();
260 }
261 
263  socket_base_t *socket_, const options_t &options_,
264  address_t *addr_) :
265  session_base_t (io_thread_, connect_, socket_, options_, addr_),
266  state (group)
267 {
268 }
269 
271 {
272 }
273 
275 {
276  if (state == group) {
277  if ((msg_->flags() & msg_t::more) != msg_t::more) {
278  errno = EFAULT;
279  return -1;
280  }
281 
282  if (msg_->size() > ZMQ_GROUP_MAX_LENGTH) {
283  errno = EFAULT;
284  return -1;
285  }
286 
287  group_msg = *msg_;
288  state = body;
289 
290  int rc = msg_->init ();
291  errno_assert (rc == 0);
292  return 0;
293  }
294  else {
295  // Set the message group
296  int rc = msg_->set_group ((char*)group_msg.data (), group_msg. size());
297  errno_assert (rc == 0);
298 
299  // We set the group, so we don't need the group_msg anymore
300  rc = group_msg.close ();
301  errno_assert (rc == 0);
302 
303  // Thread safe socket doesn't support multipart messages
304  if ((msg_->flags() & msg_t::more) == msg_t::more) {
305  errno = EFAULT;
306  return -1;
307  }
308 
309  // Push message to dish socket
310  rc = session_base_t::push_msg (msg_);
311 
312  if (rc == 0)
313  state = group;
314 
315  return rc;
316  }
317 }
318 
320 {
321  int rc = session_base_t::pull_msg (msg_);
322 
323  if (rc != 0)
324  return rc;
325 
326  if (!msg_->is_join () && !msg_->is_leave ())
327  return rc;
328  else {
329  int group_length = (int) strlen (msg_->group ());
330 
331  msg_t command;
332  int offset;
333 
334  if (msg_->is_join ()) {
335  rc = command.init_size (group_length + 5);
336  errno_assert(rc == 0);
337  offset = 5;
338  memcpy (command.data (), "\4JOIN", 5);
339  }
340  else {
341  rc = command.init_size (group_length + 6);
342  errno_assert(rc == 0);
343  offset = 6;
344  memcpy (command.data (), "\5LEAVE", 6);
345  }
346 
347  command.set_flags (msg_t::command);
348  char* command_data = (char*)command.data ();
349 
350  // Copy the group
351  memcpy (command_data + offset, msg_->group (), group_length);
352 
353  // Close the join message
354  rc = msg_->close ();
355  errno_assert (rc == 0);
356 
357  *msg_ = command;
358 
359  return 0;
360  }
361 }
362 
364 {
366  state = group;
367 }
#define size
void xpipe_terminated(zmq::pipe_t *pipe_)
Definition: dish.cpp:86
int close()
Definition: msg.cpp:217
int set_group(const char *group_)
Definition: msg.cpp:548
blob_t get_credential() const
Definition: fq.cpp:158
bool write(msg_t *msg_)
Definition: pipe.cpp:221
int send_to_all(zmq::msg_t *msg_)
Definition: dist.cpp:140
#define ZMQ_DISH
#define zmq_assert(x)
Definition: err.hpp:119
virtual int push_msg(msg_t *msg_)
int move(msg_t &src_)
Definition: msg.cpp:274
void activated(zmq::pipe_t *pipe_)
Definition: dist.cpp:124
#define ENOTSUP
Definition: zmq.h:112
subscriptions_t subscriptions
Definition: dish.hpp:85
~dish_t()
Definition: dish.cpp:58
void xread_activated(zmq::pipe_t *pipe_)
Definition: dish.cpp:76
#define ZMQ_GROUP_MAX_LENGTH
Definition: zmq.h:355
int xleave(const char *group_)
Definition: dish.cpp:135
void xwrite_activated(zmq::pipe_t *pipe_)
Definition: dish.cpp:81
void pipe_terminated(pipe_t *pipe_)
Definition: fq.cpp:56
void xhiccuped(pipe_t *pipe_)
Definition: dish.cpp:92
int pull_msg(msg_t *msg_)
Definition: dish.cpp:319
int xrecv(zmq::msg_t *msg_)
Definition: dish.cpp:184
unsigned char size
Definition: msg.hpp:188
char group[16]
Definition: msg.hpp:182
int xsend(zmq::msg_t *msg_)
Definition: dish.cpp:171
int init_size(size_t size_)
Definition: msg.cpp:93
bool xhas_out()
Definition: dish.cpp:178
bool is_join() const
Definition: msg.cpp:447
dist_t dist
Definition: dish.hpp:81
#define LIBZMQ_UNUSED(object)
Definition: macros.hpp:6
bool xhas_in()
Definition: dish.cpp:212
int init_leave()
Definition: msg.cpp:207
void attach(zmq::pipe_t *pipe_)
Definition: dist.cpp:50
int init()
Definition: msg.cpp:82
void attach(pipe_t *pipe_)
Definition: fq.cpp:49
std::basic_string< unsigned char > blob_t
Definition: blob.hpp:134
fq_t fq
Definition: dish.hpp:78
virtual int pull_msg(msg_t *msg_)
void flush()
Definition: pipe.cpp:248
int init_join()
Definition: msg.cpp:197
bool has_message
Definition: dish.hpp:89
blob_t get_credential() const
Definition: dish.cpp:239
void set_flags(unsigned char flags_)
Definition: msg.cpp:384
int recv(msg_t *msg_)
Definition: fq.cpp:83
void activated(pipe_t *pipe_)
Definition: fq.cpp:76
#define errno_assert(x)
Definition: err.hpp:129
int push_msg(msg_t *msg_)
Definition: dish.cpp:274
void send_subscriptions(pipe_t *pipe_)
Definition: dish.cpp:244
void xattach_pipe(zmq::pipe_t *pipe_, bool subscribe_to_all_)
Definition: dish.cpp:64
bool is_leave() const
Definition: msg.cpp:452
unsigned char data[max_vsm_size]
Definition: msg.hpp:187
void pipe_terminated(zmq::pipe_t *pipe_)
Definition: dist.cpp:104
dish_session_t(zmq::io_thread_t *io_thread_, bool connect_, zmq::socket_base_t *socket_, const options_t &options_, address_t *addr_)
Definition: dish.cpp:262
dish_t(zmq::ctx_t *parent_, uint32_t tid_, int sid_)
Definition: dish.cpp:44
options_t options
Definition: own.hpp:109
int xjoin(const char *group_)
Definition: dish.cpp:98
msg_t message
Definition: dish.hpp:90
unsigned char flags
Definition: msg.hpp:181
enum zmq::dish_session_t::@35 state
virtual void reset()