LCOV - code coverage report
Current view: top level - home/h/core/forks/m4-libzmq/src - zmq.cpp (source / functions) Hit Total Coverage
Test: zeromq-4.2.0 Code Coverage Lines: 359 503 71.4 %
Date: 2016-05-09 Functions: 65 71 91.5 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*
       2             :     Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
       3             : 
       4             :     This file is part of libzmq, the ZeroMQ core engine in C++.
       5             : 
       6             :     libzmq is free software; you can redistribute it and/or modify it under
       7             :     the terms of the GNU Lesser General Public License (LGPL) as published
       8             :     by the Free Software Foundation; either version 3 of the License, or
       9             :     (at your option) any later version.
      10             : 
      11             :     As a special exception, the Contributors give you permission to link
      12             :     this library with independent modules to produce an executable,
      13             :     regardless of the license terms of these independent modules, and to
      14             :     copy and distribute the resulting executable under terms of your choice,
      15             :     provided that you also meet, for each linked independent module, the
      16             :     terms and conditions of the license of that module. An independent
      17             :     module is a module which is not derived from or based on this library.
      18             :     If you modify this library, you must extend this exception to your
      19             :     version of the library.
      20             : 
      21             :     libzmq is distributed in the hope that it will be useful, but WITHOUT
      22             :     ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
      23             :     FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
      24             :     License for more details.
      25             : 
      26             :     You should have received a copy of the GNU Lesser General Public License
      27             :     along with this program.  If not, see <http://www.gnu.org/licenses/>.
      28             : */
      29             : #include "precompiled.hpp"
      30             : #define ZMQ_TYPE_UNSAFE
      31             : 
      32             : #include "macros.hpp"
      33             : #include "poller.hpp"
      34             : 
      35             : //  On AIX platform, poll.h has to be included first to get consistent
      36             : //  definition of pollfd structure (AIX uses 'reqevents' and 'retnevents'
      37             : //  instead of 'events' and 'revents' and defines macros to map from POSIX-y
      38             : //  names to AIX-specific names).
      39             : #if defined ZMQ_POLL_BASED_ON_POLL
      40             : #include <poll.h>
      41             : #endif
      42             : 
      43             : // zmq.h must be included *after* poll.h for AIX to build properly
      44             : #include "../include/zmq.h"
      45             : 
      46             : #if defined ZMQ_HAVE_WINDOWS
      47             : #include "windows.hpp"
      48             : #else
      49             : #include <unistd.h>
      50             : #endif
      51             : 
      52             : 
      53             : // XSI vector I/O
      54             : #if defined ZMQ_HAVE_UIO
      55             : #include <sys/uio.h>
      56             : #else
      57             : struct iovec {
      58             :     void *iov_base;
      59             :     size_t iov_len;
      60             : };
      61             : #endif
      62             : 
      63             : 
      64             : #include <string.h>
      65             : #include <stdlib.h>
      66             : #include <new>
      67             : #include <climits>
      68             : 
      69             : #include "proxy.hpp"
      70             : #include "socket_base.hpp"
      71             : #include "stdint.hpp"
      72             : #include "config.hpp"
      73             : #include "likely.hpp"
      74             : #include "clock.hpp"
      75             : #include "ctx.hpp"
      76             : #include "err.hpp"
      77             : #include "msg.hpp"
      78             : #include "fd.hpp"
      79             : #include "metadata.hpp"
      80             : #include "signaler.hpp"
      81             : #include "socket_poller.hpp"
      82             : #include "timers.hpp"
      83             : 
      84             : #if defined ZMQ_HAVE_OPENPGM
      85             : #define __PGM_WININT_H__
      86             : #include <pgm/pgm.h>
      87             : #endif
      88             : 
      89             : //  Compile time check whether msg_t fits into zmq_msg_t.
      90             : typedef char check_msg_t_size
      91             :     [sizeof (zmq::msg_t) ==  sizeof (zmq_msg_t) ? 1 : -1];
      92             : 
      93             : 
      94           3 : void zmq_version (int *major_, int *minor_, int *patch_)
      95             : {
      96           3 :     *major_ = ZMQ_VERSION_MAJOR;
      97           3 :     *minor_ = ZMQ_VERSION_MINOR;
      98           3 :     *patch_ = ZMQ_VERSION_PATCH;
      99           3 : }
     100             : 
     101             : 
     102           3 : const char *zmq_strerror (int errnum_)
     103             : {
     104           3 :     return zmq::errno_to_string (errnum_);
     105             : }
     106             : 
     107        3930 : int zmq_errno (void)
     108             : {
     109        3930 :     return errno;
     110             : }
     111             : 
     112             : 
     113             : //  New context API
     114             : 
     115         423 : void *zmq_ctx_new (void)
     116             : {
     117             : #if defined ZMQ_HAVE_OPENPGM
     118             : 
     119             :     //  Init PGM transport. Ensure threading and timer are enabled. Find PGM
     120             :     //  protocol ID. Note that if you want to use gettimeofday and sleep for
     121             :     //  openPGM timing, set environment variables PGM_TIMER to "GTOD" and
     122             :     //  PGM_SLEEP to "USLEEP".
     123             :     pgm_error_t *pgm_error = NULL;
     124             :     const bool ok = pgm_init (&pgm_error);
     125             :     if (ok != TRUE) {
     126             : 
     127             :         //  Invalid parameters don't set pgm_error_t
     128             :         zmq_assert (pgm_error != NULL);
     129             :         if (pgm_error->domain == PGM_ERROR_DOMAIN_TIME && (
     130             :               pgm_error->code == PGM_ERROR_FAILED)) {
     131             : 
     132             :             //  Failed to access RTC or HPET device.
     133             :             pgm_error_free (pgm_error);
     134             :             errno = EINVAL;
     135             :             return NULL;
     136             :         }
     137             : 
     138             :         //  PGM_ERROR_DOMAIN_ENGINE: WSAStartup errors or missing WSARecvMsg.
     139             :         zmq_assert (false);
     140             :     }
     141             : #endif
     142             : 
     143             : #ifdef ZMQ_HAVE_WINDOWS
     144             :     //  Intialise Windows sockets. Note that WSAStartup can be called multiple
     145             :     //  times given that WSACleanup will be called for each WSAStartup.
     146             :    //  We do this before the ctx constructor since its embedded mailbox_t
     147             :    //  object needs Winsock to be up and running.
     148             :     WORD version_requested = MAKEWORD (2, 2);
     149             :     WSADATA wsa_data;
     150             :     int rc = WSAStartup (version_requested, &wsa_data);
     151             :     zmq_assert (rc == 0);
     152             :     zmq_assert (LOBYTE (wsa_data.wVersion) == 2 &&
     153             :         HIBYTE (wsa_data.wVersion) == 2);
     154             : #endif
     155             : 
     156             :     //  Create 0MQ context.
     157         423 :     zmq::ctx_t *ctx = new (std::nothrow) zmq::ctx_t;
     158         423 :     alloc_assert (ctx);
     159         423 :     return ctx;
     160             : }
     161             : 
     162         423 : int zmq_ctx_term (void *ctx_)
     163             : {
     164         423 :     if (!ctx_ || !((zmq::ctx_t *) ctx_)->check_tag ()) {
     165           0 :         errno = EFAULT;
     166           0 :         return -1;
     167             :     }
     168             : 
     169         423 :     int rc = ((zmq::ctx_t *) ctx_)->terminate ();
     170         423 :     int en = errno;
     171             : 
     172             :     //  Shut down only if termination was not interrupted by a signal.
     173             :     if (!rc || en != EINTR) {
     174             : #ifdef ZMQ_HAVE_WINDOWS
     175             :         //  On Windows, uninitialise socket layer.
     176             :         rc = WSACleanup ();
     177             :         wsa_assert (rc != SOCKET_ERROR);
     178             : #endif
     179             : 
     180             : #if defined ZMQ_HAVE_OPENPGM
     181             :         //  Shut down the OpenPGM library.
     182             :         if (pgm_shutdown () != TRUE)
     183             :             zmq_assert (false);
     184             : #endif
     185             :     }
     186             : 
     187             :     errno = en;
     188         423 :     return rc;
     189             : }
     190             : 
     191           6 : int zmq_ctx_shutdown (void *ctx_)
     192             : {
     193           6 :     if (!ctx_ || !((zmq::ctx_t *) ctx_)->check_tag ()) {
     194           0 :         errno = EFAULT;
     195           0 :         return -1;
     196             :     }
     197           6 :     return ((zmq::ctx_t *) ctx_)->shutdown ();
     198             : }
     199             : 
     200          42 : int zmq_ctx_set (void *ctx_, int option_, int optval_)
     201             : {
     202          42 :     if (!ctx_ || !((zmq::ctx_t *) ctx_)->check_tag ()) {
     203           0 :         errno = EFAULT;
     204           0 :         return -1;
     205             :     }
     206          42 :     return ((zmq::ctx_t *) ctx_)->set (option_, optval_);
     207             : }
     208             : 
     209          18 : int zmq_ctx_get (void *ctx_, int option_)
     210             : {
     211          18 :     if (!ctx_ || !((zmq::ctx_t *) ctx_)->check_tag ()) {
     212           0 :         errno = EFAULT;
     213           0 :         return -1;
     214             :     }
     215          18 :     return ((zmq::ctx_t *) ctx_)->get (option_);
     216             : }
     217             : 
     218             : //  Stable/legacy context API
     219             : 
     220           3 : void *zmq_init (int io_threads_)
     221             : {
     222           3 :     if (io_threads_ >= 0) {
     223           3 :         void *ctx = zmq_ctx_new ();
     224           3 :         zmq_ctx_set (ctx, ZMQ_IO_THREADS, io_threads_);
     225           3 :         return ctx;
     226             :     }
     227           0 :     errno = EINVAL;
     228           0 :     return NULL;
     229             : }
     230             : 
     231           0 : int zmq_term (void *ctx_)
     232             : {
     233           0 :     return zmq_ctx_term (ctx_);
     234             : }
     235             : 
     236          30 : int zmq_ctx_destroy (void *ctx_)
     237             : {
     238          30 :     return zmq_ctx_term (ctx_);
     239             : }
     240             : 
     241             : 
     242             : // Sockets
     243             : 
     244       11181 : void *zmq_socket (void *ctx_, int type_)
     245             : {
     246       11181 :     if (!ctx_ || !((zmq::ctx_t *) ctx_)->check_tag ()) {
     247           0 :         errno = EFAULT;
     248           0 :         return NULL;
     249             :     }
     250       11181 :     zmq::ctx_t *ctx = (zmq::ctx_t *) ctx_;
     251       11181 :     zmq::socket_base_t *s = ctx->create_socket (type_);
     252       11181 :     return (void *) s;
     253             : }
     254             : 
     255       11115 : int zmq_close (void *s_)
     256             : {
     257       11115 :     if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
     258           0 :         errno = ENOTSOCK;
     259           0 :         return -1;
     260             :     }
     261       11115 :     ((zmq::socket_base_t*) s_)->close ();
     262       11115 :     return 0;
     263             : }
     264             : 
     265        2613 : int zmq_setsockopt (void *s_, int option_, const void *optval_,
     266             :     size_t optvallen_)
     267             : {
     268        2613 :     if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
     269           0 :         errno = ENOTSOCK;
     270           0 :         return -1;
     271             :     }
     272        2613 :     zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
     273        2613 :     int result = s->setsockopt (option_, optval_, optvallen_);
     274        2613 :     return result;
     275             : }
     276             : 
     277     2654034 : int zmq_getsockopt (void *s_, int option_, void *optval_, size_t *optvallen_)
     278             : {
     279     2654034 :     if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
     280           0 :         errno = ENOTSOCK;
     281           0 :         return -1;
     282             :     }
     283     2654034 :     zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
     284     2654034 :     int result = s->getsockopt (option_, optval_, optvallen_);
     285     2654028 :     return result;
     286             : }
     287             : 
     288          27 : int zmq_socket_monitor (void *s_, const char *addr_, int events_)
     289             : {
     290          27 :     if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
     291           0 :         errno = ENOTSOCK;
     292           0 :         return -1;
     293             :     }
     294          27 :     zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
     295          27 :     int result = s->monitor (addr_, events_);
     296          27 :     return result;
     297             : }
     298             : 
     299          15 : int zmq_join (void *s_, const char* group_)
     300             : {
     301          15 :     if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
     302           0 :         errno = ENOTSOCK;
     303           0 :         return -1;
     304             :     }
     305          15 :     zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
     306          15 :     int result = s->join (group_);
     307          15 :     return result;
     308             : }
     309             : 
     310           6 : int zmq_leave (void *s_, const char* group_)
     311             : {
     312           6 :     if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
     313           0 :         errno = ENOTSOCK;
     314           0 :         return -1;
     315             :     }
     316           6 :     zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
     317           6 :     int result = s->leave (group_);
     318           6 :     return result;
     319             : }
     320             : 
     321         687 : int zmq_bind (void *s_, const char *addr_)
     322             : {
     323         687 :     if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
     324           0 :         errno = ENOTSOCK;
     325           0 :         return -1;
     326             :     }
     327         687 :     zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
     328         687 :     int result = s->bind (addr_);
     329         687 :     return result;
     330             : }
     331             : 
     332        4326 : int zmq_connect (void *s_, const char *addr_)
     333             : {
     334        4326 :     if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
     335           0 :         errno = ENOTSOCK;
     336           0 :         return -1;
     337             :     }
     338        4326 :     zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
     339        4326 :     int result = s->connect (addr_);
     340        4326 :     return result;
     341             : }
     342             : 
     343          54 : int zmq_unbind (void *s_, const char *addr_)
     344             : {
     345          54 :     if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
     346           0 :         errno = ENOTSOCK;
     347           0 :         return -1;
     348             :     }
     349          54 :     zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
     350          54 :     return s->term_endpoint (addr_);
     351             : }
     352             : 
     353          30 : int zmq_disconnect (void *s_, const char *addr_)
     354             : {
     355          30 :     if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
     356           0 :         errno = ENOTSOCK;
     357           0 :         return -1;
     358             :     }
     359          30 :     zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
     360          30 :     return s->term_endpoint (addr_);
     361             : }
     362             : 
     363             : // Sending functions.
     364             : 
     365             : static inline int
     366      864412 : s_sendmsg (zmq::socket_base_t *s_, zmq_msg_t *msg_, int flags_)
     367             : {
     368      864412 :     size_t sz = zmq_msg_size (msg_);
     369      864412 :     int rc = s_->send ((zmq::msg_t *) msg_, flags_);
     370      864413 :     if (unlikely (rc < 0))
     371             :         return -1;
     372             : 
     373             :     //  This is what I'd like to do, my C++ fu is too weak -- PH 2016/02/09
     374             :     //  int max_msgsz = s_->parent->get (ZMQ_MAX_MSGSZ);
     375      852466 :     size_t max_msgsz = INT_MAX;
     376             : 
     377             :     //  Truncate returned size to INT_MAX to avoid overflow to negative values
     378      852466 :     return (int) (sz < max_msgsz? sz: max_msgsz);
     379             : }
     380             : 
     381             : /*  To be deprecated once zmq_msg_send() is stable                           */
     382         108 : int zmq_sendmsg (void *s_, zmq_msg_t *msg_, int flags_)
     383             : {
     384         108 :     return zmq_msg_send (msg_, s_, flags_);
     385             : }
     386             : 
     387      863879 : int zmq_send (void *s_, const void *buf_, size_t len_, int flags_)
     388             : {
     389      863879 :     if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
     390           0 :         errno = ENOTSOCK;
     391           0 :         return -1;
     392             :     }
     393             :     zmq_msg_t msg;
     394      863879 :     if (zmq_msg_init_size (&msg, len_))
     395             :         return -1;
     396             : 
     397             :     //  We explicitly allow a send from NULL, size zero
     398      863878 :     if (len_) {
     399             :         assert (buf_);
     400      602558 :         memcpy (zmq_msg_data (&msg), buf_, len_);
     401             :     }
     402      863875 :     zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
     403      863875 :     int rc = s_sendmsg (s, &msg, flags_);
     404      863878 :     if (unlikely (rc < 0)) {
     405       11931 :         int err = errno;
     406       11931 :         int rc2 = zmq_msg_close (&msg);
     407       11931 :         errno_assert (rc2 == 0);
     408       11931 :         errno = err;
     409       11931 :         return -1;
     410             :     }
     411             :     //  Note the optimisation here. We don't close the msg object as it is
     412             :     //  empty anyway. This may change when implementation of zmq_msg_t changes.
     413             :     return rc;
     414             : }
     415             : 
     416         174 : int zmq_send_const (void *s_, const void *buf_, size_t len_, int flags_)
     417             : {
     418         174 :     if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
     419           0 :         errno = ENOTSOCK;
     420           0 :         return -1;
     421             :     }
     422             :     zmq_msg_t msg;
     423         174 :     int rc = zmq_msg_init_data (&msg, (void *)buf_, len_, NULL, NULL);
     424         174 :     if (rc != 0)
     425             :         return -1;
     426             : 
     427         174 :     zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
     428         174 :     rc = s_sendmsg (s, &msg, flags_);
     429         174 :     if (unlikely (rc < 0)) {
     430           0 :         int err = errno;
     431           0 :         int rc2 = zmq_msg_close (&msg);
     432           0 :         errno_assert (rc2 == 0);
     433           0 :         errno = err;
     434           0 :         return -1;
     435             :     }
     436             :     //  Note the optimisation here. We don't close the msg object as it is
     437             :     //  empty anyway. This may change when implementation of zmq_msg_t changes.
     438             :     return rc;
     439             : }
     440             : 
     441             : 
     442             : // Send multiple messages.
     443             : // TODO: this function has no man page
     444             : //
     445             : // If flag bit ZMQ_SNDMORE is set the vector is treated as
     446             : // a single multi-part message, i.e. the last message has
     447             : // ZMQ_SNDMORE bit switched off.
     448             : //
     449          24 : int zmq_sendiov (void *s_, iovec *a_, size_t count_, int flags_)
     450             : {
     451          24 :     if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
     452           6 :         errno = ENOTSOCK;
     453           6 :         return -1;
     454             :     }
     455          18 :     if (unlikely (count_ <= 0 || !a_)) {
     456          12 :         errno = EINVAL;
     457          12 :         return -1;
     458             :     }
     459             : 
     460             :     int rc = 0;
     461             :     zmq_msg_t msg;
     462             :     zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
     463             : 
     464          60 :     for (size_t i = 0; i < count_; ++i) {
     465          60 :         rc = zmq_msg_init_size (&msg, a_[i].iov_len);
     466          60 :         if (rc != 0) {
     467             :             rc = -1;
     468             :             break;
     469             :         }
     470          60 :         memcpy (zmq_msg_data (&msg), a_[i].iov_base, a_[i].iov_len);
     471          60 :         if (i == count_ - 1)
     472           6 :             flags_ = flags_ & ~ZMQ_SNDMORE;
     473          60 :         rc = s_sendmsg (s, &msg, flags_);
     474          60 :         if (unlikely (rc < 0)) {
     475           0 :            int err = errno;
     476           0 :            int rc2 = zmq_msg_close (&msg);
     477           0 :            errno_assert (rc2 == 0);
     478           0 :            errno = err;
     479           0 :            rc = -1;
     480           0 :            break;
     481             :         }
     482             :     }
     483           6 :     return rc;
     484             : }
     485             : 
     486             : // Receiving functions.
     487             : 
     488             : static int
     489     3971497 : s_recvmsg (zmq::socket_base_t *s_, zmq_msg_t *msg_, int flags_)
     490             : {
     491     3971497 :     int rc = s_->recv ((zmq::msg_t *) msg_, flags_);
     492     3975513 :     if (unlikely (rc < 0))
     493             :         return -1;
     494             : 
     495             :     //  Truncate returned size to INT_MAX to avoid overflow to negative values
     496      851946 :     size_t sz = zmq_msg_size (msg_);
     497      851946 :     return (int) (sz < INT_MAX? sz: INT_MAX);
     498             : }
     499             : 
     500             : /*  To be deprecated once zmq_msg_recv() is stable                           */
     501           3 : int zmq_recvmsg (void *s_, zmq_msg_t *msg_, int flags_)
     502             : {
     503           3 :     return zmq_msg_recv (msg_, s_, flags_);
     504             : }
     505             : 
     506             : 
     507     3976632 : int zmq_recv (void *s_, void *buf_, size_t len_, int flags_)
     508             : {
     509     3976632 :     if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
     510           0 :         errno = ENOTSOCK;
     511           0 :         return -1;
     512             :     }
     513             :     zmq_msg_t msg;
     514     3976106 :     int rc = zmq_msg_init (&msg);
     515     3970666 :     errno_assert (rc == 0);
     516             : 
     517     3970666 :     zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
     518     3970666 :     int nbytes = s_recvmsg (s, &msg, flags_);
     519     3975277 :     if (unlikely (nbytes < 0)) {
     520     3124309 :         int err = errno;
     521     3126191 :         rc = zmq_msg_close (&msg);
     522     3125390 :         errno_assert (rc == 0);
     523     3125225 :         errno = err;
     524     3125225 :         return -1;
     525             :     }
     526             : 
     527             :     //  An oversized message is silently truncated.
     528      850968 :     size_t to_copy = size_t (nbytes) < len_ ? size_t (nbytes) : len_;
     529             : 
     530             :     //  We explicitly allow a null buffer argument if len is zero
     531      850968 :     if (to_copy) {
     532             :         assert (buf_);
     533      601578 :         memcpy (buf_, zmq_msg_data (&msg), to_copy);
     534             :     }
     535      850968 :     rc = zmq_msg_close (&msg);
     536      850968 :     errno_assert (rc == 0);
     537             : 
     538      850968 :     return nbytes;
     539             : }
     540             : 
     541             : // Receive a multi-part message
     542             : //
     543             : // Receives up to *count_ parts of a multi-part message.
     544             : // Sets *count_ to the actual number of parts read.
     545             : // ZMQ_RCVMORE is set to indicate if a complete multi-part message was read.
     546             : // Returns number of message parts read, or -1 on error.
     547             : //
     548             : // Note: even if -1 is returned, some parts of the message
     549             : // may have been read. Therefore the client must consult
     550             : // *count_ to retrieve message parts successfully read,
     551             : // even if -1 is returned.
     552             : //
     553             : // The iov_base* buffers of each iovec *a_ filled in by this
     554             : // function may be freed using free().
     555             : // TODO: this function has no man page
     556             : //
     557          30 : int zmq_recviov (void *s_, iovec *a_, size_t *count_, int flags_)
     558             : {
     559          30 :     if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
     560           6 :         errno = ENOTSOCK;
     561           6 :         return -1;
     562             :     }
     563          24 :     if (unlikely (!count_ || *count_ <= 0 || !a_)) {
     564          18 :         errno = EINVAL;
     565          18 :         return -1;
     566             :     }
     567             : 
     568           6 :     zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
     569             : 
     570           6 :     size_t count = *count_;
     571           6 :     int nread = 0;
     572           6 :     bool recvmore = true;
     573             : 
     574           6 :     *count_ = 0;
     575             : 
     576          66 :     for (size_t i = 0; recvmore && i < count; ++i) {
     577             : 
     578             :         zmq_msg_t msg;
     579          60 :         int rc = zmq_msg_init (&msg);
     580          60 :         errno_assert (rc == 0);
     581             : 
     582          60 :         int nbytes = s_recvmsg (s, &msg, flags_);
     583          60 :         if (unlikely (nbytes < 0)) {
     584           0 :             int err = errno;
     585           0 :             rc = zmq_msg_close (&msg);
     586           0 :             errno_assert (rc == 0);
     587           0 :             errno = err;
     588           0 :             nread = -1;
     589           0 :             break;
     590             :         }
     591             : 
     592          60 :         a_[i].iov_len = zmq_msg_size (&msg);
     593          60 :         a_[i].iov_base = static_cast<char *> (malloc(a_[i].iov_len));
     594          60 :         if (unlikely (!a_[i].iov_base)) {
     595           0 :             errno = ENOMEM;
     596           0 :             return -1;
     597             :         }
     598          60 :         memcpy(a_[i].iov_base,static_cast<char *> (zmq_msg_data (&msg)),
     599          60 :                a_[i].iov_len);
     600             :         // Assume zmq_socket ZMQ_RVCMORE is properly set.
     601          60 :         zmq::msg_t* p_msg = reinterpret_cast<zmq::msg_t*>(&msg);
     602          60 :         recvmore = p_msg->flags() & zmq::msg_t::more;
     603          60 :         rc = zmq_msg_close(&msg);
     604          60 :         errno_assert (rc == 0);
     605          60 :         ++*count_;
     606          60 :         ++nread;
     607             :     }
     608           6 :     return nread;
     609             : }
     610             : 
     611             : // Message manipulators.
     612             : 
     613     3976978 : int zmq_msg_init (zmq_msg_t *msg_)
     614             : {
     615     3976978 :     return ((zmq::msg_t*) msg_)->init ();
     616             : }
     617             : 
     618      864134 : int zmq_msg_init_size (zmq_msg_t *msg_, size_t size_)
     619             : {
     620      864134 :     return ((zmq::msg_t*) msg_)->init_size (size_);
     621             : }
     622             : 
     623         195 : int zmq_msg_init_data (zmq_msg_t *msg_, void *data_, size_t size_,
     624             :     zmq_free_fn *ffn_, void *hint_)
     625             : {
     626         195 :     return ((zmq::msg_t*) msg_)->init_data (data_, size_, ffn_, hint_);
     627             : }
     628             : 
     629         303 : int zmq_msg_send (zmq_msg_t *msg_, void *s_, int flags_)
     630             : {
     631         303 :     if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
     632           0 :         errno = ENOTSOCK;
     633           0 :         return -1;
     634             :     }
     635         303 :     zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
     636         303 :     int result = s_sendmsg (s, msg_, flags_);
     637         303 :     return result;
     638             : }
     639             : 
     640         936 : int zmq_msg_recv (zmq_msg_t *msg_, void *s_, int flags_)
     641             : {
     642         936 :     if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
     643           0 :         errno = ENOTSOCK;
     644           0 :         return -1;
     645             :     }
     646         936 :     zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
     647         936 :     int result = s_recvmsg (s, msg_, flags_);
     648         936 :     return result;
     649             : }
     650             : 
     651     3989718 : int zmq_msg_close (zmq_msg_t *msg_)
     652             : {
     653     3989718 :     return ((zmq::msg_t*) msg_)->close ();
     654             : }
     655             : 
     656           0 : int zmq_msg_move (zmq_msg_t *dest_, zmq_msg_t *src_)
     657             : {
     658           0 :     return ((zmq::msg_t*) dest_)->move (*(zmq::msg_t*) src_);
     659             : }
     660             : 
     661          54 : int zmq_msg_copy (zmq_msg_t *dest_, zmq_msg_t *src_)
     662             : {
     663          54 :     return ((zmq::msg_t*) dest_)->copy (*(zmq::msg_t*) src_);
     664             : }
     665             : 
     666     1204561 : void *zmq_msg_data (zmq_msg_t *msg_)
     667             : {
     668     1204561 :     return ((zmq::msg_t*) msg_)->data ();
     669             : }
     670             : 
     671     1715379 : size_t zmq_msg_size (zmq_msg_t *msg_)
     672             : {
     673     1715379 :     return ((zmq::msg_t*) msg_)->size ();
     674             : }
     675             : 
     676         144 : int zmq_msg_more (zmq_msg_t *msg_)
     677             : {
     678         144 :     return zmq_msg_get (msg_, ZMQ_MORE);
     679             : }
     680             : 
     681         156 : int zmq_msg_get (zmq_msg_t *msg_, int property_)
     682             : {
     683             :     const char* fd_string;
     684             : 
     685         156 :     switch (property_) {
     686             :         case ZMQ_MORE:
     687         144 :             return (((zmq::msg_t*) msg_)->flags () & zmq::msg_t::more)? 1: 0;
     688             :         case ZMQ_SRCFD:
     689           3 :             fd_string = zmq_msg_gets(msg_, "__fd");
     690           3 :             if (fd_string == NULL)
     691             :                 return (int)-1;
     692             : 
     693           3 :             return atoi(fd_string);
     694             :         case ZMQ_SHARED:
     695          15 :             return (((zmq::msg_t*) msg_)->is_cmsg ()) ||
     696          18 :                    (((zmq::msg_t*) msg_)->flags () & zmq::msg_t::shared)? 1: 0;
     697             :         default:
     698           0 :             errno = EINVAL;
     699           0 :             return -1;
     700             :     }
     701             : }
     702             : 
     703           0 : int zmq_msg_set (zmq_msg_t *, int, int)
     704             : {
     705             :     //  No properties supported at present
     706           0 :     errno = EINVAL;
     707           0 :     return -1;
     708             : }
     709             : 
     710           9 : int zmq_msg_set_routing_id (zmq_msg_t *msg_, uint32_t routing_id_)
     711             : {
     712           9 :     return ((zmq::msg_t *) msg_)->set_routing_id (routing_id_);
     713             : }
     714             : 
     715          18 : uint32_t zmq_msg_routing_id (zmq_msg_t *msg_)
     716             : {
     717          18 :     return ((zmq::msg_t *) msg_)->get_routing_id ();
     718             : }
     719             : 
     720          18 : int zmq_msg_set_group (zmq_msg_t *msg_, const char *group_)
     721             : {
     722          18 :     return ((zmq::msg_t *) msg_)->set_group (group_);
     723             : }
     724             : 
     725          12 : const char *zmq_msg_group (zmq_msg_t *msg_)
     726             : {
     727          12 :     return ((zmq::msg_t *) msg_)->group ();
     728             : }
     729             : 
     730             : //  Get message metadata string
     731             : 
     732          27 : const char *zmq_msg_gets (zmq_msg_t *msg_, const char *property_)
     733             : {
     734          27 :     zmq::metadata_t *metadata = ((zmq::msg_t *) msg_)->metadata ();
     735          27 :     const char *value = NULL;
     736          27 :     if (metadata)
     737          54 :         value = metadata->get (std::string (property_));
     738          27 :     if (value)
     739             :         return value;
     740             :     else {
     741           3 :         errno = EINVAL;
     742           3 :         return NULL;
     743             :     }
     744             : }
     745             : 
     746             : // Polling.
     747             : 
     748      530747 : int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
     749             : {
     750             :     //  TODO: the function implementation can just call zmq_pollfd_poll with
     751             :     //  pollfd as NULL, however pollfd is not yet stable.
     752             : #if defined ZMQ_POLL_BASED_ON_POLL
     753      530747 :     if (unlikely (nitems_ < 0)) {
     754           0 :         errno = EINVAL;
     755           0 :         return -1;
     756             :     }
     757      530747 :     if (unlikely (nitems_ == 0)) {
     758         301 :         if (timeout_ == 0)
     759             :             return 0;
     760             : #if defined ZMQ_HAVE_WINDOWS
     761             :         Sleep (timeout_ > 0 ? timeout_ : INFINITE);
     762             :         return 0;
     763             : #elif defined ZMQ_HAVE_ANDROID
     764             :         usleep (timeout_ * 1000);
     765             :         return 0;
     766             : #else
     767         301 :         return usleep (timeout_ * 1000);
     768             : #endif
     769             :     }
     770             : 
     771      530446 :     if (!items_) {
     772           0 :         errno = EFAULT;
     773           0 :         return -1;
     774             :     }
     775             : 
     776      530446 :     zmq::clock_t clock;
     777      530446 :     uint64_t now = 0;
     778      530446 :     uint64_t end = 0;
     779             :     pollfd spollfds[ZMQ_POLLITEMS_DFLT];
     780      530446 :     pollfd *pollfds = spollfds;
     781             : 
     782      530446 :     if (nitems_ > ZMQ_POLLITEMS_DFLT) {
     783           0 :         pollfds = (pollfd*) malloc (nitems_ * sizeof (pollfd));
     784           0 :         alloc_assert (pollfds);
     785             :     }
     786             : 
     787             :     //  Build pollset for poll () system call.
     788     1856319 :     for (int i = 0; i != nitems_; i++) {
     789             : 
     790             :         //  If the poll item is a 0MQ socket, we poll on the file descriptor
     791             :         //  retrieved by the ZMQ_FD socket option.
     792     1325873 :         if (items_ [i].socket) {
     793     1325873 :             size_t zmq_fd_size = sizeof (zmq::fd_t);
     794     2651746 :             if (zmq_getsockopt (items_ [i].socket, ZMQ_FD, &pollfds [i].fd,
     795     1325873 :                 &zmq_fd_size) == -1) {
     796           0 :                 if (pollfds != spollfds)
     797           0 :                     free (pollfds);
     798           0 :                 return -1;
     799             :             }
     800     1325873 :             pollfds [i].events = items_ [i].events ? POLLIN : 0;
     801             :         }
     802             :         //  Else, the poll item is a raw file descriptor. Just convert the
     803             :         //  events to normal POLLIN/POLLOUT for poll ().
     804             :         else {
     805           0 :             pollfds [i].fd = items_ [i].fd;
     806             :             pollfds [i].events =
     807             :                 (items_ [i].events & ZMQ_POLLIN ? POLLIN : 0) |
     808           0 :                 (items_ [i].events & ZMQ_POLLOUT ? POLLOUT : 0) |
     809           0 :                 (items_ [i].events & ZMQ_POLLPRI ? POLLPRI : 0);
     810             :         }
     811             :     }
     812             : 
     813             :     bool first_pass = true;
     814             :     int nevents = 0;
     815             : 
     816             :     while (true) {
     817             :         //  Compute the timeout for the subsequent poll.
     818             :         int timeout;
     819      531035 :         if (first_pass)
     820             :             timeout = 0;
     821             :         else
     822         589 :         if (timeout_ < 0)
     823             :             timeout = -1;
     824             :         else
     825         487 :             timeout = end - now;
     826             : 
     827             :         //  Wait for events.
     828             :         while (true) {
     829     1062072 :             int rc = poll (pollfds, nitems_, timeout);
     830      531037 :             if (rc == -1 && errno == EINTR) {
     831           0 :                 if (pollfds != spollfds)
     832           0 :                     free (pollfds);
     833             :                 return -1;
     834             :             }
     835      531037 :             errno_assert (rc >= 0);
     836             :             break;
     837             :         }
     838             :         //  Check for the events.
     839     1327151 :         for (int i = 0; i != nitems_; i++) {
     840             : 
     841     1327153 :             items_ [i].revents = 0;
     842             : 
     843             :             //  The poll item is a 0MQ socket. Retrieve pending events
     844             :             //  using the ZMQ_EVENTS socket option.
     845     1327153 :             if (items_ [i].socket) {
     846     1327153 :                 size_t zmq_events_size = sizeof (uint32_t);
     847             :                 uint32_t zmq_events;
     848     1327151 :                 if (zmq_getsockopt (items_ [i].socket, ZMQ_EVENTS, &zmq_events,
     849     1327153 :                     &zmq_events_size) == -1) {
     850           0 :                     if (pollfds != spollfds)
     851           0 :                         free (pollfds);
     852           0 :                     return -1;
     853             :                 }
     854     1857095 :                 if ((items_ [i].events & ZMQ_POLLOUT) &&
     855      529944 :                       (zmq_events & ZMQ_POLLOUT))
     856         100 :                     items_ [i].revents |= ZMQ_POLLOUT;
     857     2124358 :                 if ((items_ [i].events & ZMQ_POLLIN) &&
     858      797207 :                       (zmq_events & ZMQ_POLLIN))
     859      265054 :                     items_ [i].revents |= ZMQ_POLLIN;
     860             :             }
     861             :             //  Else, the poll item is a raw file descriptor, simply convert
     862             :             //  the events to zmq_pollitem_t-style format.
     863             :             else {
     864           0 :                 if (pollfds [i].revents & POLLIN)
     865           0 :                     items_ [i].revents |= ZMQ_POLLIN;
     866           0 :                 if (pollfds [i].revents & POLLOUT)
     867           0 :                     items_ [i].revents |= ZMQ_POLLOUT;
     868           0 :                 if (pollfds [i].revents & POLLPRI)
     869           0 :                    items_ [i].revents |= ZMQ_POLLPRI;
     870           0 :                 if (pollfds [i].revents & ~(POLLIN | POLLOUT | POLLPRI))
     871           0 :                     items_ [i].revents |= ZMQ_POLLERR;
     872             :             }
     873             : 
     874     1327151 :             if (items_ [i].revents)
     875      265154 :                 nevents++;
     876             :         }
     877             : 
     878             :         //  If timeout is zero, exit immediately whether there are events or not.
     879      531035 :         if (timeout_ == 0)
     880             :             break;
     881             : 
     882             :         //  If there are events to return, we can exit immediately.
     883      266063 :         if (nevents)
     884             :             break;
     885             : 
     886             :         //  At this point we are meant to wait for events but there are none.
     887             :         //  If timeout is infinite we can just loop until we get some events.
     888        1022 :         if (timeout_ < 0) {
     889         102 :             if (first_pass)
     890          58 :                 first_pass = false;
     891             :             continue;
     892             :         }
     893             : 
     894             :         //  The timeout is finite and there are no events. In the first pass
     895             :         //  we get a timestamp of when the polling have begun. (We assume that
     896             :         //  first pass have taken negligible time). We also compute the time
     897             :         //  when the polling should time out.
     898         920 :         if (first_pass) {
     899         484 :             now = clock.now_ms ();
     900         484 :             end = now + timeout_;
     901         484 :             if (now == end)
     902             :                 break;
     903             :             first_pass = false;
     904             :             continue;
     905             :         }
     906             : 
     907             :         //  Find out whether timeout have expired.
     908         436 :         now = clock.now_ms ();
     909         436 :         if (now >= end)
     910             :             break;
     911             :     }
     912             : 
     913      530446 :     if (pollfds != spollfds)
     914           0 :         free (pollfds);
     915      530446 :     return nevents;
     916             : 
     917             : #elif defined ZMQ_POLL_BASED_ON_SELECT
     918             : 
     919             :     if (unlikely (nitems_ < 0)) {
     920             :         errno = EINVAL;
     921             :         return -1;
     922             :     }
     923             :     if (unlikely (nitems_ == 0)) {
     924             :         if (timeout_ == 0)
     925             :             return 0;
     926             : #if defined ZMQ_HAVE_WINDOWS
     927             :         Sleep (timeout_ > 0 ? timeout_ : INFINITE);
     928             :         return 0;
     929             : #else
     930             :         return usleep (timeout_ * 1000);
     931             : #endif
     932             :     }
     933             :     zmq::clock_t clock;
     934             :     uint64_t now = 0;
     935             :     uint64_t end = 0;
     936             : 
     937             :     //  Ensure we do not attempt to select () on more than FD_SETSIZE
     938             :     //  file descriptors.
     939             :     zmq_assert (nitems_ <= FD_SETSIZE);
     940             : 
     941             :     fd_set pollset_in  = { 0 };
     942             :     fd_set pollset_out = { 0 };
     943             :     fd_set pollset_err = { 0 };
     944             : 
     945             :     zmq::fd_t maxfd = 0;
     946             : 
     947             :     //  Build the fd_sets for passing to select ().
     948             :     for (int i = 0; i != nitems_; i++) {
     949             : 
     950             :         //  If the poll item is a 0MQ socket we are interested in input on the
     951             :         //  notification file descriptor retrieved by the ZMQ_FD socket option.
     952             :         if (items_ [i].socket) {
     953             :             size_t zmq_fd_size = sizeof (zmq::fd_t);
     954             :             zmq::fd_t notify_fd;
     955             :             if (zmq_getsockopt (items_ [i].socket, ZMQ_FD, &notify_fd,
     956             :                 &zmq_fd_size) == -1)
     957             :                 return -1;
     958             :             if (items_ [i].events) {
     959             :                 FD_SET (notify_fd, &pollset_in);
     960             :                 if (maxfd < notify_fd)
     961             :                     maxfd = notify_fd;
     962             :             }
     963             :         }
     964             :         //  Else, the poll item is a raw file descriptor. Convert the poll item
     965             :         //  events to the appropriate fd_sets.
     966             :         else {
     967             :             if (items_ [i].events & ZMQ_POLLIN)
     968             :                 FD_SET (items_ [i].fd, &pollset_in);
     969             :             if (items_ [i].events & ZMQ_POLLOUT)
     970             :                 FD_SET (items_ [i].fd, &pollset_out);
     971             :             if (items_ [i].events & ZMQ_POLLERR)
     972             :                 FD_SET (items_ [i].fd, &pollset_err);
     973             :             if (maxfd < items_ [i].fd)
     974             :                 maxfd = items_ [i].fd;
     975             :         }
     976             :     }
     977             : 
     978             :     bool first_pass = true;
     979             :     int nevents = 0;
     980             :     fd_set inset, outset, errset;
     981             : 
     982             :     while (true) {
     983             : 
     984             :         //  Compute the timeout for the subsequent poll.
     985             :         timeval timeout;
     986             :         timeval *ptimeout;
     987             :         if (first_pass) {
     988             :             timeout.tv_sec = 0;
     989             :             timeout.tv_usec = 0;
     990             :             ptimeout = &timeout;
     991             :         }
     992             :         else
     993             :         if (timeout_ < 0)
     994             :             ptimeout = NULL;
     995             :         else {
     996             :             timeout.tv_sec = (long) ((end - now) / 1000);
     997             :             timeout.tv_usec = (long) ((end - now) % 1000 * 1000);
     998             :             ptimeout = &timeout;
     999             :         }
    1000             : 
    1001             :         //  Wait for events. Ignore interrupts if there's infinite timeout.
    1002             :         while (true) {
    1003             :             memcpy (&inset, &pollset_in, sizeof (fd_set));
    1004             :             memcpy (&outset, &pollset_out, sizeof (fd_set));
    1005             :             memcpy (&errset, &pollset_err, sizeof (fd_set));
    1006             : #if defined ZMQ_HAVE_WINDOWS
    1007             :             int rc = select (0, &inset, &outset, &errset, ptimeout);
    1008             :             if (unlikely (rc == SOCKET_ERROR)) {
    1009             :                 errno = zmq::wsa_error_to_errno (WSAGetLastError ());
    1010             :                 wsa_assert (errno == ENOTSOCK);
    1011             :                 return -1;
    1012             :             }
    1013             : #else
    1014             :             int rc = select (maxfd + 1, &inset, &outset, &errset, ptimeout);
    1015             :             if (unlikely (rc == -1)) {
    1016             :                 errno_assert (errno == EINTR || errno == EBADF);
    1017             :                 return -1;
    1018             :             }
    1019             : #endif
    1020             :             break;
    1021             :         }
    1022             : 
    1023             :         //  Check for the events.
    1024             :         for (int i = 0; i != nitems_; i++) {
    1025             : 
    1026             :             items_ [i].revents = 0;
    1027             : 
    1028             :             //  The poll item is a 0MQ socket. Retrieve pending events
    1029             :             //  using the ZMQ_EVENTS socket option.
    1030             :             if (items_ [i].socket) {
    1031             :                 size_t zmq_events_size = sizeof (uint32_t);
    1032             :                 uint32_t zmq_events;
    1033             :                 if (zmq_getsockopt (items_ [i].socket, ZMQ_EVENTS, &zmq_events,
    1034             :                       &zmq_events_size) == -1)
    1035             :                     return -1;
    1036             :                 if ((items_ [i].events & ZMQ_POLLOUT) &&
    1037             :                       (zmq_events & ZMQ_POLLOUT))
    1038             :                     items_ [i].revents |= ZMQ_POLLOUT;
    1039             :                 if ((items_ [i].events & ZMQ_POLLIN) &&
    1040             :                       (zmq_events & ZMQ_POLLIN))
    1041             :                     items_ [i].revents |= ZMQ_POLLIN;
    1042             :             }
    1043             :             //  Else, the poll item is a raw file descriptor, simply convert
    1044             :             //  the events to zmq_pollitem_t-style format.
    1045             :             else {
    1046             :                 if (FD_ISSET (items_ [i].fd, &inset))
    1047             :                     items_ [i].revents |= ZMQ_POLLIN;
    1048             :                 if (FD_ISSET (items_ [i].fd, &outset))
    1049             :                     items_ [i].revents |= ZMQ_POLLOUT;
    1050             :                 if (FD_ISSET (items_ [i].fd, &errset))
    1051             :                     items_ [i].revents |= ZMQ_POLLERR;
    1052             :             }
    1053             : 
    1054             :             if (items_ [i].revents)
    1055             :                 nevents++;
    1056             :         }
    1057             : 
    1058             :         //  If timeout is zero, exit immediately whether there are events or not.
    1059             :         if (timeout_ == 0)
    1060             :             break;
    1061             : 
    1062             :         //  If there are events to return, we can exit immediately.
    1063             :         if (nevents)
    1064             :             break;
    1065             : 
    1066             :         //  At this point we are meant to wait for events but there are none.
    1067             :         //  If timeout is infinite we can just loop until we get some events.
    1068             :         if (timeout_ < 0) {
    1069             :             if (first_pass)
    1070             :                 first_pass = false;
    1071             :             continue;
    1072             :         }
    1073             : 
    1074             :         //  The timeout is finite and there are no events. In the first pass
    1075             :         //  we get a timestamp of when the polling have begun. (We assume that
    1076             :         //  first pass have taken negligible time). We also compute the time
    1077             :         //  when the polling should time out.
    1078             :         if (first_pass) {
    1079             :             now = clock.now_ms ();
    1080             :             end = now + timeout_;
    1081             :             if (now == end)
    1082             :                 break;
    1083             :             first_pass = false;
    1084             :             continue;
    1085             :         }
    1086             : 
    1087             :         //  Find out whether timeout have expired.
    1088             :         now = clock.now_ms ();
    1089             :         if (now >= end)
    1090             :           break;
    1091             :     }
    1092             : 
    1093             :     return nevents;
    1094             : 
    1095             : #else
    1096             :     //  Exotic platforms that support neither poll() nor select().
    1097             :     errno = ENOTSUP;
    1098             :     return -1;
    1099             : #endif
    1100             : }
    1101             : 
    1102             : //  The poller functionality
    1103             : 
    1104           3 : void *zmq_poller_new (void)
    1105             : {
    1106           3 :     zmq::socket_poller_t *poller = new (std::nothrow) zmq::socket_poller_t;
    1107           3 :     alloc_assert (poller);
    1108           3 :     return poller;
    1109             : }
    1110             : 
    1111           3 : int zmq_poller_destroy (void **poller_p_)
    1112             : {
    1113           3 :     void *poller = *poller_p_;
    1114           3 :     if (!poller || !((zmq::socket_poller_t*) poller)->check_tag ()) {
    1115           0 :         errno = EFAULT;
    1116           0 :         return -1;
    1117             :     }
    1118             : 
    1119           3 :     delete ((zmq::socket_poller_t*) poller);
    1120           3 :     *poller_p_ = NULL;
    1121           3 :     return 0;
    1122             : }
    1123             : 
    1124           6 : int zmq_poller_add (void *poller_, void *s_, void *user_data_, short events_)
    1125             : {
    1126           6 :     if (!poller_ || !((zmq::socket_poller_t*)poller_)->check_tag ()) {
    1127           0 :         errno = EFAULT;
    1128           0 :         return -1;
    1129             :     }
    1130             : 
    1131           6 :     if (!s_ || !((zmq::socket_base_t*)s_)->check_tag ()) {
    1132           0 :         errno = ENOTSOCK;
    1133           0 :         return -1;
    1134             :     }
    1135           6 :     zmq::socket_base_t *socket = (zmq::socket_base_t*)s_;
    1136             : 
    1137           6 :     return ((zmq::socket_poller_t*)poller_)->add (socket, user_data_, events_);
    1138             : }
    1139             : 
    1140             : #if defined _WIN32
    1141             : int zmq_poller_add_fd (void *poller_, SOCKET fd_, void *user_data_, short events_)
    1142             : #else
    1143           3 : int zmq_poller_add_fd (void *poller_, int fd_, void *user_data_, short events_)
    1144             : #endif
    1145             : {
    1146           3 :     if (!poller_ || !((zmq::socket_poller_t*)poller_)->check_tag ()) {
    1147           0 :         errno = EFAULT;
    1148           0 :         return -1;
    1149             :     }
    1150             : 
    1151           3 :     return ((zmq::socket_poller_t*)poller_)->add_fd (fd_, user_data_, events_);
    1152             : }
    1153             : 
    1154             : 
    1155           3 : int zmq_poller_modify (void *poller_, void *s_, short events_)
    1156             : {
    1157           3 :     if (!poller_ || !((zmq::socket_poller_t*)poller_)->check_tag ()) {
    1158           0 :         errno = EFAULT;
    1159           0 :         return -1;
    1160             :     }
    1161             : 
    1162           3 :     if (!s_ || !((zmq::socket_base_t*)s_)->check_tag ()) {
    1163           0 :         errno = ENOTSOCK;
    1164           0 :         return -1;
    1165             :     }
    1166           3 :     zmq::socket_base_t *socket = (zmq::socket_base_t*)s_;
    1167             : 
    1168           3 :     return ((zmq::socket_poller_t*)poller_)->modify (socket, events_);
    1169             : }
    1170             : 
    1171             : 
    1172             : #if defined _WIN32
    1173             : int zmq_poller_modify_fd (void *poller_, SOCKET fd_, short events_)
    1174             : #else
    1175           0 : int zmq_poller_modify_fd (void *poller_, int fd_, short events_)
    1176             : #endif
    1177             : {
    1178           0 :     if (!poller_ || !((zmq::socket_poller_t*)poller_)->check_tag ()) {
    1179           0 :         errno = EFAULT;
    1180           0 :         return -1;
    1181             :     }
    1182             : 
    1183           0 :     return ((zmq::socket_poller_t*)poller_)->modify_fd (fd_, events_);
    1184             : }
    1185             : 
    1186             : 
    1187           3 : int zmq_poller_remove (void *poller_, void *s_)
    1188             : {
    1189           3 :     if (!poller_ || !((zmq::socket_poller_t*)poller_)->check_tag ()) {
    1190           0 :         errno = EFAULT;
    1191           0 :         return -1;
    1192             :     }
    1193             : 
    1194           3 :     if (!s_ || !((zmq::socket_base_t*)s_)->check_tag ()) {
    1195           0 :         errno = ENOTSOCK;
    1196           0 :         return -1;
    1197             :     }
    1198           3 :     zmq::socket_base_t *socket = (zmq::socket_base_t*)s_;
    1199             : 
    1200           3 :     return ((zmq::socket_poller_t*)poller_)->remove (socket);
    1201             : }
    1202             : 
    1203             : #if defined _WIN32
    1204             : int zmq_poller_remove_fd (void *poller_, SOCKET fd_)
    1205             : #else
    1206           3 : int zmq_poller_remove_fd (void *poller_, int fd_)
    1207             : #endif
    1208             : {
    1209           3 :     if (!poller_ || !((zmq::socket_poller_t*)poller_)->check_tag ()) {
    1210           0 :         errno = EFAULT;
    1211           0 :         return -1;
    1212             :     }
    1213             : 
    1214           3 :     return ((zmq::socket_poller_t*)poller_)->remove_fd (fd_);
    1215             : }
    1216             : 
    1217             : 
    1218          18 : int zmq_poller_wait (void *poller_, zmq_poller_event_t *event, long timeout_)
    1219             : {
    1220          18 :     if (!poller_ || !((zmq::socket_poller_t*)poller_)->check_tag ()) {
    1221           0 :         errno = EFAULT;
    1222           0 :         return -1;
    1223             :     }
    1224             : 
    1225          18 :     zmq_assert (event != NULL);
    1226             : 
    1227             :     zmq::socket_poller_t::event_t e;
    1228             :     memset (&e, 0, sizeof (e));
    1229             : 
    1230          18 :     int rc = ((zmq::socket_poller_t*)poller_)->wait (&e, timeout_);
    1231             : 
    1232          18 :     event->socket = e.socket;
    1233          18 :     event->fd = e.fd;
    1234          18 :     event->user_data = e.user_data;
    1235          18 :     event->events = e.events;
    1236             : 
    1237          18 :     return rc;
    1238             : }
    1239             : 
    1240             : //  Timers
    1241             : 
    1242           3 : void *zmq_timers_new (void)
    1243             : {
    1244           3 :     zmq::timers_t *timers = new (std::nothrow) zmq::timers_t;
    1245           3 :     alloc_assert (timers);
    1246           3 :     return timers;
    1247             : }
    1248             : 
    1249           3 : int zmq_timers_destroy (void **timers_p_)
    1250             : {
    1251           3 :     void *timers = *timers_p_;
    1252           3 :     if (!timers || !((zmq::timers_t *) timers)->check_tag ()) {
    1253           0 :         errno = EFAULT;
    1254           0 :         return -1;
    1255             :     }
    1256           3 :     delete ((zmq::timers_t *) timers);
    1257           3 :     *timers_p_ = NULL;
    1258           3 :     return 0;
    1259             : }
    1260             : 
    1261           3 : int zmq_timers_add (void *timers_, size_t interval_, zmq_timer_fn handler_, void *arg_)
    1262             : {
    1263           3 :     if (!timers_ || !((zmq::timers_t*)timers_)->check_tag ()) {
    1264           0 :         errno = EFAULT;
    1265           0 :         return -1;
    1266             :     }
    1267             : 
    1268           3 :     return ((zmq::timers_t*)timers_)->add (interval_, handler_, arg_);
    1269             : }
    1270             : 
    1271           3 : int zmq_timers_cancel (void *timers_, int timer_id_)
    1272             : {
    1273           3 :     if (!timers_ || !((zmq::timers_t*)timers_)->check_tag ()) {
    1274           0 :         errno = EFAULT;
    1275           0 :         return -1;
    1276             :     }
    1277             : 
    1278           3 :     return ((zmq::timers_t*)timers_)->cancel (timer_id_);
    1279             : }
    1280             : 
    1281           3 : int zmq_timers_set_interval (void *timers_, int timer_id_, size_t interval_)
    1282             : {
    1283           3 :     if (!timers_ || !((zmq::timers_t*)timers_)->check_tag ()) {
    1284           0 :         errno = EFAULT;
    1285           0 :         return -1;
    1286             :     }
    1287             : 
    1288           3 :     return ((zmq::timers_t*)timers_)->set_interval (timer_id_, interval_);
    1289             : }
    1290             : 
    1291           3 : int zmq_timers_reset (void *timers_, int timer_id_)
    1292             : {
    1293           3 :     if (!timers_ || !((zmq::timers_t*)timers_)->check_tag ()) {
    1294           0 :         errno = EFAULT;
    1295           0 :         return -1;
    1296             :     }
    1297             : 
    1298           3 :     return ((zmq::timers_t*)timers_)->reset (timer_id_);
    1299             : }
    1300             : 
    1301          27 : long zmq_timers_timeout (void *timers_)
    1302             : {
    1303          27 :     if (!timers_ || !((zmq::timers_t*)timers_)->check_tag ()) {
    1304           0 :         errno = EFAULT;
    1305           0 :         return -1;
    1306             :     }
    1307             : 
    1308          27 :     return ((zmq::timers_t*)timers_)->timeout ();
    1309             : }
    1310             : 
    1311          24 : int zmq_timers_execute (void *timers_)
    1312             : {
    1313          24 :     if (!timers_ || !((zmq::timers_t*)timers_)->check_tag ()) {
    1314           0 :         errno = EFAULT;
    1315           0 :         return -1;
    1316             :     }
    1317             : 
    1318          24 :     return ((zmq::timers_t*)timers_)->execute ();
    1319             : }
    1320             : 
    1321             : //  The proxy functionality
    1322             : 
    1323           0 : int zmq_proxy (void *frontend_, void *backend_, void *capture_)
    1324             : {
    1325           0 :     if (!frontend_ || !backend_) {
    1326           0 :         errno = EFAULT;
    1327           0 :         return -1;
    1328             :     }
    1329             :     return zmq::proxy (
    1330             :         (zmq::socket_base_t*) frontend_,
    1331             :         (zmq::socket_base_t*) backend_,
    1332           0 :         (zmq::socket_base_t*) capture_);
    1333             : }
    1334             : 
    1335           9 : int zmq_proxy_steerable (void *frontend_, void *backend_, void *capture_, void *control_)
    1336             : {
    1337           9 :     if (!frontend_ || !backend_) {
    1338           0 :         errno = EFAULT;
    1339           0 :         return -1;
    1340             :     }
    1341             :     return zmq::proxy (
    1342             :         (zmq::socket_base_t*) frontend_,
    1343             :         (zmq::socket_base_t*) backend_,
    1344             :         (zmq::socket_base_t*) capture_,
    1345           9 :         (zmq::socket_base_t*) control_);
    1346             : }
    1347             : 
    1348             : //  The deprecated device functionality
    1349             : 
    1350           0 : int zmq_device (int /* type */, void *frontend_, void *backend_)
    1351             : {
    1352             :     return zmq::proxy (
    1353             :         (zmq::socket_base_t*) frontend_,
    1354           0 :         (zmq::socket_base_t*) backend_, NULL);
    1355             : }
    1356             : 
    1357             : //  Probe library capabilities; for now, reports on transport and security
    1358             : 
    1359          21 : int zmq_has (const char *capability)
    1360             : {
    1361             : #if !defined (ZMQ_HAVE_WINDOWS) && !defined (ZMQ_HAVE_OPENVMS)
    1362          21 :     if (strcmp (capability, "ipc") == 0)
    1363             :         return true;
    1364             : #endif
    1365             : #if defined (ZMQ_HAVE_OPENPGM)
    1366             :     if (strcmp (capability, "pgm") == 0)
    1367             :         return true;
    1368             : #endif
    1369             : #if defined (ZMQ_HAVE_TIPC)
    1370             :     if (strcmp (capability, "tipc") == 0)
    1371             :         return true;
    1372             : #endif
    1373             : #if defined (ZMQ_HAVE_NORM)
    1374             :     if (strcmp (capability, "norm") == 0)
    1375             :         return true;
    1376             : #endif
    1377             : #if defined (ZMQ_HAVE_CURVE)
    1378          18 :     if (strcmp (capability, "curve") == 0)
    1379             :         return true;
    1380             : #endif
    1381             : #if defined (HAVE_LIBGSSAPI_KRB5)
    1382             :     if (strcmp (capability, "gssapi") == 0)
    1383             :         return true;
    1384             : #endif
    1385             : #if defined (ZMQ_HAVE_VMCI)
    1386             :     if (strcmp (capability, "vmci") == 0)
    1387             :         return true;
    1388             : #endif
    1389             :     //  Whatever the application asked for, we don't have
    1390          15 :     return false;
    1391             : }

Generated by: LCOV version 1.10