libzmq  master
ZeroMQ C++ Core Engine (LIBZMQ)
pgm_socket.cpp
Go to the documentation of this file.
1 /*
2  Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 
4  This file is part of libzmq, the ZeroMQ core engine in C++.
5 
6  libzmq is free software; you can redistribute it and/or modify it under
7  the terms of the GNU Lesser General Public License (LGPL) as published
8  by the Free Software Foundation; either version 3 of the License, or
9  (at your option) any later version.
10 
11  As a special exception, the Contributors give you permission to link
12  this library with independent modules to produce an executable,
13  regardless of the license terms of these independent modules, and to
14  copy and distribute the resulting executable under terms of your choice,
15  provided that you also meet, for each linked independent module, the
16  terms and conditions of the license of that module. An independent
17  module is a module which is not derived from or based on this library.
18  If you modify this library, you must extend this exception to your
19  version of the library.
20 
21  libzmq is distributed in the hope that it will be useful, but WITHOUT
22  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23  FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24  License for more details.
25 
26  You should have received a copy of the GNU Lesser General Public License
27  along with this program. If not, see <http://www.gnu.org/licenses/>.
28 */
29 
30 #include "precompiled.hpp"
31 #include "platform.hpp"
32 
33 #ifdef ZMQ_HAVE_OPENPGM
34 
35 #ifdef ZMQ_HAVE_WINDOWS
36 #include "windows.hpp"
37 #endif
38 
39 #ifdef ZMQ_HAVE_LINUX
40 #include <poll.h>
41 #endif
42 
43 #include <stdlib.h>
44 #include <string.h>
45 #include <string>
46 
47 #include "options.hpp"
48 #include "pgm_socket.hpp"
49 #include "config.hpp"
50 #include "err.hpp"
51 #include "random.hpp"
52 #include "stdint.hpp"
53 
54 #ifndef MSG_ERRQUEUE
55 #define MSG_ERRQUEUE 0x2000
56 #endif
57 
58 zmq::pgm_socket_t::pgm_socket_t (bool receiver_, const options_t &options_) :
59  sock (NULL),
60  options (options_),
61  receiver (receiver_),
62  pgm_msgv (NULL),
63  pgm_msgv_len (0),
64  nbytes_rec (0),
65  nbytes_processed (0),
66  pgm_msgv_processed (0)
67 {
68 }
69 
70 // Resolve PGM socket address.
71 // network_ of the form <interface & multicast group decls>:<IP port>
72 // e.g. eth0;239.192.0.1:7500
73 // link-local;224.250.0.1,224.250.0.2;224.250.0.3:8000
74 // ;[fe80::1%en0]:7500
75 int zmq::pgm_socket_t::init_address (const char *network_,
76  struct pgm_addrinfo_t **res, uint16_t *port_number)
77 {
78  // Parse port number, start from end for IPv6
79  const char *port_delim = strrchr (network_, ':');
80  if (!port_delim) {
81  errno = EINVAL;
82  return -1;
83  }
84 
85  *port_number = atoi (port_delim + 1);
86 
87  char network [256];
88  if (port_delim - network_ >= (int) sizeof (network) - 1) {
89  errno = EINVAL;
90  return -1;
91  }
92  memset (network, '\0', sizeof (network));
93  memcpy (network, network_, port_delim - network_);
94 
95  pgm_error_t *pgm_error = NULL;
96  struct pgm_addrinfo_t hints;
97 
98  memset (&hints, 0, sizeof (hints));
99  hints.ai_family = AF_UNSPEC;
100  if (!pgm_getaddrinfo (network, NULL, res, &pgm_error)) {
101 
102  // Invalid parameters don't set pgm_error_t.
103  zmq_assert (pgm_error != NULL);
104  if (pgm_error->domain == PGM_ERROR_DOMAIN_IF &&
105 
106  // NB: cannot catch EAI_BADFLAGS.
107  ( pgm_error->code != PGM_ERROR_SERVICE &&
108  pgm_error->code != PGM_ERROR_SOCKTNOSUPPORT)) {
109 
110  // User, host, or network configuration or transient error.
111  pgm_error_free (pgm_error);
112  errno = EINVAL;
113  return -1;
114  }
115 
116  // Fatal OpenPGM internal error.
117  zmq_assert (false);
118  }
119  return 0;
120 }
121 
122 // Create, bind and connect PGM socket.
123 int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)
124 {
125  // Can not open transport before destroying old one.
126  zmq_assert (sock == NULL);
127  zmq_assert (options.rate > 0);
128 
129  // Zero counter used in msgrecv.
130  nbytes_rec = 0;
131  nbytes_processed = 0;
132  pgm_msgv_processed = 0;
133 
134  uint16_t port_number;
135  struct pgm_addrinfo_t *res = NULL;
136  sa_family_t sa_family;
137 
138  pgm_error_t *pgm_error = NULL;
139 
140  if (init_address(network_, &res, &port_number) < 0) {
141  goto err_abort;
142  }
143 
144  zmq_assert (res != NULL);
145 
146  // Pick up detected IP family.
147  sa_family = res->ai_send_addrs[0].gsr_group.ss_family;
148 
149  // Create IP/PGM or UDP/PGM socket.
150  if (udp_encapsulation_) {
151  if (!pgm_socket (&sock, sa_family, SOCK_SEQPACKET, IPPROTO_UDP,
152  &pgm_error)) {
153 
154  // Invalid parameters don't set pgm_error_t.
155  zmq_assert (pgm_error != NULL);
156  if (pgm_error->domain == PGM_ERROR_DOMAIN_SOCKET && (
157  pgm_error->code != PGM_ERROR_BADF &&
158  pgm_error->code != PGM_ERROR_FAULT &&
159  pgm_error->code != PGM_ERROR_NOPROTOOPT &&
160  pgm_error->code != PGM_ERROR_FAILED))
161 
162  // User, host, or network configuration or transient error.
163  goto err_abort;
164 
165  // Fatal OpenPGM internal error.
166  zmq_assert (false);
167  }
168 
169  // All options are of data type int
170  const int encapsulation_port = port_number;
171  if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_UDP_ENCAP_UCAST_PORT,
172  &encapsulation_port, sizeof (encapsulation_port)))
173  goto err_abort;
174  if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_UDP_ENCAP_MCAST_PORT,
175  &encapsulation_port, sizeof (encapsulation_port)))
176  goto err_abort;
177  }
178  else {
179  if (!pgm_socket (&sock, sa_family, SOCK_SEQPACKET, IPPROTO_PGM,
180  &pgm_error)) {
181 
182  // Invalid parameters don't set pgm_error_t.
183  zmq_assert (pgm_error != NULL);
184  if (pgm_error->domain == PGM_ERROR_DOMAIN_SOCKET && (
185  pgm_error->code != PGM_ERROR_BADF &&
186  pgm_error->code != PGM_ERROR_FAULT &&
187  pgm_error->code != PGM_ERROR_NOPROTOOPT &&
188  pgm_error->code != PGM_ERROR_FAILED))
189 
190  // User, host, or network configuration or transient error.
191  goto err_abort;
192 
193  // Fatal OpenPGM internal error.
194  zmq_assert (false);
195  }
196  }
197 
198  {
199  const int rcvbuf = (int) options.rcvbuf;
200  if (rcvbuf >= 0) {
201  if (!pgm_setsockopt (sock, SOL_SOCKET, SO_RCVBUF, &rcvbuf,
202  sizeof (rcvbuf)))
203  goto err_abort;
204  }
205 
206  const int sndbuf = (int) options.sndbuf;
207  if (sndbuf >= 0) {
208  if (!pgm_setsockopt (sock, SOL_SOCKET, SO_SNDBUF, &sndbuf,
209  sizeof (sndbuf)))
210  goto err_abort;
211  }
212 
213  const int max_tpdu = (int) options.multicast_maxtpdu;
214  if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_MTU, &max_tpdu,
215  sizeof (max_tpdu)))
216  goto err_abort;
217  }
218 
219  if (receiver) {
220  const int recv_only = 1,
221  rxw_max_tpdu = (int) options.multicast_maxtpdu,
222  rxw_sqns = compute_sqns (rxw_max_tpdu),
223  peer_expiry = pgm_secs (300),
224  spmr_expiry = pgm_msecs (25),
225  nak_bo_ivl = pgm_msecs (50),
226  nak_rpt_ivl = pgm_msecs (200),
227  nak_rdata_ivl = pgm_msecs (200),
228  nak_data_retries = 50,
229  nak_ncf_retries = 50;
230 
231  if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_RECV_ONLY, &recv_only,
232  sizeof (recv_only)) ||
233  !pgm_setsockopt (sock, IPPROTO_PGM, PGM_RXW_SQNS, &rxw_sqns,
234  sizeof (rxw_sqns)) ||
235  !pgm_setsockopt (sock, IPPROTO_PGM, PGM_PEER_EXPIRY, &peer_expiry,
236  sizeof (peer_expiry)) ||
237  !pgm_setsockopt (sock, IPPROTO_PGM, PGM_SPMR_EXPIRY, &spmr_expiry,
238  sizeof (spmr_expiry)) ||
239  !pgm_setsockopt (sock, IPPROTO_PGM, PGM_NAK_BO_IVL, &nak_bo_ivl,
240  sizeof (nak_bo_ivl)) ||
241  !pgm_setsockopt (sock, IPPROTO_PGM, PGM_NAK_RPT_IVL, &nak_rpt_ivl,
242  sizeof (nak_rpt_ivl)) ||
243  !pgm_setsockopt (sock, IPPROTO_PGM, PGM_NAK_RDATA_IVL,
244  &nak_rdata_ivl, sizeof (nak_rdata_ivl)) ||
245  !pgm_setsockopt (sock, IPPROTO_PGM, PGM_NAK_DATA_RETRIES,
246  &nak_data_retries, sizeof (nak_data_retries)) ||
247  !pgm_setsockopt (sock, IPPROTO_PGM, PGM_NAK_NCF_RETRIES,
248  &nak_ncf_retries, sizeof (nak_ncf_retries)))
249  goto err_abort;
250  }
251  else {
252  const int send_only = 1,
253  max_rte = (int) ((options.rate * 1000) / 8),
254  txw_max_tpdu = (int) options.multicast_maxtpdu,
255  txw_sqns = compute_sqns (txw_max_tpdu),
256  ambient_spm = pgm_secs (30),
257  heartbeat_spm[] = { pgm_msecs (100),
258  pgm_msecs (100),
259  pgm_msecs (100),
260  pgm_msecs (100),
261  pgm_msecs (1300),
262  pgm_secs (7),
263  pgm_secs (16),
264  pgm_secs (25),
265  pgm_secs (30) };
266 
267  if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_SEND_ONLY,
268  &send_only, sizeof (send_only)) ||
269  !pgm_setsockopt (sock, IPPROTO_PGM, PGM_ODATA_MAX_RTE,
270  &max_rte, sizeof (max_rte)) ||
271  !pgm_setsockopt (sock, IPPROTO_PGM, PGM_TXW_SQNS,
272  &txw_sqns, sizeof (txw_sqns)) ||
273  !pgm_setsockopt (sock, IPPROTO_PGM, PGM_AMBIENT_SPM,
274  &ambient_spm, sizeof (ambient_spm)) ||
275  !pgm_setsockopt (sock, IPPROTO_PGM, PGM_HEARTBEAT_SPM,
276  &heartbeat_spm, sizeof (heartbeat_spm)))
277  goto err_abort;
278  }
279 
280  // PGM transport GSI.
281  struct pgm_sockaddr_t addr;
282 
283  memset (&addr, 0, sizeof(addr));
284  addr.sa_port = port_number;
285  addr.sa_addr.sport = DEFAULT_DATA_SOURCE_PORT;
286 
287  // Create random GSI.
288  uint32_t buf [2];
289  buf [0] = generate_random ();
290  buf [1] = generate_random ();
291  if (!pgm_gsi_create_from_data (&addr.sa_addr.gsi, (uint8_t*) buf, 8))
292  goto err_abort;
293 
294 
295  // Bind a transport to the specified network devices.
296  struct pgm_interface_req_t if_req;
297  memset (&if_req, 0, sizeof(if_req));
298  if_req.ir_interface = res->ai_recv_addrs[0].gsr_interface;
299  if_req.ir_scope_id = 0;
300  if (AF_INET6 == sa_family) {
301  struct sockaddr_in6 sa6;
302  memcpy (&sa6, &res->ai_recv_addrs[0].gsr_group, sizeof (sa6));
303  if_req.ir_scope_id = sa6.sin6_scope_id;
304  }
305  if (!pgm_bind3 (sock, &addr, sizeof (addr), &if_req, sizeof (if_req),
306  &if_req, sizeof (if_req), &pgm_error)) {
307 
308  // Invalid parameters don't set pgm_error_t.
309  zmq_assert (pgm_error != NULL);
310  if ((pgm_error->domain == PGM_ERROR_DOMAIN_SOCKET ||
311  pgm_error->domain == PGM_ERROR_DOMAIN_IF) && (
312  pgm_error->code != PGM_ERROR_INVAL &&
313  pgm_error->code != PGM_ERROR_BADF &&
314  pgm_error->code != PGM_ERROR_FAULT))
315 
316  // User, host, or network configuration or transient error.
317  goto err_abort;
318 
319  // Fatal OpenPGM internal error.
320  zmq_assert (false);
321  }
322 
323  // Join IP multicast groups.
324  for (unsigned i = 0; i < res->ai_recv_addrs_len; i++) {
325  if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_JOIN_GROUP,
326  &res->ai_recv_addrs [i], sizeof (struct group_req)))
327  goto err_abort;
328  }
329  if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_SEND_GROUP,
330  &res->ai_send_addrs [0], sizeof (struct group_req)))
331  goto err_abort;
332 
333  pgm_freeaddrinfo (res);
334  res = NULL;
335 
336  // Set IP level parameters.
337  {
338  // Multicast loopback disabled by default
339  const int multicast_loop = 0;
340  if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_MULTICAST_LOOP,
341  &multicast_loop, sizeof (multicast_loop)))
342  goto err_abort;
343 
344  const int multicast_hops = options.multicast_hops;
345  if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_MULTICAST_HOPS,
346  &multicast_hops, sizeof (multicast_hops)))
347  goto err_abort;
348 
349  // Expedited Forwarding PHB for network elements, no ECN.
350  // Ignore return value due to varied runtime support.
351  const int dscp = 0x2e << 2;
352  if (AF_INET6 != sa_family)
353  pgm_setsockopt (sock, IPPROTO_PGM, PGM_TOS,
354  &dscp, sizeof (dscp));
355 
356  const int nonblocking = 1;
357  if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_NOBLOCK,
358  &nonblocking, sizeof (nonblocking)))
359  goto err_abort;
360  }
361 
362  // Connect PGM transport to start state machine.
363  if (!pgm_connect (sock, &pgm_error)) {
364 
365  // Invalid parameters don't set pgm_error_t.
366  zmq_assert (pgm_error != NULL);
367  goto err_abort;
368  }
369 
370  // For receiver transport preallocate pgm_msgv array.
371  if (receiver) {
373  size_t max_tsdu_size = get_max_tsdu_size ();
374  pgm_msgv_len = (int) in_batch_size / max_tsdu_size;
375  if ((int) in_batch_size % max_tsdu_size)
376  pgm_msgv_len++;
377  zmq_assert (pgm_msgv_len);
378 
379  pgm_msgv = (pgm_msgv_t*) malloc (sizeof (pgm_msgv_t) * pgm_msgv_len);
380  alloc_assert (pgm_msgv);
381  }
382 
383  return 0;
384 
385 err_abort:
386  if (sock != NULL) {
387  pgm_close (sock, FALSE);
388  sock = NULL;
389  }
390  if (res != NULL) {
391  pgm_freeaddrinfo (res);
392  res = NULL;
393  }
394  if (pgm_error != NULL) {
395  pgm_error_free (pgm_error);
396  pgm_error = NULL;
397  }
398  errno = EINVAL;
399  return -1;
400 }
401 
402 zmq::pgm_socket_t::~pgm_socket_t ()
403 {
404  if (pgm_msgv)
405  free (pgm_msgv);
406  if (sock)
407  pgm_close (sock, TRUE);
408 }
409 
410 // Get receiver fds. receive_fd_ is signaled for incoming packets,
411 // waiting_pipe_fd_ is signaled for state driven events and data.
412 void zmq::pgm_socket_t::get_receiver_fds (fd_t *receive_fd_,
413  fd_t *waiting_pipe_fd_)
414 {
415  socklen_t socklen;
416  bool rc;
417 
418  zmq_assert (receive_fd_);
419  zmq_assert (waiting_pipe_fd_);
420 
421  socklen = sizeof (*receive_fd_);
422  rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_RECV_SOCK, receive_fd_,
423  &socklen);
424  zmq_assert (rc);
425  zmq_assert (socklen == sizeof (*receive_fd_));
426 
427  socklen = sizeof (*waiting_pipe_fd_);
428  rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_PENDING_SOCK, waiting_pipe_fd_,
429  &socklen);
430  zmq_assert (rc);
431  zmq_assert (socklen == sizeof (*waiting_pipe_fd_));
432 }
433 
434 // Get fds and store them into user allocated memory.
435 // send_fd is for non-blocking send wire notifications.
436 // receive_fd_ is for incoming back-channel protocol packets.
437 // rdata_notify_fd_ is raised for waiting repair transmissions.
438 // pending_notify_fd_ is for state driven events.
439 void zmq::pgm_socket_t::get_sender_fds (fd_t *send_fd_, fd_t *receive_fd_,
440  fd_t *rdata_notify_fd_, fd_t *pending_notify_fd_)
441 {
442  socklen_t socklen;
443  bool rc;
444 
445  zmq_assert (send_fd_);
446  zmq_assert (receive_fd_);
447  zmq_assert (rdata_notify_fd_);
448  zmq_assert (pending_notify_fd_);
449 
450  socklen = sizeof (*send_fd_);
451  rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_SEND_SOCK, send_fd_, &socklen);
452  zmq_assert (rc);
453  zmq_assert (socklen == sizeof (*receive_fd_));
454 
455  socklen = sizeof (*receive_fd_);
456  rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_RECV_SOCK, receive_fd_,
457  &socklen);
458  zmq_assert (rc);
459  zmq_assert (socklen == sizeof (*receive_fd_));
460 
461  socklen = sizeof (*rdata_notify_fd_);
462  rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_REPAIR_SOCK, rdata_notify_fd_,
463  &socklen);
464  zmq_assert (rc);
465  zmq_assert (socklen == sizeof (*rdata_notify_fd_));
466 
467  socklen = sizeof (*pending_notify_fd_);
468  rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_PENDING_SOCK,
469  pending_notify_fd_, &socklen);
470  zmq_assert (rc);
471  zmq_assert (socklen == sizeof (*pending_notify_fd_));
472 }
473 
474 // Send one APDU, transmit window owned memory.
475 // data_len_ must be less than one TPDU.
476 size_t zmq::pgm_socket_t::send (unsigned char *data_, size_t data_len_)
477 {
478  size_t nbytes = 0;
479 
480  const int status = pgm_send (sock, data_, data_len_, &nbytes);
481 
482  // We have to write all data as one packet.
483  if (nbytes > 0) {
484  zmq_assert (status == PGM_IO_STATUS_NORMAL);
485  zmq_assert (nbytes == data_len_);
486  }
487  else {
488  zmq_assert (status == PGM_IO_STATUS_RATE_LIMITED ||
489  status == PGM_IO_STATUS_WOULD_BLOCK);
490 
491  if (status == PGM_IO_STATUS_RATE_LIMITED)
492  errno = ENOMEM;
493  else
494  errno = EBUSY;
495  }
496 
497  // Save return value.
498  last_tx_status = status;
499 
500  return nbytes;
501 }
502 
503 long zmq::pgm_socket_t::get_rx_timeout ()
504 {
505  if (last_rx_status != PGM_IO_STATUS_RATE_LIMITED &&
506  last_rx_status != PGM_IO_STATUS_TIMER_PENDING)
507  return -1;
508 
509  struct timeval tv;
510  socklen_t optlen = sizeof (tv);
511  const bool rc = pgm_getsockopt (sock, IPPROTO_PGM,
512  last_rx_status == PGM_IO_STATUS_RATE_LIMITED ? PGM_RATE_REMAIN :
513  PGM_TIME_REMAIN, &tv, &optlen);
514  zmq_assert (rc);
515 
516  const long timeout = (tv.tv_sec * 1000) + (tv.tv_usec / 1000);
517 
518  return timeout;
519 }
520 
521 long zmq::pgm_socket_t::get_tx_timeout ()
522 {
523  if (last_tx_status != PGM_IO_STATUS_RATE_LIMITED)
524  return -1;
525 
526  struct timeval tv;
527  socklen_t optlen = sizeof (tv);
528  const bool rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_RATE_REMAIN, &tv,
529  &optlen);
530  zmq_assert (rc);
531 
532  const long timeout = (tv.tv_sec * 1000) + (tv.tv_usec / 1000);
533 
534  return timeout;
535 }
536 
537 // Return max TSDU size without fragmentation from current PGM transport.
538 size_t zmq::pgm_socket_t::get_max_tsdu_size ()
539 {
540  int max_tsdu = 0;
541  socklen_t optlen = sizeof (max_tsdu);
542 
543  bool rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_MSS, &max_tsdu, &optlen);
544  zmq_assert (rc);
545  zmq_assert (optlen == sizeof (max_tsdu));
546  return (size_t) max_tsdu;
547 }
548 
549 // pgm_recvmsgv is called to fill the pgm_msgv array up to pgm_msgv_len.
550 // In subsequent calls data from pgm_msgv structure are returned.
551 ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)
552 {
553  size_t raw_data_len = 0;
554 
555  // We just sent all data from pgm_transport_recvmsgv up
556  // and have to return 0 that another engine in this thread is scheduled.
557  if (nbytes_rec == nbytes_processed && nbytes_rec > 0) {
558 
559  // Reset all the counters.
560  nbytes_rec = 0;
561  nbytes_processed = 0;
562  pgm_msgv_processed = 0;
563  errno = EAGAIN;
564  return 0;
565  }
566 
567  // If we have are going first time or if we have processed all pgm_msgv_t
568  // structure previously read from the pgm socket.
569  if (nbytes_rec == nbytes_processed) {
570 
571  // Check program flow.
572  zmq_assert (pgm_msgv_processed == 0);
573  zmq_assert (nbytes_processed == 0);
574  zmq_assert (nbytes_rec == 0);
575 
576  // Receive a vector of Application Protocol Domain Unit's (APDUs)
577  // from the transport.
578  pgm_error_t *pgm_error = NULL;
579 
580  const int status = pgm_recvmsgv (sock, pgm_msgv,
581  pgm_msgv_len, MSG_ERRQUEUE, &nbytes_rec, &pgm_error);
582 
583  // Invalid parameters.
584  zmq_assert (status != PGM_IO_STATUS_ERROR);
585 
586  last_rx_status = status;
587 
588  // In a case when no ODATA/RDATA fired POLLIN event (SPM...)
589  // pgm_recvmsg returns PGM_IO_STATUS_TIMER_PENDING.
590  if (status == PGM_IO_STATUS_TIMER_PENDING) {
591 
592  zmq_assert (nbytes_rec == 0);
593 
594  // In case if no RDATA/ODATA caused POLLIN 0 is
595  // returned.
596  nbytes_rec = 0;
597  errno = EBUSY;
598  return 0;
599  }
600 
601  // Send SPMR, NAK, ACK is rate limited.
602  if (status == PGM_IO_STATUS_RATE_LIMITED) {
603 
604  zmq_assert (nbytes_rec == 0);
605 
606  // In case if no RDATA/ODATA caused POLLIN 0 is returned.
607  nbytes_rec = 0;
608  errno = ENOMEM;
609  return 0;
610  }
611 
612  // No peers and hence no incoming packets.
613  if (status == PGM_IO_STATUS_WOULD_BLOCK) {
614 
615  zmq_assert (nbytes_rec == 0);
616 
617  // In case if no RDATA/ODATA caused POLLIN 0 is returned.
618  nbytes_rec = 0;
619  errno = EAGAIN;
620  return 0;
621  }
622 
623  // Data loss.
624  if (status == PGM_IO_STATUS_RESET) {
625 
626  struct pgm_sk_buff_t* skb = pgm_msgv [0].msgv_skb [0];
627 
628  // Save lost data TSI.
629  *tsi_ = &skb->tsi;
630  nbytes_rec = 0;
631 
632  // In case of dala loss -1 is returned.
633  errno = EINVAL;
634  pgm_free_skb (skb);
635  return -1;
636  }
637 
638  zmq_assert (status == PGM_IO_STATUS_NORMAL);
639  }
640  else
641  {
642  zmq_assert (pgm_msgv_processed <= pgm_msgv_len);
643  }
644 
645  // Zero byte payloads are valid in PGM, but not 0MQ protocol.
646  zmq_assert (nbytes_rec > 0);
647 
648  // Only one APDU per pgm_msgv_t structure is allowed.
649  zmq_assert (pgm_msgv [pgm_msgv_processed].msgv_len == 1);
650 
651  struct pgm_sk_buff_t* skb =
652  pgm_msgv [pgm_msgv_processed].msgv_skb [0];
653 
654  // Take pointers from pgm_msgv_t structure.
655  *raw_data_ = skb->data;
656  raw_data_len = skb->len;
657 
658  // Save current TSI.
659  *tsi_ = &skb->tsi;
660 
661  // Move the the next pgm_msgv_t structure.
662  pgm_msgv_processed++;
663  zmq_assert (pgm_msgv_processed <= pgm_msgv_len);
664  nbytes_processed +=raw_data_len;
665 
666  return raw_data_len;
667 }
668 
669 void zmq::pgm_socket_t::process_upstream ()
670 {
671  pgm_msgv_t dummy_msg;
672 
673  size_t dummy_bytes = 0;
674  pgm_error_t *pgm_error = NULL;
675 
676  const int status = pgm_recvmsgv (sock, &dummy_msg,
677  1, MSG_ERRQUEUE, &dummy_bytes, &pgm_error);
678 
679  // Invalid parameters.
680  zmq_assert (status != PGM_IO_STATUS_ERROR);
681 
682  // No data should be returned.
683  zmq_assert (dummy_bytes == 0 && (status == PGM_IO_STATUS_TIMER_PENDING ||
684  status == PGM_IO_STATUS_RATE_LIMITED ||
685  status == PGM_IO_STATUS_WOULD_BLOCK));
686 
687  last_rx_status = status;
688 
689  if (status == PGM_IO_STATUS_TIMER_PENDING)
690  errno = EBUSY;
691  else
692  if (status == PGM_IO_STATUS_RATE_LIMITED)
693  errno = ENOMEM;
694  else
695  errno = EAGAIN;
696 }
697 
698 int zmq::pgm_socket_t::compute_sqns (int tpdu_)
699 {
700  // Convert rate into B/ms.
701  uint64_t rate = uint64_t (options.rate) / 8;
702 
703  // Compute the size of the buffer in bytes.
704  uint64_t size = uint64_t (options.recovery_ivl) * rate;
705 
706  // Translate the size into number of packets.
707  uint64_t sqns = size / tpdu_;
708 
709  // Buffer should be able to hold at least one packet.
710  if (sqns == 0)
711  sqns = 1;
712 
713  return (int) sqns;
714 }
715 
716 #endif
717 
#define size
int multicast_hops
Definition: options.hpp:83
int fd_t
Definition: fd.hpp:50
#define zmq_assert(x)
Definition: err.hpp:119
int recovery_ivl
Definition: options.hpp:80
uint32_t generate_random()
Definition: random.cpp:54
int multicast_maxtpdu
Definition: options.hpp:87
int receive(void *socket)
static void receiver(void *socket)
#define alloc_assert(x)
Definition: err.hpp:159
options_t options
Definition: own.hpp:109