libzmq  master
ZeroMQ C++ Core Engine (LIBZMQ)
select.cpp
Go to the documentation of this file.
1 /*
2  Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 
4  This file is part of libzmq, the ZeroMQ core engine in C++.
5 
6  libzmq is free software; you can redistribute it and/or modify it under
7  the terms of the GNU Lesser General Public License (LGPL) as published
8  by the Free Software Foundation; either version 3 of the License, or
9  (at your option) any later version.
10 
11  As a special exception, the Contributors give you permission to link
12  this library with independent modules to produce an executable,
13  regardless of the license terms of these independent modules, and to
14  copy and distribute the resulting executable under terms of your choice,
15  provided that you also meet, for each linked independent module, the
16  terms and conditions of the license of that module. An independent
17  module is a module which is not derived from or based on this library.
18  If you modify this library, you must extend this exception to your
19  version of the library.
20 
21  libzmq is distributed in the hope that it will be useful, but WITHOUT
22  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23  FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24  License for more details.
25 
26  You should have received a copy of the GNU Lesser General Public License
27  along with this program. If not, see <http://www.gnu.org/licenses/>.
28 */
29 
30 #include "precompiled.hpp"
31 #include "select.hpp"
32 #if defined ZMQ_USE_SELECT
33 
34 #include "platform.hpp"
35 
36 #if defined ZMQ_HAVE_WINDOWS
37 #include "windows.hpp"
38 #elif defined ZMQ_HAVE_HPUX
39 #include <sys/param.h>
40 #include <sys/types.h>
41 #include <sys/time.h>
42 #elif defined ZMQ_HAVE_OPENVMS
43 #include <sys/types.h>
44 #include <sys/time.h>
45 #else
46 #include <sys/select.h>
47 #endif
48 
49 #include "err.hpp"
50 #include "config.hpp"
51 #include "i_poll_events.hpp"
52 
53 zmq::select_t::select_t (const zmq::ctx_t &ctx_) :
54  ctx (ctx_),
55 #if defined ZMQ_HAVE_WINDOWS
56  // Fine as long as map is not cleared.
57  current_family_entry_it (family_entries.end ()),
58 #else
59  retired (false),
60  maxfd (retired_fd),
61 #endif
62  stopping (false)
63 {
64 }
65 
66 zmq::select_t::~select_t ()
67 {
68  worker.stop ();
69 }
70 
71 zmq::select_t::handle_t zmq::select_t::add_fd (fd_t fd_, i_poll_events *events_)
72 {
73  fd_entry_t fd_entry;
74  fd_entry.fd = fd_;
75  fd_entry.events = events_;
76 
77 #if defined ZMQ_HAVE_WINDOWS
78  u_short family = get_fd_family (fd_);
79  wsa_assert (family != AF_UNSPEC);
80  family_entry_t& family_entry = family_entries [family];
81  family_entry.fd_entries.push_back (fd_entry);
82  FD_SET (fd_, &family_entry.fds_set.error);
83 #else
84  fd_entries.push_back (fd_entry);
85  FD_SET (fd_, &fds_set.error);
86 
87  if (fd_ > maxfd)
88  maxfd = fd_;
89 #endif
90 
91  adjust_load (1);
92 
93  return fd_;
94 }
95 
96 void zmq::select_t::rm_fd (handle_t handle_)
97 {
98 #if defined ZMQ_HAVE_WINDOWS
99  u_short family = get_fd_family (handle_);
100  wsa_assert (family != AF_UNSPEC);
101 
102  family_entries_t::iterator family_entry_it = family_entries.find (family);
103  family_entry_t& family_entry = family_entry_it->second;
104 
105  if (family_entry_it != current_family_entry_it) {
106  // Family is not currently being iterated and can be safely
107  // modified in-place. So later it can be skipped without
108  // re-verifying its content.
109  fd_entries_t::iterator fd_entry_it;
110  for (fd_entry_it = family_entry.fd_entries.begin ();
111  fd_entry_it != family_entry.fd_entries.end (); ++fd_entry_it)
112  if (fd_entry_it->fd == handle_)
113  break;
114  zmq_assert (fd_entry_it != family_entry.fd_entries.end ());
115 
116  family_entry.fd_entries.erase (fd_entry_it);
117  family_entry.fds_set.remove_fd (handle_);
118  } else {
119  // Otherwise mark removed entries as retired. It will be cleaned up
120  // at the end of the iteration. See zmq::select_t::loop
121  fd_entries_t::iterator fd_entry_it;
122  for (fd_entry_it = family_entry.fd_entries.begin ();
123  fd_entry_it != family_entry.fd_entries.end (); ++fd_entry_it)
124  if (fd_entry_it->fd == handle_)
125  break;
126  zmq_assert (fd_entry_it != family_entry.fd_entries.end ());
127 
128  fd_entry_it->fd = retired_fd;
129  family_entry.fds_set.remove_fd (handle_);
130  family_entry.retired = true;
131  }
132 #else
133  fd_entries_t::iterator fd_entry_it;
134  for (fd_entry_it = fd_entries.begin ();
135  fd_entry_it != fd_entries.end (); ++fd_entry_it)
136  if (fd_entry_it->fd == handle_)
137  break;
138  zmq_assert (fd_entry_it != fd_entries.end ());
139 
140  fd_entry_it->fd = retired_fd;
141  fds_set.remove_fd (handle_);
142 
143  if (handle_ == maxfd) {
144  maxfd = retired_fd;
145  for (fd_entry_it = fd_entries.begin (); fd_entry_it != fd_entries.end ();
146  ++fd_entry_it)
147  if (fd_entry_it->fd > maxfd)
148  maxfd = fd_entry_it->fd;
149  }
150 
151  retired = true;
152 #endif
153  adjust_load (-1);
154 }
155 
156 void zmq::select_t::set_pollin (handle_t handle_)
157 {
158 #if defined ZMQ_HAVE_WINDOWS
159  u_short family = get_fd_family (handle_);
160  wsa_assert (family != AF_UNSPEC);
161  FD_SET (handle_, &family_entries [family].fds_set.read);
162 #else
163  FD_SET (handle_, &fds_set.read);
164 #endif
165 }
166 
167 void zmq::select_t::reset_pollin (handle_t handle_)
168 {
169 #if defined ZMQ_HAVE_WINDOWS
170  u_short family = get_fd_family (handle_);
171  wsa_assert (family != AF_UNSPEC);
172  FD_CLR (handle_, &family_entries [family].fds_set.read);
173 #else
174  FD_CLR (handle_, &fds_set.read);
175 #endif
176 }
177 
178 void zmq::select_t::set_pollout (handle_t handle_)
179 {
180 #if defined ZMQ_HAVE_WINDOWS
181  u_short family = get_fd_family (handle_);
182  wsa_assert (family != AF_UNSPEC);
183  FD_SET (handle_, &family_entries [family].fds_set.write);
184 #else
185  FD_SET (handle_, &fds_set.write);
186 #endif
187 }
188 
189 void zmq::select_t::reset_pollout (handle_t handle_)
190 {
191 #if defined ZMQ_HAVE_WINDOWS
192  u_short family = get_fd_family (handle_);
193  wsa_assert (family != AF_UNSPEC);
194  FD_CLR (handle_, &family_entries [family].fds_set.write);
195 #else
196  FD_CLR (handle_, &fds_set.write);
197 #endif
198 }
199 
200 void zmq::select_t::start ()
201 {
202  ctx.start_thread (worker, worker_routine, this);
203 }
204 
205 void zmq::select_t::stop ()
206 {
207  stopping = true;
208 }
209 
210 int zmq::select_t::max_fds ()
211 {
212  return FD_SETSIZE;
213 }
214 
215 void zmq::select_t::loop ()
216 {
217  while (!stopping) {
218  // Execute any due timers.
219  int timeout = (int) execute_timers ();
220 
221 #if defined ZMQ_HAVE_OSX
222  struct timeval tv = { (long) (timeout / 1000), timeout % 1000 * 1000 };
223 #else
224  struct timeval tv = { (long) (timeout / 1000), (long) (timeout % 1000 * 1000) };
225 #endif
226 
227  int rc = 0;
228 
229 #if defined ZMQ_HAVE_WINDOWS
230  /*
231  On Windows select does not allow to mix descriptors from different
232  service providers. It seems to work for AF_INET and AF_INET6,
233  but fails for AF_INET and VMCI. The workaround is to use
234  WSAEventSelect and WSAWaitForMultipleEvents to wait, then use
235  select to find out what actually changed. WSAWaitForMultipleEvents
236  cannot be used alone, because it does not support more than 64 events
237  which is not enough.
238 
239  To reduce unncessary overhead, WSA is only used when there are more
240  than one family. Moreover, AF_INET and AF_INET6 are considered the same
241  family because Windows seems to handle them properly.
242  See get_fd_family for details.
243  */
244  wsa_events_t wsa_events;
245 
246  // If there is just one family, there is no reason to use WSA events.
247  if (family_entries.size () > 1) {
248  for (family_entries_t::iterator family_entry_it = family_entries.begin ();
249  family_entry_it != family_entries.end (); ++family_entry_it) {
250  family_entry_t& family_entry = family_entry_it->second;
251 
252  for (fd_entries_t::iterator fd_entry_it = family_entry.fd_entries.begin ();
253  fd_entry_it != family_entry.fd_entries.end (); ++fd_entry_it) {
254  fd_t fd = fd_entry_it->fd;
255 
256  // http://stackoverflow.com/q/35043420/188530
257  if (FD_ISSET (fd, &family_entry.fds_set.read) &&
258  FD_ISSET (fd, &family_entry.fds_set.write))
259  rc = WSAEventSelect (fd, wsa_events.events [3],
260  FD_READ | FD_ACCEPT | FD_CLOSE | FD_WRITE | FD_CONNECT | FD_OOB);
261  else if (FD_ISSET (fd, &family_entry.fds_set.read))
262  rc = WSAEventSelect (fd, wsa_events.events [0],
263  FD_READ | FD_ACCEPT | FD_CLOSE | FD_OOB);
264  else if (FD_ISSET (fd, &family_entry.fds_set.write))
265  rc = WSAEventSelect (fd, wsa_events.events [1],
266  FD_WRITE | FD_CONNECT | FD_OOB);
267  else if (FD_ISSET (fd, &family_entry.fds_set.error))
268  rc = WSAEventSelect (fd, wsa_events.events [2],
269  FD_OOB);
270  else
271  rc = 0;
272 
273  wsa_assert (rc != SOCKET_ERROR);
274  }
275  }
276  }
277 #endif
278 
279 #if defined ZMQ_HAVE_WINDOWS
280  if (family_entries.size () > 1) {
281  rc = WSAWaitForMultipleEvents (4, wsa_events.events, FALSE,
282  timeout ? timeout : INFINITE, FALSE);
283  wsa_assert (rc != WSA_WAIT_FAILED);
284  zmq_assert (rc != WSA_WAIT_IO_COMPLETION);
285 
286  if (rc == WSA_WAIT_TIMEOUT)
287  continue;
288  }
289 
290  for (current_family_entry_it = family_entries.begin ();
291  current_family_entry_it != family_entries.end (); ++current_family_entry_it) {
292  family_entry_t& family_entry = current_family_entry_it->second;
293 
294  // select will fail when run with empty sets.
295  if (family_entry.fd_entries.empty ())
296  continue;
297 
298  fds_set_t local_fds_set = family_entry.fds_set;
299 
300  if (family_entries.size () > 1) {
301  // There is no reason to wait again after WSAWaitForMultipleEvents.
302  // Simply collect what is ready.
303  struct timeval tv_nodelay = { 0, 0 };
304  rc = select (0, &local_fds_set.read, &local_fds_set.write, &local_fds_set.error,
305  &tv_nodelay);
306  }
307  else
308  rc = select (0, &local_fds_set.read, &local_fds_set.write,
309  &local_fds_set.error, timeout > 0 ? &tv : NULL);
310 
311  wsa_assert (rc != SOCKET_ERROR);
312 
313  // Size is cached to avoid iteration through recently added descriptors.
314  for (fd_entries_t::size_type i = 0, size = family_entry.fd_entries.size (); i < size && rc > 0; ++i) {
315  fd_entry_t& fd_entry = family_entry.fd_entries [i];
316 
317  if (fd_entry.fd == retired_fd)
318  continue;
319 
320  if (FD_ISSET (fd_entry.fd, &local_fds_set.read)) {
321  fd_entry.events->in_event ();
322  --rc;
323  }
324 
325  if (fd_entry.fd == retired_fd || rc == 0)
326  continue;
327 
328  if (FD_ISSET (fd_entry.fd, &local_fds_set.write)) {
329  fd_entry.events->out_event ();
330  --rc;
331  }
332 
333  if (fd_entry.fd == retired_fd || rc == 0)
334  continue;
335 
336  if (FD_ISSET (fd_entry.fd, &local_fds_set.error)) {
337  fd_entry.events->in_event ();
338  --rc;
339  }
340  }
341 
342  if (family_entry.retired) {
343  family_entry.retired = false;
344  family_entry.fd_entries.erase (std::remove_if (family_entry.fd_entries.begin (),
345  family_entry.fd_entries.end (), is_retired_fd), family_entry.fd_entries.end ());
346  }
347  }
348 #else
349  fds_set_t local_fds_set = fds_set;
350  rc = select (maxfd + 1, &local_fds_set.read, &local_fds_set.write,
351  &local_fds_set.error, timeout ? &tv : NULL);
352 
353  if (rc == -1) {
354  errno_assert (errno == EINTR);
355  continue;
356  }
357 
358  // Size is cached to avoid iteration through just added descriptors.
359  for (fd_entries_t::size_type i = 0, size = fd_entries.size (); i < size && rc > 0; ++i) {
360  fd_entry_t& fd_entry = fd_entries [i];
361 
362  if (fd_entry.fd == retired_fd)
363  continue;
364 
365  if (FD_ISSET (fd_entry.fd, &local_fds_set.read)) {
366  fd_entry.events->in_event ();
367  --rc;
368  }
369 
370  if (fd_entry.fd == retired_fd || rc == 0)
371  continue;
372 
373  if (FD_ISSET (fd_entry.fd, &local_fds_set.write)) {
374  fd_entry.events->out_event ();
375  --rc;
376  }
377 
378  if (fd_entry.fd == retired_fd || rc == 0)
379  continue;
380 
381  if (FD_ISSET (fd_entry.fd, &local_fds_set.error)) {
382  fd_entry.events->in_event ();
383  --rc;
384  }
385  }
386 
387  if (retired) {
388  retired = false;
389  fd_entries.erase (std::remove_if (fd_entries.begin (), fd_entries.end (),
390  is_retired_fd), fd_entries.end ());
391  }
392 #endif
393  }
394 }
395 
396 void zmq::select_t::worker_routine (void *arg_)
397 {
398  ((select_t*) arg_)->loop ();
399 }
400 
401 zmq::select_t::fds_set_t::fds_set_t ()
402 {
403  FD_ZERO (&read);
404  FD_ZERO (&write);
405  FD_ZERO (&error);
406 }
407 
408 zmq::select_t::fds_set_t::fds_set_t (const fds_set_t& other_)
409 {
410  memcpy (&read, &other_.read, sizeof other_.read);
411  memcpy (&write, &other_.write, sizeof other_.write);
412  memcpy (&error, &other_.error, sizeof other_.error);
413 }
414 
415 zmq::select_t::fds_set_t& zmq::select_t::fds_set_t::operator= (const fds_set_t& other_)
416 {
417  memcpy (&read, &other_.read, sizeof other_.read);
418  memcpy (&write, &other_.write, sizeof other_.write);
419  memcpy (&error, &other_.error, sizeof other_.error);
420  return *this;
421 }
422 
423 void zmq::select_t::fds_set_t::remove_fd (const fd_t& fd_)
424 {
425  FD_CLR (fd_, &read);
426  FD_CLR (fd_, &write);
427  FD_CLR (fd_, &error);
428 }
429 
430 bool zmq::select_t::is_retired_fd (const fd_entry_t &entry)
431 {
432  return (entry.fd == retired_fd);
433 }
434 
435 #if defined ZMQ_HAVE_WINDOWS
436 u_short zmq::select_t::get_fd_family (fd_t fd_)
437 {
438  // Use sockaddr_storage instead of sockaddr to accomodate differect structure sizes
439  sockaddr_storage addr = { 0 };
440  int addr_size = sizeof addr;
441 
442  int type;
443  int type_length = sizeof(int);
444 
445  int rc = getsockopt(fd_, SOL_SOCKET, SO_TYPE, (char*) &type, &type_length);
446 
447  if (rc == 0) {
448  if (type == SOCK_DGRAM)
449  return AF_INET;
450  else {
451  rc = getsockname(fd_, (sockaddr *)&addr, &addr_size);
452 
453  // AF_INET and AF_INET6 can be mixed in select
454  // TODO: If proven otherwise, should simply return addr.sa_family
455  if (rc != SOCKET_ERROR)
456  return addr.ss_family == AF_INET6 ? AF_INET : addr.ss_family;
457  }
458  }
459 
460  return AF_UNSPEC;
461 }
462 
463 zmq::select_t::family_entry_t::family_entry_t () :
464  retired (false)
465 {
466 }
467 
468 
469 zmq::select_t::wsa_events_t::wsa_events_t ()
470 {
471  events [0] = WSACreateEvent ();
472  wsa_assert (events [0] != WSA_INVALID_EVENT);
473  events [1] = WSACreateEvent ();
474  wsa_assert (events [1] != WSA_INVALID_EVENT);
475  events [2] = WSACreateEvent ();
476  wsa_assert (events [2] != WSA_INVALID_EVENT);
477  events [3] = WSACreateEvent ();
478  wsa_assert (events [3] != WSA_INVALID_EVENT);
479 }
480 
481 zmq::select_t::wsa_events_t::~wsa_events_t ()
482 {
483  wsa_assert (WSACloseEvent (events [0]));
484  wsa_assert (WSACloseEvent (events [1]));
485  wsa_assert (WSACloseEvent (events [2]));
486  wsa_assert (WSACloseEvent (events [3]));
487 }
488 #endif
489 
490 #endif
#define size
int fd_t
Definition: fd.hpp:50
#define zmq_assert(x)
Definition: err.hpp:119
zmq::ctx_t * ctx
Definition: object.hpp:139
static void worker(void *s)
int getsockopt(int option_, void *optval_, size_t *optvallen_)
void start_thread(thread_t &thread_, thread_fn *tfn_, void *arg_) const
Definition: ctx.cpp:422
enum type_t type
#define errno_assert(x)
Definition: err.hpp:129
Definition: command.hpp:80