libzmq  master
ZeroMQ C++ Core Engine (LIBZMQ)
radio.cpp
Go to the documentation of this file.
1 /*
2  Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 
4  This file is part of libzmq, the ZeroMQ core engine in C++.
5 
6  libzmq is free software; you can redistribute it and/or modify it under
7  the terms of the GNU Lesser General Public License (LGPL) as published
8  by the Free Software Foundation; either version 3 of the License, or
9  (at your option) any later version.
10 
11  As a special exception, the Contributors give you permission to link
12  this library with independent modules to produce an executable,
13  regardless of the license terms of these independent modules, and to
14  copy and distribute the resulting executable under terms of your choice,
15  provided that you also meet, for each linked independent module, the
16  terms and conditions of the license of that module. An independent
17  module is a module which is not derived from or based on this library.
18  If you modify this library, you must extend this exception to your
19  version of the library.
20 
21  libzmq is distributed in the hope that it will be useful, but WITHOUT
22  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23  FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24  License for more details.
25 
26  You should have received a copy of the GNU Lesser General Public License
27  along with this program. If not, see <http://www.gnu.org/licenses/>.
28 */
29 
30 #include "precompiled.hpp"
31 #include <string.h>
32 
33 #include "radio.hpp"
34 #include "macros.hpp"
35 #include "pipe.hpp"
36 #include "err.hpp"
37 #include "msg.hpp"
38 
39 zmq::radio_t::radio_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
40  socket_base_t (parent_, tid_, sid_, true)
41 {
43 }
44 
46 {
47 }
48 
49 void zmq::radio_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
50 {
51  LIBZMQ_UNUSED (subscribe_to_all_);
52 
53  zmq_assert (pipe_);
54 
55  // Don't delay pipe termination as there is no one
56  // to receive the delimiter.
57  pipe_->set_nodelay ();
58 
59  dist.attach (pipe_);
60 
61  if (subscribe_to_all_)
62  udp_pipes.push_back (pipe_);
63  // The pipe is active when attached. Let's read the subscriptions from
64  // it, if any.
65  else
66  xread_activated (pipe_);
67 }
68 
70 {
71  // There are some subscriptions waiting. Let's process them.
72  msg_t msg;
73  while (pipe_->read (&msg)) {
74  // Apply the subscription to the trie
75  if (msg.is_join () || msg.is_leave ()) {
76  std::string group = std::string (msg.group ());
77 
78  if (msg.is_join ())
79  subscriptions.insert (subscriptions_t::value_type (group, pipe_));
80  else {
81  std::pair<subscriptions_t::iterator, subscriptions_t::iterator> range =
82  subscriptions.equal_range (group);
83 
84  for (subscriptions_t::iterator it = range.first; it != range.second; ++it) {
85  if (it->second == pipe_) {
86  subscriptions.erase (it);
87  break;
88  }
89  }
90  }
91  }
92  msg.close ();
93  }
94 }
95 
97 {
98  dist.activated (pipe_);
99 }
100 
102 {
103  for (subscriptions_t::iterator it = subscriptions.begin (); it != subscriptions.end (); ) {
104  if (it->second == pipe_) {
105  subscriptions.erase (it++);
106  } else {
107  ++it;
108  }
109  }
110 
111  udp_pipes_t::iterator it = std::find(udp_pipes.begin(),
112  udp_pipes.end (), pipe_);
113  if (it != udp_pipes.end ())
114  udp_pipes.erase (it);
115 
116  dist.pipe_terminated (pipe_);
117 }
118 
120 {
121  // Radio sockets do not allow multipart data (ZMQ_SNDMORE)
122  if (msg_->flags () & msg_t::more) {
123  errno = EINVAL;
124  return -1;
125  }
126 
127  dist.unmatch ();
128 
129  std::pair<subscriptions_t::iterator, subscriptions_t::iterator> range =
130  subscriptions.equal_range (std::string(msg_->group ()));
131 
132  for (subscriptions_t::iterator it = range.first; it != range.second; ++it)
133  dist.match (it-> second);
134 
135  for (udp_pipes_t::iterator it = udp_pipes.begin (); it != udp_pipes.end (); ++it)
136  dist.match (*it);
137 
138  int rc = dist.send_to_matching (msg_);
139 
140  return rc;
141 }
142 
144 {
145  return dist.has_out ();
146 }
147 
149 {
150  // Messages cannot be received from PUB socket.
151  LIBZMQ_UNUSED (msg_);
152  errno = ENOTSUP;
153  return -1;
154 }
155 
157 {
158  return false;
159 }
160 
162  socket_base_t *socket_, const options_t &options_,
163  address_t *addr_) :
164  session_base_t (io_thread_, connect_, socket_, options_, addr_),
165  state (group)
166 {
167 }
168 
170 {
171 }
172 
174 {
175  if (msg_->flags() & msg_t::command) {
176  char *command_data =
177  static_cast <char *> (msg_->data ());
178  const size_t data_size = msg_->size ();
179 
180  int group_length;
181  char * group;
182 
183  msg_t join_leave_msg;
184  int rc;
185 
186  // Set the msg type to either JOIN or LEAVE
187  if (data_size >= 5 && memcmp (command_data, "\4JOIN", 5) == 0) {
188  group_length = (int) data_size - 5;
189  group = command_data + 5;
190  rc = join_leave_msg.init_join ();
191  }
192  else if (data_size >= 6 && memcmp (command_data, "\5LEAVE", 6) == 0) {
193  group_length = (int) data_size - 6;
194  group = command_data + 6;
195  rc = join_leave_msg.init_leave ();
196  }
197  // If it is not a JOIN or LEAVE just push the message
198  else
199  return session_base_t::push_msg (msg_);
200 
201  errno_assert (rc == 0);
202 
203  // Set the group
204  rc = join_leave_msg.set_group (group, group_length);
205  errno_assert (rc == 0);
206 
207  // Close the current command
208  rc = msg_->close ();
209  errno_assert (rc == 0);
210 
211  // Push the join or leave command
212  *msg_ = join_leave_msg;
213  return session_base_t::push_msg (msg_);
214  }
215  else
216  return session_base_t::push_msg (msg_);
217 }
218 
220 {
221  if (state == group) {
223  if (rc != 0)
224  return rc;
225 
226  const char *group = pending_msg.group ();
227  int length = (int) strlen (group);
228 
229  // First frame is the group
230  rc = msg_->init_size (length);
231  errno_assert(rc == 0);
232  msg_->set_flags(msg_t::more);
233  memcpy (msg_->data (), group, length);
234 
235  // Next status is the body
236  state = body;
237  return 0;
238  }
239  else {
240  *msg_ = pending_msg;
241  state = group;
242  return 0;
243  }
244 }
245 
247 {
249  state = group;
250 }
bool read(msg_t *msg_)
Definition: pipe.cpp:169
int close()
Definition: msg.cpp:217
void xread_activated(zmq::pipe_t *pipe_)
Definition: radio.cpp:69
int set_group(const char *group_)
Definition: msg.cpp:548
void xwrite_activated(zmq::pipe_t *pipe_)
Definition: radio.cpp:96
#define ZMQ_RADIO
#define zmq_assert(x)
Definition: err.hpp:119
virtual int push_msg(msg_t *msg_)
radio_session_t(zmq::io_thread_t *io_thread_, bool connect_, zmq::socket_base_t *socket_, const options_t &options_, address_t *addr_)
Definition: radio.cpp:161
void activated(zmq::pipe_t *pipe_)
Definition: dist.cpp:124
#define ENOTSUP
Definition: zmq.h:112
void set_nodelay()
Definition: pipe.cpp:380
int xrecv(zmq::msg_t *msg_)
Definition: radio.cpp:148
udp_pipes_t udp_pipes
Definition: radio.hpp:76
enum zmq::radio_session_t::@50 state
void match(zmq::pipe_t *pipe_)
Definition: dist.cpp:68
int xsend(zmq::msg_t *msg_)
Definition: radio.cpp:119
unsigned char size
Definition: msg.hpp:188
char group[16]
Definition: msg.hpp:182
int init_size(size_t size_)
Definition: msg.cpp:93
radio_t(zmq::ctx_t *parent_, uint32_t tid_, int sid_)
Definition: radio.cpp:39
bool is_join() const
Definition: msg.cpp:447
#define LIBZMQ_UNUSED(object)
Definition: macros.hpp:6
int push_msg(msg_t *msg_)
Definition: radio.cpp:173
int init_leave()
Definition: msg.cpp:207
int send_to_matching(zmq::msg_t *msg_)
Definition: dist.cpp:146
void attach(zmq::pipe_t *pipe_)
Definition: dist.cpp:50
virtual int pull_msg(msg_t *msg_)
bool has_out()
Definition: dist.cpp:205
void xattach_pipe(zmq::pipe_t *pipe_, bool subscribe_to_all_=false)
Definition: radio.cpp:49
int init_join()
Definition: msg.cpp:197
bool xhas_out()
Definition: radio.cpp:143
bool xhas_in()
Definition: radio.cpp:156
void set_flags(unsigned char flags_)
Definition: msg.cpp:384
#define errno_assert(x)
Definition: err.hpp:129
void xpipe_terminated(zmq::pipe_t *pipe_)
Definition: radio.cpp:101
void unmatch()
Definition: dist.cpp:99
bool is_leave() const
Definition: msg.cpp:452
unsigned char data[max_vsm_size]
Definition: msg.hpp:187
void pipe_terminated(zmq::pipe_t *pipe_)
Definition: dist.cpp:104
options_t options
Definition: own.hpp:109
dist_t dist
Definition: radio.hpp:79
int pull_msg(msg_t *msg_)
Definition: radio.cpp:219
unsigned char flags
Definition: msg.hpp:181
virtual void reset()
subscriptions_t subscriptions
Definition: radio.hpp:72