libzmq  master
ZeroMQ C++ Core Engine (LIBZMQ)
xsub.cpp
Go to the documentation of this file.
1 /*
2  Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 
4  This file is part of libzmq, the ZeroMQ core engine in C++.
5 
6  libzmq is free software; you can redistribute it and/or modify it under
7  the terms of the GNU Lesser General Public License (LGPL) as published
8  by the Free Software Foundation; either version 3 of the License, or
9  (at your option) any later version.
10 
11  As a special exception, the Contributors give you permission to link
12  this library with independent modules to produce an executable,
13  regardless of the license terms of these independent modules, and to
14  copy and distribute the resulting executable under terms of your choice,
15  provided that you also meet, for each linked independent module, the
16  terms and conditions of the license of that module. An independent
17  module is a module which is not derived from or based on this library.
18  If you modify this library, you must extend this exception to your
19  version of the library.
20 
21  libzmq is distributed in the hope that it will be useful, but WITHOUT
22  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23  FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24  License for more details.
25 
26  You should have received a copy of the GNU Lesser General Public License
27  along with this program. If not, see <http://www.gnu.org/licenses/>.
28 */
29 
30 #include "precompiled.hpp"
31 #include <string.h>
32 
33 #include "macros.hpp"
34 #include "xsub.hpp"
35 #include "err.hpp"
36 
37 zmq::xsub_t::xsub_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
38  socket_base_t (parent_, tid_, sid_),
39  has_message (false),
40  more (false)
41 {
43 
44  // When socket is being closed down we don't want to wait till pending
45  // subscription commands are sent to the wire.
46  options.linger = 0;
47 
48  int rc = message.init ();
49  errno_assert (rc == 0);
50 }
51 
53 {
54  int rc = message.close ();
55  errno_assert (rc == 0);
56 }
57 
58 void zmq::xsub_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
59 {
60  LIBZMQ_UNUSED (subscribe_to_all_);
61 
62  zmq_assert (pipe_);
63  fq.attach (pipe_);
64  dist.attach (pipe_);
65 
66  // Send all the cached subscriptions to the new upstream peer.
68  pipe_->flush ();
69 }
70 
72 {
73  fq.activated (pipe_);
74 }
75 
77 {
78  dist.activated (pipe_);
79 }
80 
82 {
83  fq.pipe_terminated (pipe_);
84  dist.pipe_terminated (pipe_);
85 }
86 
88 {
89  // Send all the cached subscriptions to the hiccuped pipe.
91  pipe_->flush ();
92 }
93 
95 {
96  size_t size = msg_->size ();
97  unsigned char *data = (unsigned char *) msg_->data ();
98 
99  if (size > 0 && *data == 1) {
100  // Process subscribe message
101  // This used to filter out duplicate subscriptions,
102  // however this is alread done on the XPUB side and
103  // doing it here as well breaks ZMQ_XPUB_VERBOSE
104  // when there are forwarding devices involved.
105  subscriptions.add (data + 1, size - 1);
106  return dist.send_to_all (msg_);
107  }
108  else
109  if (size > 0 && *data == 0) {
110  // Process unsubscribe message
111  if (subscriptions.rm (data + 1, size - 1))
112  return dist.send_to_all (msg_);
113  }
114  else
115  // User message sent upstream to XPUB socket
116  return dist.send_to_all (msg_);
117 
118  int rc = msg_->close ();
119  errno_assert (rc == 0);
120  rc = msg_->init ();
121  errno_assert (rc == 0);
122 
123  return 0;
124 }
125 
127 {
128  // Subscription can be added/removed anytime.
129  return true;
130 }
131 
133 {
134  // If there's already a message prepared by a previous call to zmq_poll,
135  // return it straight ahead.
136  if (has_message) {
137  int rc = msg_->move (message);
138  errno_assert (rc == 0);
139  has_message = false;
140  more = msg_->flags () & msg_t::more ? true : false;
141  return 0;
142  }
143 
144  // TODO: This can result in infinite loop in the case of continuous
145  // stream of non-matching messages which breaks the non-blocking recv
146  // semantics.
147  while (true) {
148 
149  // Get a message using fair queueing algorithm.
150  int rc = fq.recv (msg_);
151 
152  // If there's no message available, return immediately.
153  // The same when error occurs.
154  if (rc != 0)
155  return -1;
156 
157  // Check whether the message matches at least one subscription.
158  // Non-initial parts of the message are passed
159  if (more || !options.filter || match (msg_)) {
160  more = msg_->flags () & msg_t::more ? true : false;
161  return 0;
162  }
163 
164  // Message doesn't match. Pop any remaining parts of the message
165  // from the pipe.
166  while (msg_->flags () & msg_t::more) {
167  rc = fq.recv (msg_);
168  errno_assert (rc == 0);
169  }
170  }
171 }
172 
174 {
175  // There are subsequent parts of the partly-read message available.
176  if (more)
177  return true;
178 
179  // If there's already a message prepared by a previous call to zmq_poll,
180  // return straight ahead.
181  if (has_message)
182  return true;
183 
184  // TODO: This can result in infinite loop in the case of continuous
185  // stream of non-matching messages.
186  while (true) {
187 
188  // Get a message using fair queueing algorithm.
189  int rc = fq.recv (&message);
190 
191  // If there's no message available, return immediately.
192  // The same when error occurs.
193  if (rc != 0) {
194  errno_assert (errno == EAGAIN);
195  return false;
196  }
197 
198  // Check whether the message matches at least one subscription.
199  if (!options.filter || match (&message)) {
200  has_message = true;
201  return true;
202  }
203 
204  // Message doesn't match. Pop any remaining parts of the message
205  // from the pipe.
206  while (message.flags () & msg_t::more) {
207  rc = fq.recv (&message);
208  errno_assert (rc == 0);
209  }
210  }
211 }
212 
214 {
215  return fq.get_credential ();
216 }
217 
219 {
220  bool matching = subscriptions.check ((unsigned char*) msg_->data (), msg_->size ());
221 
222  return matching ^ options.invert_matching;
223 }
224 
225 void zmq::xsub_t::send_subscription (unsigned char *data_, size_t size_,
226  void *arg_)
227 {
228  pipe_t *pipe = (pipe_t*) arg_;
229 
230  // Create the subscription message.
231  msg_t msg;
232  int rc = msg.init_size (size_ + 1);
233  errno_assert (rc == 0);
234  unsigned char *data = (unsigned char*) msg.data ();
235  data [0] = 1;
236 
237  // We explicitly allow a NULL subscription with size zero
238  if (size_) {
239  assert (data_);
240  memcpy (data + 1, data_, size_);
241  }
242 
243  // Send it to the pipe.
244  bool sent = pipe->write (&msg);
245  // If we reached the SNDHWM, and thus cannot send the subscription, drop
246  // the subscription message instead. This matches the behaviour of
247  // zmq_setsockopt(ZMQ_SUBSCRIBE, ...), which also drops subscriptions
248  // when the SNDHWM is reached.
249  if (!sent)
250  msg.close ();
251 }
#define size
bool xhas_out()
Definition: xsub.cpp:126
bool invert_matching
Definition: options.hpp:143
int close()
Definition: msg.cpp:217
void xread_activated(zmq::pipe_t *pipe_)
Definition: xsub.cpp:71
blob_t get_credential() const
Definition: fq.cpp:158
trie_t subscriptions
Definition: xsub.hpp:85
bool write(msg_t *msg_)
Definition: pipe.cpp:221
int send_to_all(zmq::msg_t *msg_)
Definition: dist.cpp:140
void xwrite_activated(zmq::pipe_t *pipe_)
Definition: xsub.cpp:76
#define zmq_assert(x)
Definition: err.hpp:119
bool has_message
Definition: xsub.hpp:89
int move(msg_t &src_)
Definition: msg.cpp:274
void activated(zmq::pipe_t *pipe_)
Definition: dist.cpp:124
~xsub_t()
Definition: xsub.cpp:52
bool more
Definition: xsub.hpp:94
#define ZMQ_XSUB
Definition: zmq.h:256
void pipe_terminated(pipe_t *pipe_)
Definition: fq.cpp:56
static void send_subscription(unsigned char *data_, size_t size_, void *arg_)
Definition: xsub.cpp:225
unsigned char size
Definition: msg.hpp:188
int init_size(size_t size_)
Definition: msg.cpp:93
fq_t fq
Definition: xsub.hpp:79
void xattach_pipe(zmq::pipe_t *pipe_, bool subscribe_to_all_)
Definition: xsub.cpp:58
void xpipe_terminated(zmq::pipe_t *pipe_)
Definition: xsub.cpp:81
bool match(zmq::msg_t *msg_)
Definition: xsub.cpp:218
#define LIBZMQ_UNUSED(object)
Definition: macros.hpp:6
void attach(zmq::pipe_t *pipe_)
Definition: dist.cpp:50
blob_t get_credential() const
Definition: xsub.cpp:213
int init()
Definition: msg.cpp:82
void attach(pipe_t *pipe_)
Definition: fq.cpp:49
std::basic_string< unsigned char > blob_t
Definition: blob.hpp:134
void flush()
Definition: pipe.cpp:248
bool add(unsigned char *prefix_, size_t size_)
Definition: trie.cpp:67
int xsend(zmq::msg_t *msg_)
Definition: xsub.cpp:94
xsub_t(zmq::ctx_t *parent_, uint32_t tid_, int sid_)
Definition: xsub.cpp:37
int recv(msg_t *msg_)
Definition: fq.cpp:83
void xhiccuped(pipe_t *pipe_)
Definition: xsub.cpp:87
void activated(pipe_t *pipe_)
Definition: fq.cpp:76
#define errno_assert(x)
Definition: err.hpp:129
int xrecv(zmq::msg_t *msg_)
Definition: xsub.cpp:132
unsigned char data[max_vsm_size]
Definition: msg.hpp:187
void pipe_terminated(zmq::pipe_t *pipe_)
Definition: dist.cpp:104
bool xhas_in()
Definition: xsub.cpp:173
options_t options
Definition: own.hpp:109
msg_t message
Definition: xsub.hpp:90
unsigned char flags
Definition: msg.hpp:181
bool check(unsigned char *data_, size_t size_)
Definition: trie.cpp:264
void apply(void(*func_)(unsigned char *data_, size_t size_, void *arg_), void *arg_)
Definition: trie.cpp:298
dist_t dist
Definition: xsub.hpp:82
bool rm(unsigned char *prefix_, size_t size_)
Definition: trie.cpp:146