libzmq  master
ZeroMQ C++ Core Engine (LIBZMQ)
zmq.cpp
Go to the documentation of this file.
1 /*
2  Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 
4  This file is part of libzmq, the ZeroMQ core engine in C++.
5 
6  libzmq is free software; you can redistribute it and/or modify it under
7  the terms of the GNU Lesser General Public License (LGPL) as published
8  by the Free Software Foundation; either version 3 of the License, or
9  (at your option) any later version.
10 
11  As a special exception, the Contributors give you permission to link
12  this library with independent modules to produce an executable,
13  regardless of the license terms of these independent modules, and to
14  copy and distribute the resulting executable under terms of your choice,
15  provided that you also meet, for each linked independent module, the
16  terms and conditions of the license of that module. An independent
17  module is a module which is not derived from or based on this library.
18  If you modify this library, you must extend this exception to your
19  version of the library.
20 
21  libzmq is distributed in the hope that it will be useful, but WITHOUT
22  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23  FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24  License for more details.
25 
26  You should have received a copy of the GNU Lesser General Public License
27  along with this program. If not, see <http://www.gnu.org/licenses/>.
28 */
29 #include "precompiled.hpp"
30 #define ZMQ_TYPE_UNSAFE
31 
32 #include "macros.hpp"
33 #include "poller.hpp"
34 
35 // On AIX platform, poll.h has to be included first to get consistent
36 // definition of pollfd structure (AIX uses 'reqevents' and 'retnevents'
37 // instead of 'events' and 'revents' and defines macros to map from POSIX-y
38 // names to AIX-specific names).
39 #if defined ZMQ_POLL_BASED_ON_POLL
40 #include <poll.h>
41 #endif
42 
43 // zmq.h must be included *after* poll.h for AIX to build properly
44 #include "../include/zmq.h"
45 
46 #if defined ZMQ_HAVE_WINDOWS
47 #include "windows.hpp"
48 #else
49 #include <unistd.h>
50 #endif
51 
52 
53 // XSI vector I/O
54 #if defined ZMQ_HAVE_UIO
55 #include <sys/uio.h>
56 #else
57 struct iovec {
58  void *iov_base;
59  size_t iov_len;
60 };
61 #endif
62 
63 
64 #include <string.h>
65 #include <stdlib.h>
66 #include <new>
67 #include <climits>
68 
69 #include "proxy.hpp"
70 #include "socket_base.hpp"
71 #include "stdint.hpp"
72 #include "config.hpp"
73 #include "likely.hpp"
74 #include "clock.hpp"
75 #include "ctx.hpp"
76 #include "err.hpp"
77 #include "msg.hpp"
78 #include "fd.hpp"
79 #include "metadata.hpp"
80 #include "signaler.hpp"
81 #include "socket_poller.hpp"
82 #include "timers.hpp"
83 
84 #if defined ZMQ_HAVE_OPENPGM
85 #define __PGM_WININT_H__
86 #include <pgm/pgm.h>
87 #endif
88 
89 // Compile time check whether msg_t fits into zmq_msg_t.
90 typedef char check_msg_t_size
91  [sizeof (zmq::msg_t) == sizeof (zmq_msg_t) ? 1 : -1];
92 
93 
94 void zmq_version (int *major_, int *minor_, int *patch_)
95 {
96  *major_ = ZMQ_VERSION_MAJOR;
97  *minor_ = ZMQ_VERSION_MINOR;
98  *patch_ = ZMQ_VERSION_PATCH;
99 }
100 
101 
102 const char *zmq_strerror (int errnum_)
103 {
104  return zmq::errno_to_string (errnum_);
105 }
106 
107 int zmq_errno (void)
108 {
109  return errno;
110 }
111 
112 
113 // New context API
114 
115 void *zmq_ctx_new (void)
116 {
117 #if defined ZMQ_HAVE_OPENPGM
118 
119  // Init PGM transport. Ensure threading and timer are enabled. Find PGM
120  // protocol ID. Note that if you want to use gettimeofday and sleep for
121  // openPGM timing, set environment variables PGM_TIMER to "GTOD" and
122  // PGM_SLEEP to "USLEEP".
123  pgm_error_t *pgm_error = NULL;
124  const bool ok = pgm_init (&pgm_error);
125  if (ok != TRUE) {
126 
127  // Invalid parameters don't set pgm_error_t
128  zmq_assert (pgm_error != NULL);
129  if (pgm_error->domain == PGM_ERROR_DOMAIN_TIME && (
130  pgm_error->code == PGM_ERROR_FAILED)) {
131 
132  // Failed to access RTC or HPET device.
133  pgm_error_free (pgm_error);
134  errno = EINVAL;
135  return NULL;
136  }
137 
138  // PGM_ERROR_DOMAIN_ENGINE: WSAStartup errors or missing WSARecvMsg.
139  zmq_assert (false);
140  }
141 #endif
142 
143 #ifdef ZMQ_HAVE_WINDOWS
144  // Intialise Windows sockets. Note that WSAStartup can be called multiple
145  // times given that WSACleanup will be called for each WSAStartup.
146  // We do this before the ctx constructor since its embedded mailbox_t
147  // object needs Winsock to be up and running.
148  WORD version_requested = MAKEWORD (2, 2);
149  WSADATA wsa_data;
150  int rc = WSAStartup (version_requested, &wsa_data);
151  zmq_assert (rc == 0);
152  zmq_assert (LOBYTE (wsa_data.wVersion) == 2 &&
153  HIBYTE (wsa_data.wVersion) == 2);
154 #endif
155 
156  // Create 0MQ context.
157  zmq::ctx_t *ctx = new (std::nothrow) zmq::ctx_t;
158  alloc_assert (ctx);
159  return ctx;
160 }
161 
162 int zmq_ctx_term (void *ctx_)
163 {
164  if (!ctx_ || !((zmq::ctx_t *) ctx_)->check_tag ()) {
165  errno = EFAULT;
166  return -1;
167  }
168 
169  int rc = ((zmq::ctx_t *) ctx_)->terminate ();
170  int en = errno;
171 
172  // Shut down only if termination was not interrupted by a signal.
173  if (!rc || en != EINTR) {
174 #ifdef ZMQ_HAVE_WINDOWS
175  // On Windows, uninitialise socket layer.
176  rc = WSACleanup ();
177  wsa_assert (rc != SOCKET_ERROR);
178 #endif
179 
180 #if defined ZMQ_HAVE_OPENPGM
181  // Shut down the OpenPGM library.
182  if (pgm_shutdown () != TRUE)
183  zmq_assert (false);
184 #endif
185  }
186 
187  errno = en;
188  return rc;
189 }
190 
191 int zmq_ctx_shutdown (void *ctx_)
192 {
193  if (!ctx_ || !((zmq::ctx_t *) ctx_)->check_tag ()) {
194  errno = EFAULT;
195  return -1;
196  }
197  return ((zmq::ctx_t *) ctx_)->shutdown ();
198 }
199 
200 int zmq_ctx_set (void *ctx_, int option_, int optval_)
201 {
202  if (!ctx_ || !((zmq::ctx_t *) ctx_)->check_tag ()) {
203  errno = EFAULT;
204  return -1;
205  }
206  return ((zmq::ctx_t *) ctx_)->set (option_, optval_);
207 }
208 
209 int zmq_ctx_get (void *ctx_, int option_)
210 {
211  if (!ctx_ || !((zmq::ctx_t *) ctx_)->check_tag ()) {
212  errno = EFAULT;
213  return -1;
214  }
215  return ((zmq::ctx_t *) ctx_)->get (option_);
216 }
217 
218 // Stable/legacy context API
219 
220 void *zmq_init (int io_threads_)
221 {
222  if (io_threads_ >= 0) {
223  void *ctx = zmq_ctx_new ();
224  zmq_ctx_set (ctx, ZMQ_IO_THREADS, io_threads_);
225  return ctx;
226  }
227  errno = EINVAL;
228  return NULL;
229 }
230 
231 int zmq_term (void *ctx_)
232 {
233  return zmq_ctx_term (ctx_);
234 }
235 
236 int zmq_ctx_destroy (void *ctx_)
237 {
238  return zmq_ctx_term (ctx_);
239 }
240 
241 
242 // Sockets
243 
244 void *zmq_socket (void *ctx_, int type_)
245 {
246  if (!ctx_ || !((zmq::ctx_t *) ctx_)->check_tag ()) {
247  errno = EFAULT;
248  return NULL;
249  }
250  zmq::ctx_t *ctx = (zmq::ctx_t *) ctx_;
251  zmq::socket_base_t *s = ctx->create_socket (type_);
252  return (void *) s;
253 }
254 
255 int zmq_close (void *s_)
256 {
257  if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
258  errno = ENOTSOCK;
259  return -1;
260  }
261  ((zmq::socket_base_t*) s_)->close ();
262  return 0;
263 }
264 
265 int zmq_setsockopt (void *s_, int option_, const void *optval_,
266  size_t optvallen_)
267 {
268  if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
269  errno = ENOTSOCK;
270  return -1;
271  }
273  int result = s->setsockopt (option_, optval_, optvallen_);
274  return result;
275 }
276 
277 int zmq_getsockopt (void *s_, int option_, void *optval_, size_t *optvallen_)
278 {
279  if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
280  errno = ENOTSOCK;
281  return -1;
282  }
284  int result = s->getsockopt (option_, optval_, optvallen_);
285  return result;
286 }
287 
288 int zmq_socket_monitor (void *s_, const char *addr_, int events_)
289 {
290  if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
291  errno = ENOTSOCK;
292  return -1;
293  }
295  int result = s->monitor (addr_, events_);
296  return result;
297 }
298 
299 int zmq_join (void *s_, const char* group_)
300 {
301  if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
302  errno = ENOTSOCK;
303  return -1;
304  }
306  int result = s->join (group_);
307  return result;
308 }
309 
310 int zmq_leave (void *s_, const char* group_)
311 {
312  if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
313  errno = ENOTSOCK;
314  return -1;
315  }
317  int result = s->leave (group_);
318  return result;
319 }
320 
321 int zmq_bind (void *s_, const char *addr_)
322 {
323  if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
324  errno = ENOTSOCK;
325  return -1;
326  }
328  int result = s->bind (addr_);
329  return result;
330 }
331 
332 int zmq_connect (void *s_, const char *addr_)
333 {
334  if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
335  errno = ENOTSOCK;
336  return -1;
337  }
339  int result = s->connect (addr_);
340  return result;
341 }
342 
343 int zmq_unbind (void *s_, const char *addr_)
344 {
345  if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
346  errno = ENOTSOCK;
347  return -1;
348  }
350  return s->term_endpoint (addr_);
351 }
352 
353 int zmq_disconnect (void *s_, const char *addr_)
354 {
355  if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
356  errno = ENOTSOCK;
357  return -1;
358  }
360  return s->term_endpoint (addr_);
361 }
362 
363 // Sending functions.
364 
365 static inline int
366 s_sendmsg (zmq::socket_base_t *s_, zmq_msg_t *msg_, int flags_)
367 {
368  size_t sz = zmq_msg_size (msg_);
369  int rc = s_->send ((zmq::msg_t *) msg_, flags_);
370  if (unlikely (rc < 0))
371  return -1;
372 
373  // This is what I'd like to do, my C++ fu is too weak -- PH 2016/02/09
374  // int max_msgsz = s_->parent->get (ZMQ_MAX_MSGSZ);
375  size_t max_msgsz = INT_MAX;
376 
377  // Truncate returned size to INT_MAX to avoid overflow to negative values
378  return (int) (sz < max_msgsz? sz: max_msgsz);
379 }
380 
381 /* To be deprecated once zmq_msg_send() is stable */
382 int zmq_sendmsg (void *s_, zmq_msg_t *msg_, int flags_)
383 {
384  return zmq_msg_send (msg_, s_, flags_);
385 }
386 
387 int zmq_send (void *s_, const void *buf_, size_t len_, int flags_)
388 {
389  if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
390  errno = ENOTSOCK;
391  return -1;
392  }
393  zmq_msg_t msg;
394  if (zmq_msg_init_size (&msg, len_))
395  return -1;
396 
397  // We explicitly allow a send from NULL, size zero
398  if (len_) {
399  assert (buf_);
400  memcpy (zmq_msg_data (&msg), buf_, len_);
401  }
403  int rc = s_sendmsg (s, &msg, flags_);
404  if (unlikely (rc < 0)) {
405  int err = errno;
406  int rc2 = zmq_msg_close (&msg);
407  errno_assert (rc2 == 0);
408  errno = err;
409  return -1;
410  }
411  // Note the optimisation here. We don't close the msg object as it is
412  // empty anyway. This may change when implementation of zmq_msg_t changes.
413  return rc;
414 }
415 
416 int zmq_send_const (void *s_, const void *buf_, size_t len_, int flags_)
417 {
418  if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
419  errno = ENOTSOCK;
420  return -1;
421  }
422  zmq_msg_t msg;
423  int rc = zmq_msg_init_data (&msg, (void *)buf_, len_, NULL, NULL);
424  if (rc != 0)
425  return -1;
426 
428  rc = s_sendmsg (s, &msg, flags_);
429  if (unlikely (rc < 0)) {
430  int err = errno;
431  int rc2 = zmq_msg_close (&msg);
432  errno_assert (rc2 == 0);
433  errno = err;
434  return -1;
435  }
436  // Note the optimisation here. We don't close the msg object as it is
437  // empty anyway. This may change when implementation of zmq_msg_t changes.
438  return rc;
439 }
440 
441 
442 // Send multiple messages.
443 // TODO: this function has no man page
444 //
445 // If flag bit ZMQ_SNDMORE is set the vector is treated as
446 // a single multi-part message, i.e. the last message has
447 // ZMQ_SNDMORE bit switched off.
448 //
449 int zmq_sendiov (void *s_, iovec *a_, size_t count_, int flags_)
450 {
451  if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
452  errno = ENOTSOCK;
453  return -1;
454  }
455  if (unlikely (count_ <= 0 || !a_)) {
456  errno = EINVAL;
457  return -1;
458  }
459 
460  int rc = 0;
461  zmq_msg_t msg;
463 
464  for (size_t i = 0; i < count_; ++i) {
465  rc = zmq_msg_init_size (&msg, a_[i].iov_len);
466  if (rc != 0) {
467  rc = -1;
468  break;
469  }
470  memcpy (zmq_msg_data (&msg), a_[i].iov_base, a_[i].iov_len);
471  if (i == count_ - 1)
472  flags_ = flags_ & ~ZMQ_SNDMORE;
473  rc = s_sendmsg (s, &msg, flags_);
474  if (unlikely (rc < 0)) {
475  int err = errno;
476  int rc2 = zmq_msg_close (&msg);
477  errno_assert (rc2 == 0);
478  errno = err;
479  rc = -1;
480  break;
481  }
482  }
483  return rc;
484 }
485 
486 // Receiving functions.
487 
488 static int
489 s_recvmsg (zmq::socket_base_t *s_, zmq_msg_t *msg_, int flags_)
490 {
491  int rc = s_->recv ((zmq::msg_t *) msg_, flags_);
492  if (unlikely (rc < 0))
493  return -1;
494 
495  // Truncate returned size to INT_MAX to avoid overflow to negative values
496  size_t sz = zmq_msg_size (msg_);
497  return (int) (sz < INT_MAX? sz: INT_MAX);
498 }
499 
500 /* To be deprecated once zmq_msg_recv() is stable */
501 int zmq_recvmsg (void *s_, zmq_msg_t *msg_, int flags_)
502 {
503  return zmq_msg_recv (msg_, s_, flags_);
504 }
505 
506 
507 int zmq_recv (void *s_, void *buf_, size_t len_, int flags_)
508 {
509  if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
510  errno = ENOTSOCK;
511  return -1;
512  }
513  zmq_msg_t msg;
514  int rc = zmq_msg_init (&msg);
515  errno_assert (rc == 0);
516 
518  int nbytes = s_recvmsg (s, &msg, flags_);
519  if (unlikely (nbytes < 0)) {
520  int err = errno;
521  rc = zmq_msg_close (&msg);
522  errno_assert (rc == 0);
523  errno = err;
524  return -1;
525  }
526 
527  // An oversized message is silently truncated.
528  size_t to_copy = size_t (nbytes) < len_ ? size_t (nbytes) : len_;
529 
530  // We explicitly allow a null buffer argument if len is zero
531  if (to_copy) {
532  assert (buf_);
533  memcpy (buf_, zmq_msg_data (&msg), to_copy);
534  }
535  rc = zmq_msg_close (&msg);
536  errno_assert (rc == 0);
537 
538  return nbytes;
539 }
540 
541 // Receive a multi-part message
542 //
543 // Receives up to *count_ parts of a multi-part message.
544 // Sets *count_ to the actual number of parts read.
545 // ZMQ_RCVMORE is set to indicate if a complete multi-part message was read.
546 // Returns number of message parts read, or -1 on error.
547 //
548 // Note: even if -1 is returned, some parts of the message
549 // may have been read. Therefore the client must consult
550 // *count_ to retrieve message parts successfully read,
551 // even if -1 is returned.
552 //
553 // The iov_base* buffers of each iovec *a_ filled in by this
554 // function may be freed using free().
555 // TODO: this function has no man page
556 //
557 int zmq_recviov (void *s_, iovec *a_, size_t *count_, int flags_)
558 {
559  if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
560  errno = ENOTSOCK;
561  return -1;
562  }
563  if (unlikely (!count_ || *count_ <= 0 || !a_)) {
564  errno = EINVAL;
565  return -1;
566  }
567 
569 
570  size_t count = *count_;
571  int nread = 0;
572  bool recvmore = true;
573 
574  *count_ = 0;
575 
576  for (size_t i = 0; recvmore && i < count; ++i) {
577 
578  zmq_msg_t msg;
579  int rc = zmq_msg_init (&msg);
580  errno_assert (rc == 0);
581 
582  int nbytes = s_recvmsg (s, &msg, flags_);
583  if (unlikely (nbytes < 0)) {
584  int err = errno;
585  rc = zmq_msg_close (&msg);
586  errno_assert (rc == 0);
587  errno = err;
588  nread = -1;
589  break;
590  }
591 
592  a_[i].iov_len = zmq_msg_size (&msg);
593  a_[i].iov_base = static_cast<char *> (malloc(a_[i].iov_len));
594  if (unlikely (!a_[i].iov_base)) {
595  errno = ENOMEM;
596  return -1;
597  }
598  memcpy(a_[i].iov_base,static_cast<char *> (zmq_msg_data (&msg)),
599  a_[i].iov_len);
600  // Assume zmq_socket ZMQ_RVCMORE is properly set.
601  zmq::msg_t* p_msg = reinterpret_cast<zmq::msg_t*>(&msg);
602  recvmore = p_msg->flags() & zmq::msg_t::more;
603  rc = zmq_msg_close(&msg);
604  errno_assert (rc == 0);
605  ++*count_;
606  ++nread;
607  }
608  return nread;
609 }
610 
611 // Message manipulators.
612 
614 {
615  return ((zmq::msg_t*) msg_)->init ();
616 }
617 
618 int zmq_msg_init_size (zmq_msg_t *msg_, size_t size_)
619 {
620  return ((zmq::msg_t*) msg_)->init_size (size_);
621 }
622 
623 int zmq_msg_init_data (zmq_msg_t *msg_, void *data_, size_t size_,
624  zmq_free_fn *ffn_, void *hint_)
625 {
626  return ((zmq::msg_t*) msg_)->init_data (data_, size_, ffn_, hint_);
627 }
628 
629 int zmq_msg_send (zmq_msg_t *msg_, void *s_, int flags_)
630 {
631  if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
632  errno = ENOTSOCK;
633  return -1;
634  }
636  int result = s_sendmsg (s, msg_, flags_);
637  return result;
638 }
639 
640 int zmq_msg_recv (zmq_msg_t *msg_, void *s_, int flags_)
641 {
642  if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
643  errno = ENOTSOCK;
644  return -1;
645  }
647  int result = s_recvmsg (s, msg_, flags_);
648  return result;
649 }
650 
652 {
653  return ((zmq::msg_t*) msg_)->close ();
654 }
655 
656 int zmq_msg_move (zmq_msg_t *dest_, zmq_msg_t *src_)
657 {
658  return ((zmq::msg_t*) dest_)->move (*(zmq::msg_t*) src_);
659 }
660 
661 int zmq_msg_copy (zmq_msg_t *dest_, zmq_msg_t *src_)
662 {
663  return ((zmq::msg_t*) dest_)->copy (*(zmq::msg_t*) src_);
664 }
665 
666 void *zmq_msg_data (zmq_msg_t *msg_)
667 {
668  return ((zmq::msg_t*) msg_)->data ();
669 }
670 
671 size_t zmq_msg_size (zmq_msg_t *msg_)
672 {
673  return ((zmq::msg_t*) msg_)->size ();
674 }
675 
677 {
678  return zmq_msg_get (msg_, ZMQ_MORE);
679 }
680 
681 int zmq_msg_get (zmq_msg_t *msg_, int property_)
682 {
683  const char* fd_string;
684 
685  switch (property_) {
686  case ZMQ_MORE:
687  return (((zmq::msg_t*) msg_)->flags () & zmq::msg_t::more)? 1: 0;
688  case ZMQ_SRCFD:
689  fd_string = zmq_msg_gets(msg_, "__fd");
690  if (fd_string == NULL)
691  return (int)-1;
692 
693  return atoi(fd_string);
694  case ZMQ_SHARED:
695  return (((zmq::msg_t*) msg_)->is_cmsg ()) ||
696  (((zmq::msg_t*) msg_)->flags () & zmq::msg_t::shared)? 1: 0;
697  default:
698  errno = EINVAL;
699  return -1;
700  }
701 }
702 
703 int zmq_msg_set (zmq_msg_t *, int, int)
704 {
705  // No properties supported at present
706  errno = EINVAL;
707  return -1;
708 }
709 
710 int zmq_msg_set_routing_id (zmq_msg_t *msg_, uint32_t routing_id_)
711 {
712  return ((zmq::msg_t *) msg_)->set_routing_id (routing_id_);
713 }
714 
716 {
717  return ((zmq::msg_t *) msg_)->get_routing_id ();
718 }
719 
720 int zmq_msg_set_group (zmq_msg_t *msg_, const char *group_)
721 {
722  return ((zmq::msg_t *) msg_)->set_group (group_);
723 }
724 
725 const char *zmq_msg_group (zmq_msg_t *msg_)
726 {
727  return ((zmq::msg_t *) msg_)->group ();
728 }
729 
730 // Get message metadata string
731 
732 const char *zmq_msg_gets (zmq_msg_t *msg_, const char *property_)
733 {
734  zmq::metadata_t *metadata = ((zmq::msg_t *) msg_)->metadata ();
735  const char *value = NULL;
736  if (metadata)
737  value = metadata->get (std::string (property_));
738  if (value)
739  return value;
740  else {
741  errno = EINVAL;
742  return NULL;
743  }
744 }
745 
746 // Polling.
747 
748 int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
749 {
750  // TODO: the function implementation can just call zmq_pollfd_poll with
751  // pollfd as NULL, however pollfd is not yet stable.
752 #if defined ZMQ_POLL_BASED_ON_POLL
753  if (unlikely (nitems_ < 0)) {
754  errno = EINVAL;
755  return -1;
756  }
757  if (unlikely (nitems_ == 0)) {
758  if (timeout_ == 0)
759  return 0;
760 #if defined ZMQ_HAVE_WINDOWS
761  Sleep (timeout_ > 0 ? timeout_ : INFINITE);
762  return 0;
763 #elif defined ZMQ_HAVE_ANDROID
764  usleep (timeout_ * 1000);
765  return 0;
766 #else
767  return usleep (timeout_ * 1000);
768 #endif
769  }
770 
771  if (!items_) {
772  errno = EFAULT;
773  return -1;
774  }
775 
776  zmq::clock_t clock;
777  uint64_t now = 0;
778  uint64_t end = 0;
779  pollfd spollfds[ZMQ_POLLITEMS_DFLT];
780  pollfd *pollfds = spollfds;
781 
782  if (nitems_ > ZMQ_POLLITEMS_DFLT) {
783  pollfds = (pollfd*) malloc (nitems_ * sizeof (pollfd));
784  alloc_assert (pollfds);
785  }
786 
787  // Build pollset for poll () system call.
788  for (int i = 0; i != nitems_; i++) {
789 
790  // If the poll item is a 0MQ socket, we poll on the file descriptor
791  // retrieved by the ZMQ_FD socket option.
792  if (items_ [i].socket) {
793  size_t zmq_fd_size = sizeof (zmq::fd_t);
794  if (zmq_getsockopt (items_ [i].socket, ZMQ_FD, &pollfds [i].fd,
795  &zmq_fd_size) == -1) {
796  if (pollfds != spollfds)
797  free (pollfds);
798  return -1;
799  }
800  pollfds [i].events = items_ [i].events ? POLLIN : 0;
801  }
802  // Else, the poll item is a raw file descriptor. Just convert the
803  // events to normal POLLIN/POLLOUT for poll ().
804  else {
805  pollfds [i].fd = items_ [i].fd;
806  pollfds [i].events =
807  (items_ [i].events & ZMQ_POLLIN ? POLLIN : 0) |
808  (items_ [i].events & ZMQ_POLLOUT ? POLLOUT : 0) |
809  (items_ [i].events & ZMQ_POLLPRI ? POLLPRI : 0);
810  }
811  }
812 
813  bool first_pass = true;
814  int nevents = 0;
815 
816  while (true) {
817  // Compute the timeout for the subsequent poll.
818  int timeout;
819  if (first_pass)
820  timeout = 0;
821  else
822  if (timeout_ < 0)
823  timeout = -1;
824  else
825  timeout = end - now;
826 
827  // Wait for events.
828  while (true) {
829  int rc = poll (pollfds, nitems_, timeout);
830  if (rc == -1 && errno == EINTR) {
831  if (pollfds != spollfds)
832  free (pollfds);
833  return -1;
834  }
835  errno_assert (rc >= 0);
836  break;
837  }
838  // Check for the events.
839  for (int i = 0; i != nitems_; i++) {
840 
841  items_ [i].revents = 0;
842 
843  // The poll item is a 0MQ socket. Retrieve pending events
844  // using the ZMQ_EVENTS socket option.
845  if (items_ [i].socket) {
846  size_t zmq_events_size = sizeof (uint32_t);
847  uint32_t zmq_events;
848  if (zmq_getsockopt (items_ [i].socket, ZMQ_EVENTS, &zmq_events,
849  &zmq_events_size) == -1) {
850  if (pollfds != spollfds)
851  free (pollfds);
852  return -1;
853  }
854  if ((items_ [i].events & ZMQ_POLLOUT) &&
855  (zmq_events & ZMQ_POLLOUT))
856  items_ [i].revents |= ZMQ_POLLOUT;
857  if ((items_ [i].events & ZMQ_POLLIN) &&
858  (zmq_events & ZMQ_POLLIN))
859  items_ [i].revents |= ZMQ_POLLIN;
860  }
861  // Else, the poll item is a raw file descriptor, simply convert
862  // the events to zmq_pollitem_t-style format.
863  else {
864  if (pollfds [i].revents & POLLIN)
865  items_ [i].revents |= ZMQ_POLLIN;
866  if (pollfds [i].revents & POLLOUT)
867  items_ [i].revents |= ZMQ_POLLOUT;
868  if (pollfds [i].revents & POLLPRI)
869  items_ [i].revents |= ZMQ_POLLPRI;
870  if (pollfds [i].revents & ~(POLLIN | POLLOUT | POLLPRI))
871  items_ [i].revents |= ZMQ_POLLERR;
872  }
873 
874  if (items_ [i].revents)
875  nevents++;
876  }
877 
878  // If timeout is zero, exit immediately whether there are events or not.
879  if (timeout_ == 0)
880  break;
881 
882  // If there are events to return, we can exit immediately.
883  if (nevents)
884  break;
885 
886  // At this point we are meant to wait for events but there are none.
887  // If timeout is infinite we can just loop until we get some events.
888  if (timeout_ < 0) {
889  if (first_pass)
890  first_pass = false;
891  continue;
892  }
893 
894  // The timeout is finite and there are no events. In the first pass
895  // we get a timestamp of when the polling have begun. (We assume that
896  // first pass have taken negligible time). We also compute the time
897  // when the polling should time out.
898  if (first_pass) {
899  now = clock.now_ms ();
900  end = now + timeout_;
901  if (now == end)
902  break;
903  first_pass = false;
904  continue;
905  }
906 
907  // Find out whether timeout have expired.
908  now = clock.now_ms ();
909  if (now >= end)
910  break;
911  }
912 
913  if (pollfds != spollfds)
914  free (pollfds);
915  return nevents;
916 
917 #elif defined ZMQ_POLL_BASED_ON_SELECT
918 
919  if (unlikely (nitems_ < 0)) {
920  errno = EINVAL;
921  return -1;
922  }
923  if (unlikely (nitems_ == 0)) {
924  if (timeout_ == 0)
925  return 0;
926 #if defined ZMQ_HAVE_WINDOWS
927  Sleep (timeout_ > 0 ? timeout_ : INFINITE);
928  return 0;
929 #else
930  return usleep (timeout_ * 1000);
931 #endif
932  }
933  zmq::clock_t clock;
934  uint64_t now = 0;
935  uint64_t end = 0;
936 
937  // Ensure we do not attempt to select () on more than FD_SETSIZE
938  // file descriptors.
939  zmq_assert (nitems_ <= FD_SETSIZE);
940 
941  fd_set pollset_in = { 0 };
942  fd_set pollset_out = { 0 };
943  fd_set pollset_err = { 0 };
944 
945  zmq::fd_t maxfd = 0;
946 
947  // Build the fd_sets for passing to select ().
948  for (int i = 0; i != nitems_; i++) {
949 
950  // If the poll item is a 0MQ socket we are interested in input on the
951  // notification file descriptor retrieved by the ZMQ_FD socket option.
952  if (items_ [i].socket) {
953  size_t zmq_fd_size = sizeof (zmq::fd_t);
954  zmq::fd_t notify_fd;
955  if (zmq_getsockopt (items_ [i].socket, ZMQ_FD, &notify_fd,
956  &zmq_fd_size) == -1)
957  return -1;
958  if (items_ [i].events) {
959  FD_SET (notify_fd, &pollset_in);
960  if (maxfd < notify_fd)
961  maxfd = notify_fd;
962  }
963  }
964  // Else, the poll item is a raw file descriptor. Convert the poll item
965  // events to the appropriate fd_sets.
966  else {
967  if (items_ [i].events & ZMQ_POLLIN)
968  FD_SET (items_ [i].fd, &pollset_in);
969  if (items_ [i].events & ZMQ_POLLOUT)
970  FD_SET (items_ [i].fd, &pollset_out);
971  if (items_ [i].events & ZMQ_POLLERR)
972  FD_SET (items_ [i].fd, &pollset_err);
973  if (maxfd < items_ [i].fd)
974  maxfd = items_ [i].fd;
975  }
976  }
977 
978  bool first_pass = true;
979  int nevents = 0;
980  fd_set inset, outset, errset;
981 
982  while (true) {
983 
984  // Compute the timeout for the subsequent poll.
985  timeval timeout;
986  timeval *ptimeout;
987  if (first_pass) {
988  timeout.tv_sec = 0;
989  timeout.tv_usec = 0;
990  ptimeout = &timeout;
991  }
992  else
993  if (timeout_ < 0)
994  ptimeout = NULL;
995  else {
996  timeout.tv_sec = (long) ((end - now) / 1000);
997  timeout.tv_usec = (long) ((end - now) % 1000 * 1000);
998  ptimeout = &timeout;
999  }
1000 
1001  // Wait for events. Ignore interrupts if there's infinite timeout.
1002  while (true) {
1003  memcpy (&inset, &pollset_in, sizeof (fd_set));
1004  memcpy (&outset, &pollset_out, sizeof (fd_set));
1005  memcpy (&errset, &pollset_err, sizeof (fd_set));
1006 #if defined ZMQ_HAVE_WINDOWS
1007  int rc = select (0, &inset, &outset, &errset, ptimeout);
1008  if (unlikely (rc == SOCKET_ERROR)) {
1009  errno = zmq::wsa_error_to_errno (WSAGetLastError ());
1010  wsa_assert (errno == ENOTSOCK);
1011  return -1;
1012  }
1013 #else
1014  int rc = select (maxfd + 1, &inset, &outset, &errset, ptimeout);
1015  if (unlikely (rc == -1)) {
1016  errno_assert (errno == EINTR || errno == EBADF);
1017  return -1;
1018  }
1019 #endif
1020  break;
1021  }
1022 
1023  // Check for the events.
1024  for (int i = 0; i != nitems_; i++) {
1025 
1026  items_ [i].revents = 0;
1027 
1028  // The poll item is a 0MQ socket. Retrieve pending events
1029  // using the ZMQ_EVENTS socket option.
1030  if (items_ [i].socket) {
1031  size_t zmq_events_size = sizeof (uint32_t);
1032  uint32_t zmq_events;
1033  if (zmq_getsockopt (items_ [i].socket, ZMQ_EVENTS, &zmq_events,
1034  &zmq_events_size) == -1)
1035  return -1;
1036  if ((items_ [i].events & ZMQ_POLLOUT) &&
1037  (zmq_events & ZMQ_POLLOUT))
1038  items_ [i].revents |= ZMQ_POLLOUT;
1039  if ((items_ [i].events & ZMQ_POLLIN) &&
1040  (zmq_events & ZMQ_POLLIN))
1041  items_ [i].revents |= ZMQ_POLLIN;
1042  }
1043  // Else, the poll item is a raw file descriptor, simply convert
1044  // the events to zmq_pollitem_t-style format.
1045  else {
1046  if (FD_ISSET (items_ [i].fd, &inset))
1047  items_ [i].revents |= ZMQ_POLLIN;
1048  if (FD_ISSET (items_ [i].fd, &outset))
1049  items_ [i].revents |= ZMQ_POLLOUT;
1050  if (FD_ISSET (items_ [i].fd, &errset))
1051  items_ [i].revents |= ZMQ_POLLERR;
1052  }
1053 
1054  if (items_ [i].revents)
1055  nevents++;
1056  }
1057 
1058  // If timeout is zero, exit immediately whether there are events or not.
1059  if (timeout_ == 0)
1060  break;
1061 
1062  // If there are events to return, we can exit immediately.
1063  if (nevents)
1064  break;
1065 
1066  // At this point we are meant to wait for events but there are none.
1067  // If timeout is infinite we can just loop until we get some events.
1068  if (timeout_ < 0) {
1069  if (first_pass)
1070  first_pass = false;
1071  continue;
1072  }
1073 
1074  // The timeout is finite and there are no events. In the first pass
1075  // we get a timestamp of when the polling have begun. (We assume that
1076  // first pass have taken negligible time). We also compute the time
1077  // when the polling should time out.
1078  if (first_pass) {
1079  now = clock.now_ms ();
1080  end = now + timeout_;
1081  if (now == end)
1082  break;
1083  first_pass = false;
1084  continue;
1085  }
1086 
1087  // Find out whether timeout have expired.
1088  now = clock.now_ms ();
1089  if (now >= end)
1090  break;
1091  }
1092 
1093  return nevents;
1094 
1095 #else
1096  // Exotic platforms that support neither poll() nor select().
1097  errno = ENOTSUP;
1098  return -1;
1099 #endif
1100 }
1101 
1102 // The poller functionality
1103 
1104 void *zmq_poller_new (void)
1105 {
1106  zmq::socket_poller_t *poller = new (std::nothrow) zmq::socket_poller_t;
1107  alloc_assert (poller);
1108  return poller;
1109 }
1110 
1111 int zmq_poller_destroy (void **poller_p_)
1112 {
1113  void *poller = *poller_p_;
1114  if (!poller || !((zmq::socket_poller_t*) poller)->check_tag ()) {
1115  errno = EFAULT;
1116  return -1;
1117  }
1118 
1119  delete ((zmq::socket_poller_t*) poller);
1120  *poller_p_ = NULL;
1121  return 0;
1122 }
1123 
1124 int zmq_poller_add (void *poller_, void *s_, void *user_data_, short events_)
1125 {
1126  if (!poller_ || !((zmq::socket_poller_t*)poller_)->check_tag ()) {
1127  errno = EFAULT;
1128  return -1;
1129  }
1130 
1131  if (!s_ || !((zmq::socket_base_t*)s_)->check_tag ()) {
1132  errno = ENOTSOCK;
1133  return -1;
1134  }
1135  zmq::socket_base_t *socket = (zmq::socket_base_t*)s_;
1136 
1137  return ((zmq::socket_poller_t*)poller_)->add (socket, user_data_, events_);
1138 }
1139 
1140 #if defined _WIN32
1141 int zmq_poller_add_fd (void *poller_, SOCKET fd_, void *user_data_, short events_)
1142 #else
1143 int zmq_poller_add_fd (void *poller_, int fd_, void *user_data_, short events_)
1144 #endif
1145 {
1146  if (!poller_ || !((zmq::socket_poller_t*)poller_)->check_tag ()) {
1147  errno = EFAULT;
1148  return -1;
1149  }
1150 
1151  return ((zmq::socket_poller_t*)poller_)->add_fd (fd_, user_data_, events_);
1152 }
1153 
1154 
1155 int zmq_poller_modify (void *poller_, void *s_, short events_)
1156 {
1157  if (!poller_ || !((zmq::socket_poller_t*)poller_)->check_tag ()) {
1158  errno = EFAULT;
1159  return -1;
1160  }
1161 
1162  if (!s_ || !((zmq::socket_base_t*)s_)->check_tag ()) {
1163  errno = ENOTSOCK;
1164  return -1;
1165  }
1166  zmq::socket_base_t *socket = (zmq::socket_base_t*)s_;
1167 
1168  return ((zmq::socket_poller_t*)poller_)->modify (socket, events_);
1169 }
1170 
1171 
1172 #if defined _WIN32
1173 int zmq_poller_modify_fd (void *poller_, SOCKET fd_, short events_)
1174 #else
1175 int zmq_poller_modify_fd (void *poller_, int fd_, short events_)
1176 #endif
1177 {
1178  if (!poller_ || !((zmq::socket_poller_t*)poller_)->check_tag ()) {
1179  errno = EFAULT;
1180  return -1;
1181  }
1182 
1183  return ((zmq::socket_poller_t*)poller_)->modify_fd (fd_, events_);
1184 }
1185 
1186 
1187 int zmq_poller_remove (void *poller_, void *s_)
1188 {
1189  if (!poller_ || !((zmq::socket_poller_t*)poller_)->check_tag ()) {
1190  errno = EFAULT;
1191  return -1;
1192  }
1193 
1194  if (!s_ || !((zmq::socket_base_t*)s_)->check_tag ()) {
1195  errno = ENOTSOCK;
1196  return -1;
1197  }
1198  zmq::socket_base_t *socket = (zmq::socket_base_t*)s_;
1199 
1200  return ((zmq::socket_poller_t*)poller_)->remove (socket);
1201 }
1202 
1203 #if defined _WIN32
1204 int zmq_poller_remove_fd (void *poller_, SOCKET fd_)
1205 #else
1206 int zmq_poller_remove_fd (void *poller_, int fd_)
1207 #endif
1208 {
1209  if (!poller_ || !((zmq::socket_poller_t*)poller_)->check_tag ()) {
1210  errno = EFAULT;
1211  return -1;
1212  }
1213 
1214  return ((zmq::socket_poller_t*)poller_)->remove_fd (fd_);
1215 }
1216 
1217 
1218 int zmq_poller_wait (void *poller_, zmq_poller_event_t *event, long timeout_)
1219 {
1220  if (!poller_ || !((zmq::socket_poller_t*)poller_)->check_tag ()) {
1221  errno = EFAULT;
1222  return -1;
1223  }
1224 
1225  zmq_assert (event != NULL);
1226 
1228  memset (&e, 0, sizeof (e));
1229 
1230  int rc = ((zmq::socket_poller_t*)poller_)->wait (&e, timeout_);
1231 
1232  event->socket = e.socket;
1233  event->fd = e.fd;
1234  event->user_data = e.user_data;
1235  event->events = e.events;
1236 
1237  return rc;
1238 }
1239 
1240 // Timers
1241 
1242 void *zmq_timers_new (void)
1243 {
1244  zmq::timers_t *timers = new (std::nothrow) zmq::timers_t;
1245  alloc_assert (timers);
1246  return timers;
1247 }
1248 
1249 int zmq_timers_destroy (void **timers_p_)
1250 {
1251  void *timers = *timers_p_;
1252  if (!timers || !((zmq::timers_t *) timers)->check_tag ()) {
1253  errno = EFAULT;
1254  return -1;
1255  }
1256  delete ((zmq::timers_t *) timers);
1257  *timers_p_ = NULL;
1258  return 0;
1259 }
1260 
1261 int zmq_timers_add (void *timers_, size_t interval_, zmq_timer_fn handler_, void *arg_)
1262 {
1263  if (!timers_ || !((zmq::timers_t*)timers_)->check_tag ()) {
1264  errno = EFAULT;
1265  return -1;
1266  }
1267 
1268  return ((zmq::timers_t*)timers_)->add (interval_, handler_, arg_);
1269 }
1270 
1271 int zmq_timers_cancel (void *timers_, int timer_id_)
1272 {
1273  if (!timers_ || !((zmq::timers_t*)timers_)->check_tag ()) {
1274  errno = EFAULT;
1275  return -1;
1276  }
1277 
1278  return ((zmq::timers_t*)timers_)->cancel (timer_id_);
1279 }
1280 
1281 int zmq_timers_set_interval (void *timers_, int timer_id_, size_t interval_)
1282 {
1283  if (!timers_ || !((zmq::timers_t*)timers_)->check_tag ()) {
1284  errno = EFAULT;
1285  return -1;
1286  }
1287 
1288  return ((zmq::timers_t*)timers_)->set_interval (timer_id_, interval_);
1289 }
1290 
1291 int zmq_timers_reset (void *timers_, int timer_id_)
1292 {
1293  if (!timers_ || !((zmq::timers_t*)timers_)->check_tag ()) {
1294  errno = EFAULT;
1295  return -1;
1296  }
1297 
1298  return ((zmq::timers_t*)timers_)->reset (timer_id_);
1299 }
1300 
1301 long zmq_timers_timeout (void *timers_)
1302 {
1303  if (!timers_ || !((zmq::timers_t*)timers_)->check_tag ()) {
1304  errno = EFAULT;
1305  return -1;
1306  }
1307 
1308  return ((zmq::timers_t*)timers_)->timeout ();
1309 }
1310 
1311 int zmq_timers_execute (void *timers_)
1312 {
1313  if (!timers_ || !((zmq::timers_t*)timers_)->check_tag ()) {
1314  errno = EFAULT;
1315  return -1;
1316  }
1317 
1318  return ((zmq::timers_t*)timers_)->execute ();
1319 }
1320 
1321 // The proxy functionality
1322 
1323 int zmq_proxy (void *frontend_, void *backend_, void *capture_)
1324 {
1325  if (!frontend_ || !backend_) {
1326  errno = EFAULT;
1327  return -1;
1328  }
1329  return zmq::proxy (
1330  (zmq::socket_base_t*) frontend_,
1331  (zmq::socket_base_t*) backend_,
1332  (zmq::socket_base_t*) capture_);
1333 }
1334 
1335 int zmq_proxy_steerable (void *frontend_, void *backend_, void *capture_, void *control_)
1336 {
1337  if (!frontend_ || !backend_) {
1338  errno = EFAULT;
1339  return -1;
1340  }
1341  return zmq::proxy (
1342  (zmq::socket_base_t*) frontend_,
1343  (zmq::socket_base_t*) backend_,
1344  (zmq::socket_base_t*) capture_,
1345  (zmq::socket_base_t*) control_);
1346 }
1347 
1348 // The deprecated device functionality
1349 
1350 int zmq_device (int /* type */, void *frontend_, void *backend_)
1351 {
1352  return zmq::proxy (
1353  (zmq::socket_base_t*) frontend_,
1354  (zmq::socket_base_t*) backend_, NULL);
1355 }
1356 
1357 // Probe library capabilities; for now, reports on transport and security
1358 
1359 int zmq_has (const char *capability)
1360 {
1361 #if !defined (ZMQ_HAVE_WINDOWS) && !defined (ZMQ_HAVE_OPENVMS)
1362  if (strcmp (capability, "ipc") == 0)
1363  return true;
1364 #endif
1365 #if defined (ZMQ_HAVE_OPENPGM)
1366  if (strcmp (capability, "pgm") == 0)
1367  return true;
1368 #endif
1369 #if defined (ZMQ_HAVE_TIPC)
1370  if (strcmp (capability, "tipc") == 0)
1371  return true;
1372 #endif
1373 #if defined (ZMQ_HAVE_NORM)
1374  if (strcmp (capability, "norm") == 0)
1375  return true;
1376 #endif
1377 #if defined (ZMQ_HAVE_CURVE)
1378  if (strcmp (capability, "curve") == 0)
1379  return true;
1380 #endif
1381 #if defined (HAVE_LIBGSSAPI_KRB5)
1382  if (strcmp (capability, "gssapi") == 0)
1383  return true;
1384 #endif
1385 #if defined (ZMQ_HAVE_VMCI)
1386  if (strcmp (capability, "vmci") == 0)
1387  return true;
1388 #endif
1389  // Whatever the application asked for, we don't have
1390  return false;
1391 }
#define ZMQ_POLLITEMS_DFLT
Definition: zmq.h:427
size_t iov_len
Definition: zmq.cpp:59
int zmq_socket_monitor(void *s_, const char *addr_, int events_)
Definition: zmq.cpp:288
int zmq_msg_get(zmq_msg_t *msg_, int property_)
Definition: zmq.cpp:681
int zmq_msg_copy(zmq_msg_t *dest_, zmq_msg_t *src_)
Definition: zmq.cpp:661
void * zmq_timers_new(void)
Definition: zmq.cpp:1242
short events
Definition: zmq.h:423
int zmq_msg_set_routing_id(zmq_msg_t *msg_, uint32_t routing_id_)
Definition: zmq.cpp:710
int zmq_ctx_get(void *ctx_, int option_)
Definition: zmq.cpp:209
int fd_t
Definition: fd.hpp:50
int zmq_msg_init_data(zmq_msg_t *msg_, void *data_, size_t size_, zmq_free_fn *ffn_, void *hint_)
Definition: zmq.cpp:623
#define ZMQ_SNDMORE
Definition: zmq.h:346
int join(const char *group)
uint32_t zmq_msg_routing_id(zmq_msg_t *msg_)
Definition: zmq.cpp:715
#define ZMQ_VERSION_PATCH
Definition: zmq.h:44
int zmq_device(int, void *frontend_, void *backend_)
Definition: zmq.cpp:1350
#define zmq_assert(x)
Definition: err.hpp:119
int zmq_poller_modify(void *poller_, void *s_, short events_)
Definition: zmq.cpp:1155
uint64_t now_ms()
Definition: clock.cpp:182
#define ZMQ_SRCFD
Definition: zmq.h:369
int zmq_msg_init_size(zmq_msg_t *msg_, size_t size_)
Definition: zmq.cpp:618
#define ENOTSUP
Definition: zmq.h:112
int zmq_msg_recv(zmq_msg_t *msg_, void *s_, int flags_)
Definition: zmq.cpp:640
int setsockopt(int option_, const void *optval_, size_t optvallen_)
int zmq_ctx_shutdown(void *ctx_)
Definition: zmq.cpp:191
int zmq_msg_init(zmq_msg_t *msg_)
Definition: zmq.cpp:613
int zmq_poller_remove(void *poller_, void *s_)
Definition: zmq.cpp:1187
char check_msg_t_size[sizeof(zmq::msg_t)==sizeof(zmq_msg_t)?1:-1]
Definition: zmq.cpp:91
int zmq_setsockopt(void *s_, int option_, const void *optval_, size_t optvallen_)
Definition: zmq.cpp:265
int zmq_recviov(void *s_, iovec *a_, size_t *count_, int flags_)
Definition: zmq.cpp:557
int zmq_has(const char *capability)
Definition: zmq.cpp:1359
int zmq_msg_move(zmq_msg_t *dest_, zmq_msg_t *src_)
Definition: zmq.cpp:656
int bind(const char *addr_)
int zmq_ctx_set(void *ctx_, int option_, int optval_)
Definition: zmq.cpp:200
int zmq_timers_destroy(void **timers_p_)
Definition: zmq.cpp:1249
static int s_recvmsg(zmq::socket_base_t *s_, zmq_msg_t *msg_, int flags_)
Definition: zmq.cpp:489
int term_endpoint(const char *addr_)
#define ZMQ_VERSION_MINOR
Definition: zmq.h:43
int zmq_leave(void *s_, const char *group_)
Definition: zmq.cpp:310
zmq::socket_base_t * create_socket(int type_)
Definition: ctx.cpp:320
int zmq_poller_add_fd(void *poller_, int fd_, void *user_data_, short events_)
Definition: zmq.cpp:1143
#define ZMQ_POLLPRI
Definition: zmq.h:413
void * zmq_socket(void *ctx_, int type_)
Definition: zmq.cpp:244
void * zmq_ctx_new(void)
Definition: zmq.cpp:115
#define ZMQ_SHARED
Definition: zmq.h:342
int zmq_recvmsg(void *s_, zmq_msg_t *msg_, int flags_)
Definition: zmq.cpp:501
const char * zmq_strerror(int errnum_)
Definition: zmq.cpp:102
short revents
Definition: zmq.h:424
int zmq_unbind(void *s_, const char *addr_)
Definition: zmq.cpp:343
void * iov_base
Definition: zmq.cpp:58
#define ZMQ_FD
Definition: zmq.h:273
void * zmq_poller_new(void)
Definition: zmq.cpp:1104
const char * zmq_msg_group(zmq_msg_t *msg_)
Definition: zmq.cpp:725
int fd
Definition: zmq.h:421
const char * get(const std::string &property) const
Definition: metadata.cpp:39
int zmq_sendiov(void *s_, iovec *a_, size_t count_, int flags_)
Definition: zmq.cpp:449
int zmq_proxy(void *frontend_, void *backend_, void *capture_)
Definition: zmq.cpp:1323
int getsockopt(int option_, void *optval_, size_t *optvallen_)
int zmq_close(void *s_)
Definition: zmq.cpp:255
#define ENOTSOCK
Definition: zmq.h:136
int send(zmq::msg_t *msg_, int flags_)
int zmq_errno(void)
Definition: zmq.cpp:107
int zmq_msg_set(zmq_msg_t *, int, int)
Definition: zmq.cpp:703
int zmq_poll(zmq_pollitem_t *items_, int nitems_, long timeout_)
Definition: zmq.cpp:748
#define unlikely(x)
Definition: likely.hpp:38
int monitor(const char *endpoint_, int events_)
#define ZMQ_MORE
Definition: zmq.h:341
#define ZMQ_POLLERR
Definition: zmq.h:412
int zmq_poller_remove_fd(void *poller_, int fd_)
Definition: zmq.cpp:1206
int zmq_getsockopt(void *s_, int option_, void *optval_, size_t *optvallen_)
Definition: zmq.cpp:277
void * zmq_init(int io_threads_)
Definition: zmq.cpp:220
int zmq_msg_set_group(zmq_msg_t *msg_, const char *group_)
Definition: zmq.cpp:720
Definition: zmq.h:221
int proxy(class socket_base_t *frontend_, class socket_base_t *backend_, class socket_base_t *capture_, class socket_base_t *control_=NULL)
Definition: proxy.cpp:105
#define ZMQ_EVENTS
Definition: zmq.h:274
Definition: zmq.cpp:57
int zmq_timers_reset(void *timers_, int timer_id_)
Definition: zmq.cpp:1291
#define ZMQ_VERSION_MAJOR
Definition: zmq.h:42
void( zmq_free_fn)(void *data, void *hint)
Definition: zmq.h:223
int zmq_timers_execute(void *timers_)
Definition: zmq.cpp:1311
int zmq_sendmsg(void *s_, zmq_msg_t *msg_, int flags_)
Definition: zmq.cpp:382
size_t zmq_msg_size(zmq_msg_t *msg_)
Definition: zmq.cpp:671
#define ZMQ_POLLOUT
Definition: zmq.h:411
const char * zmq_msg_gets(zmq_msg_t *msg_, const char *property_)
Definition: zmq.cpp:732
#define alloc_assert(x)
Definition: err.hpp:159
#define errno_assert(x)
Definition: err.hpp:129
#define ZMQ_IO_THREADS
Definition: zmq.h:189
int zmq_recv(void *s_, void *buf_, size_t len_, int flags_)
Definition: zmq.cpp:507
int zmq_msg_send(zmq_msg_t *msg_, void *s_, int flags_)
Definition: zmq.cpp:629
int zmq_timers_cancel(void *timers_, int timer_id_)
Definition: zmq.cpp:1271
long zmq_timers_timeout(void *timers_)
Definition: zmq.cpp:1301
int zmq_msg_more(zmq_msg_t *msg_)
Definition: zmq.cpp:676
int zmq_join(void *s_, const char *group_)
Definition: zmq.cpp:299
int zmq_bind(void *s_, const char *addr_)
Definition: zmq.cpp:321
int zmq_send_const(void *s_, const void *buf_, size_t len_, int flags_)
Definition: zmq.cpp:416
int zmq_timers_set_interval(void *timers_, int timer_id_, size_t interval_)
Definition: zmq.cpp:1281
int leave(const char *group)
int zmq_ctx_destroy(void *ctx_)
Definition: zmq.cpp:236
int zmq_ctx_term(void *ctx_)
Definition: zmq.cpp:162
int zmq_poller_modify_fd(void *poller_, int fd_, short events_)
Definition: zmq.cpp:1175
#define ZMQ_POLLIN
Definition: zmq.h:410
int zmq_poller_add(void *poller_, void *s_, void *user_data_, short events_)
Definition: zmq.cpp:1124
int zmq_poller_destroy(void **poller_p_)
Definition: zmq.cpp:1111
const char * errno_to_string(int errno_)
Definition: err.cpp:33
int zmq_connect(void *s_, const char *addr_)
Definition: zmq.cpp:332
static int s_sendmsg(zmq::socket_base_t *s_, zmq_msg_t *msg_, int flags_)
Definition: zmq.cpp:366
void zmq_version(int *major_, int *minor_, int *patch_)
Definition: zmq.cpp:94
int zmq_send(void *s_, const void *buf_, size_t len_, int flags_)
Definition: zmq.cpp:387
int zmq_disconnect(void *s_, const char *addr_)
Definition: zmq.cpp:353
int zmq_term(void *ctx_)
Definition: zmq.cpp:231
int zmq_timers_add(void *timers_, size_t interval_, zmq_timer_fn handler_, void *arg_)
Definition: zmq.cpp:1261
int recv(zmq::msg_t *msg_, int flags_)
unsigned char flags
Definition: msg.hpp:181
void( zmq_timer_fn)(int timer_id, void *arg)
int zmq_poller_wait(void *poller_, zmq_poller_event_t *event, long timeout_)
Definition: zmq.cpp:1218
int zmq_proxy_steerable(void *frontend_, void *backend_, void *capture_, void *control_)
Definition: zmq.cpp:1335
int connect(const char *addr_)
void * zmq_msg_data(zmq_msg_t *msg_)
Definition: zmq.cpp:666
int zmq_msg_close(zmq_msg_t *msg_)
Definition: zmq.cpp:651