LCOV - code coverage report
Current view: top level - home/h/core/forks/m4-libzmq/src - socket_poller.cpp (source / functions) Hit Total Coverage
Test: zeromq-4.2.0 Code Coverage Lines: 147 178 82.6 %
Date: 2016-05-09 Functions: 10 11 90.9 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*
       2             :     Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
       3             : 
       4             :     This file is part of libzmq, the ZeroMQ core engine in C++.
       5             : 
       6             :     libzmq is free software; you can redistribute it and/or modify it under
       7             :     the terms of the GNU Lesser General Public License (LGPL) as published
       8             :     by the Free Software Foundation; either version 3 of the License, or
       9             :     (at your option) any later version.
      10             : 
      11             :     As a special exception, the Contributors give you permission to link
      12             :     this library with independent modules to produce an executable,
      13             :     regardless of the license terms of these independent modules, and to
      14             :     copy and distribute the resulting executable under terms of your choice,
      15             :     provided that you also meet, for each linked independent module, the
      16             :     terms and conditions of the license of that module. An independent
      17             :     module is a module which is not derived from or based on this library.
      18             :     If you modify this library, you must extend this exception to your
      19             :     version of the library.
      20             : 
      21             :     libzmq is distributed in the hope that it will be useful, but WITHOUT
      22             :     ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
      23             :     FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
      24             :     License for more details.
      25             : 
      26             :     You should have received a copy of the GNU Lesser General Public License
      27             :     along with this program.  If not, see <http://www.gnu.org/licenses/>.
      28             : */
      29             : 
      30             : #include "precompiled.hpp"
      31             : #include "socket_poller.hpp"
      32             : #include "err.hpp"
      33             : 
      34           3 : zmq::socket_poller_t::socket_poller_t () :
      35             :     tag (0xCAFEBABE),
      36             :     need_rebuild (true),
      37             :     use_signaler (false),
      38             :     poll_size(0)
      39             : #if defined ZMQ_POLL_BASED_ON_POLL
      40             :     ,
      41           6 :     pollfds (NULL)
      42             : #elif defined ZMQ_POLL_BASED_ON_SELECT
      43             :     ,
      44             :     maxfd(0)
      45             : #endif
      46             : {
      47             : #if defined ZMQ_POLL_BASED_ON_SELECT
      48             :     memset(&pollset_in, 0, sizeof(pollset_in));
      49             :     memset(&pollset_out, 0, sizeof(pollset_in));
      50             :     memset(&pollset_err, 0, sizeof(pollset_in));
      51             : #endif
      52           3 : }
      53             : 
      54           9 : zmq::socket_poller_t::~socket_poller_t ()
      55             : {
      56             :     //  Mark the socket_poller as dead
      57           3 :     tag = 0xdeadbeef;
      58             : 
      59          24 :     for (items_t::iterator it = items.begin(); it != items.end(); ++it) {
      60           3 :         if (it->socket && it->socket->check_tag()) {
      61             :             int thread_safe;
      62           0 :             size_t thread_safe_size = sizeof(int);
      63             : 
      64           0 :             if (it->socket->getsockopt (ZMQ_THREAD_SAFE, &thread_safe, &thread_safe_size) == 0 && thread_safe)
      65           0 :                 it->socket->remove_signaler (&signaler);
      66             :         }
      67             :     }
      68             : 
      69             : #if defined ZMQ_POLL_BASED_ON_POLL
      70           3 :     if (pollfds) {
      71           3 :         free (pollfds);
      72           3 :         pollfds = NULL;
      73             :     }
      74             : #endif
      75           3 : }
      76             : 
      77          39 : bool zmq::socket_poller_t::check_tag ()
      78             : {
      79          39 :     return tag == 0xCAFEBABE;
      80             : }
      81             : 
      82           6 : int zmq::socket_poller_t::add (socket_base_t *socket_, void* user_data_, short events_)
      83             : {
      84          30 :     for (items_t::iterator it = items.begin (); it != items.end (); ++it) {
      85           0 :         if (it->socket == socket_) {
      86           0 :             errno = EINVAL;
      87             :             return -1;
      88             :         }
      89             :     }
      90             : 
      91             :     int thread_safe;
      92           6 :     size_t thread_safe_size = sizeof(int);
      93             : 
      94           6 :     if (socket_->getsockopt (ZMQ_THREAD_SAFE, &thread_safe, &thread_safe_size) == -1)
      95             :         return -1;
      96             : 
      97           6 :     if (thread_safe) {
      98           3 :         if (socket_->add_signaler (&signaler) == -1)
      99             :            return -1;
     100             :     }
     101             : 
     102             :     item_t item = {socket_, 0, user_data_, events_
     103             : #if defined ZMQ_POLL_BASED_ON_POLL
     104             :                    ,-1
     105             : #endif
     106           6 :     };
     107           6 :     items.push_back (item);
     108           6 :     need_rebuild = true;
     109             : 
     110           6 :     return 0;
     111             : }
     112             : 
     113           3 : int zmq::socket_poller_t::add_fd (fd_t fd_, void *user_data_, short events_)
     114             : {
     115          15 :    for (items_t::iterator it = items.begin (); it != items.end (); ++it) {
     116           0 :         if (!it->socket && it->fd == fd_) {
     117           0 :             errno = EINVAL;
     118             :             return -1;
     119             :         }
     120             :     }
     121             : 
     122             :     item_t item = {NULL, fd_, user_data_, events_
     123             : #if defined ZMQ_POLL_BASED_ON_POLL
     124             :                    ,-1
     125             : #endif
     126           3 :                    };
     127           3 :     items.push_back (item);
     128           3 :     need_rebuild = true;
     129             : 
     130           3 :     return 0;
     131             : }
     132             : 
     133           3 : int zmq::socket_poller_t::modify (socket_base_t  *socket_, short events_)
     134             : {
     135           3 :     items_t::iterator it;
     136             : 
     137          15 :     for (it = items.begin (); it != items.end (); ++it) {
     138           3 :         if (it->socket == socket_)
     139             :             break;
     140             :     }
     141             : 
     142           9 :     if (it == items.end()) {
     143           0 :         errno = EINVAL;
     144           0 :         return -1;
     145             :     }
     146             : 
     147           3 :     it->events = events_;
     148           3 :     need_rebuild = true;
     149             : 
     150           3 :     return 0;
     151             : }
     152             : 
     153             : 
     154           0 : int zmq::socket_poller_t::modify_fd (fd_t fd_, short events_)
     155             : {
     156           0 :     items_t::iterator it;
     157             : 
     158           0 :     for (it = items.begin (); it != items.end (); ++it) {
     159           0 :         if (!it->socket && it->fd == fd_)
     160             :             break;
     161             :     }
     162             : 
     163           0 :     if (it == items.end()) {
     164           0 :         errno = EINVAL;
     165           0 :         return -1;
     166             :     }
     167             : 
     168           0 :     it->events = events_;
     169           0 :     need_rebuild = true;
     170             : 
     171           0 :     return 0;
     172             : }
     173             : 
     174             : 
     175           3 : int zmq::socket_poller_t::remove (socket_base_t *socket_)
     176             : {
     177           3 :     items_t::iterator it;
     178             : 
     179          15 :     for (it = items.begin (); it != items.end (); ++it) {
     180           3 :         if (it->socket == socket_)
     181             :             break;
     182             :     }
     183             : 
     184           9 :     if (it == items.end()) {
     185           0 :         errno = EINVAL;
     186           0 :         return -1;
     187             :     }
     188             : 
     189           3 :     items.erase(it);
     190           3 :     need_rebuild = true;
     191             : 
     192             :     int thread_safe;
     193           3 :     size_t thread_safe_size = sizeof(int);
     194             : 
     195           3 :     if (socket_->getsockopt (ZMQ_THREAD_SAFE, &thread_safe, &thread_safe_size) == 0 && thread_safe)
     196           0 :         socket_->remove_signaler (&signaler);
     197             : 
     198             :     return 0;
     199             : }
     200             : 
     201           3 : int zmq::socket_poller_t::remove_fd (fd_t fd_)
     202             : {
     203           3 :     items_t::iterator it;
     204             : 
     205          15 :     for (it = items.begin (); it != items.end (); ++it) {
     206           3 :         if (!it->socket && it->fd == fd_)
     207             :             break;
     208             :     }
     209             : 
     210           9 :     if (it == items.end()) {
     211           0 :         errno = EINVAL;
     212           0 :         return -1;
     213             :     }
     214             : 
     215           3 :     items.erase (it);
     216           3 :     need_rebuild = true;
     217             : 
     218           3 :     return 0;
     219             : }
     220             : 
     221          15 : int zmq::socket_poller_t::rebuild ()
     222             : {
     223             : #if defined ZMQ_POLL_BASED_ON_POLL
     224             : 
     225          15 :     if (pollfds) {
     226           9 :         free (pollfds);
     227           9 :         pollfds = NULL;
     228             :     }
     229             : 
     230          15 :     use_signaler = false;
     231             : 
     232          15 :     poll_size = 0;
     233             : 
     234         111 :     for (items_t::iterator it = items.begin (); it != items.end (); ++it) {
     235          12 :         if (it->events) {
     236          12 :             if (it->socket) {
     237             :                 int thread_safe;
     238           9 :                 size_t thread_safe_size = sizeof(int);
     239             : 
     240           9 :                 if (it->socket->getsockopt (ZMQ_THREAD_SAFE, &thread_safe, &thread_safe_size) == -1)
     241           0 :                     return -1;
     242             : 
     243           9 :                 if (thread_safe) {
     244           6 :                     if (!use_signaler) {
     245           6 :                         use_signaler = true;
     246           6 :                         poll_size++;
     247             :                     }
     248             :                 }
     249             :                 else
     250           3 :                     poll_size++;
     251             :             }
     252             :             else
     253           3 :                 poll_size++;
     254             :         }
     255             :     }
     256             : 
     257          15 :     if (poll_size == 0)
     258             :         return 0;
     259             : 
     260          12 :     pollfds = (pollfd*) malloc (poll_size * sizeof (pollfd));
     261          12 :     alloc_assert (pollfds);
     262             : 
     263          12 :     int item_nbr = 0;
     264             : 
     265          12 :     if (use_signaler) {
     266           6 :         item_nbr = 1;
     267           6 :         pollfds[0].fd = signaler.get_fd();
     268           6 :         pollfds[0].events = POLLIN;
     269             :     }
     270             : 
     271          96 :     for (items_t::iterator it = items.begin (); it != items.end (); ++it) {
     272          12 :         if (it->events) {
     273          12 :             if (it->socket) {
     274             :                 int thread_safe;
     275           9 :                 size_t thread_safe_size = sizeof(int);
     276             : 
     277           9 :                 if (it->socket->getsockopt (ZMQ_THREAD_SAFE, &thread_safe, &thread_safe_size) == -1)
     278           0 :                     return -1;
     279             : 
     280           9 :                 if (!thread_safe) {
     281           3 :                     size_t fd_size = sizeof (zmq::fd_t);
     282           3 :                     if (it->socket->getsockopt (ZMQ_FD, &pollfds [item_nbr].fd, &fd_size) == -1) {
     283           0 :                         return -1;
     284             :                     }
     285             : 
     286           3 :                     pollfds [item_nbr].events = POLLIN;
     287           3 :                     item_nbr++;
     288             :                 }
     289             :             }
     290             :             else {
     291           3 :                 pollfds [item_nbr].fd = it->fd;
     292           3 :                 pollfds [item_nbr].events =
     293             :                     (it->events & ZMQ_POLLIN ? POLLIN : 0) |
     294           3 :                     (it->events & ZMQ_POLLOUT ? POLLOUT : 0) |
     295           6 :                     (it->events & ZMQ_POLLPRI ? POLLPRI : 0);
     296           3 :                 it->pollfd_index = item_nbr;
     297           3 :                 item_nbr++;
     298             :             }
     299             :         }
     300             :     }
     301             : 
     302             :  #elif defined ZMQ_POLL_BASED_ON_SELECT
     303             : 
     304             :     FD_ZERO (&pollset_in);
     305             :     FD_ZERO (&pollset_out);
     306             :     FD_ZERO (&pollset_err);
     307             : 
     308             :     //  Ensure we do not attempt to select () on more than FD_SETSIZE
     309             :     //  file descriptors.
     310             :     zmq_assert (items.size () <= FD_SETSIZE);
     311             : 
     312             :     poll_size = 0;
     313             : 
     314             :     use_signaler = false;
     315             : 
     316             :     for (items_t::iterator it = items.begin (); it != items.end (); ++it) {
     317             :         if (it->socket) {
     318             :             int thread_safe;
     319             :             size_t thread_safe_size = sizeof(int);
     320             : 
     321             :             if (it->socket->getsockopt (ZMQ_THREAD_SAFE, &thread_safe, &thread_safe_size) == -1)
     322             :                 return -1;
     323             : 
     324             :             if (thread_safe && it->events) {
     325             :                 use_signaler = true;
     326             :                 FD_SET (signaler.get_fd (), &pollset_in);
     327             :                 poll_size = 1;
     328             :                 break;
     329             :             }
     330             :         }
     331             :     }
     332             : 
     333             :     maxfd = 0;
     334             : 
     335             :     //  Build the fd_sets for passing to select ().
     336             :     for (items_t::iterator it = items.begin (); it != items.end (); ++it) {
     337             :         if (it->events) {
     338             :             //  If the poll item is a 0MQ socket we are interested in input on the
     339             :             //  notification file descriptor retrieved by the ZMQ_FD socket option.
     340             :             if (it->socket) {
     341             :                 int thread_safe;
     342             :                 size_t thread_safe_size = sizeof(int);
     343             : 
     344             :                 if (it->socket->getsockopt (ZMQ_THREAD_SAFE, &thread_safe, &thread_safe_size) == -1)
     345             :                     return -1;
     346             : 
     347             :                 if (!thread_safe) {
     348             :                     zmq::fd_t notify_fd;
     349             :                     size_t fd_size = sizeof (zmq::fd_t);
     350             :                     if (it->socket->getsockopt (ZMQ_FD, &notify_fd, &fd_size) == -1)
     351             :                         return -1;
     352             : 
     353             :                     FD_SET (notify_fd, &pollset_in);
     354             :                     if (maxfd < notify_fd)
     355             :                         maxfd = notify_fd;
     356             : 
     357             :                     poll_size++;
     358             :                 }
     359             :             }
     360             :             //  Else, the poll item is a raw file descriptor. Convert the poll item
     361             :             //  events to the appropriate fd_sets.
     362             :             else {
     363             :                 if (it->events & ZMQ_POLLIN)
     364             :                     FD_SET (it->fd, &pollset_in);
     365             :                 if (it->events & ZMQ_POLLOUT)
     366             :                     FD_SET (it->fd, &pollset_out);
     367             :                 if (it->events & ZMQ_POLLERR)
     368             :                     FD_SET (it->fd, &pollset_err);
     369             :                 if (maxfd < it->fd)
     370             :                     maxfd = it->fd;
     371             : 
     372             :                 poll_size++;
     373             :             }
     374             :         }
     375             :     }
     376             : 
     377             : #endif
     378             : 
     379          12 :     need_rebuild = false;
     380          12 :     return 0;
     381             : }
     382             : 
     383          18 : int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *event_, long timeout_)
     384             : {
     385          18 :     if (need_rebuild)
     386          15 :         if (rebuild () == -1)
     387             :             return -1;
     388             : 
     389             : #if defined ZMQ_POLL_BASED_ON_POLL
     390          18 :     if (unlikely (poll_size == 0)) {
     391             :         // We'll report an error (timed out) as if the list was non-empty and
     392             :         // no event occured within the specified timeout. Otherwise the caller
     393             :         // needs to check the return value AND the event to avoid using the
     394             :         // nullified event data.
     395           3 :         errno = ETIMEDOUT;
     396           3 :         if (timeout_ == 0)
     397             :             return -1;
     398             : #if defined ZMQ_HAVE_WINDOWS
     399             :         Sleep (timeout_ > 0 ? timeout_ : INFINITE);
     400             :         return -1;
     401             : #elif defined ZMQ_HAVE_ANDROID
     402             :         usleep (timeout_ * 1000);
     403             :         return -1;
     404             : #else
     405           0 :         usleep (timeout_ * 1000);
     406             :         return -1;
     407             : #endif
     408             :     }
     409             : 
     410          15 :     zmq::clock_t clock;
     411             :     uint64_t now = 0;
     412             :     uint64_t end = 0;
     413             : 
     414             :     bool first_pass = true;
     415             : 
     416             :     while (true) {
     417             :         //  Compute the timeout for the subsequent poll.
     418             :         int timeout;
     419          23 :         if (first_pass)
     420             :             timeout = 0;
     421             :         else
     422           8 :         if (timeout_ < 0)
     423             :             timeout = -1;
     424             :         else
     425           5 :             timeout = end - now;
     426             : 
     427             :         //  Wait for events.
     428             :         while (true) {
     429          46 :             int rc = poll (pollfds, poll_size, timeout);
     430          23 :             if (rc == -1 && errno == EINTR) {
     431             :                 return -1;
     432             :             }
     433          23 :             errno_assert (rc >= 0);
     434             :             break;
     435             :         }
     436             : 
     437             :         //  Receive the signal from pollfd
     438          23 :         if (use_signaler && pollfds[0].revents & POLLIN)
     439           6 :             signaler.recv ();
     440             : 
     441             :         //  Check for the events.
     442         148 :         for (items_t::iterator it = items.begin (); it != items.end (); ++it) {
     443             : 
     444             :             //  The poll item is a 0MQ socket. Retrieve pending events
     445             :             //  using the ZMQ_EVENTS socket option.
     446          23 :             if (it->socket) {
     447          20 :                 size_t events_size = sizeof (uint32_t);
     448             :                 uint32_t events;
     449          20 :                 if (it->socket->getsockopt (ZMQ_EVENTS, &events, &events_size) == -1) {
     450           9 :                     return -1;
     451             :                 }
     452             : 
     453          20 :                 if (it->events & events) {
     454           9 :                     event_->socket = it->socket;
     455           9 :                     event_->user_data = it->user_data;
     456           9 :                     event_->events = it->events & events;
     457             : 
     458             :                     //  If there is event to return, we can exit immediately.
     459           9 :                     return 0;
     460             :                 }
     461             :             }
     462             :             //  Else, the poll item is a raw file descriptor, simply convert
     463             :             //  the events to zmq_pollitem_t-style format.
     464             :             else {
     465           3 :                 short revents = pollfds [it->pollfd_index].revents;
     466           3 :                 short events = 0;
     467             : 
     468           3 :                 if (revents & POLLIN)
     469           3 :                     events |= ZMQ_POLLIN;
     470           3 :                 if (revents & POLLOUT)
     471           0 :                     events |= ZMQ_POLLOUT;
     472           3 :                 if (revents & POLLPRI)
     473           0 :                     events |= ZMQ_POLLPRI;
     474           3 :                 if (revents & ~(POLLIN | POLLOUT | POLLPRI))
     475           0 :                     events |= ZMQ_POLLERR;
     476             : 
     477           3 :                 if (events) {
     478           3 :                     event_->socket = NULL;
     479           3 :                     event_->user_data = it->user_data;
     480           3 :                     event_->fd = it->fd;
     481           3 :                     event_->events = events;
     482             : 
     483             :                     //  If there is event to return, we can exit immediately.
     484           3 :                     return 0;
     485             :                 }
     486             :             }
     487             :         }
     488             : 
     489             :         //  If timeout is zero, exit immediately whether there are events or not.
     490          11 :         if (timeout_ == 0)
     491             :             break;
     492             : 
     493             :         //  At this point we are meant to wait for events but there are none.
     494             :         //  If timeout is infinite we can just loop until we get some events.
     495           8 :         if (timeout_ < 0) {
     496           3 :             if (first_pass)
     497           3 :                 first_pass = false;
     498             :             continue;
     499             :         }
     500             : 
     501             :         //  The timeout is finite and there are no events. In the first pass
     502             :         //  we get a timestamp of when the polling have begun. (We assume that
     503             :         //  first pass have taken negligible time). We also compute the time
     504             :         //  when the polling should time out.
     505           5 :         if (first_pass) {
     506           3 :             now = clock.now_ms ();
     507           3 :             end = now + timeout_;
     508           3 :             if (now == end)
     509             :                 break;
     510             :             first_pass = false;
     511             :             continue;
     512             :         }
     513             : 
     514             :         //  Find out whether timeout have expired.
     515           2 :         now = clock.now_ms ();
     516           2 :         if (now >= end)
     517             :             break;
     518             :     }
     519             : 
     520           3 :     errno = ETIMEDOUT;
     521           3 :     return -1;
     522             : 
     523             : #elif defined ZMQ_POLL_BASED_ON_SELECT
     524             : 
     525             :     if (unlikely (poll_size == 0)) {
     526             :         // We'll report an error (timed out) as if the list was non-empty and
     527             :         // no event occured within the specified timeout. Otherwise the caller
     528             :         // needs to check the return value AND the event to avoid using the
     529             :         // nullified event data.
     530             :         errno = ETIMEDOUT;
     531             :         if (timeout_ == 0)
     532             :             return -1;
     533             : #if defined ZMQ_HAVE_WINDOWS
     534             :         Sleep (timeout_ > 0 ? timeout_ : INFINITE);
     535             :         return -1;
     536             : #else
     537             :         usleep (timeout_ * 1000);
     538             :         return -1;
     539             : #endif
     540             :     }
     541             :     zmq::clock_t clock;
     542             :     uint64_t now = 0;
     543             :     uint64_t end = 0;
     544             : 
     545             :     bool first_pass = true;
     546             :     fd_set inset, outset, errset;
     547             : 
     548             :     while (true) {
     549             : 
     550             :         //  Compute the timeout for the subsequent poll.
     551             :         timeval timeout;
     552             :         timeval *ptimeout;
     553             :         if (first_pass) {
     554             :             timeout.tv_sec = 0;
     555             :             timeout.tv_usec = 0;
     556             :             ptimeout = &timeout;
     557             :         }
     558             :         else
     559             :         if (timeout_ < 0)
     560             :             ptimeout = NULL;
     561             :         else {
     562             :             timeout.tv_sec = (long) ((end - now) / 1000);
     563             :             timeout.tv_usec = (long) ((end - now) % 1000 * 1000);
     564             :             ptimeout = &timeout;
     565             :         }
     566             : 
     567             :         //  Wait for events. Ignore interrupts if there's infinite timeout.
     568             :         while (true) {
     569             :             memcpy (&inset, &pollset_in, sizeof (fd_set));
     570             :             memcpy (&outset, &pollset_out, sizeof (fd_set));
     571             :             memcpy (&errset, &pollset_err, sizeof (fd_set));
     572             : #if defined ZMQ_HAVE_WINDOWS
     573             :             int rc = select (0, &inset, &outset, &errset, ptimeout);
     574             :             if (unlikely (rc == SOCKET_ERROR)) {
     575             :                 errno = zmq::wsa_error_to_errno (WSAGetLastError ());
     576             :                 wsa_assert (errno == ENOTSOCK);
     577             :                 return -1;
     578             :             }
     579             : #else
     580             :             int rc = select (maxfd + 1, &inset, &outset, &errset, ptimeout);
     581             :             if (unlikely (rc == -1)) {
     582             :                 errno_assert (errno == EINTR || errno == EBADF);
     583             :                 return -1;
     584             :             }
     585             : #endif
     586             :             break;
     587             :         }
     588             : 
     589             :         if (use_signaler && FD_ISSET (signaler.get_fd (), &inset))
     590             :             signaler.recv ();
     591             : 
     592             :         //  Check for the events.
     593             :         for (items_t::iterator it = items.begin (); it != items.end (); ++it) {
     594             : 
     595             :             //  The poll item is a 0MQ socket. Retrieve pending events
     596             :             //  using the ZMQ_EVENTS socket option.
     597             :             if (it->socket) {
     598             :                 size_t events_size = sizeof (uint32_t);
     599             :                 uint32_t events;
     600             :                 if (it->socket->getsockopt (ZMQ_EVENTS, &events, &events_size) == -1)
     601             :                     return -1;
     602             : 
     603             :                 if (it->events & events) {
     604             :                     event_->socket = it->socket;
     605             :                     event_->user_data = it->user_data;
     606             :                     event_->events = it->events & events;
     607             : 
     608             :                     //  If there is event to return, we can exit immediately.
     609             :                     return 0;
     610             :                 }
     611             :             }
     612             :             //  Else, the poll item is a raw file descriptor, simply convert
     613             :             //  the events to zmq_pollitem_t-style format.
     614             :             else {
     615             :                 short events = 0;
     616             : 
     617             :                 if (FD_ISSET (it->fd, &inset))
     618             :                     events |= ZMQ_POLLIN;
     619             :                 if (FD_ISSET (it->fd, &outset))
     620             :                     events |= ZMQ_POLLOUT;
     621             :                 if (FD_ISSET (it->fd, &errset))
     622             :                     events |= ZMQ_POLLERR;
     623             : 
     624             :                 if (events) {
     625             :                     event_->socket = NULL;
     626             :                     event_->user_data = it->user_data;
     627             :                     event_->fd = it->fd;
     628             :                     event_->events = events;
     629             : 
     630             :                     //  If there is event to return, we can exit immediately.
     631             :                     return 0;
     632             :                 }
     633             :             }
     634             :         }
     635             : 
     636             :         //  If timeout is zero, exit immediately whether there are events or not.
     637             :         if (timeout_ == 0)
     638             :             break;
     639             : 
     640             :         //  At this point we are meant to wait for events but there are none.
     641             :         //  If timeout is infinite we can just loop until we get some events.
     642             :         if (timeout_ < 0) {
     643             :             if (first_pass)
     644             :                 first_pass = false;
     645             :             continue;
     646             :         }
     647             : 
     648             :         //  The timeout is finite and there are no events. In the first pass
     649             :         //  we get a timestamp of when the polling have begun. (We assume that
     650             :         //  first pass have taken negligible time). We also compute the time
     651             :         //  when the polling should time out.
     652             :         if (first_pass) {
     653             :             now = clock.now_ms ();
     654             :             end = now + timeout_;
     655             :             if (now == end)
     656             :                 break;
     657             :             first_pass = false;
     658             :             continue;
     659             :         }
     660             : 
     661             :         //  Find out whether timeout have expired.
     662             :         now = clock.now_ms ();
     663             :         if (now >= end)
     664             :             break;
     665             :     }
     666             : 
     667             :     errno = ETIMEDOUT;
     668             :     return -1;
     669             : 
     670             : #else
     671             :     //  Exotic platforms that support neither poll() nor select().
     672             :     errno = ENOTSUP;
     673             :     return -1;
     674             : #endif
     675             : }

Generated by: LCOV version 1.10