libzmq  master
ZeroMQ C++ Core Engine (LIBZMQ)
xpub.cpp
Go to the documentation of this file.
1 /*
2  Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 
4  This file is part of libzmq, the ZeroMQ core engine in C++.
5 
6  libzmq is free software; you can redistribute it and/or modify it under
7  the terms of the GNU Lesser General Public License (LGPL) as published
8  by the Free Software Foundation; either version 3 of the License, or
9  (at your option) any later version.
10 
11  As a special exception, the Contributors give you permission to link
12  this library with independent modules to produce an executable,
13  regardless of the license terms of these independent modules, and to
14  copy and distribute the resulting executable under terms of your choice,
15  provided that you also meet, for each linked independent module, the
16  terms and conditions of the license of that module. An independent
17  module is a module which is not derived from or based on this library.
18  If you modify this library, you must extend this exception to your
19  version of the library.
20 
21  libzmq is distributed in the hope that it will be useful, but WITHOUT
22  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23  FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24  License for more details.
25 
26  You should have received a copy of the GNU Lesser General Public License
27  along with this program. If not, see <http://www.gnu.org/licenses/>.
28 */
29 
30 #include "precompiled.hpp"
31 #include <string.h>
32 
33 #include "xpub.hpp"
34 #include "pipe.hpp"
35 #include "err.hpp"
36 #include "msg.hpp"
37 
38 zmq::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
39  socket_base_t (parent_, tid_, sid_),
40  verbose_subs (false),
41  verbose_unsubs (false),
42  more (false),
43  lossy (true),
44  manual(false),
45  pending_pipes (),
46  welcome_msg ()
47 {
48  last_pipe = NULL;
50  welcome_msg.init();
51 }
52 
54 {
56 }
57 
58 void zmq::xpub_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
59 {
60  zmq_assert (pipe_);
61  dist.attach (pipe_);
62 
63  // If subscribe_to_all_ is specified, the caller would like to subscribe
64  // to all data on this pipe, implicitly.
65  if (subscribe_to_all_)
66  subscriptions.add (NULL, 0, pipe_);
67 
68  // if welcome message exist
69  if (welcome_msg.size() > 0)
70  {
71  msg_t copy;
72  copy.init();
73  copy.copy(welcome_msg);
74 
75  pipe_->write(&copy);
76  pipe_->flush();
77  }
78 
79  // The pipe is active when attached. Let's read the subscriptions from
80  // it, if any.
81  xread_activated (pipe_);
82 }
83 
85 {
86  // There are some subscriptions waiting. Let's process them.
87  msg_t sub;
88  while (pipe_->read (&sub)) {
89  // Apply the subscription to the trie
90  unsigned char *const data = (unsigned char *) sub.data ();
91  const size_t size = sub.size ();
92  if (size > 0 && (*data == 0 || *data == 1)) {
93  if (manual)
94  {
95  pending_pipes.push_back(pipe_);
96  pending_data.push_back(blob_t(data, size));
97  pending_metadata.push_back(sub.metadata());
98  pending_flags.push_back(0);
99  }
100  else
101  {
102  bool unique;
103  if (*data == 0)
104  unique = subscriptions.rm(data + 1, size - 1, pipe_);
105  else
106  unique = subscriptions.add(data + 1, size - 1, pipe_);
107 
108  // If the (un)subscription is not a duplicate store it so that it can be
109  // passed to the user on next recv call unless verbose mode is enabled
110  // which makes to pass always these messages.
111  if (options.type == ZMQ_XPUB && (unique || (*data == 1 && verbose_subs) ||
112  (*data == 0 && verbose_unsubs && verbose_subs))) {
113  pending_data.push_back(blob_t(data, size));
114  pending_metadata.push_back(sub.metadata());
115  pending_flags.push_back(0);
116  }
117  }
118  }
119  else {
120  // Process user message coming upstream from xsub socket
121  pending_data.push_back (blob_t (data, size));
122  pending_metadata.push_back (sub.metadata ());
123  pending_flags.push_back (sub.flags ());
124  }
125  sub.close ();
126  }
127 }
128 
130 {
131  dist.activated (pipe_);
132 }
133 
134 int zmq::xpub_t::xsetsockopt (int option_, const void *optval_,
135  size_t optvallen_)
136 {
137  if (option_ == ZMQ_XPUB_VERBOSE
138  || option_ == ZMQ_XPUB_VERBOSER
139  || option_ == ZMQ_XPUB_NODROP
140  || option_ == ZMQ_XPUB_MANUAL) {
141  if (optvallen_ != sizeof(int) || *static_cast <const int*> (optval_) < 0) {
142  errno = EINVAL;
143  return -1;
144  }
145  if (option_ == ZMQ_XPUB_VERBOSE) {
146  verbose_subs = (*static_cast <const int*> (optval_) != 0);
147  verbose_unsubs = 0;
148  }
149  else
150  if (option_ == ZMQ_XPUB_VERBOSER) {
151  verbose_subs = (*static_cast <const int*> (optval_) != 0);
153  }
154  else
155  if (option_ == ZMQ_XPUB_NODROP)
156  lossy = (*static_cast <const int*> (optval_) == 0);
157  else
158  if (option_ == ZMQ_XPUB_MANUAL)
159  manual = (*static_cast <const int*> (optval_) != 0);
160  }
161  else
162  if (option_ == ZMQ_SUBSCRIBE && manual) {
163  if (last_pipe != NULL)
164  subscriptions.add ((unsigned char *)optval_, optvallen_, last_pipe);
165  }
166  else
167  if (option_ == ZMQ_UNSUBSCRIBE && manual) {
168  if (last_pipe != NULL)
169  subscriptions.rm ((unsigned char *)optval_, optvallen_, last_pipe);
170  }
171  else
172  if (option_ == ZMQ_XPUB_WELCOME_MSG) {
173  welcome_msg.close();
174 
175  if (optvallen_ > 0) {
176  int rc = welcome_msg.init_size(optvallen_);
177  errno_assert(rc == 0);
178 
179  unsigned char *data = (unsigned char*)welcome_msg.data();
180  memcpy(data, optval_, optvallen_);
181  }
182  else
183  welcome_msg.init();
184  }
185  else {
186  errno = EINVAL;
187  return -1;
188  }
189  return 0;
190 }
191 
193 {
194  // Remove the pipe from the trie. If there are topics that nobody
195  // is interested in anymore, send corresponding unsubscriptions
196  // upstream.
198 
199  dist.pipe_terminated (pipe_);
200 }
201 
202 void zmq::xpub_t::mark_as_matching (pipe_t *pipe_, void *arg_)
203 {
204  xpub_t *self = (xpub_t*) arg_;
205  self->dist.match (pipe_);
206 }
207 
209 {
210  bool msg_more = msg_->flags () & msg_t::more ? true : false;
211 
212  // For the first part of multi-part message, find the matching pipes.
213  if (!more) {
214  subscriptions.match ((unsigned char*) msg_->data (), msg_->size (),
215  mark_as_matching, this);
216  // If inverted matching is used, reverse the selection now
217  if (options.invert_matching) {
219  }
220  }
221 
222  int rc = -1; // Assume we fail
223  if (lossy || dist.check_hwm ()) {
224  if (dist.send_to_matching (msg_) == 0) {
225  // If we are at the end of multi-part message we can mark
226  // all the pipes as non-matching.
227  if (!msg_more)
228  dist.unmatch ();
229  more = msg_more;
230  rc = 0; // Yay, sent successfully
231  }
232  }
233  else
234  errno = EAGAIN;
235  return rc;
236 }
237 
239 {
240  return dist.has_out ();
241 }
242 
244 {
245  // If there is at least one
246  if (pending_data.empty ()) {
247  errno = EAGAIN;
248  return -1;
249  }
250 
251  // User is reading a message, set last_pipe and remove it from the deque
252  if (manual && !pending_pipes.empty ()) {
253  last_pipe = pending_pipes.front ();
254  pending_pipes.pop_front ();
255  }
256 
257  int rc = msg_->close ();
258  errno_assert (rc == 0);
259  rc = msg_->init_size (pending_data.front ().size ());
260  errno_assert (rc == 0);
261  memcpy (msg_->data (),
262  pending_data.front ().data (),
263  pending_data.front ().size ());
264 
265  // set metadata only if there is some
266  if (metadata_t* metadata = pending_metadata.front ()) {
267  msg_->set_metadata (metadata);
268  }
269 
270  msg_->set_flags (pending_flags.front ());
271  pending_data.pop_front ();
272  pending_metadata.pop_front ();
273  pending_flags.pop_front ();
274  return 0;
275 }
276 
278 {
279  return !pending_data.empty ();
280 }
281 
282 void zmq::xpub_t::send_unsubscription (unsigned char *data_, size_t size_,
283  void *arg_)
284 {
285  xpub_t *self = (xpub_t*) arg_;
286 
287  if (self->options.type != ZMQ_PUB) {
288  // Place the unsubscription to the queue of pending (un)subscriptions
289  // to be retrieved by the user later on.
290  blob_t unsub (size_ + 1, 0);
291  unsub [0] = 0;
292  if (size_ > 0)
293  memcpy (&unsub [1], data_, size_);
294  self->pending_data.push_back (unsub);
295  self->pending_metadata.push_back (NULL);
296  self->pending_flags.push_back (0);
297 
298  if (self->manual) {
299  self->last_pipe = NULL;
300  self->pending_pipes.push_back (NULL);
301  }
302  }
303 }
bool xhas_out()
Definition: xpub.cpp:238
#define ZMQ_XPUB_VERBOSER
Definition: zmq.h:329
#define size
void match(unsigned char *data_, size_t size_, void(*func_)(zmq::pipe_t *pipe_, void *arg_), void *arg_)
Definition: mtrie.cpp:395
bool read(msg_t *msg_)
Definition: pipe.cpp:169
bool invert_matching
Definition: options.hpp:143
bool more
Definition: xpub.hpp:94
int close()
Definition: msg.cpp:217
mtrie_t subscriptions
Definition: xpub.hpp:80
bool write(msg_t *msg_)
Definition: pipe.cpp:221
bool manual
Definition: xpub.hpp:100
#define zmq_assert(x)
Definition: err.hpp:119
std::deque< pipe_t * > pending_pipes
Definition: xpub.hpp:106
static void send_unsubscription(unsigned char *data_, size_t size_, void *arg_)
Definition: xpub.cpp:282
void xattach_pipe(zmq::pipe_t *pipe_, bool subscribe_to_all_=false)
Definition: xpub.cpp:58
#define ZMQ_XPUB
Definition: zmq.h:255
void activated(zmq::pipe_t *pipe_)
Definition: dist.cpp:124
#define ZMQ_SUBSCRIBE
Definition: zmq.h:266
std::deque< metadata_t * > pending_metadata
Definition: xpub.hpp:115
pipe_t * last_pipe
Definition: xpub.hpp:103
~xpub_t()
Definition: xpub.cpp:53
int xsend(zmq::msg_t *msg_)
Definition: xpub.cpp:208
#define ZMQ_PUB
Definition: zmq.h:247
dist_t dist
Definition: xpub.hpp:83
std::deque< blob_t > pending_data
Definition: xpub.hpp:114
std::deque< unsigned char > pending_flags
Definition: xpub.hpp:116
void match(zmq::pipe_t *pipe_)
Definition: dist.cpp:68
metadata_t * metadata
Definition: msg.hpp:175
unsigned char size
Definition: msg.hpp:188
bool xhas_in()
Definition: xpub.cpp:277
#define ZMQ_XPUB_WELCOME_MSG
Definition: zmq.h:323
int init_size(size_t size_)
Definition: msg.cpp:93
xpub_t(zmq::ctx_t *parent_, uint32_t tid_, int sid_)
Definition: xpub.cpp:38
void xpipe_terminated(zmq::pipe_t *pipe_)
Definition: xpub.cpp:192
bool check_hwm()
Definition: dist.cpp:226
bool verbose_subs
Definition: xpub.hpp:87
#define ZMQ_UNSUBSCRIBE
Definition: zmq.h:267
int send_to_matching(zmq::msg_t *msg_)
Definition: dist.cpp:146
void attach(zmq::pipe_t *pipe_)
Definition: dist.cpp:50
int init()
Definition: msg.cpp:82
int xsetsockopt(int option_, const void *optval_, size_t optvallen_)
Definition: xpub.cpp:134
void flush()
Definition: pipe.cpp:248
bool has_out()
Definition: dist.cpp:205
std::basic_string< unsigned char > blob_t
Definition: xpub.hpp:113
static void mark_as_matching(zmq::pipe_t *pipe_, void *arg_)
Definition: xpub.cpp:202
bool add(unsigned char *prefix_, size_t size_, zmq::pipe_t *pipe_)
Definition: mtrie.cpp:72
int copy(msg_t &src_)
Definition: msg.cpp:295
void set_flags(unsigned char flags_)
Definition: msg.cpp:384
bool lossy
Definition: xpub.hpp:97
#define errno_assert(x)
Definition: err.hpp:129
msg_t welcome_msg
Definition: xpub.hpp:109
void unmatch()
Definition: dist.cpp:99
#define ZMQ_XPUB_VERBOSE
Definition: zmq.h:293
unsigned char data[max_vsm_size]
Definition: msg.hpp:187
void pipe_terminated(zmq::pipe_t *pipe_)
Definition: dist.cpp:104
void reverse_match()
Definition: dist.cpp:83
options_t options
Definition: own.hpp:109
void rm(zmq::pipe_t *pipe_, void(*func_)(unsigned char *data_, size_t size_, void *arg_), void *arg_, bool call_on_uniq_)
Definition: mtrie.cpp:160
void xread_activated(zmq::pipe_t *pipe_)
Definition: xpub.cpp:84
void xwrite_activated(zmq::pipe_t *pipe_)
Definition: xpub.cpp:129
bool verbose_unsubs
Definition: xpub.hpp:91
unsigned char flags
Definition: msg.hpp:181
#define ZMQ_XPUB_NODROP
Definition: zmq.h:318
int xrecv(zmq::msg_t *msg_)
Definition: xpub.cpp:243
#define ZMQ_XPUB_MANUAL
Definition: zmq.h:322
void set_metadata(metadata_t *metadata_)
Definition: msg.cpp:399