libzmq  master
ZeroMQ C++ Core Engine (LIBZMQ)
socket_poller.cpp
Go to the documentation of this file.
1 /*
2  Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 
4  This file is part of libzmq, the ZeroMQ core engine in C++.
5 
6  libzmq is free software; you can redistribute it and/or modify it under
7  the terms of the GNU Lesser General Public License (LGPL) as published
8  by the Free Software Foundation; either version 3 of the License, or
9  (at your option) any later version.
10 
11  As a special exception, the Contributors give you permission to link
12  this library with independent modules to produce an executable,
13  regardless of the license terms of these independent modules, and to
14  copy and distribute the resulting executable under terms of your choice,
15  provided that you also meet, for each linked independent module, the
16  terms and conditions of the license of that module. An independent
17  module is a module which is not derived from or based on this library.
18  If you modify this library, you must extend this exception to your
19  version of the library.
20 
21  libzmq is distributed in the hope that it will be useful, but WITHOUT
22  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23  FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24  License for more details.
25 
26  You should have received a copy of the GNU Lesser General Public License
27  along with this program. If not, see <http://www.gnu.org/licenses/>.
28 */
29 
30 #include "precompiled.hpp"
31 #include "socket_poller.hpp"
32 #include "err.hpp"
33 
35  tag (0xCAFEBABE),
36  need_rebuild (true),
37  use_signaler (false),
38  poll_size(0)
39 #if defined ZMQ_POLL_BASED_ON_POLL
40  ,
41  pollfds (NULL)
42 #elif defined ZMQ_POLL_BASED_ON_SELECT
43  ,
44  maxfd(0)
45 #endif
46 {
47 #if defined ZMQ_POLL_BASED_ON_SELECT
48  memset(&pollset_in, 0, sizeof(pollset_in));
49  memset(&pollset_out, 0, sizeof(pollset_in));
50  memset(&pollset_err, 0, sizeof(pollset_in));
51 #endif
52 }
53 
55 {
56  // Mark the socket_poller as dead
57  tag = 0xdeadbeef;
58 
59  for (items_t::iterator it = items.begin(); it != items.end(); ++it) {
60  if (it->socket && it->socket->check_tag()) {
61  int thread_safe;
62  size_t thread_safe_size = sizeof(int);
63 
64  if (it->socket->getsockopt (ZMQ_THREAD_SAFE, &thread_safe, &thread_safe_size) == 0 && thread_safe)
65  it->socket->remove_signaler (&signaler);
66  }
67  }
68 
69 #if defined ZMQ_POLL_BASED_ON_POLL
70  if (pollfds) {
71  free (pollfds);
72  pollfds = NULL;
73  }
74 #endif
75 }
76 
78 {
79  return tag == 0xCAFEBABE;
80 }
81 
82 int zmq::socket_poller_t::add (socket_base_t *socket_, void* user_data_, short events_)
83 {
84  for (items_t::iterator it = items.begin (); it != items.end (); ++it) {
85  if (it->socket == socket_) {
86  errno = EINVAL;
87  return -1;
88  }
89  }
90 
91  int thread_safe;
92  size_t thread_safe_size = sizeof(int);
93 
94  if (socket_->getsockopt (ZMQ_THREAD_SAFE, &thread_safe, &thread_safe_size) == -1)
95  return -1;
96 
97  if (thread_safe) {
98  if (socket_->add_signaler (&signaler) == -1)
99  return -1;
100  }
101 
102  item_t item = {socket_, 0, user_data_, events_
103 #if defined ZMQ_POLL_BASED_ON_POLL
104  ,-1
105 #endif
106  };
107  items.push_back (item);
108  need_rebuild = true;
109 
110  return 0;
111 }
112 
113 int zmq::socket_poller_t::add_fd (fd_t fd_, void *user_data_, short events_)
114 {
115  for (items_t::iterator it = items.begin (); it != items.end (); ++it) {
116  if (!it->socket && it->fd == fd_) {
117  errno = EINVAL;
118  return -1;
119  }
120  }
121 
122  item_t item = {NULL, fd_, user_data_, events_
123 #if defined ZMQ_POLL_BASED_ON_POLL
124  ,-1
125 #endif
126  };
127  items.push_back (item);
128  need_rebuild = true;
129 
130  return 0;
131 }
132 
133 int zmq::socket_poller_t::modify (socket_base_t *socket_, short events_)
134 {
135  items_t::iterator it;
136 
137  for (it = items.begin (); it != items.end (); ++it) {
138  if (it->socket == socket_)
139  break;
140  }
141 
142  if (it == items.end()) {
143  errno = EINVAL;
144  return -1;
145  }
146 
147  it->events = events_;
148  need_rebuild = true;
149 
150  return 0;
151 }
152 
153 
154 int zmq::socket_poller_t::modify_fd (fd_t fd_, short events_)
155 {
156  items_t::iterator it;
157 
158  for (it = items.begin (); it != items.end (); ++it) {
159  if (!it->socket && it->fd == fd_)
160  break;
161  }
162 
163  if (it == items.end()) {
164  errno = EINVAL;
165  return -1;
166  }
167 
168  it->events = events_;
169  need_rebuild = true;
170 
171  return 0;
172 }
173 
174 
176 {
177  items_t::iterator it;
178 
179  for (it = items.begin (); it != items.end (); ++it) {
180  if (it->socket == socket_)
181  break;
182  }
183 
184  if (it == items.end()) {
185  errno = EINVAL;
186  return -1;
187  }
188 
189  items.erase(it);
190  need_rebuild = true;
191 
192  int thread_safe;
193  size_t thread_safe_size = sizeof(int);
194 
195  if (socket_->getsockopt (ZMQ_THREAD_SAFE, &thread_safe, &thread_safe_size) == 0 && thread_safe)
196  socket_->remove_signaler (&signaler);
197 
198  return 0;
199 }
200 
202 {
203  items_t::iterator it;
204 
205  for (it = items.begin (); it != items.end (); ++it) {
206  if (!it->socket && it->fd == fd_)
207  break;
208  }
209 
210  if (it == items.end()) {
211  errno = EINVAL;
212  return -1;
213  }
214 
215  items.erase (it);
216  need_rebuild = true;
217 
218  return 0;
219 }
220 
222 {
223 #if defined ZMQ_POLL_BASED_ON_POLL
224 
225  if (pollfds) {
226  free (pollfds);
227  pollfds = NULL;
228  }
229 
230  use_signaler = false;
231 
232  poll_size = 0;
233 
234  for (items_t::iterator it = items.begin (); it != items.end (); ++it) {
235  if (it->events) {
236  if (it->socket) {
237  int thread_safe;
238  size_t thread_safe_size = sizeof(int);
239 
240  if (it->socket->getsockopt (ZMQ_THREAD_SAFE, &thread_safe, &thread_safe_size) == -1)
241  return -1;
242 
243  if (thread_safe) {
244  if (!use_signaler) {
245  use_signaler = true;
246  poll_size++;
247  }
248  }
249  else
250  poll_size++;
251  }
252  else
253  poll_size++;
254  }
255  }
256 
257  if (poll_size == 0)
258  return 0;
259 
260  pollfds = (pollfd*) malloc (poll_size * sizeof (pollfd));
262 
263  int item_nbr = 0;
264 
265  if (use_signaler) {
266  item_nbr = 1;
267  pollfds[0].fd = signaler.get_fd();
268  pollfds[0].events = POLLIN;
269  }
270 
271  for (items_t::iterator it = items.begin (); it != items.end (); ++it) {
272  if (it->events) {
273  if (it->socket) {
274  int thread_safe;
275  size_t thread_safe_size = sizeof(int);
276 
277  if (it->socket->getsockopt (ZMQ_THREAD_SAFE, &thread_safe, &thread_safe_size) == -1)
278  return -1;
279 
280  if (!thread_safe) {
281  size_t fd_size = sizeof (zmq::fd_t);
282  if (it->socket->getsockopt (ZMQ_FD, &pollfds [item_nbr].fd, &fd_size) == -1) {
283  return -1;
284  }
285 
286  pollfds [item_nbr].events = POLLIN;
287  item_nbr++;
288  }
289  }
290  else {
291  pollfds [item_nbr].fd = it->fd;
292  pollfds [item_nbr].events =
293  (it->events & ZMQ_POLLIN ? POLLIN : 0) |
294  (it->events & ZMQ_POLLOUT ? POLLOUT : 0) |
295  (it->events & ZMQ_POLLPRI ? POLLPRI : 0);
296  it->pollfd_index = item_nbr;
297  item_nbr++;
298  }
299  }
300  }
301 
302  #elif defined ZMQ_POLL_BASED_ON_SELECT
303 
304  FD_ZERO (&pollset_in);
305  FD_ZERO (&pollset_out);
306  FD_ZERO (&pollset_err);
307 
308  // Ensure we do not attempt to select () on more than FD_SETSIZE
309  // file descriptors.
310  zmq_assert (items.size () <= FD_SETSIZE);
311 
312  poll_size = 0;
313 
314  use_signaler = false;
315 
316  for (items_t::iterator it = items.begin (); it != items.end (); ++it) {
317  if (it->socket) {
318  int thread_safe;
319  size_t thread_safe_size = sizeof(int);
320 
321  if (it->socket->getsockopt (ZMQ_THREAD_SAFE, &thread_safe, &thread_safe_size) == -1)
322  return -1;
323 
324  if (thread_safe && it->events) {
325  use_signaler = true;
326  FD_SET (signaler.get_fd (), &pollset_in);
327  poll_size = 1;
328  break;
329  }
330  }
331  }
332 
333  maxfd = 0;
334 
335  // Build the fd_sets for passing to select ().
336  for (items_t::iterator it = items.begin (); it != items.end (); ++it) {
337  if (it->events) {
338  // If the poll item is a 0MQ socket we are interested in input on the
339  // notification file descriptor retrieved by the ZMQ_FD socket option.
340  if (it->socket) {
341  int thread_safe;
342  size_t thread_safe_size = sizeof(int);
343 
344  if (it->socket->getsockopt (ZMQ_THREAD_SAFE, &thread_safe, &thread_safe_size) == -1)
345  return -1;
346 
347  if (!thread_safe) {
348  zmq::fd_t notify_fd;
349  size_t fd_size = sizeof (zmq::fd_t);
350  if (it->socket->getsockopt (ZMQ_FD, &notify_fd, &fd_size) == -1)
351  return -1;
352 
353  FD_SET (notify_fd, &pollset_in);
354  if (maxfd < notify_fd)
355  maxfd = notify_fd;
356 
357  poll_size++;
358  }
359  }
360  // Else, the poll item is a raw file descriptor. Convert the poll item
361  // events to the appropriate fd_sets.
362  else {
363  if (it->events & ZMQ_POLLIN)
364  FD_SET (it->fd, &pollset_in);
365  if (it->events & ZMQ_POLLOUT)
366  FD_SET (it->fd, &pollset_out);
367  if (it->events & ZMQ_POLLERR)
368  FD_SET (it->fd, &pollset_err);
369  if (maxfd < it->fd)
370  maxfd = it->fd;
371 
372  poll_size++;
373  }
374  }
375  }
376 
377 #endif
378 
379  need_rebuild = false;
380  return 0;
381 }
382 
384 {
385  if (need_rebuild)
386  if (rebuild () == -1)
387  return -1;
388 
389 #if defined ZMQ_POLL_BASED_ON_POLL
390  if (unlikely (poll_size == 0)) {
391  // We'll report an error (timed out) as if the list was non-empty and
392  // no event occured within the specified timeout. Otherwise the caller
393  // needs to check the return value AND the event to avoid using the
394  // nullified event data.
395  errno = ETIMEDOUT;
396  if (timeout_ == 0)
397  return -1;
398 #if defined ZMQ_HAVE_WINDOWS
399  Sleep (timeout_ > 0 ? timeout_ : INFINITE);
400  return -1;
401 #elif defined ZMQ_HAVE_ANDROID
402  usleep (timeout_ * 1000);
403  return -1;
404 #else
405  usleep (timeout_ * 1000);
406  return -1;
407 #endif
408  }
409 
410  zmq::clock_t clock;
411  uint64_t now = 0;
412  uint64_t end = 0;
413 
414  bool first_pass = true;
415 
416  while (true) {
417  // Compute the timeout for the subsequent poll.
418  int timeout;
419  if (first_pass)
420  timeout = 0;
421  else
422  if (timeout_ < 0)
423  timeout = -1;
424  else
425  timeout = end - now;
426 
427  // Wait for events.
428  while (true) {
429  int rc = poll (pollfds, poll_size, timeout);
430  if (rc == -1 && errno == EINTR) {
431  return -1;
432  }
433  errno_assert (rc >= 0);
434  break;
435  }
436 
437  // Receive the signal from pollfd
438  if (use_signaler && pollfds[0].revents & POLLIN)
439  signaler.recv ();
440 
441  // Check for the events.
442  for (items_t::iterator it = items.begin (); it != items.end (); ++it) {
443 
444  // The poll item is a 0MQ socket. Retrieve pending events
445  // using the ZMQ_EVENTS socket option.
446  if (it->socket) {
447  size_t events_size = sizeof (uint32_t);
448  uint32_t events;
449  if (it->socket->getsockopt (ZMQ_EVENTS, &events, &events_size) == -1) {
450  return -1;
451  }
452 
453  if (it->events & events) {
454  event_->socket = it->socket;
455  event_->user_data = it->user_data;
456  event_->events = it->events & events;
457 
458  // If there is event to return, we can exit immediately.
459  return 0;
460  }
461  }
462  // Else, the poll item is a raw file descriptor, simply convert
463  // the events to zmq_pollitem_t-style format.
464  else {
465  short revents = pollfds [it->pollfd_index].revents;
466  short events = 0;
467 
468  if (revents & POLLIN)
469  events |= ZMQ_POLLIN;
470  if (revents & POLLOUT)
471  events |= ZMQ_POLLOUT;
472  if (revents & POLLPRI)
473  events |= ZMQ_POLLPRI;
474  if (revents & ~(POLLIN | POLLOUT | POLLPRI))
475  events |= ZMQ_POLLERR;
476 
477  if (events) {
478  event_->socket = NULL;
479  event_->user_data = it->user_data;
480  event_->fd = it->fd;
481  event_->events = events;
482 
483  // If there is event to return, we can exit immediately.
484  return 0;
485  }
486  }
487  }
488 
489  // If timeout is zero, exit immediately whether there are events or not.
490  if (timeout_ == 0)
491  break;
492 
493  // At this point we are meant to wait for events but there are none.
494  // If timeout is infinite we can just loop until we get some events.
495  if (timeout_ < 0) {
496  if (first_pass)
497  first_pass = false;
498  continue;
499  }
500 
501  // The timeout is finite and there are no events. In the first pass
502  // we get a timestamp of when the polling have begun. (We assume that
503  // first pass have taken negligible time). We also compute the time
504  // when the polling should time out.
505  if (first_pass) {
506  now = clock.now_ms ();
507  end = now + timeout_;
508  if (now == end)
509  break;
510  first_pass = false;
511  continue;
512  }
513 
514  // Find out whether timeout have expired.
515  now = clock.now_ms ();
516  if (now >= end)
517  break;
518  }
519 
520  errno = ETIMEDOUT;
521  return -1;
522 
523 #elif defined ZMQ_POLL_BASED_ON_SELECT
524 
525  if (unlikely (poll_size == 0)) {
526  // We'll report an error (timed out) as if the list was non-empty and
527  // no event occured within the specified timeout. Otherwise the caller
528  // needs to check the return value AND the event to avoid using the
529  // nullified event data.
530  errno = ETIMEDOUT;
531  if (timeout_ == 0)
532  return -1;
533 #if defined ZMQ_HAVE_WINDOWS
534  Sleep (timeout_ > 0 ? timeout_ : INFINITE);
535  return -1;
536 #else
537  usleep (timeout_ * 1000);
538  return -1;
539 #endif
540  }
541  zmq::clock_t clock;
542  uint64_t now = 0;
543  uint64_t end = 0;
544 
545  bool first_pass = true;
546  fd_set inset, outset, errset;
547 
548  while (true) {
549 
550  // Compute the timeout for the subsequent poll.
551  timeval timeout;
552  timeval *ptimeout;
553  if (first_pass) {
554  timeout.tv_sec = 0;
555  timeout.tv_usec = 0;
556  ptimeout = &timeout;
557  }
558  else
559  if (timeout_ < 0)
560  ptimeout = NULL;
561  else {
562  timeout.tv_sec = (long) ((end - now) / 1000);
563  timeout.tv_usec = (long) ((end - now) % 1000 * 1000);
564  ptimeout = &timeout;
565  }
566 
567  // Wait for events. Ignore interrupts if there's infinite timeout.
568  while (true) {
569  memcpy (&inset, &pollset_in, sizeof (fd_set));
570  memcpy (&outset, &pollset_out, sizeof (fd_set));
571  memcpy (&errset, &pollset_err, sizeof (fd_set));
572 #if defined ZMQ_HAVE_WINDOWS
573  int rc = select (0, &inset, &outset, &errset, ptimeout);
574  if (unlikely (rc == SOCKET_ERROR)) {
575  errno = zmq::wsa_error_to_errno (WSAGetLastError ());
576  wsa_assert (errno == ENOTSOCK);
577  return -1;
578  }
579 #else
580  int rc = select (maxfd + 1, &inset, &outset, &errset, ptimeout);
581  if (unlikely (rc == -1)) {
582  errno_assert (errno == EINTR || errno == EBADF);
583  return -1;
584  }
585 #endif
586  break;
587  }
588 
589  if (use_signaler && FD_ISSET (signaler.get_fd (), &inset))
590  signaler.recv ();
591 
592  // Check for the events.
593  for (items_t::iterator it = items.begin (); it != items.end (); ++it) {
594 
595  // The poll item is a 0MQ socket. Retrieve pending events
596  // using the ZMQ_EVENTS socket option.
597  if (it->socket) {
598  size_t events_size = sizeof (uint32_t);
599  uint32_t events;
600  if (it->socket->getsockopt (ZMQ_EVENTS, &events, &events_size) == -1)
601  return -1;
602 
603  if (it->events & events) {
604  event_->socket = it->socket;
605  event_->user_data = it->user_data;
606  event_->events = it->events & events;
607 
608  // If there is event to return, we can exit immediately.
609  return 0;
610  }
611  }
612  // Else, the poll item is a raw file descriptor, simply convert
613  // the events to zmq_pollitem_t-style format.
614  else {
615  short events = 0;
616 
617  if (FD_ISSET (it->fd, &inset))
618  events |= ZMQ_POLLIN;
619  if (FD_ISSET (it->fd, &outset))
620  events |= ZMQ_POLLOUT;
621  if (FD_ISSET (it->fd, &errset))
622  events |= ZMQ_POLLERR;
623 
624  if (events) {
625  event_->socket = NULL;
626  event_->user_data = it->user_data;
627  event_->fd = it->fd;
628  event_->events = events;
629 
630  // If there is event to return, we can exit immediately.
631  return 0;
632  }
633  }
634  }
635 
636  // If timeout is zero, exit immediately whether there are events or not.
637  if (timeout_ == 0)
638  break;
639 
640  // At this point we are meant to wait for events but there are none.
641  // If timeout is infinite we can just loop until we get some events.
642  if (timeout_ < 0) {
643  if (first_pass)
644  first_pass = false;
645  continue;
646  }
647 
648  // The timeout is finite and there are no events. In the first pass
649  // we get a timestamp of when the polling have begun. (We assume that
650  // first pass have taken negligible time). We also compute the time
651  // when the polling should time out.
652  if (first_pass) {
653  now = clock.now_ms ();
654  end = now + timeout_;
655  if (now == end)
656  break;
657  first_pass = false;
658  continue;
659  }
660 
661  // Find out whether timeout have expired.
662  now = clock.now_ms ();
663  if (now >= end)
664  break;
665  }
666 
667  errno = ETIMEDOUT;
668  return -1;
669 
670 #else
671  // Exotic platforms that support neither poll() nor select().
672  errno = ENOTSUP;
673  return -1;
674 #endif
675 }
int wait(event_t *event, long timeout)
int fd_t
Definition: fd.hpp:50
#define zmq_assert(x)
Definition: err.hpp:119
uint64_t now_ms()
Definition: clock.cpp:182
#define ENOTSUP
Definition: zmq.h:112
int add_signaler(signaler_t *s)
fd_t get_fd() const
Definition: signaler.cpp:166
#define ZMQ_POLLPRI
Definition: zmq.h:413
int modify(socket_base_t *socket, short events)
#define ZMQ_FD
Definition: zmq.h:273
int add(socket_base_t *socket, void *user_data, short events)
int getsockopt(int option_, void *optval_, size_t *optvallen_)
#define ENOTSOCK
Definition: zmq.h:136
#define ZMQ_THREAD_SAFE
Definition: zmq.h:332
#define unlikely(x)
Definition: likely.hpp:38
#define ETIMEDOUT
Definition: zmq.h:157
int modify_fd(fd_t fd, short events)
#define ZMQ_POLLERR
Definition: zmq.h:412
#define ZMQ_EVENTS
Definition: zmq.h:274
int remove(socket_base_t *socket)
#define ZMQ_POLLOUT
Definition: zmq.h:411
#define alloc_assert(x)
Definition: err.hpp:159
#define errno_assert(x)
Definition: err.hpp:129
int remove_signaler(signaler_t *s)
int add_fd(fd_t fd, void *user_data, short events)
#define ZMQ_POLLIN
Definition: zmq.h:410
#define ZMQ_POLL_BASED_ON_POLL
Definition: poller.hpp:61