Line data Source code
1 : /*
2 : Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 :
4 : This file is part of libzmq, the ZeroMQ core engine in C++.
5 :
6 : libzmq is free software; you can redistribute it and/or modify it under
7 : the terms of the GNU Lesser General Public License (LGPL) as published
8 : by the Free Software Foundation; either version 3 of the License, or
9 : (at your option) any later version.
10 :
11 : As a special exception, the Contributors give you permission to link
12 : this library with independent modules to produce an executable,
13 : regardless of the license terms of these independent modules, and to
14 : copy and distribute the resulting executable under terms of your choice,
15 : provided that you also meet, for each linked independent module, the
16 : terms and conditions of the license of that module. An independent
17 : module is a module which is not derived from or based on this library.
18 : If you modify this library, you must extend this exception to your
19 : version of the library.
20 :
21 : libzmq is distributed in the hope that it will be useful, but WITHOUT
22 : ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 : FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 : License for more details.
25 :
26 : You should have received a copy of the GNU Lesser General Public License
27 : along with this program. If not, see <http://www.gnu.org/licenses/>.
28 : */
29 :
30 : #include "precompiled.hpp"
31 : #include "socket_poller.hpp"
32 : #include "err.hpp"
33 :
34 3 : zmq::socket_poller_t::socket_poller_t () :
35 : tag (0xCAFEBABE),
36 : need_rebuild (true),
37 : use_signaler (false),
38 : poll_size(0)
39 : #if defined ZMQ_POLL_BASED_ON_POLL
40 : ,
41 6 : pollfds (NULL)
42 : #elif defined ZMQ_POLL_BASED_ON_SELECT
43 : ,
44 : maxfd(0)
45 : #endif
46 : {
47 : #if defined ZMQ_POLL_BASED_ON_SELECT
48 : memset(&pollset_in, 0, sizeof(pollset_in));
49 : memset(&pollset_out, 0, sizeof(pollset_in));
50 : memset(&pollset_err, 0, sizeof(pollset_in));
51 : #endif
52 3 : }
53 :
54 9 : zmq::socket_poller_t::~socket_poller_t ()
55 : {
56 : // Mark the socket_poller as dead
57 3 : tag = 0xdeadbeef;
58 :
59 24 : for (items_t::iterator it = items.begin(); it != items.end(); ++it) {
60 3 : if (it->socket && it->socket->check_tag()) {
61 : int thread_safe;
62 0 : size_t thread_safe_size = sizeof(int);
63 :
64 0 : if (it->socket->getsockopt (ZMQ_THREAD_SAFE, &thread_safe, &thread_safe_size) == 0 && thread_safe)
65 0 : it->socket->remove_signaler (&signaler);
66 : }
67 : }
68 :
69 : #if defined ZMQ_POLL_BASED_ON_POLL
70 3 : if (pollfds) {
71 3 : free (pollfds);
72 3 : pollfds = NULL;
73 : }
74 : #endif
75 3 : }
76 :
77 39 : bool zmq::socket_poller_t::check_tag ()
78 : {
79 39 : return tag == 0xCAFEBABE;
80 : }
81 :
82 6 : int zmq::socket_poller_t::add (socket_base_t *socket_, void* user_data_, short events_)
83 : {
84 30 : for (items_t::iterator it = items.begin (); it != items.end (); ++it) {
85 0 : if (it->socket == socket_) {
86 0 : errno = EINVAL;
87 : return -1;
88 : }
89 : }
90 :
91 : int thread_safe;
92 6 : size_t thread_safe_size = sizeof(int);
93 :
94 6 : if (socket_->getsockopt (ZMQ_THREAD_SAFE, &thread_safe, &thread_safe_size) == -1)
95 : return -1;
96 :
97 6 : if (thread_safe) {
98 3 : if (socket_->add_signaler (&signaler) == -1)
99 : return -1;
100 : }
101 :
102 : item_t item = {socket_, 0, user_data_, events_
103 : #if defined ZMQ_POLL_BASED_ON_POLL
104 : ,-1
105 : #endif
106 6 : };
107 6 : items.push_back (item);
108 6 : need_rebuild = true;
109 :
110 6 : return 0;
111 : }
112 :
113 3 : int zmq::socket_poller_t::add_fd (fd_t fd_, void *user_data_, short events_)
114 : {
115 15 : for (items_t::iterator it = items.begin (); it != items.end (); ++it) {
116 0 : if (!it->socket && it->fd == fd_) {
117 0 : errno = EINVAL;
118 : return -1;
119 : }
120 : }
121 :
122 : item_t item = {NULL, fd_, user_data_, events_
123 : #if defined ZMQ_POLL_BASED_ON_POLL
124 : ,-1
125 : #endif
126 3 : };
127 3 : items.push_back (item);
128 3 : need_rebuild = true;
129 :
130 3 : return 0;
131 : }
132 :
133 3 : int zmq::socket_poller_t::modify (socket_base_t *socket_, short events_)
134 : {
135 3 : items_t::iterator it;
136 :
137 15 : for (it = items.begin (); it != items.end (); ++it) {
138 3 : if (it->socket == socket_)
139 : break;
140 : }
141 :
142 9 : if (it == items.end()) {
143 0 : errno = EINVAL;
144 0 : return -1;
145 : }
146 :
147 3 : it->events = events_;
148 3 : need_rebuild = true;
149 :
150 3 : return 0;
151 : }
152 :
153 :
154 0 : int zmq::socket_poller_t::modify_fd (fd_t fd_, short events_)
155 : {
156 0 : items_t::iterator it;
157 :
158 0 : for (it = items.begin (); it != items.end (); ++it) {
159 0 : if (!it->socket && it->fd == fd_)
160 : break;
161 : }
162 :
163 0 : if (it == items.end()) {
164 0 : errno = EINVAL;
165 0 : return -1;
166 : }
167 :
168 0 : it->events = events_;
169 0 : need_rebuild = true;
170 :
171 0 : return 0;
172 : }
173 :
174 :
175 3 : int zmq::socket_poller_t::remove (socket_base_t *socket_)
176 : {
177 3 : items_t::iterator it;
178 :
179 15 : for (it = items.begin (); it != items.end (); ++it) {
180 3 : if (it->socket == socket_)
181 : break;
182 : }
183 :
184 9 : if (it == items.end()) {
185 0 : errno = EINVAL;
186 0 : return -1;
187 : }
188 :
189 3 : items.erase(it);
190 3 : need_rebuild = true;
191 :
192 : int thread_safe;
193 3 : size_t thread_safe_size = sizeof(int);
194 :
195 3 : if (socket_->getsockopt (ZMQ_THREAD_SAFE, &thread_safe, &thread_safe_size) == 0 && thread_safe)
196 0 : socket_->remove_signaler (&signaler);
197 :
198 : return 0;
199 : }
200 :
201 3 : int zmq::socket_poller_t::remove_fd (fd_t fd_)
202 : {
203 3 : items_t::iterator it;
204 :
205 15 : for (it = items.begin (); it != items.end (); ++it) {
206 3 : if (!it->socket && it->fd == fd_)
207 : break;
208 : }
209 :
210 9 : if (it == items.end()) {
211 0 : errno = EINVAL;
212 0 : return -1;
213 : }
214 :
215 3 : items.erase (it);
216 3 : need_rebuild = true;
217 :
218 3 : return 0;
219 : }
220 :
221 15 : int zmq::socket_poller_t::rebuild ()
222 : {
223 : #if defined ZMQ_POLL_BASED_ON_POLL
224 :
225 15 : if (pollfds) {
226 9 : free (pollfds);
227 9 : pollfds = NULL;
228 : }
229 :
230 15 : use_signaler = false;
231 :
232 15 : poll_size = 0;
233 :
234 111 : for (items_t::iterator it = items.begin (); it != items.end (); ++it) {
235 12 : if (it->events) {
236 12 : if (it->socket) {
237 : int thread_safe;
238 9 : size_t thread_safe_size = sizeof(int);
239 :
240 9 : if (it->socket->getsockopt (ZMQ_THREAD_SAFE, &thread_safe, &thread_safe_size) == -1)
241 0 : return -1;
242 :
243 9 : if (thread_safe) {
244 6 : if (!use_signaler) {
245 6 : use_signaler = true;
246 6 : poll_size++;
247 : }
248 : }
249 : else
250 3 : poll_size++;
251 : }
252 : else
253 3 : poll_size++;
254 : }
255 : }
256 :
257 15 : if (poll_size == 0)
258 : return 0;
259 :
260 12 : pollfds = (pollfd*) malloc (poll_size * sizeof (pollfd));
261 12 : alloc_assert (pollfds);
262 :
263 12 : int item_nbr = 0;
264 :
265 12 : if (use_signaler) {
266 6 : item_nbr = 1;
267 6 : pollfds[0].fd = signaler.get_fd();
268 6 : pollfds[0].events = POLLIN;
269 : }
270 :
271 96 : for (items_t::iterator it = items.begin (); it != items.end (); ++it) {
272 12 : if (it->events) {
273 12 : if (it->socket) {
274 : int thread_safe;
275 9 : size_t thread_safe_size = sizeof(int);
276 :
277 9 : if (it->socket->getsockopt (ZMQ_THREAD_SAFE, &thread_safe, &thread_safe_size) == -1)
278 0 : return -1;
279 :
280 9 : if (!thread_safe) {
281 3 : size_t fd_size = sizeof (zmq::fd_t);
282 3 : if (it->socket->getsockopt (ZMQ_FD, &pollfds [item_nbr].fd, &fd_size) == -1) {
283 0 : return -1;
284 : }
285 :
286 3 : pollfds [item_nbr].events = POLLIN;
287 3 : item_nbr++;
288 : }
289 : }
290 : else {
291 3 : pollfds [item_nbr].fd = it->fd;
292 3 : pollfds [item_nbr].events =
293 : (it->events & ZMQ_POLLIN ? POLLIN : 0) |
294 3 : (it->events & ZMQ_POLLOUT ? POLLOUT : 0) |
295 6 : (it->events & ZMQ_POLLPRI ? POLLPRI : 0);
296 3 : it->pollfd_index = item_nbr;
297 3 : item_nbr++;
298 : }
299 : }
300 : }
301 :
302 : #elif defined ZMQ_POLL_BASED_ON_SELECT
303 :
304 : FD_ZERO (&pollset_in);
305 : FD_ZERO (&pollset_out);
306 : FD_ZERO (&pollset_err);
307 :
308 : // Ensure we do not attempt to select () on more than FD_SETSIZE
309 : // file descriptors.
310 : zmq_assert (items.size () <= FD_SETSIZE);
311 :
312 : poll_size = 0;
313 :
314 : use_signaler = false;
315 :
316 : for (items_t::iterator it = items.begin (); it != items.end (); ++it) {
317 : if (it->socket) {
318 : int thread_safe;
319 : size_t thread_safe_size = sizeof(int);
320 :
321 : if (it->socket->getsockopt (ZMQ_THREAD_SAFE, &thread_safe, &thread_safe_size) == -1)
322 : return -1;
323 :
324 : if (thread_safe && it->events) {
325 : use_signaler = true;
326 : FD_SET (signaler.get_fd (), &pollset_in);
327 : poll_size = 1;
328 : break;
329 : }
330 : }
331 : }
332 :
333 : maxfd = 0;
334 :
335 : // Build the fd_sets for passing to select ().
336 : for (items_t::iterator it = items.begin (); it != items.end (); ++it) {
337 : if (it->events) {
338 : // If the poll item is a 0MQ socket we are interested in input on the
339 : // notification file descriptor retrieved by the ZMQ_FD socket option.
340 : if (it->socket) {
341 : int thread_safe;
342 : size_t thread_safe_size = sizeof(int);
343 :
344 : if (it->socket->getsockopt (ZMQ_THREAD_SAFE, &thread_safe, &thread_safe_size) == -1)
345 : return -1;
346 :
347 : if (!thread_safe) {
348 : zmq::fd_t notify_fd;
349 : size_t fd_size = sizeof (zmq::fd_t);
350 : if (it->socket->getsockopt (ZMQ_FD, ¬ify_fd, &fd_size) == -1)
351 : return -1;
352 :
353 : FD_SET (notify_fd, &pollset_in);
354 : if (maxfd < notify_fd)
355 : maxfd = notify_fd;
356 :
357 : poll_size++;
358 : }
359 : }
360 : // Else, the poll item is a raw file descriptor. Convert the poll item
361 : // events to the appropriate fd_sets.
362 : else {
363 : if (it->events & ZMQ_POLLIN)
364 : FD_SET (it->fd, &pollset_in);
365 : if (it->events & ZMQ_POLLOUT)
366 : FD_SET (it->fd, &pollset_out);
367 : if (it->events & ZMQ_POLLERR)
368 : FD_SET (it->fd, &pollset_err);
369 : if (maxfd < it->fd)
370 : maxfd = it->fd;
371 :
372 : poll_size++;
373 : }
374 : }
375 : }
376 :
377 : #endif
378 :
379 12 : need_rebuild = false;
380 12 : return 0;
381 : }
382 :
383 18 : int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *event_, long timeout_)
384 : {
385 18 : if (need_rebuild)
386 15 : if (rebuild () == -1)
387 : return -1;
388 :
389 : #if defined ZMQ_POLL_BASED_ON_POLL
390 18 : if (unlikely (poll_size == 0)) {
391 : // We'll report an error (timed out) as if the list was non-empty and
392 : // no event occured within the specified timeout. Otherwise the caller
393 : // needs to check the return value AND the event to avoid using the
394 : // nullified event data.
395 3 : errno = ETIMEDOUT;
396 3 : if (timeout_ == 0)
397 : return -1;
398 : #if defined ZMQ_HAVE_WINDOWS
399 : Sleep (timeout_ > 0 ? timeout_ : INFINITE);
400 : return -1;
401 : #elif defined ZMQ_HAVE_ANDROID
402 : usleep (timeout_ * 1000);
403 : return -1;
404 : #else
405 0 : usleep (timeout_ * 1000);
406 : return -1;
407 : #endif
408 : }
409 :
410 15 : zmq::clock_t clock;
411 : uint64_t now = 0;
412 : uint64_t end = 0;
413 :
414 : bool first_pass = true;
415 :
416 : while (true) {
417 : // Compute the timeout for the subsequent poll.
418 : int timeout;
419 23 : if (first_pass)
420 : timeout = 0;
421 : else
422 8 : if (timeout_ < 0)
423 : timeout = -1;
424 : else
425 5 : timeout = end - now;
426 :
427 : // Wait for events.
428 : while (true) {
429 46 : int rc = poll (pollfds, poll_size, timeout);
430 23 : if (rc == -1 && errno == EINTR) {
431 : return -1;
432 : }
433 23 : errno_assert (rc >= 0);
434 : break;
435 : }
436 :
437 : // Receive the signal from pollfd
438 23 : if (use_signaler && pollfds[0].revents & POLLIN)
439 6 : signaler.recv ();
440 :
441 : // Check for the events.
442 148 : for (items_t::iterator it = items.begin (); it != items.end (); ++it) {
443 :
444 : // The poll item is a 0MQ socket. Retrieve pending events
445 : // using the ZMQ_EVENTS socket option.
446 23 : if (it->socket) {
447 20 : size_t events_size = sizeof (uint32_t);
448 : uint32_t events;
449 20 : if (it->socket->getsockopt (ZMQ_EVENTS, &events, &events_size) == -1) {
450 9 : return -1;
451 : }
452 :
453 20 : if (it->events & events) {
454 9 : event_->socket = it->socket;
455 9 : event_->user_data = it->user_data;
456 9 : event_->events = it->events & events;
457 :
458 : // If there is event to return, we can exit immediately.
459 9 : return 0;
460 : }
461 : }
462 : // Else, the poll item is a raw file descriptor, simply convert
463 : // the events to zmq_pollitem_t-style format.
464 : else {
465 3 : short revents = pollfds [it->pollfd_index].revents;
466 3 : short events = 0;
467 :
468 3 : if (revents & POLLIN)
469 3 : events |= ZMQ_POLLIN;
470 3 : if (revents & POLLOUT)
471 0 : events |= ZMQ_POLLOUT;
472 3 : if (revents & POLLPRI)
473 0 : events |= ZMQ_POLLPRI;
474 3 : if (revents & ~(POLLIN | POLLOUT | POLLPRI))
475 0 : events |= ZMQ_POLLERR;
476 :
477 3 : if (events) {
478 3 : event_->socket = NULL;
479 3 : event_->user_data = it->user_data;
480 3 : event_->fd = it->fd;
481 3 : event_->events = events;
482 :
483 : // If there is event to return, we can exit immediately.
484 3 : return 0;
485 : }
486 : }
487 : }
488 :
489 : // If timeout is zero, exit immediately whether there are events or not.
490 11 : if (timeout_ == 0)
491 : break;
492 :
493 : // At this point we are meant to wait for events but there are none.
494 : // If timeout is infinite we can just loop until we get some events.
495 8 : if (timeout_ < 0) {
496 3 : if (first_pass)
497 3 : first_pass = false;
498 : continue;
499 : }
500 :
501 : // The timeout is finite and there are no events. In the first pass
502 : // we get a timestamp of when the polling have begun. (We assume that
503 : // first pass have taken negligible time). We also compute the time
504 : // when the polling should time out.
505 5 : if (first_pass) {
506 3 : now = clock.now_ms ();
507 3 : end = now + timeout_;
508 3 : if (now == end)
509 : break;
510 : first_pass = false;
511 : continue;
512 : }
513 :
514 : // Find out whether timeout have expired.
515 2 : now = clock.now_ms ();
516 2 : if (now >= end)
517 : break;
518 : }
519 :
520 3 : errno = ETIMEDOUT;
521 3 : return -1;
522 :
523 : #elif defined ZMQ_POLL_BASED_ON_SELECT
524 :
525 : if (unlikely (poll_size == 0)) {
526 : // We'll report an error (timed out) as if the list was non-empty and
527 : // no event occured within the specified timeout. Otherwise the caller
528 : // needs to check the return value AND the event to avoid using the
529 : // nullified event data.
530 : errno = ETIMEDOUT;
531 : if (timeout_ == 0)
532 : return -1;
533 : #if defined ZMQ_HAVE_WINDOWS
534 : Sleep (timeout_ > 0 ? timeout_ : INFINITE);
535 : return -1;
536 : #else
537 : usleep (timeout_ * 1000);
538 : return -1;
539 : #endif
540 : }
541 : zmq::clock_t clock;
542 : uint64_t now = 0;
543 : uint64_t end = 0;
544 :
545 : bool first_pass = true;
546 : fd_set inset, outset, errset;
547 :
548 : while (true) {
549 :
550 : // Compute the timeout for the subsequent poll.
551 : timeval timeout;
552 : timeval *ptimeout;
553 : if (first_pass) {
554 : timeout.tv_sec = 0;
555 : timeout.tv_usec = 0;
556 : ptimeout = &timeout;
557 : }
558 : else
559 : if (timeout_ < 0)
560 : ptimeout = NULL;
561 : else {
562 : timeout.tv_sec = (long) ((end - now) / 1000);
563 : timeout.tv_usec = (long) ((end - now) % 1000 * 1000);
564 : ptimeout = &timeout;
565 : }
566 :
567 : // Wait for events. Ignore interrupts if there's infinite timeout.
568 : while (true) {
569 : memcpy (&inset, &pollset_in, sizeof (fd_set));
570 : memcpy (&outset, &pollset_out, sizeof (fd_set));
571 : memcpy (&errset, &pollset_err, sizeof (fd_set));
572 : #if defined ZMQ_HAVE_WINDOWS
573 : int rc = select (0, &inset, &outset, &errset, ptimeout);
574 : if (unlikely (rc == SOCKET_ERROR)) {
575 : errno = zmq::wsa_error_to_errno (WSAGetLastError ());
576 : wsa_assert (errno == ENOTSOCK);
577 : return -1;
578 : }
579 : #else
580 : int rc = select (maxfd + 1, &inset, &outset, &errset, ptimeout);
581 : if (unlikely (rc == -1)) {
582 : errno_assert (errno == EINTR || errno == EBADF);
583 : return -1;
584 : }
585 : #endif
586 : break;
587 : }
588 :
589 : if (use_signaler && FD_ISSET (signaler.get_fd (), &inset))
590 : signaler.recv ();
591 :
592 : // Check for the events.
593 : for (items_t::iterator it = items.begin (); it != items.end (); ++it) {
594 :
595 : // The poll item is a 0MQ socket. Retrieve pending events
596 : // using the ZMQ_EVENTS socket option.
597 : if (it->socket) {
598 : size_t events_size = sizeof (uint32_t);
599 : uint32_t events;
600 : if (it->socket->getsockopt (ZMQ_EVENTS, &events, &events_size) == -1)
601 : return -1;
602 :
603 : if (it->events & events) {
604 : event_->socket = it->socket;
605 : event_->user_data = it->user_data;
606 : event_->events = it->events & events;
607 :
608 : // If there is event to return, we can exit immediately.
609 : return 0;
610 : }
611 : }
612 : // Else, the poll item is a raw file descriptor, simply convert
613 : // the events to zmq_pollitem_t-style format.
614 : else {
615 : short events = 0;
616 :
617 : if (FD_ISSET (it->fd, &inset))
618 : events |= ZMQ_POLLIN;
619 : if (FD_ISSET (it->fd, &outset))
620 : events |= ZMQ_POLLOUT;
621 : if (FD_ISSET (it->fd, &errset))
622 : events |= ZMQ_POLLERR;
623 :
624 : if (events) {
625 : event_->socket = NULL;
626 : event_->user_data = it->user_data;
627 : event_->fd = it->fd;
628 : event_->events = events;
629 :
630 : // If there is event to return, we can exit immediately.
631 : return 0;
632 : }
633 : }
634 : }
635 :
636 : // If timeout is zero, exit immediately whether there are events or not.
637 : if (timeout_ == 0)
638 : break;
639 :
640 : // At this point we are meant to wait for events but there are none.
641 : // If timeout is infinite we can just loop until we get some events.
642 : if (timeout_ < 0) {
643 : if (first_pass)
644 : first_pass = false;
645 : continue;
646 : }
647 :
648 : // The timeout is finite and there are no events. In the first pass
649 : // we get a timestamp of when the polling have begun. (We assume that
650 : // first pass have taken negligible time). We also compute the time
651 : // when the polling should time out.
652 : if (first_pass) {
653 : now = clock.now_ms ();
654 : end = now + timeout_;
655 : if (now == end)
656 : break;
657 : first_pass = false;
658 : continue;
659 : }
660 :
661 : // Find out whether timeout have expired.
662 : now = clock.now_ms ();
663 : if (now >= end)
664 : break;
665 : }
666 :
667 : errno = ETIMEDOUT;
668 : return -1;
669 :
670 : #else
671 : // Exotic platforms that support neither poll() nor select().
672 : errno = ENOTSUP;
673 : return -1;
674 : #endif
675 : }
|