42 #elif defined ZMQ_POLL_BASED_ON_SELECT
47 #if defined ZMQ_POLL_BASED_ON_SELECT 48 memset(&pollset_in, 0,
sizeof(pollset_in));
49 memset(&pollset_out, 0,
sizeof(pollset_in));
50 memset(&pollset_err, 0,
sizeof(pollset_in));
59 for (items_t::iterator it =
items.begin(); it !=
items.end(); ++it) {
60 if (it->socket && it->socket->check_tag()) {
62 size_t thread_safe_size =
sizeof(
int);
64 if (it->socket->getsockopt (
ZMQ_THREAD_SAFE, &thread_safe, &thread_safe_size) == 0 && thread_safe)
65 it->socket->remove_signaler (&
signaler);
69 #if defined ZMQ_POLL_BASED_ON_POLL 79 return tag == 0xCAFEBABE;
84 for (items_t::iterator it =
items.begin (); it !=
items.end (); ++it) {
85 if (it->socket == socket_) {
92 size_t thread_safe_size =
sizeof(
int);
102 item_t item = {socket_, 0, user_data_, events_
103 #if defined ZMQ_POLL_BASED_ON_POLL 107 items.push_back (item);
115 for (items_t::iterator it =
items.begin (); it !=
items.end (); ++it) {
116 if (!it->socket && it->fd == fd_) {
122 item_t item = {NULL, fd_, user_data_, events_
123 #if defined ZMQ_POLL_BASED_ON_POLL 127 items.push_back (item);
135 items_t::iterator it;
137 for (it =
items.begin (); it !=
items.end (); ++it) {
138 if (it->socket == socket_)
142 if (it ==
items.end()) {
147 it->events = events_;
156 items_t::iterator it;
158 for (it =
items.begin (); it !=
items.end (); ++it) {
159 if (!it->socket && it->fd == fd_)
163 if (it ==
items.end()) {
168 it->events = events_;
177 items_t::iterator it;
179 for (it =
items.begin (); it !=
items.end (); ++it) {
180 if (it->socket == socket_)
184 if (it ==
items.end()) {
193 size_t thread_safe_size =
sizeof(
int);
203 items_t::iterator it;
205 for (it =
items.begin (); it !=
items.end (); ++it) {
206 if (!it->socket && it->fd == fd_)
210 if (it ==
items.end()) {
223 #if defined ZMQ_POLL_BASED_ON_POLL 234 for (items_t::iterator it =
items.begin (); it !=
items.end (); ++it) {
238 size_t thread_safe_size =
sizeof(
int);
240 if (it->socket->getsockopt (
ZMQ_THREAD_SAFE, &thread_safe, &thread_safe_size) == -1)
271 for (items_t::iterator it =
items.begin (); it !=
items.end (); ++it) {
275 size_t thread_safe_size =
sizeof(
int);
277 if (it->socket->getsockopt (
ZMQ_THREAD_SAFE, &thread_safe, &thread_safe_size) == -1)
282 if (it->socket->getsockopt (
ZMQ_FD, &
pollfds [item_nbr].fd, &fd_size) == -1) {
286 pollfds [item_nbr].events = POLLIN;
291 pollfds [item_nbr].fd = it->fd;
296 it->pollfd_index = item_nbr;
302 #elif defined ZMQ_POLL_BASED_ON_SELECT 304 FD_ZERO (&pollset_in);
305 FD_ZERO (&pollset_out);
306 FD_ZERO (&pollset_err);
316 for (items_t::iterator it =
items.begin (); it !=
items.end (); ++it) {
319 size_t thread_safe_size =
sizeof(
int);
321 if (it->socket->getsockopt (
ZMQ_THREAD_SAFE, &thread_safe, &thread_safe_size) == -1)
324 if (thread_safe && it->events) {
336 for (items_t::iterator it =
items.begin (); it !=
items.end (); ++it) {
342 size_t thread_safe_size =
sizeof(
int);
344 if (it->socket->getsockopt (
ZMQ_THREAD_SAFE, &thread_safe, &thread_safe_size) == -1)
350 if (it->socket->getsockopt (
ZMQ_FD, ¬ify_fd, &fd_size) == -1)
353 FD_SET (notify_fd, &pollset_in);
354 if (maxfd < notify_fd)
364 FD_SET (it->fd, &pollset_in);
366 FD_SET (it->fd, &pollset_out);
368 FD_SET (it->fd, &pollset_err);
389 #if defined ZMQ_POLL_BASED_ON_POLL 398 #if defined ZMQ_HAVE_WINDOWS 399 Sleep (timeout_ > 0 ? timeout_ : INFINITE);
401 #elif defined ZMQ_HAVE_ANDROID 402 usleep (timeout_ * 1000);
405 usleep (timeout_ * 1000);
414 bool first_pass =
true;
430 if (rc == -1 && errno == EINTR) {
442 for (items_t::iterator it =
items.begin (); it !=
items.end (); ++it) {
447 size_t events_size =
sizeof (uint32_t);
449 if (it->socket->getsockopt (
ZMQ_EVENTS, &events, &events_size) == -1) {
453 if (it->events & events) {
454 event_->
socket = it->socket;
456 event_->
events = it->events & events;
465 short revents =
pollfds [it->pollfd_index].revents;
468 if (revents & POLLIN)
470 if (revents & POLLOUT)
472 if (revents & POLLPRI)
474 if (revents & ~(POLLIN | POLLOUT | POLLPRI))
507 end = now + timeout_;
523 #elif defined ZMQ_POLL_BASED_ON_SELECT 533 #if defined ZMQ_HAVE_WINDOWS 534 Sleep (timeout_ > 0 ? timeout_ : INFINITE);
537 usleep (timeout_ * 1000);
545 bool first_pass =
true;
546 fd_set inset, outset, errset;
562 timeout.tv_sec = (long) ((end - now) / 1000);
563 timeout.tv_usec = (long) ((end - now) % 1000 * 1000);
569 memcpy (&inset, &pollset_in,
sizeof (fd_set));
570 memcpy (&outset, &pollset_out,
sizeof (fd_set));
571 memcpy (&errset, &pollset_err,
sizeof (fd_set));
572 #if defined ZMQ_HAVE_WINDOWS 573 int rc = select (0, &inset, &outset, &errset, ptimeout);
574 if (
unlikely (rc == SOCKET_ERROR)) {
575 errno = zmq::wsa_error_to_errno (WSAGetLastError ());
580 int rc = select (maxfd + 1, &inset, &outset, &errset, ptimeout);
593 for (items_t::iterator it =
items.begin (); it !=
items.end (); ++it) {
598 size_t events_size =
sizeof (uint32_t);
600 if (it->socket->getsockopt (
ZMQ_EVENTS, &events, &events_size) == -1)
603 if (it->events & events) {
604 event_->
socket = it->socket;
606 event_->
events = it->events & events;
617 if (FD_ISSET (it->fd, &inset))
619 if (FD_ISSET (it->fd, &outset))
621 if (FD_ISSET (it->fd, &errset))
654 end = now + timeout_;
int wait(event_t *event, long timeout)
int add_signaler(signaler_t *s)
int modify(socket_base_t *socket, short events)
int add(socket_base_t *socket, void *user_data, short events)
int getsockopt(int option_, void *optval_, size_t *optvallen_)
int modify_fd(fd_t fd, short events)
int remove(socket_base_t *socket)
int remove_signaler(signaler_t *s)
int add_fd(fd_t fd, void *user_data, short events)
#define ZMQ_POLL_BASED_ON_POLL