61 if (subscribe_to_all_)
73 while (pipe_->
read (&msg)) {
76 std::string group = std::string (msg.
group ());
79 subscriptions.insert (subscriptions_t::value_type (group, pipe_));
81 std::pair<subscriptions_t::iterator, subscriptions_t::iterator> range =
84 for (subscriptions_t::iterator it = range.first; it != range.second; ++it) {
85 if (it->second == pipe_) {
104 if (it->second == pipe_) {
111 udp_pipes_t::iterator it = std::find(
udp_pipes.begin(),
129 std::pair<subscriptions_t::iterator, subscriptions_t::iterator> range =
132 for (subscriptions_t::iterator it = range.first; it != range.second; ++it)
177 static_cast <
char *> (msg_->
data ());
178 const size_t data_size = msg_->
size ();
183 msg_t join_leave_msg;
187 if (data_size >= 5 && memcmp (command_data,
"\4JOIN", 5) == 0) {
188 group_length = (
int) data_size - 5;
189 group = command_data + 5;
192 else if (data_size >= 6 && memcmp (command_data,
"\5LEAVE", 6) == 0) {
193 group_length = (
int) data_size - 6;
194 group = command_data + 6;
204 rc = join_leave_msg.
set_group (group, group_length);
212 *msg_ = join_leave_msg;
227 int length = (
int) strlen (group);
void xread_activated(zmq::pipe_t *pipe_)
int set_group(const char *group_)
void xwrite_activated(zmq::pipe_t *pipe_)
virtual int push_msg(msg_t *msg_)
radio_session_t(zmq::io_thread_t *io_thread_, bool connect_, zmq::socket_base_t *socket_, const options_t &options_, address_t *addr_)
void activated(zmq::pipe_t *pipe_)
int xrecv(zmq::msg_t *msg_)
enum zmq::radio_session_t::@50 state
void match(zmq::pipe_t *pipe_)
int xsend(zmq::msg_t *msg_)
int init_size(size_t size_)
radio_t(zmq::ctx_t *parent_, uint32_t tid_, int sid_)
#define LIBZMQ_UNUSED(object)
int push_msg(msg_t *msg_)
int send_to_matching(zmq::msg_t *msg_)
void attach(zmq::pipe_t *pipe_)
virtual int pull_msg(msg_t *msg_)
void xattach_pipe(zmq::pipe_t *pipe_, bool subscribe_to_all_=false)
void set_flags(unsigned char flags_)
void xpipe_terminated(zmq::pipe_t *pipe_)
unsigned char data[max_vsm_size]
void pipe_terminated(zmq::pipe_t *pipe_)
int pull_msg(msg_t *msg_)
subscriptions_t subscriptions