Line data Source code
1 : /*
2 : Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 :
4 : This file is part of libzmq, the ZeroMQ core engine in C++.
5 :
6 : libzmq is free software; you can redistribute it and/or modify it under
7 : the terms of the GNU Lesser General Public License (LGPL) as published
8 : by the Free Software Foundation; either version 3 of the License, or
9 : (at your option) any later version.
10 :
11 : As a special exception, the Contributors give you permission to link
12 : this library with independent modules to produce an executable,
13 : regardless of the license terms of these independent modules, and to
14 : copy and distribute the resulting executable under terms of your choice,
15 : provided that you also meet, for each linked independent module, the
16 : terms and conditions of the license of that module. An independent
17 : module is a module which is not derived from or based on this library.
18 : If you modify this library, you must extend this exception to your
19 : version of the library.
20 :
21 : libzmq is distributed in the hope that it will be useful, but WITHOUT
22 : ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 : FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 : License for more details.
25 :
26 : You should have received a copy of the GNU Lesser General Public License
27 : along with this program. If not, see <http://www.gnu.org/licenses/>.
28 : */
29 :
30 : #include "precompiled.hpp"
31 : #include <string.h>
32 :
33 : #include "radio.hpp"
34 : #include "macros.hpp"
35 : #include "pipe.hpp"
36 : #include "err.hpp"
37 : #include "msg.hpp"
38 :
39 6 : zmq::radio_t::radio_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
40 18 : socket_base_t (parent_, tid_, sid_, true)
41 : {
42 6 : options.type = ZMQ_RADIO;
43 6 : }
44 :
45 24 : zmq::radio_t::~radio_t ()
46 : {
47 12 : }
48 :
49 6 : void zmq::radio_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
50 : {
51 : LIBZMQ_UNUSED (subscribe_to_all_);
52 :
53 6 : zmq_assert (pipe_);
54 :
55 : // Don't delay pipe termination as there is no one
56 : // to receive the delimiter.
57 6 : pipe_->set_nodelay ();
58 :
59 6 : dist.attach (pipe_);
60 :
61 6 : if (subscribe_to_all_)
62 3 : udp_pipes.push_back (pipe_);
63 : // The pipe is active when attached. Let's read the subscriptions from
64 : // it, if any.
65 : else
66 3 : xread_activated (pipe_);
67 6 : }
68 :
69 9 : void zmq::radio_t::xread_activated (pipe_t *pipe_)
70 : {
71 : // There are some subscriptions waiting. Let's process them.
72 : msg_t msg;
73 27 : while (pipe_->read (&msg)) {
74 : // Apply the subscription to the trie
75 9 : if (msg.is_join () || msg.is_leave ()) {
76 9 : std::string group = std::string (msg.group ());
77 :
78 9 : if (msg.is_join ())
79 12 : subscriptions.insert (subscriptions_t::value_type (group, pipe_));
80 : else {
81 : std::pair<subscriptions_t::iterator, subscriptions_t::iterator> range =
82 6 : subscriptions.equal_range (group);
83 :
84 6 : for (subscriptions_t::iterator it = range.first; it != range.second; ++it) {
85 3 : if (it->second == pipe_) {
86 3 : subscriptions.erase (it);
87 : break;
88 : }
89 : }
90 : }
91 : }
92 9 : msg.close ();
93 : }
94 9 : }
95 :
96 0 : void zmq::radio_t::xwrite_activated (pipe_t *pipe_)
97 : {
98 0 : dist.activated (pipe_);
99 0 : }
100 :
101 6 : void zmq::radio_t::xpipe_terminated (pipe_t *pipe_)
102 : {
103 30 : for (subscriptions_t::iterator it = subscriptions.begin (); it != subscriptions.end (); ) {
104 3 : if (it->second == pipe_) {
105 6 : subscriptions.erase (it++);
106 : } else {
107 : ++it;
108 : }
109 : }
110 :
111 : udp_pipes_t::iterator it = std::find(udp_pipes.begin(),
112 18 : udp_pipes.end (), pipe_);
113 18 : if (it != udp_pipes.end ())
114 3 : udp_pipes.erase (it);
115 :
116 6 : dist.pipe_terminated (pipe_);
117 6 : }
118 :
119 18 : int zmq::radio_t::xsend (msg_t *msg_)
120 : {
121 : // Radio sockets do not allow multipart data (ZMQ_SNDMORE)
122 18 : if (msg_->flags () & msg_t::more) {
123 0 : errno = EINVAL;
124 0 : return -1;
125 : }
126 :
127 18 : dist.unmatch ();
128 :
129 : std::pair<subscriptions_t::iterator, subscriptions_t::iterator> range =
130 54 : subscriptions.equal_range (std::string(msg_->group ()));
131 :
132 45 : for (subscriptions_t::iterator it = range.first; it != range.second; ++it)
133 9 : dist.match (it-> second);
134 :
135 99 : for (udp_pipes_t::iterator it = udp_pipes.begin (); it != udp_pipes.end (); ++it)
136 3 : dist.match (*it);
137 :
138 18 : int rc = dist.send_to_matching (msg_);
139 :
140 : return rc;
141 : }
142 :
143 0 : bool zmq::radio_t::xhas_out ()
144 : {
145 0 : return dist.has_out ();
146 : }
147 :
148 0 : int zmq::radio_t::xrecv (msg_t *msg_)
149 : {
150 : // Messages cannot be received from PUB socket.
151 : LIBZMQ_UNUSED (msg_);
152 0 : errno = ENOTSUP;
153 0 : return -1;
154 : }
155 :
156 0 : bool zmq::radio_t::xhas_in ()
157 : {
158 0 : return false;
159 : }
160 :
161 6 : zmq::radio_session_t::radio_session_t (io_thread_t *io_thread_, bool connect_,
162 : socket_base_t *socket_, const options_t &options_,
163 : address_t *addr_) :
164 : session_base_t (io_thread_, connect_, socket_, options_, addr_),
165 6 : state (group)
166 : {
167 6 : }
168 :
169 6 : zmq::radio_session_t::~radio_session_t ()
170 : {
171 6 : }
172 :
173 9 : int zmq::radio_session_t::push_msg (msg_t *msg_)
174 : {
175 9 : if (msg_->flags() & msg_t::command) {
176 : char *command_data =
177 9 : static_cast <char *> (msg_->data ());
178 9 : const size_t data_size = msg_->size ();
179 :
180 : int group_length;
181 : char * group;
182 :
183 : msg_t join_leave_msg;
184 : int rc;
185 :
186 : // Set the msg type to either JOIN or LEAVE
187 9 : if (data_size >= 5 && memcmp (command_data, "\4JOIN", 5) == 0) {
188 6 : group_length = (int) data_size - 5;
189 6 : group = command_data + 5;
190 6 : rc = join_leave_msg.init_join ();
191 : }
192 3 : else if (data_size >= 6 && memcmp (command_data, "\5LEAVE", 6) == 0) {
193 3 : group_length = (int) data_size - 6;
194 3 : group = command_data + 6;
195 3 : rc = join_leave_msg.init_leave ();
196 : }
197 : // If it is not a JOIN or LEAVE just push the message
198 : else
199 0 : return session_base_t::push_msg (msg_);
200 :
201 9 : errno_assert (rc == 0);
202 :
203 : // Set the group
204 9 : rc = join_leave_msg.set_group (group, group_length);
205 9 : errno_assert (rc == 0);
206 :
207 : // Close the current command
208 9 : rc = msg_->close ();
209 9 : errno_assert (rc == 0);
210 :
211 : // Push the join or leave command
212 9 : *msg_ = join_leave_msg;
213 9 : return session_base_t::push_msg (msg_);
214 : }
215 : else
216 0 : return session_base_t::push_msg (msg_);
217 : }
218 :
219 57 : int zmq::radio_session_t::pull_msg (msg_t *msg_)
220 : {
221 57 : if (state == group) {
222 45 : int rc = session_base_t::pull_msg (&pending_msg);
223 45 : if (rc != 0)
224 : return rc;
225 :
226 12 : const char *group = pending_msg.group ();
227 12 : int length = (int) strlen (group);
228 :
229 : // First frame is the group
230 12 : rc = msg_->init_size (length);
231 12 : errno_assert(rc == 0);
232 12 : msg_->set_flags(msg_t::more);
233 12 : memcpy (msg_->data (), group, length);
234 :
235 : // Next status is the body
236 12 : state = body;
237 12 : return 0;
238 : }
239 : else {
240 12 : *msg_ = pending_msg;
241 12 : state = group;
242 12 : return 0;
243 : }
244 : }
245 :
246 0 : void zmq::radio_session_t::reset ()
247 : {
248 0 : session_base_t::reset ();
249 0 : state = group;
250 0 : }
|