31 #include "platform.hpp" 33 #ifdef ZMQ_HAVE_OPENPGM 35 #ifdef ZMQ_HAVE_WINDOWS 55 #define MSG_ERRQUEUE 0x2000 58 zmq::pgm_socket_t::pgm_socket_t (
bool receiver_,
const options_t &options_) :
66 pgm_msgv_processed (0)
75 int zmq::pgm_socket_t::init_address (
const char *network_,
76 struct pgm_addrinfo_t **res, uint16_t *port_number)
79 const char *port_delim = strrchr (network_,
':');
85 *port_number = atoi (port_delim + 1);
88 if (port_delim - network_ >= (
int)
sizeof (network) - 1) {
92 memset (network,
'\0',
sizeof (network));
93 memcpy (network, network_, port_delim - network_);
95 pgm_error_t *pgm_error = NULL;
96 struct pgm_addrinfo_t hints;
98 memset (&hints, 0,
sizeof (hints));
99 hints.ai_family = AF_UNSPEC;
100 if (!pgm_getaddrinfo (network, NULL, res, &pgm_error)) {
104 if (pgm_error->domain == PGM_ERROR_DOMAIN_IF &&
107 ( pgm_error->code != PGM_ERROR_SERVICE &&
108 pgm_error->code != PGM_ERROR_SOCKTNOSUPPORT)) {
111 pgm_error_free (pgm_error);
123 int zmq::pgm_socket_t::init (
bool udp_encapsulation_,
const char *network_)
131 nbytes_processed = 0;
132 pgm_msgv_processed = 0;
134 uint16_t port_number;
135 struct pgm_addrinfo_t *res = NULL;
136 sa_family_t sa_family;
138 pgm_error_t *pgm_error = NULL;
140 if (init_address(network_, &res, &port_number) < 0) {
147 sa_family = res->ai_send_addrs[0].gsr_group.ss_family;
150 if (udp_encapsulation_) {
151 if (!pgm_socket (&sock, sa_family, SOCK_SEQPACKET, IPPROTO_UDP,
156 if (pgm_error->domain == PGM_ERROR_DOMAIN_SOCKET && (
157 pgm_error->code != PGM_ERROR_BADF &&
158 pgm_error->code != PGM_ERROR_FAULT &&
159 pgm_error->code != PGM_ERROR_NOPROTOOPT &&
160 pgm_error->code != PGM_ERROR_FAILED))
170 const int encapsulation_port = port_number;
171 if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_UDP_ENCAP_UCAST_PORT,
172 &encapsulation_port,
sizeof (encapsulation_port)))
174 if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_UDP_ENCAP_MCAST_PORT,
175 &encapsulation_port,
sizeof (encapsulation_port)))
179 if (!pgm_socket (&sock, sa_family, SOCK_SEQPACKET, IPPROTO_PGM,
184 if (pgm_error->domain == PGM_ERROR_DOMAIN_SOCKET && (
185 pgm_error->code != PGM_ERROR_BADF &&
186 pgm_error->code != PGM_ERROR_FAULT &&
187 pgm_error->code != PGM_ERROR_NOPROTOOPT &&
188 pgm_error->code != PGM_ERROR_FAILED))
201 if (!pgm_setsockopt (sock, SOL_SOCKET, SO_RCVBUF, &rcvbuf,
208 if (!pgm_setsockopt (sock, SOL_SOCKET, SO_SNDBUF, &sndbuf,
214 if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_MTU, &max_tpdu,
220 const int recv_only = 1,
222 rxw_sqns = compute_sqns (rxw_max_tpdu),
223 peer_expiry = pgm_secs (300),
224 spmr_expiry = pgm_msecs (25),
225 nak_bo_ivl = pgm_msecs (50),
226 nak_rpt_ivl = pgm_msecs (200),
227 nak_rdata_ivl = pgm_msecs (200),
228 nak_data_retries = 50,
229 nak_ncf_retries = 50;
231 if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_RECV_ONLY, &recv_only,
232 sizeof (recv_only)) ||
233 !pgm_setsockopt (sock, IPPROTO_PGM, PGM_RXW_SQNS, &rxw_sqns,
234 sizeof (rxw_sqns)) ||
235 !pgm_setsockopt (sock, IPPROTO_PGM, PGM_PEER_EXPIRY, &peer_expiry,
236 sizeof (peer_expiry)) ||
237 !pgm_setsockopt (sock, IPPROTO_PGM, PGM_SPMR_EXPIRY, &spmr_expiry,
238 sizeof (spmr_expiry)) ||
239 !pgm_setsockopt (sock, IPPROTO_PGM, PGM_NAK_BO_IVL, &nak_bo_ivl,
240 sizeof (nak_bo_ivl)) ||
241 !pgm_setsockopt (sock, IPPROTO_PGM, PGM_NAK_RPT_IVL, &nak_rpt_ivl,
242 sizeof (nak_rpt_ivl)) ||
243 !pgm_setsockopt (sock, IPPROTO_PGM, PGM_NAK_RDATA_IVL,
244 &nak_rdata_ivl,
sizeof (nak_rdata_ivl)) ||
245 !pgm_setsockopt (sock, IPPROTO_PGM, PGM_NAK_DATA_RETRIES,
246 &nak_data_retries,
sizeof (nak_data_retries)) ||
247 !pgm_setsockopt (sock, IPPROTO_PGM, PGM_NAK_NCF_RETRIES,
248 &nak_ncf_retries,
sizeof (nak_ncf_retries)))
252 const int send_only = 1,
255 txw_sqns = compute_sqns (txw_max_tpdu),
256 ambient_spm = pgm_secs (30),
257 heartbeat_spm[] = { pgm_msecs (100),
267 if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_SEND_ONLY,
268 &send_only,
sizeof (send_only)) ||
269 !pgm_setsockopt (sock, IPPROTO_PGM, PGM_ODATA_MAX_RTE,
270 &max_rte,
sizeof (max_rte)) ||
271 !pgm_setsockopt (sock, IPPROTO_PGM, PGM_TXW_SQNS,
272 &txw_sqns,
sizeof (txw_sqns)) ||
273 !pgm_setsockopt (sock, IPPROTO_PGM, PGM_AMBIENT_SPM,
274 &ambient_spm,
sizeof (ambient_spm)) ||
275 !pgm_setsockopt (sock, IPPROTO_PGM, PGM_HEARTBEAT_SPM,
276 &heartbeat_spm,
sizeof (heartbeat_spm)))
281 struct pgm_sockaddr_t addr;
283 memset (&addr, 0,
sizeof(addr));
284 addr.sa_port = port_number;
285 addr.sa_addr.sport = DEFAULT_DATA_SOURCE_PORT;
291 if (!pgm_gsi_create_from_data (&addr.sa_addr.gsi, (uint8_t*) buf, 8))
296 struct pgm_interface_req_t if_req;
297 memset (&if_req, 0,
sizeof(if_req));
298 if_req.ir_interface = res->ai_recv_addrs[0].gsr_interface;
299 if_req.ir_scope_id = 0;
300 if (AF_INET6 == sa_family) {
301 struct sockaddr_in6 sa6;
302 memcpy (&sa6, &res->ai_recv_addrs[0].gsr_group, sizeof (sa6));
303 if_req.ir_scope_id = sa6.sin6_scope_id;
305 if (!pgm_bind3 (sock, &addr,
sizeof (addr), &if_req,
sizeof (if_req),
306 &if_req,
sizeof (if_req), &pgm_error)) {
310 if ((pgm_error->domain == PGM_ERROR_DOMAIN_SOCKET ||
311 pgm_error->domain == PGM_ERROR_DOMAIN_IF) && (
312 pgm_error->code != PGM_ERROR_INVAL &&
313 pgm_error->code != PGM_ERROR_BADF &&
314 pgm_error->code != PGM_ERROR_FAULT))
324 for (
unsigned i = 0; i < res->ai_recv_addrs_len; i++) {
325 if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_JOIN_GROUP,
326 &res->ai_recv_addrs [i], sizeof (
struct group_req)))
329 if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_SEND_GROUP,
330 &res->ai_send_addrs [0], sizeof (
struct group_req)))
333 pgm_freeaddrinfo (res);
339 const int multicast_loop = 0;
340 if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_MULTICAST_LOOP,
341 &multicast_loop,
sizeof (multicast_loop)))
345 if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_MULTICAST_HOPS,
346 &multicast_hops,
sizeof (multicast_hops)))
351 const int dscp = 0x2e << 2;
352 if (AF_INET6 != sa_family)
353 pgm_setsockopt (sock, IPPROTO_PGM, PGM_TOS,
354 &dscp,
sizeof (dscp));
356 const int nonblocking = 1;
357 if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_NOBLOCK,
358 &nonblocking,
sizeof (nonblocking)))
363 if (!pgm_connect (sock, &pgm_error)) {
373 size_t max_tsdu_size = get_max_tsdu_size ();
379 pgm_msgv = (pgm_msgv_t*) malloc (
sizeof (pgm_msgv_t) * pgm_msgv_len);
387 pgm_close (sock, FALSE);
391 pgm_freeaddrinfo (res);
394 if (pgm_error != NULL) {
395 pgm_error_free (pgm_error);
402 zmq::pgm_socket_t::~pgm_socket_t ()
407 pgm_close (sock, TRUE);
412 void zmq::pgm_socket_t::get_receiver_fds (
fd_t *receive_fd_,
413 fd_t *waiting_pipe_fd_)
421 socklen =
sizeof (*receive_fd_);
422 rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_RECV_SOCK, receive_fd_,
425 zmq_assert (socklen ==
sizeof (*receive_fd_));
427 socklen =
sizeof (*waiting_pipe_fd_);
428 rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_PENDING_SOCK, waiting_pipe_fd_,
431 zmq_assert (socklen ==
sizeof (*waiting_pipe_fd_));
439 void zmq::pgm_socket_t::get_sender_fds (
fd_t *send_fd_,
fd_t *receive_fd_,
440 fd_t *rdata_notify_fd_,
fd_t *pending_notify_fd_)
450 socklen =
sizeof (*send_fd_);
451 rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_SEND_SOCK, send_fd_, &socklen);
453 zmq_assert (socklen ==
sizeof (*receive_fd_));
455 socklen =
sizeof (*receive_fd_);
456 rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_RECV_SOCK, receive_fd_,
459 zmq_assert (socklen ==
sizeof (*receive_fd_));
461 socklen =
sizeof (*rdata_notify_fd_);
462 rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_REPAIR_SOCK, rdata_notify_fd_,
465 zmq_assert (socklen ==
sizeof (*rdata_notify_fd_));
467 socklen =
sizeof (*pending_notify_fd_);
468 rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_PENDING_SOCK,
469 pending_notify_fd_, &socklen);
471 zmq_assert (socklen ==
sizeof (*pending_notify_fd_));
476 size_t zmq::pgm_socket_t::send (
unsigned char *data_,
size_t data_len_)
480 const int status = pgm_send (sock, data_, data_len_, &nbytes);
488 zmq_assert (status == PGM_IO_STATUS_RATE_LIMITED ||
489 status == PGM_IO_STATUS_WOULD_BLOCK);
491 if (status == PGM_IO_STATUS_RATE_LIMITED)
498 last_tx_status = status;
503 long zmq::pgm_socket_t::get_rx_timeout ()
505 if (last_rx_status != PGM_IO_STATUS_RATE_LIMITED &&
506 last_rx_status != PGM_IO_STATUS_TIMER_PENDING)
510 socklen_t optlen =
sizeof (tv);
511 const bool rc = pgm_getsockopt (sock, IPPROTO_PGM,
512 last_rx_status == PGM_IO_STATUS_RATE_LIMITED ? PGM_RATE_REMAIN :
513 PGM_TIME_REMAIN, &tv, &optlen);
516 const long timeout = (tv.tv_sec * 1000) + (tv.tv_usec / 1000);
521 long zmq::pgm_socket_t::get_tx_timeout ()
523 if (last_tx_status != PGM_IO_STATUS_RATE_LIMITED)
527 socklen_t optlen =
sizeof (tv);
528 const bool rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_RATE_REMAIN, &tv,
532 const long timeout = (tv.tv_sec * 1000) + (tv.tv_usec / 1000);
538 size_t zmq::pgm_socket_t::get_max_tsdu_size ()
541 socklen_t optlen =
sizeof (max_tsdu);
543 bool rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_MSS, &max_tsdu, &optlen);
546 return (
size_t) max_tsdu;
553 size_t raw_data_len = 0;
557 if (nbytes_rec == nbytes_processed && nbytes_rec > 0) {
561 nbytes_processed = 0;
562 pgm_msgv_processed = 0;
569 if (nbytes_rec == nbytes_processed) {
578 pgm_error_t *pgm_error = NULL;
580 const int status = pgm_recvmsgv (sock, pgm_msgv,
581 pgm_msgv_len, MSG_ERRQUEUE, &nbytes_rec, &pgm_error);
586 last_rx_status = status;
590 if (status == PGM_IO_STATUS_TIMER_PENDING) {
602 if (status == PGM_IO_STATUS_RATE_LIMITED) {
613 if (status == PGM_IO_STATUS_WOULD_BLOCK) {
624 if (status == PGM_IO_STATUS_RESET) {
626 struct pgm_sk_buff_t* skb = pgm_msgv [0].msgv_skb [0];
642 zmq_assert (pgm_msgv_processed <= pgm_msgv_len);
649 zmq_assert (pgm_msgv [pgm_msgv_processed].msgv_len == 1);
651 struct pgm_sk_buff_t* skb =
652 pgm_msgv [pgm_msgv_processed].msgv_skb [0];
655 *raw_data_ = skb->data;
656 raw_data_len = skb->len;
662 pgm_msgv_processed++;
663 zmq_assert (pgm_msgv_processed <= pgm_msgv_len);
664 nbytes_processed +=raw_data_len;
669 void zmq::pgm_socket_t::process_upstream ()
671 pgm_msgv_t dummy_msg;
673 size_t dummy_bytes = 0;
674 pgm_error_t *pgm_error = NULL;
676 const int status = pgm_recvmsgv (sock, &dummy_msg,
677 1, MSG_ERRQUEUE, &dummy_bytes, &pgm_error);
683 zmq_assert (dummy_bytes == 0 && (status == PGM_IO_STATUS_TIMER_PENDING ||
684 status == PGM_IO_STATUS_RATE_LIMITED ||
685 status == PGM_IO_STATUS_WOULD_BLOCK));
687 last_rx_status = status;
689 if (status == PGM_IO_STATUS_TIMER_PENDING)
692 if (status == PGM_IO_STATUS_RATE_LIMITED)
698 int zmq::pgm_socket_t::compute_sqns (
int tpdu_)
707 uint64_t sqns = size / tpdu_;
uint32_t generate_random()
int receive(void *socket)
static void receiver(void *socket)