Line data Source code
1 : /*
2 : Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 :
4 : This file is part of libzmq, the ZeroMQ core engine in C++.
5 :
6 : libzmq is free software; you can redistribute it and/or modify it under
7 : the terms of the GNU Lesser General Public License (LGPL) as published
8 : by the Free Software Foundation; either version 3 of the License, or
9 : (at your option) any later version.
10 :
11 : As a special exception, the Contributors give you permission to link
12 : this library with independent modules to produce an executable,
13 : regardless of the license terms of these independent modules, and to
14 : copy and distribute the resulting executable under terms of your choice,
15 : provided that you also meet, for each linked independent module, the
16 : terms and conditions of the license of that module. An independent
17 : module is a module which is not derived from or based on this library.
18 : If you modify this library, you must extend this exception to your
19 : version of the library.
20 :
21 : libzmq is distributed in the hope that it will be useful, but WITHOUT
22 : ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 : FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 : License for more details.
25 :
26 : You should have received a copy of the GNU Lesser General Public License
27 : along with this program. If not, see <http://www.gnu.org/licenses/>.
28 : */
29 :
30 : #include "precompiled.hpp"
31 : #include <string.h>
32 :
33 : #include "xpub.hpp"
34 : #include "pipe.hpp"
35 : #include "err.hpp"
36 : #include "msg.hpp"
37 :
38 258 : zmq::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
39 : socket_base_t (parent_, tid_, sid_),
40 : verbose_subs (false),
41 : verbose_unsubs (false),
42 : more (false),
43 : lossy (true),
44 : manual(false),
45 : pending_pipes (),
46 1032 : welcome_msg ()
47 : {
48 258 : last_pipe = NULL;
49 258 : options.type = ZMQ_XPUB;
50 258 : welcome_msg.init();
51 258 : }
52 :
53 1314 : zmq::xpub_t::~xpub_t ()
54 : {
55 258 : welcome_msg.close();
56 282 : }
57 :
58 3028 : void zmq::xpub_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
59 : {
60 3028 : zmq_assert (pipe_);
61 3028 : dist.attach (pipe_);
62 :
63 : // If subscribe_to_all_ is specified, the caller would like to subscribe
64 : // to all data on this pipe, implicitly.
65 3028 : if (subscribe_to_all_)
66 0 : subscriptions.add (NULL, 0, pipe_);
67 :
68 : // if welcome message exist
69 3028 : if (welcome_msg.size() > 0)
70 : {
71 : msg_t copy;
72 3 : copy.init();
73 3 : copy.copy(welcome_msg);
74 :
75 3 : pipe_->write(©);
76 3 : pipe_->flush();
77 : }
78 :
79 : // The pipe is active when attached. Let's read the subscriptions from
80 : // it, if any.
81 3028 : xread_activated (pipe_);
82 3028 : }
83 :
84 3073 : void zmq::xpub_t::xread_activated (pipe_t *pipe_)
85 : {
86 : // There are some subscriptions waiting. Let's process them.
87 : msg_t sub;
88 6272 : while (pipe_->read (&sub)) {
89 : // Apply the subscription to the trie
90 126 : unsigned char *const data = (unsigned char *) sub.data ();
91 126 : const size_t size = sub.size ();
92 126 : if (size > 0 && (*data == 0 || *data == 1)) {
93 126 : if (manual)
94 : {
95 27 : pending_pipes.push_back(pipe_);
96 54 : pending_data.push_back(blob_t(data, size));
97 54 : pending_metadata.push_back(sub.metadata());
98 54 : pending_flags.push_back(0);
99 : }
100 : else
101 : {
102 : bool unique;
103 99 : if (*data == 0)
104 6 : unique = subscriptions.rm(data + 1, size - 1, pipe_);
105 : else
106 93 : unique = subscriptions.add(data + 1, size - 1, pipe_);
107 :
108 : // If the (un)subscription is not a duplicate store it so that it can be
109 : // passed to the user on next recv call unless verbose mode is enabled
110 : // which makes to pass always these messages.
111 99 : if (options.type == ZMQ_XPUB && (unique || (*data == 1 && verbose_subs) ||
112 0 : (*data == 0 && verbose_unsubs && verbose_subs))) {
113 18 : pending_data.push_back(blob_t(data, size));
114 18 : pending_metadata.push_back(sub.metadata());
115 18 : pending_flags.push_back(0);
116 : }
117 : }
118 : }
119 : else {
120 : // Process user message coming upstream from xsub socket
121 0 : pending_data.push_back (blob_t (data, size));
122 0 : pending_metadata.push_back (sub.metadata ());
123 0 : pending_flags.push_back (sub.flags ());
124 : }
125 126 : sub.close ();
126 : }
127 3073 : }
128 :
129 0 : void zmq::xpub_t::xwrite_activated (pipe_t *pipe_)
130 : {
131 0 : dist.activated (pipe_);
132 0 : }
133 :
134 84 : int zmq::xpub_t::xsetsockopt (int option_, const void *optval_,
135 : size_t optvallen_)
136 : {
137 168 : if (option_ == ZMQ_XPUB_VERBOSE
138 84 : || option_ == ZMQ_XPUB_VERBOSER
139 84 : || option_ == ZMQ_XPUB_NODROP
140 84 : || option_ == ZMQ_XPUB_MANUAL) {
141 21 : if (optvallen_ != sizeof(int) || *static_cast <const int*> (optval_) < 0) {
142 0 : errno = EINVAL;
143 0 : return -1;
144 : }
145 21 : if (option_ == ZMQ_XPUB_VERBOSE) {
146 0 : verbose_subs = (*static_cast <const int*> (optval_) != 0);
147 0 : verbose_unsubs = 0;
148 : }
149 : else
150 21 : if (option_ == ZMQ_XPUB_VERBOSER) {
151 0 : verbose_subs = (*static_cast <const int*> (optval_) != 0);
152 0 : verbose_unsubs = verbose_subs;
153 : }
154 : else
155 21 : if (option_ == ZMQ_XPUB_NODROP)
156 6 : lossy = (*static_cast <const int*> (optval_) == 0);
157 : else
158 15 : if (option_ == ZMQ_XPUB_MANUAL)
159 15 : manual = (*static_cast <const int*> (optval_) != 0);
160 : }
161 : else
162 63 : if (option_ == ZMQ_SUBSCRIBE && manual) {
163 27 : if (last_pipe != NULL)
164 27 : subscriptions.add ((unsigned char *)optval_, optvallen_, last_pipe);
165 : }
166 : else
167 36 : if (option_ == ZMQ_UNSUBSCRIBE && manual) {
168 12 : if (last_pipe != NULL)
169 0 : subscriptions.rm ((unsigned char *)optval_, optvallen_, last_pipe);
170 : }
171 : else
172 24 : if (option_ == ZMQ_XPUB_WELCOME_MSG) {
173 3 : welcome_msg.close();
174 :
175 3 : if (optvallen_ > 0) {
176 3 : int rc = welcome_msg.init_size(optvallen_);
177 3 : errno_assert(rc == 0);
178 :
179 3 : unsigned char *data = (unsigned char*)welcome_msg.data();
180 : memcpy(data, optval_, optvallen_);
181 : }
182 : else
183 0 : welcome_msg.init();
184 : }
185 : else {
186 21 : errno = EINVAL;
187 21 : return -1;
188 : }
189 : return 0;
190 : }
191 :
192 3028 : void zmq::xpub_t::xpipe_terminated (pipe_t *pipe_)
193 : {
194 : // Remove the pipe from the trie. If there are topics that nobody
195 : // is interested in anymore, send corresponding unsubscriptions
196 : // upstream.
197 3028 : subscriptions.rm (pipe_, send_unsubscription, this, !(verbose_unsubs || manual));
198 :
199 3028 : dist.pipe_terminated (pipe_);
200 3028 : }
201 :
202 75218 : void zmq::xpub_t::mark_as_matching (pipe_t *pipe_, void *arg_)
203 : {
204 75218 : xpub_t *self = (xpub_t*) arg_;
205 75218 : self->dist.match (pipe_);
206 75218 : }
207 :
208 75266 : int zmq::xpub_t::xsend (msg_t *msg_)
209 : {
210 75266 : bool msg_more = msg_->flags () & msg_t::more ? true : false;
211 :
212 : // For the first part of multi-part message, find the matching pipes.
213 75266 : if (!more) {
214 75206 : subscriptions.match ((unsigned char*) msg_->data (), msg_->size (),
215 150412 : mark_as_matching, this);
216 : // If inverted matching is used, reverse the selection now
217 75206 : if (options.invert_matching) {
218 6 : dist.reverse_match();
219 : }
220 : }
221 :
222 75266 : int rc = -1; // Assume we fail
223 75266 : if (lossy || dist.check_hwm ()) {
224 63465 : if (dist.send_to_matching (msg_) == 0) {
225 : // If we are at the end of multi-part message we can mark
226 : // all the pipes as non-matching.
227 63465 : if (!msg_more)
228 63405 : dist.unmatch ();
229 63465 : more = msg_more;
230 63465 : rc = 0; // Yay, sent successfully
231 : }
232 : }
233 : else
234 11801 : errno = EAGAIN;
235 75266 : return rc;
236 : }
237 :
238 33 : bool zmq::xpub_t::xhas_out ()
239 : {
240 33 : return dist.has_out ();
241 : }
242 :
243 96 : int zmq::xpub_t::xrecv (msg_t *msg_)
244 : {
245 : // If there is at least one
246 192 : if (pending_data.empty ()) {
247 45 : errno = EAGAIN;
248 45 : return -1;
249 : }
250 :
251 : // User is reading a message, set last_pipe and remove it from the deque
252 90 : if (manual && !pending_pipes.empty ()) {
253 78 : last_pipe = pending_pipes.front ();
254 39 : pending_pipes.pop_front ();
255 : }
256 :
257 51 : int rc = msg_->close ();
258 51 : errno_assert (rc == 0);
259 153 : rc = msg_->init_size (pending_data.front ().size ());
260 51 : errno_assert (rc == 0);
261 : memcpy (msg_->data (),
262 102 : pending_data.front ().data (),
263 153 : pending_data.front ().size ());
264 :
265 : // set metadata only if there is some
266 102 : if (metadata_t* metadata = pending_metadata.front ()) {
267 27 : msg_->set_metadata (metadata);
268 : }
269 :
270 102 : msg_->set_flags (pending_flags.front ());
271 51 : pending_data.pop_front ();
272 51 : pending_metadata.pop_front ();
273 51 : pending_flags.pop_front ();
274 51 : return 0;
275 : }
276 :
277 33 : bool zmq::xpub_t::xhas_in ()
278 : {
279 66 : return !pending_data.empty ();
280 : }
281 :
282 84 : void zmq::xpub_t::send_unsubscription (unsigned char *data_, size_t size_,
283 : void *arg_)
284 : {
285 84 : xpub_t *self = (xpub_t*) arg_;
286 :
287 84 : if (self->options.type != ZMQ_PUB) {
288 : // Place the unsubscription to the queue of pending (un)subscriptions
289 : // to be retrieved by the user later on.
290 72 : blob_t unsub (size_ + 1, 0);
291 36 : unsub [0] = 0;
292 36 : if (size_ > 0)
293 : memcpy (&unsub [1], data_, size_);
294 36 : self->pending_data.push_back (unsub);
295 72 : self->pending_metadata.push_back (NULL);
296 72 : self->pending_flags.push_back (0);
297 :
298 36 : if (self->manual) {
299 27 : self->last_pipe = NULL;
300 54 : self->pending_pipes.push_back (NULL);
301 : }
302 : }
303 84 : }
|