Line data Source code
1 : /*
2 : Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 :
4 : This file is part of libzmq, the ZeroMQ core engine in C++.
5 :
6 : libzmq is free software; you can redistribute it and/or modify it under
7 : the terms of the GNU Lesser General Public License (LGPL) as published
8 : by the Free Software Foundation; either version 3 of the License, or
9 : (at your option) any later version.
10 :
11 : As a special exception, the Contributors give you permission to link
12 : this library with independent modules to produce an executable,
13 : regardless of the license terms of these independent modules, and to
14 : copy and distribute the resulting executable under terms of your choice,
15 : provided that you also meet, for each linked independent module, the
16 : terms and conditions of the license of that module. An independent
17 : module is a module which is not derived from or based on this library.
18 : If you modify this library, you must extend this exception to your
19 : version of the library.
20 :
21 : libzmq is distributed in the hope that it will be useful, but WITHOUT
22 : ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 : FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 : License for more details.
25 :
26 : You should have received a copy of the GNU Lesser General Public License
27 : along with this program. If not, see <http://www.gnu.org/licenses/>.
28 : */
29 :
30 : #include "precompiled.hpp"
31 : #include <string.h>
32 :
33 : #include "macros.hpp"
34 : #include "xsub.hpp"
35 : #include "err.hpp"
36 :
37 3261 : zmq::xsub_t::xsub_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
38 : socket_base_t (parent_, tid_, sid_),
39 : has_message (false),
40 3261 : more (false)
41 : {
42 3261 : options.type = ZMQ_XSUB;
43 :
44 : // When socket is being closed down we don't want to wait till pending
45 : // subscription commands are sent to the wire.
46 3261 : options.linger = 0;
47 :
48 3261 : int rc = message.init ();
49 3261 : errno_assert (rc == 0);
50 3261 : }
51 :
52 6540 : zmq::xsub_t::~xsub_t ()
53 : {
54 3261 : int rc = message.close ();
55 3261 : errno_assert (rc == 0);
56 3279 : }
57 :
58 3260 : void zmq::xsub_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
59 : {
60 : LIBZMQ_UNUSED (subscribe_to_all_);
61 :
62 3260 : zmq_assert (pipe_);
63 3260 : fq.attach (pipe_);
64 3261 : dist.attach (pipe_);
65 :
66 : // Send all the cached subscriptions to the new upstream peer.
67 3261 : subscriptions.apply (send_subscription, pipe_);
68 3261 : pipe_->flush ();
69 3261 : }
70 :
71 60 : void zmq::xsub_t::xread_activated (pipe_t *pipe_)
72 : {
73 60 : fq.activated (pipe_);
74 60 : }
75 :
76 0 : void zmq::xsub_t::xwrite_activated (pipe_t *pipe_)
77 : {
78 0 : dist.activated (pipe_);
79 0 : }
80 :
81 3261 : void zmq::xsub_t::xpipe_terminated (pipe_t *pipe_)
82 : {
83 3261 : fq.pipe_terminated (pipe_);
84 3261 : dist.pipe_terminated (pipe_);
85 3261 : }
86 :
87 0 : void zmq::xsub_t::xhiccuped (pipe_t *pipe_)
88 : {
89 : // Send all the cached subscriptions to the hiccuped pipe.
90 0 : subscriptions.apply (send_subscription, pipe_);
91 0 : pipe_->flush ();
92 0 : }
93 :
94 132 : int zmq::xsub_t::xsend (msg_t *msg_)
95 : {
96 132 : size_t size = msg_->size ();
97 132 : unsigned char *data = (unsigned char *) msg_->data ();
98 :
99 132 : if (size > 0 && *data == 1) {
100 : // Process subscribe message
101 : // This used to filter out duplicate subscriptions,
102 : // however this is alread done on the XPUB side and
103 : // doing it here as well breaks ZMQ_XPUB_VERBOSE
104 : // when there are forwarding devices involved.
105 120 : subscriptions.add (data + 1, size - 1);
106 120 : return dist.send_to_all (msg_);
107 : }
108 : else
109 12 : if (size > 0 && *data == 0) {
110 : // Process unsubscribe message
111 12 : if (subscriptions.rm (data + 1, size - 1))
112 6 : return dist.send_to_all (msg_);
113 : }
114 : else
115 : // User message sent upstream to XPUB socket
116 0 : return dist.send_to_all (msg_);
117 :
118 6 : int rc = msg_->close ();
119 6 : errno_assert (rc == 0);
120 6 : rc = msg_->init ();
121 6 : errno_assert (rc == 0);
122 :
123 : return 0;
124 : }
125 :
126 0 : bool zmq::xsub_t::xhas_out ()
127 : {
128 : // Subscription can be added/removed anytime.
129 0 : return true;
130 : }
131 :
132 3230792 : int zmq::xsub_t::xrecv (msg_t *msg_)
133 : {
134 : // If there's already a message prepared by a previous call to zmq_poll,
135 : // return it straight ahead.
136 3230792 : if (has_message) {
137 27 : int rc = msg_->move (message);
138 27 : errno_assert (rc == 0);
139 27 : has_message = false;
140 27 : more = msg_->flags () & msg_t::more ? true : false;
141 27 : return 0;
142 : }
143 :
144 : // TODO: This can result in infinite loop in the case of continuous
145 : // stream of non-matching messages which breaks the non-blocking recv
146 : // semantics.
147 : while (true) {
148 :
149 : // Get a message using fair queueing algorithm.
150 3230765 : int rc = fq.recv (msg_);
151 :
152 : // If there's no message available, return immediately.
153 : // The same when error occurs.
154 3229620 : if (rc != 0)
155 : return -1;
156 :
157 : // Check whether the message matches at least one subscription.
158 : // Non-initial parts of the message are passed
159 63432 : if (more || !options.filter || match (msg_)) {
160 63432 : more = msg_->flags () & msg_t::more ? true : false;
161 63432 : return 0;
162 : }
163 :
164 : // Message doesn't match. Pop any remaining parts of the message
165 : // from the pipe.
166 0 : while (msg_->flags () & msg_t::more) {
167 0 : rc = fq.recv (msg_);
168 0 : errno_assert (rc == 0);
169 : }
170 : }
171 : }
172 :
173 795892 : bool zmq::xsub_t::xhas_in ()
174 : {
175 : // There are subsequent parts of the partly-read message available.
176 795892 : if (more)
177 : return true;
178 :
179 : // If there's already a message prepared by a previous call to zmq_poll,
180 : // return straight ahead.
181 795892 : if (has_message)
182 : return true;
183 :
184 : // TODO: This can result in infinite loop in the case of continuous
185 : // stream of non-matching messages.
186 : while (true) {
187 :
188 : // Get a message using fair queueing algorithm.
189 266051 : int rc = fq.recv (&message);
190 :
191 : // If there's no message available, return immediately.
192 : // The same when error occurs.
193 266051 : if (rc != 0) {
194 266021 : errno_assert (errno == EAGAIN);
195 : return false;
196 : }
197 :
198 : // Check whether the message matches at least one subscription.
199 30 : if (!options.filter || match (&message)) {
200 30 : has_message = true;
201 30 : return true;
202 : }
203 :
204 : // Message doesn't match. Pop any remaining parts of the message
205 : // from the pipe.
206 0 : while (message.flags () & msg_t::more) {
207 0 : rc = fq.recv (&message);
208 0 : errno_assert (rc == 0);
209 : }
210 : }
211 : }
212 :
213 0 : zmq::blob_t zmq::xsub_t::get_credential () const
214 : {
215 0 : return fq.get_credential ();
216 : }
217 :
218 63387 : bool zmq::xsub_t::match (msg_t *msg_)
219 : {
220 63387 : bool matching = subscriptions.check ((unsigned char*) msg_->data (), msg_->size ());
221 :
222 63387 : return matching ^ options.invert_matching;
223 : }
224 :
225 45 : void zmq::xsub_t::send_subscription (unsigned char *data_, size_t size_,
226 : void *arg_)
227 : {
228 45 : pipe_t *pipe = (pipe_t*) arg_;
229 :
230 : // Create the subscription message.
231 : msg_t msg;
232 45 : int rc = msg.init_size (size_ + 1);
233 45 : errno_assert (rc == 0);
234 45 : unsigned char *data = (unsigned char*) msg.data ();
235 45 : data [0] = 1;
236 :
237 : // We explicitly allow a NULL subscription with size zero
238 45 : if (size_) {
239 : assert (data_);
240 6 : memcpy (data + 1, data_, size_);
241 : }
242 :
243 : // Send it to the pipe.
244 45 : bool sent = pipe->write (&msg);
245 : // If we reached the SNDHWM, and thus cannot send the subscription, drop
246 : // the subscription message instead. This matches the behaviour of
247 : // zmq_setsockopt(ZMQ_SUBSCRIBE, ...), which also drops subscriptions
248 : // when the SNDHWM is reached.
249 45 : if (!sent)
250 0 : msg.close ();
251 45 : }
|