32 #if defined ZMQ_USE_SELECT 34 #include "platform.hpp" 36 #if defined ZMQ_HAVE_WINDOWS 38 #elif defined ZMQ_HAVE_HPUX 39 #include <sys/param.h> 40 #include <sys/types.h> 42 #elif defined ZMQ_HAVE_OPENVMS 43 #include <sys/types.h> 46 #include <sys/select.h> 53 zmq::select_t::select_t (
const zmq::ctx_t &ctx_) :
55 #if defined ZMQ_HAVE_WINDOWS
57 current_family_entry_it (family_entries.end ()),
66 zmq::select_t::~select_t ()
71 zmq::select_t::handle_t zmq::select_t::add_fd (
fd_t fd_, i_poll_events *events_)
75 fd_entry.events = events_;
77 #if defined ZMQ_HAVE_WINDOWS 78 u_short family = get_fd_family (fd_);
79 wsa_assert (family != AF_UNSPEC);
80 family_entry_t& family_entry = family_entries [family];
81 family_entry.fd_entries.push_back (fd_entry);
82 FD_SET (fd_, &family_entry.fds_set.error);
84 fd_entries.push_back (fd_entry);
85 FD_SET (fd_, &fds_set.error);
96 void zmq::select_t::rm_fd (handle_t handle_)
98 #if defined ZMQ_HAVE_WINDOWS 99 u_short family = get_fd_family (handle_);
100 wsa_assert (family != AF_UNSPEC);
102 family_entries_t::iterator family_entry_it = family_entries.find (family);
103 family_entry_t& family_entry = family_entry_it->second;
105 if (family_entry_it != current_family_entry_it) {
109 fd_entries_t::iterator fd_entry_it;
110 for (fd_entry_it = family_entry.fd_entries.begin ();
111 fd_entry_it != family_entry.fd_entries.end (); ++fd_entry_it)
112 if (fd_entry_it->fd == handle_)
114 zmq_assert (fd_entry_it != family_entry.fd_entries.end ());
116 family_entry.fd_entries.erase (fd_entry_it);
117 family_entry.fds_set.remove_fd (handle_);
121 fd_entries_t::iterator fd_entry_it;
122 for (fd_entry_it = family_entry.fd_entries.begin ();
123 fd_entry_it != family_entry.fd_entries.end (); ++fd_entry_it)
124 if (fd_entry_it->fd == handle_)
126 zmq_assert (fd_entry_it != family_entry.fd_entries.end ());
129 family_entry.fds_set.remove_fd (handle_);
130 family_entry.retired =
true;
133 fd_entries_t::iterator fd_entry_it;
134 for (fd_entry_it = fd_entries.begin ();
135 fd_entry_it != fd_entries.end (); ++fd_entry_it)
136 if (fd_entry_it->fd == handle_)
138 zmq_assert (fd_entry_it != fd_entries.end ());
141 fds_set.remove_fd (handle_);
143 if (handle_ == maxfd) {
145 for (fd_entry_it = fd_entries.begin (); fd_entry_it != fd_entries.end ();
147 if (fd_entry_it->fd > maxfd)
148 maxfd = fd_entry_it->fd;
156 void zmq::select_t::set_pollin (handle_t handle_)
158 #if defined ZMQ_HAVE_WINDOWS 159 u_short family = get_fd_family (handle_);
160 wsa_assert (family != AF_UNSPEC);
161 FD_SET (handle_, &family_entries [family].fds_set.read);
163 FD_SET (handle_, &fds_set.read);
167 void zmq::select_t::reset_pollin (handle_t handle_)
169 #if defined ZMQ_HAVE_WINDOWS 170 u_short family = get_fd_family (handle_);
171 wsa_assert (family != AF_UNSPEC);
172 FD_CLR (handle_, &family_entries [family].fds_set.read);
174 FD_CLR (handle_, &fds_set.read);
178 void zmq::select_t::set_pollout (handle_t handle_)
180 #if defined ZMQ_HAVE_WINDOWS 181 u_short family = get_fd_family (handle_);
182 wsa_assert (family != AF_UNSPEC);
183 FD_SET (handle_, &family_entries [family].fds_set.write);
185 FD_SET (handle_, &fds_set.write);
189 void zmq::select_t::reset_pollout (handle_t handle_)
191 #if defined ZMQ_HAVE_WINDOWS 192 u_short family = get_fd_family (handle_);
193 wsa_assert (family != AF_UNSPEC);
194 FD_CLR (handle_, &family_entries [family].fds_set.write);
196 FD_CLR (handle_, &fds_set.write);
200 void zmq::select_t::start ()
210 int zmq::select_t::max_fds ()
215 void zmq::select_t::loop ()
219 int timeout = (
int) execute_timers ();
221 #if defined ZMQ_HAVE_OSX 222 struct timeval tv = { (long) (timeout / 1000), timeout % 1000 * 1000 };
224 struct timeval tv = { (long) (timeout / 1000), (long) (timeout % 1000 * 1000) };
229 #if defined ZMQ_HAVE_WINDOWS 244 wsa_events_t wsa_events;
247 if (family_entries.size () > 1) {
248 for (family_entries_t::iterator family_entry_it = family_entries.begin ();
249 family_entry_it != family_entries.end (); ++family_entry_it) {
250 family_entry_t& family_entry = family_entry_it->second;
252 for (fd_entries_t::iterator fd_entry_it = family_entry.fd_entries.begin ();
253 fd_entry_it != family_entry.fd_entries.end (); ++fd_entry_it) {
254 fd_t fd = fd_entry_it->fd;
257 if (FD_ISSET (fd, &family_entry.fds_set.read) &&
258 FD_ISSET (fd, &family_entry.fds_set.write))
259 rc = WSAEventSelect (fd, wsa_events.events [3],
260 FD_READ | FD_ACCEPT | FD_CLOSE | FD_WRITE | FD_CONNECT | FD_OOB);
261 else if (FD_ISSET (fd, &family_entry.fds_set.read))
262 rc = WSAEventSelect (fd, wsa_events.events [0],
263 FD_READ | FD_ACCEPT | FD_CLOSE | FD_OOB);
264 else if (FD_ISSET (fd, &family_entry.fds_set.write))
265 rc = WSAEventSelect (fd, wsa_events.events [1],
266 FD_WRITE | FD_CONNECT | FD_OOB);
267 else if (FD_ISSET (fd, &family_entry.fds_set.error))
268 rc = WSAEventSelect (fd, wsa_events.events [2],
273 wsa_assert (rc != SOCKET_ERROR);
279 #if defined ZMQ_HAVE_WINDOWS 280 if (family_entries.size () > 1) {
281 rc = WSAWaitForMultipleEvents (4, wsa_events.events, FALSE,
282 timeout ? timeout : INFINITE, FALSE);
283 wsa_assert (rc != WSA_WAIT_FAILED);
286 if (rc == WSA_WAIT_TIMEOUT)
290 for (current_family_entry_it = family_entries.begin ();
291 current_family_entry_it != family_entries.end (); ++current_family_entry_it) {
292 family_entry_t& family_entry = current_family_entry_it->second;
295 if (family_entry.fd_entries.empty ())
298 fds_set_t local_fds_set = family_entry.fds_set;
300 if (family_entries.size () > 1) {
303 struct timeval tv_nodelay = { 0, 0 };
304 rc = select (0, &local_fds_set.read, &local_fds_set.write, &local_fds_set.error,
308 rc = select (0, &local_fds_set.read, &local_fds_set.write,
309 &local_fds_set.error, timeout > 0 ? &tv : NULL);
311 wsa_assert (rc != SOCKET_ERROR);
314 for (fd_entries_t::size_type i = 0,
size = family_entry.fd_entries.size (); i < size && rc > 0; ++i) {
315 fd_entry_t& fd_entry = family_entry.fd_entries [i];
320 if (FD_ISSET (fd_entry.fd, &local_fds_set.read)) {
321 fd_entry.events->in_event ();
328 if (FD_ISSET (fd_entry.fd, &local_fds_set.write)) {
329 fd_entry.events->out_event ();
336 if (FD_ISSET (fd_entry.fd, &local_fds_set.error)) {
337 fd_entry.events->in_event ();
342 if (family_entry.retired) {
343 family_entry.retired =
false;
344 family_entry.fd_entries.erase (std::remove_if (family_entry.fd_entries.begin (),
345 family_entry.fd_entries.end (), is_retired_fd), family_entry.fd_entries.end ());
349 fds_set_t local_fds_set = fds_set;
350 rc = select (maxfd + 1, &local_fds_set.read, &local_fds_set.write,
351 &local_fds_set.error, timeout ? &tv : NULL);
359 for (fd_entries_t::size_type i = 0,
size = fd_entries.size (); i < size && rc > 0; ++i) {
360 fd_entry_t& fd_entry = fd_entries [i];
365 if (FD_ISSET (fd_entry.fd, &local_fds_set.read)) {
366 fd_entry.events->in_event ();
373 if (FD_ISSET (fd_entry.fd, &local_fds_set.write)) {
374 fd_entry.events->out_event ();
381 if (FD_ISSET (fd_entry.fd, &local_fds_set.error)) {
382 fd_entry.events->in_event ();
389 fd_entries.erase (std::remove_if (fd_entries.begin (), fd_entries.end (),
390 is_retired_fd), fd_entries.end ());
396 void zmq::select_t::worker_routine (
void *arg_)
398 ((select_t*) arg_)->loop ();
401 zmq::select_t::fds_set_t::fds_set_t ()
408 zmq::select_t::fds_set_t::fds_set_t (
const fds_set_t& other_)
410 memcpy (&read, &other_.read,
sizeof other_.read);
411 memcpy (&write, &other_.write,
sizeof other_.write);
412 memcpy (&error, &other_.error,
sizeof other_.error);
415 zmq::select_t::fds_set_t& zmq::select_t::fds_set_t::operator= (
const fds_set_t& other_)
417 memcpy (&read, &other_.read,
sizeof other_.read);
418 memcpy (&write, &other_.write,
sizeof other_.write);
419 memcpy (&error, &other_.error,
sizeof other_.error);
423 void zmq::select_t::fds_set_t::remove_fd (
const fd_t& fd_)
426 FD_CLR (fd_, &write);
427 FD_CLR (fd_, &error);
430 bool zmq::select_t::is_retired_fd (
const fd_entry_t &entry)
435 #if defined ZMQ_HAVE_WINDOWS 436 u_short zmq::select_t::get_fd_family (
fd_t fd_)
439 sockaddr_storage addr = { 0 };
440 int addr_size =
sizeof addr;
443 int type_length =
sizeof(
int);
445 int rc =
getsockopt(fd_, SOL_SOCKET, SO_TYPE, (
char*) &type, &type_length);
448 if (type == SOCK_DGRAM)
451 rc = getsockname(fd_, (sockaddr *)&addr, &addr_size);
455 if (rc != SOCKET_ERROR)
456 return addr.ss_family == AF_INET6 ? AF_INET : addr.ss_family;
463 zmq::select_t::family_entry_t::family_entry_t () :
469 zmq::select_t::wsa_events_t::wsa_events_t ()
471 events [0] = WSACreateEvent ();
472 wsa_assert (events [0] != WSA_INVALID_EVENT);
473 events [1] = WSACreateEvent ();
474 wsa_assert (events [1] != WSA_INVALID_EVENT);
475 events [2] = WSACreateEvent ();
476 wsa_assert (events [2] != WSA_INVALID_EVENT);
477 events [3] = WSACreateEvent ();
478 wsa_assert (events [3] != WSA_INVALID_EVENT);
481 zmq::select_t::wsa_events_t::~wsa_events_t ()
483 wsa_assert (WSACloseEvent (events [0]));
484 wsa_assert (WSACloseEvent (events [1]));
485 wsa_assert (WSACloseEvent (events [2]));
486 wsa_assert (WSACloseEvent (events [3]));
static void worker(void *s)
int getsockopt(int option_, void *optval_, size_t *optvallen_)
void start_thread(thread_t &thread_, thread_fn *tfn_, void *arg_) const