32 #include "platform.hpp" 34 #if defined ZMQ_HAVE_OPENPGM 38 #ifdef ZMQ_HAVE_WINDOWS 49 zmq::pgm_receiver_t::pgm_receiver_t (
class io_thread_t *parent_,
50 const options_t &options_) :
51 io_object_t (parent_),
53 pgm_socket (true, options_),
61 zmq::pgm_receiver_t::~pgm_receiver_t ()
67 int zmq::pgm_receiver_t::init (
bool udp_encapsulation_,
const char *network_)
69 return pgm_socket.init (udp_encapsulation_, network_);
73 session_base_t *session_)
78 pgm_socket.get_receiver_fds (&socket_fd, &waiting_pipe_fd);
79 socket_handle = add_fd (socket_fd);
80 pipe_handle = add_fd (waiting_pipe_fd);
81 set_pollin (pipe_handle);
82 set_pollin (socket_handle);
87 drop_subscriptions ();
90 void zmq::pgm_receiver_t::unplug ()
93 for (peers_t::iterator it = peers.begin (); it != peers.end (); ++it) {
94 if (it->second.decoder != NULL) {
102 cancel_timer (rx_timer_id);
103 has_rx_timer =
false;
106 rm_fd (socket_handle);
112 void zmq::pgm_receiver_t::terminate ()
118 void zmq::pgm_receiver_t::restart_output ()
120 drop_subscriptions ();
123 void zmq::pgm_receiver_t::restart_input ()
128 const peers_t::iterator it = peers.find (*active_tsi);
133 int rc = session->push_msg (it->second.decoder->msg ());
137 rc = process_input (it->second.decoder);
140 if (errno == EAGAIN) {
146 it->second.joined =
false;
153 set_pollin (pipe_handle);
154 set_pollin (socket_handle);
160 void zmq::pgm_receiver_t::in_event ()
163 const pgm_tsi_t *tsi = NULL;
166 cancel_timer (rx_timer_id);
167 has_rx_timer =
false;
177 ssize_t received = pgm_socket.receive (&tmp, &tsi);
178 inpos = (
unsigned char*) tmp;
183 if (errno == ENOMEM || errno == EBUSY) {
184 const long timeout = pgm_socket.get_rx_timeout ();
185 add_timer (timeout, rx_timer_id);
192 peers_t::iterator it = peers.find (*tsi);
195 if (received == -1) {
196 if (it != peers.end ()) {
197 it->second.joined =
false;
198 if (it->second.decoder != NULL) {
206 if (it == peers.end ()) {
207 peer_info_t peer_info = {
false, NULL};
208 it = peers.insert (peers_t::value_type (*tsi, peer_info)).first;
211 insize = static_cast <
size_t> (received);
216 inpos +=
sizeof (uint16_t);
217 insize -=
sizeof (uint16_t);
220 if (!it->second.joined) {
224 if (offset == 0xffff)
235 it->second.joined =
true;
238 it->second.decoder =
new (std::nothrow)
243 int rc = process_input (it->second.decoder);
245 if (errno == EAGAIN) {
249 reset_pollin (pipe_handle);
250 reset_pollin (socket_handle);
255 it->second.joined =
false;
265 int zmq::pgm_receiver_t::process_input (v1_decoder_t *
decoder)
271 int rc = decoder->decode (inpos, insize, n);
278 rc = session->push_msg (decoder->msg ());
288 void zmq::pgm_receiver_t::timer_event (
int token)
293 has_rx_timer =
false;
297 void zmq::pgm_receiver_t::drop_subscriptions ()
301 while (session->pull_msg (&msg) == 0)
#define LIBZMQ_DELETE(p_object)
uint16_t get_uint16(const unsigned char *buffer_)
static uint8_t decoder[96]