31 #include "platform.hpp" 33 #if defined ZMQ_HAVE_OPENPGM 35 #ifdef ZMQ_HAVE_WINDOWS 48 zmq::pgm_sender_t::pgm_sender_t (io_thread_t *parent_,
49 const options_t &options_) :
50 io_object_t (parent_),
56 pgm_socket (false, options_),
66 int zmq::pgm_sender_t::init (
bool udp_encapsulation_,
const char *network_)
68 int rc = pgm_socket.init (udp_encapsulation_, network_);
72 out_buffer_size = pgm_socket.get_max_tsdu_size ();
73 out_buffer = (
unsigned char*) malloc (out_buffer_size);
90 pgm_socket.get_sender_fds (&downlink_socket_fd, &uplink_socket_fd,
91 &rdata_notify_fd, &pending_notify_fd);
93 handle = add_fd (downlink_socket_fd);
94 uplink_handle = add_fd (uplink_socket_fd);
95 rdata_notify_handle = add_fd (rdata_notify_fd);
96 pending_notify_handle = add_fd (pending_notify_fd);
100 set_pollin (uplink_handle);
101 set_pollin (rdata_notify_handle);
102 set_pollin (pending_notify_handle);
108 void zmq::pgm_sender_t::unplug ()
111 cancel_timer (rx_timer_id);
112 has_rx_timer =
false;
116 cancel_timer (tx_timer_id);
117 has_tx_timer =
false;
121 rm_fd (uplink_handle);
122 rm_fd (rdata_notify_handle);
123 rm_fd (pending_notify_handle);
127 void zmq::pgm_sender_t::terminate ()
133 void zmq::pgm_sender_t::restart_output ()
139 void zmq::pgm_sender_t::restart_input ()
144 zmq::pgm_sender_t::~pgm_sender_t ()
146 int rc = msg.close ();
155 void zmq::pgm_sender_t::in_event ()
158 cancel_timer (rx_timer_id);
159 has_rx_timer =
false;
163 pgm_socket.process_upstream ();
164 if (errno == ENOMEM || errno == EBUSY) {
165 const long timeout = pgm_socket.get_rx_timeout ();
166 add_timer (timeout, rx_timer_id);
171 void zmq::pgm_sender_t::out_event ()
175 if (write_size == 0) {
180 unsigned char *bf = out_buffer +
sizeof (uint16_t);
181 size_t bfsz = out_buffer_size -
sizeof (uint16_t);
182 uint16_t offset = 0xffff;
184 size_t bytes =
encoder.encode (&bf, bfsz);
185 while (bytes < bfsz) {
186 if (!more_flag && offset == 0xffff)
187 offset = static_cast <uint16_t> (bytes);
188 int rc = session->pull_msg (&msg);
193 bf = out_buffer +
sizeof (uint16_t) + bytes;
194 bytes +=
encoder.encode (&bf, bfsz - bytes);
203 write_size =
sizeof (uint16_t) + bytes;
210 cancel_timer (tx_timer_id);
212 has_tx_timer =
false;
216 size_t nbytes = pgm_socket.send (out_buffer, write_size);
219 if (nbytes == write_size)
224 if (errno == ENOMEM) {
226 const long timeout = pgm_socket.get_tx_timeout ();
227 add_timer (timeout, tx_timer_id);
236 void zmq::pgm_sender_t::timer_event (
int token)
239 if (token == rx_timer_id) {
240 has_rx_timer =
false;
244 if (token == tx_timer_id) {
246 has_tx_timer =
false;
void put_uint16(unsigned char *buffer_, uint16_t value)
static char encoder[85+1]
poller_t::handle_t handle