31 #include "platform.hpp"    33 #ifdef ZMQ_HAVE_OPENPGM    35 #ifdef ZMQ_HAVE_WINDOWS    55 #define MSG_ERRQUEUE 0x2000    58 zmq::pgm_socket_t::pgm_socket_t (
bool receiver_, 
const options_t &options_) :
    66     pgm_msgv_processed (0)
    75 int zmq::pgm_socket_t::init_address (
const char *network_,
    76     struct pgm_addrinfo_t **res, uint16_t *port_number)
    79     const char *port_delim = strrchr (network_, 
':');
    85     *port_number = atoi (port_delim + 1);
    88     if (port_delim - network_ >= (
int) 
sizeof (network) - 1) {
    92     memset (network, 
'\0', 
sizeof (network));
    93     memcpy (network, network_, port_delim - network_);
    95     pgm_error_t *pgm_error = NULL;
    96     struct pgm_addrinfo_t hints;
    98     memset (&hints, 0, 
sizeof (hints));
    99     hints.ai_family = AF_UNSPEC;
   100     if (!pgm_getaddrinfo (network, NULL, res, &pgm_error)) {
   104         if (pgm_error->domain == PGM_ERROR_DOMAIN_IF &&
   107             ( pgm_error->code != PGM_ERROR_SERVICE &&
   108               pgm_error->code != PGM_ERROR_SOCKTNOSUPPORT)) {
   111             pgm_error_free (pgm_error);
   123 int zmq::pgm_socket_t::init (
bool udp_encapsulation_, 
const char *network_)
   131     nbytes_processed = 0;
   132     pgm_msgv_processed = 0;
   134     uint16_t port_number;
   135     struct pgm_addrinfo_t *res = NULL;
   136     sa_family_t sa_family;
   138     pgm_error_t *pgm_error = NULL;
   140     if (init_address(network_, &res, &port_number) < 0) {
   147     sa_family = res->ai_send_addrs[0].gsr_group.ss_family;
   150     if (udp_encapsulation_) {
   151         if (!pgm_socket (&sock, sa_family, SOCK_SEQPACKET, IPPROTO_UDP,
   156             if (pgm_error->domain == PGM_ERROR_DOMAIN_SOCKET && (
   157                   pgm_error->code != PGM_ERROR_BADF &&
   158                   pgm_error->code != PGM_ERROR_FAULT &&
   159                   pgm_error->code != PGM_ERROR_NOPROTOOPT &&
   160                   pgm_error->code != PGM_ERROR_FAILED))
   170         const int encapsulation_port = port_number;
   171         if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_UDP_ENCAP_UCAST_PORT,
   172                 &encapsulation_port, 
sizeof (encapsulation_port)))
   174         if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_UDP_ENCAP_MCAST_PORT,
   175                 &encapsulation_port, 
sizeof (encapsulation_port)))
   179         if (!pgm_socket (&sock, sa_family, SOCK_SEQPACKET, IPPROTO_PGM,
   184             if (pgm_error->domain == PGM_ERROR_DOMAIN_SOCKET && (
   185                   pgm_error->code != PGM_ERROR_BADF &&
   186                   pgm_error->code != PGM_ERROR_FAULT &&
   187                   pgm_error->code != PGM_ERROR_NOPROTOOPT &&
   188                   pgm_error->code != PGM_ERROR_FAILED))
   201             if (!pgm_setsockopt (sock, SOL_SOCKET, SO_RCVBUF, &rcvbuf,
   208             if (!pgm_setsockopt (sock, SOL_SOCKET, SO_SNDBUF, &sndbuf,
   214         if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_MTU, &max_tpdu,
   220         const int recv_only        = 1,
   222                   rxw_sqns         = compute_sqns (rxw_max_tpdu),
   223                   peer_expiry      = pgm_secs (300),
   224                   spmr_expiry      = pgm_msecs (25),
   225                   nak_bo_ivl       = pgm_msecs (50),
   226                   nak_rpt_ivl      = pgm_msecs (200),
   227                   nak_rdata_ivl    = pgm_msecs (200),
   228                   nak_data_retries = 50,
   229                   nak_ncf_retries  = 50;
   231         if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_RECV_ONLY, &recv_only,
   232                 sizeof (recv_only)) ||
   233             !pgm_setsockopt (sock, IPPROTO_PGM, PGM_RXW_SQNS, &rxw_sqns,
   234                 sizeof (rxw_sqns)) ||
   235             !pgm_setsockopt (sock, IPPROTO_PGM, PGM_PEER_EXPIRY, &peer_expiry,
   236                 sizeof (peer_expiry)) ||
   237             !pgm_setsockopt (sock, IPPROTO_PGM, PGM_SPMR_EXPIRY, &spmr_expiry,
   238                 sizeof (spmr_expiry)) ||
   239             !pgm_setsockopt (sock, IPPROTO_PGM, PGM_NAK_BO_IVL, &nak_bo_ivl,
   240                 sizeof (nak_bo_ivl)) ||
   241             !pgm_setsockopt (sock, IPPROTO_PGM, PGM_NAK_RPT_IVL, &nak_rpt_ivl,
   242                 sizeof (nak_rpt_ivl)) ||
   243             !pgm_setsockopt (sock, IPPROTO_PGM, PGM_NAK_RDATA_IVL,
   244                 &nak_rdata_ivl, 
sizeof (nak_rdata_ivl)) ||
   245             !pgm_setsockopt (sock, IPPROTO_PGM, PGM_NAK_DATA_RETRIES,
   246                 &nak_data_retries, 
sizeof (nak_data_retries)) ||
   247             !pgm_setsockopt (sock, IPPROTO_PGM, PGM_NAK_NCF_RETRIES,
   248                 &nak_ncf_retries, 
sizeof (nak_ncf_retries)))
   252         const int send_only        = 1,
   255                   txw_sqns         = compute_sqns (txw_max_tpdu),
   256                   ambient_spm      = pgm_secs (30),
   257                   heartbeat_spm[]  = { pgm_msecs (100),
   267         if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_SEND_ONLY,
   268                 &send_only, 
sizeof (send_only)) ||
   269             !pgm_setsockopt (sock, IPPROTO_PGM, PGM_ODATA_MAX_RTE,
   270                 &max_rte, 
sizeof (max_rte)) ||
   271             !pgm_setsockopt (sock, IPPROTO_PGM, PGM_TXW_SQNS,
   272                 &txw_sqns, 
sizeof (txw_sqns)) ||
   273             !pgm_setsockopt (sock, IPPROTO_PGM, PGM_AMBIENT_SPM,
   274                 &ambient_spm, 
sizeof (ambient_spm)) ||
   275             !pgm_setsockopt (sock, IPPROTO_PGM, PGM_HEARTBEAT_SPM,
   276                 &heartbeat_spm, 
sizeof (heartbeat_spm)))
   281     struct pgm_sockaddr_t addr;
   283     memset (&addr, 0, 
sizeof(addr));
   284     addr.sa_port = port_number;
   285     addr.sa_addr.sport = DEFAULT_DATA_SOURCE_PORT;
   291     if (!pgm_gsi_create_from_data (&addr.sa_addr.gsi, (uint8_t*) buf, 8))
   296     struct pgm_interface_req_t if_req;
   297     memset (&if_req, 0, 
sizeof(if_req));
   298     if_req.ir_interface = res->ai_recv_addrs[0].gsr_interface;
   299     if_req.ir_scope_id  = 0;
   300     if (AF_INET6 == sa_family) {
   301         struct sockaddr_in6 sa6;
   302         memcpy (&sa6, &res->ai_recv_addrs[0].gsr_group, sizeof (sa6));
   303         if_req.ir_scope_id = sa6.sin6_scope_id;
   305     if (!pgm_bind3 (sock, &addr, 
sizeof (addr), &if_req, 
sizeof (if_req),
   306           &if_req, 
sizeof (if_req), &pgm_error)) {
   310         if ((pgm_error->domain == PGM_ERROR_DOMAIN_SOCKET ||
   311              pgm_error->domain == PGM_ERROR_DOMAIN_IF) && (
   312              pgm_error->code != PGM_ERROR_INVAL &&
   313              pgm_error->code != PGM_ERROR_BADF &&
   314              pgm_error->code != PGM_ERROR_FAULT))
   324     for (
unsigned i = 0; i < res->ai_recv_addrs_len; i++) {
   325         if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_JOIN_GROUP,
   326               &res->ai_recv_addrs [i], sizeof (
struct group_req)))
   329     if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_SEND_GROUP,
   330           &res->ai_send_addrs [0], sizeof (
struct group_req)))
   333     pgm_freeaddrinfo (res);
   339         const int multicast_loop = 0;
   340         if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_MULTICAST_LOOP,
   341               &multicast_loop, 
sizeof (multicast_loop)))
   345         if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_MULTICAST_HOPS,
   346                 &multicast_hops, 
sizeof (multicast_hops)))
   351         const int dscp = 0x2e << 2;
   352         if (AF_INET6 != sa_family)
   353             pgm_setsockopt (sock, IPPROTO_PGM, PGM_TOS,
   354                &dscp, 
sizeof (dscp));
   356         const int nonblocking = 1;
   357         if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_NOBLOCK,
   358               &nonblocking, 
sizeof (nonblocking)))
   363     if (!pgm_connect (sock, &pgm_error)) {
   373         size_t max_tsdu_size = get_max_tsdu_size ();
   379         pgm_msgv = (pgm_msgv_t*) malloc (
sizeof (pgm_msgv_t) * pgm_msgv_len);
   387         pgm_close (sock, FALSE);
   391         pgm_freeaddrinfo (res);
   394     if (pgm_error != NULL) {
   395         pgm_error_free (pgm_error);
   402 zmq::pgm_socket_t::~pgm_socket_t ()
   407         pgm_close (sock, TRUE);
   412 void zmq::pgm_socket_t::get_receiver_fds (
fd_t *receive_fd_,
   413     fd_t *waiting_pipe_fd_)
   421     socklen = 
sizeof (*receive_fd_);
   422     rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_RECV_SOCK, receive_fd_,
   425     zmq_assert (socklen == 
sizeof (*receive_fd_));
   427     socklen = 
sizeof (*waiting_pipe_fd_);
   428     rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_PENDING_SOCK, waiting_pipe_fd_,
   431     zmq_assert (socklen == 
sizeof (*waiting_pipe_fd_));
   439 void zmq::pgm_socket_t::get_sender_fds (
fd_t *send_fd_, 
fd_t *receive_fd_,
   440     fd_t *rdata_notify_fd_, 
fd_t *pending_notify_fd_)
   450     socklen = 
sizeof (*send_fd_);
   451     rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_SEND_SOCK, send_fd_, &socklen);
   453     zmq_assert (socklen == 
sizeof (*receive_fd_));
   455     socklen = 
sizeof (*receive_fd_);
   456     rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_RECV_SOCK, receive_fd_,
   459     zmq_assert (socklen == 
sizeof (*receive_fd_));
   461     socklen = 
sizeof (*rdata_notify_fd_);
   462     rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_REPAIR_SOCK, rdata_notify_fd_,
   465     zmq_assert (socklen == 
sizeof (*rdata_notify_fd_));
   467     socklen = 
sizeof (*pending_notify_fd_);
   468     rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_PENDING_SOCK,
   469         pending_notify_fd_, &socklen);
   471     zmq_assert (socklen == 
sizeof (*pending_notify_fd_));
   476 size_t zmq::pgm_socket_t::send (
unsigned char *data_, 
size_t data_len_)
   480     const int status = pgm_send (sock, data_, data_len_, &nbytes);
   488         zmq_assert (status == PGM_IO_STATUS_RATE_LIMITED ||
   489             status == PGM_IO_STATUS_WOULD_BLOCK);
   491         if (status == PGM_IO_STATUS_RATE_LIMITED)
   498     last_tx_status = status;
   503 long zmq::pgm_socket_t::get_rx_timeout ()
   505     if (last_rx_status != PGM_IO_STATUS_RATE_LIMITED &&
   506           last_rx_status != PGM_IO_STATUS_TIMER_PENDING)
   510     socklen_t optlen = 
sizeof (tv);
   511     const bool rc = pgm_getsockopt (sock, IPPROTO_PGM,
   512         last_rx_status == PGM_IO_STATUS_RATE_LIMITED ? PGM_RATE_REMAIN :
   513         PGM_TIME_REMAIN, &tv, &optlen);
   516     const long timeout = (tv.tv_sec * 1000) + (tv.tv_usec / 1000);
   521 long zmq::pgm_socket_t::get_tx_timeout ()
   523     if (last_tx_status != PGM_IO_STATUS_RATE_LIMITED)
   527     socklen_t optlen = 
sizeof (tv);
   528     const bool rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_RATE_REMAIN, &tv,
   532     const long timeout = (tv.tv_sec * 1000) + (tv.tv_usec / 1000);
   538 size_t zmq::pgm_socket_t::get_max_tsdu_size ()
   541     socklen_t optlen = 
sizeof (max_tsdu);
   543     bool rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_MSS, &max_tsdu, &optlen);
   546     return (
size_t) max_tsdu;
   553     size_t raw_data_len = 0;
   557     if (nbytes_rec == nbytes_processed && nbytes_rec > 0) {
   561         nbytes_processed = 0;
   562         pgm_msgv_processed = 0;
   569     if (nbytes_rec == nbytes_processed) {
   578         pgm_error_t *pgm_error = NULL;
   580         const int status = pgm_recvmsgv (sock, pgm_msgv,
   581             pgm_msgv_len, MSG_ERRQUEUE, &nbytes_rec, &pgm_error);
   586         last_rx_status = status;
   590         if (status == PGM_IO_STATUS_TIMER_PENDING) {
   602         if (status == PGM_IO_STATUS_RATE_LIMITED) {
   613         if (status == PGM_IO_STATUS_WOULD_BLOCK) {
   624         if (status == PGM_IO_STATUS_RESET) {
   626             struct pgm_sk_buff_t* skb = pgm_msgv [0].msgv_skb [0];
   642         zmq_assert (pgm_msgv_processed <= pgm_msgv_len);
   649     zmq_assert (pgm_msgv [pgm_msgv_processed].msgv_len == 1);
   651     struct pgm_sk_buff_t* skb =
   652         pgm_msgv [pgm_msgv_processed].msgv_skb [0];
   655     *raw_data_ = skb->data;
   656     raw_data_len = skb->len;
   662     pgm_msgv_processed++;
   663     zmq_assert (pgm_msgv_processed <= pgm_msgv_len);
   664     nbytes_processed +=raw_data_len;
   669 void zmq::pgm_socket_t::process_upstream ()
   671     pgm_msgv_t dummy_msg;
   673     size_t dummy_bytes = 0;
   674     pgm_error_t *pgm_error = NULL;
   676     const int status = pgm_recvmsgv (sock, &dummy_msg,
   677         1, MSG_ERRQUEUE, &dummy_bytes, &pgm_error);
   683     zmq_assert (dummy_bytes == 0 && (status == PGM_IO_STATUS_TIMER_PENDING ||
   684         status == PGM_IO_STATUS_RATE_LIMITED ||
   685         status == PGM_IO_STATUS_WOULD_BLOCK));
   687     last_rx_status = status;
   689     if (status == PGM_IO_STATUS_TIMER_PENDING)
   692     if (status == PGM_IO_STATUS_RATE_LIMITED)
   698 int zmq::pgm_socket_t::compute_sqns (
int tpdu_)
   707     uint64_t sqns = size / tpdu_;
 
uint32_t generate_random()
int receive(void *socket)
static void receiver(void *socket)