libzmq  master
ZeroMQ C++ Core Engine (LIBZMQ)
kqueue.cpp
Go to the documentation of this file.
1 /*
2  Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 
4  This file is part of libzmq, the ZeroMQ core engine in C++.
5 
6  libzmq is free software; you can redistribute it and/or modify it under
7  the terms of the GNU Lesser General Public License (LGPL) as published
8  by the Free Software Foundation; either version 3 of the License, or
9  (at your option) any later version.
10 
11  As a special exception, the Contributors give you permission to link
12  this library with independent modules to produce an executable,
13  regardless of the license terms of these independent modules, and to
14  copy and distribute the resulting executable under terms of your choice,
15  provided that you also meet, for each linked independent module, the
16  terms and conditions of the license of that module. An independent
17  module is a module which is not derived from or based on this library.
18  If you modify this library, you must extend this exception to your
19  version of the library.
20 
21  libzmq is distributed in the hope that it will be useful, but WITHOUT
22  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23  FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24  License for more details.
25 
26  You should have received a copy of the GNU Lesser General Public License
27  along with this program. If not, see <http://www.gnu.org/licenses/>.
28 */
29 
30 #include "precompiled.hpp"
31 #include "kqueue.hpp"
32 #if defined ZMQ_USE_KQUEUE
33 
34 #include <sys/time.h>
35 #include <sys/types.h>
36 #include <sys/event.h>
37 #include <stdlib.h>
38 #include <unistd.h>
39 #include <algorithm>
40 #include <new>
41 
42 #include "macros.hpp"
43 #include "kqueue.hpp"
44 #include "err.hpp"
45 #include "config.hpp"
46 #include "i_poll_events.hpp"
47 #include "likely.hpp"
48 
49 // NetBSD defines (struct kevent).udata as intptr_t, everyone else
50 // as void *.
51 #if defined ZMQ_HAVE_NETBSD
52 #define kevent_udata_t intptr_t
53 #else
54 #define kevent_udata_t void *
55 #endif
56 
57 zmq::kqueue_t::kqueue_t (const zmq::ctx_t &ctx_) :
58  ctx(ctx_),
59  stopping (false)
60 {
61  // Create event queue
62  kqueue_fd = kqueue ();
63  errno_assert (kqueue_fd != -1);
64 #ifdef HAVE_FORK
65  pid = getpid();
66 #endif
67 }
68 
69 zmq::kqueue_t::~kqueue_t ()
70 {
71  worker.stop ();
72  close (kqueue_fd);
73 }
74 
75 void zmq::kqueue_t::kevent_add (fd_t fd_, short filter_, void *udata_)
76 {
77  struct kevent ev;
78 
79  EV_SET (&ev, fd_, filter_, EV_ADD, 0, 0, (kevent_udata_t)udata_);
80  int rc = kevent (kqueue_fd, &ev, 1, NULL, 0, NULL);
81  errno_assert (rc != -1);
82 }
83 
84 void zmq::kqueue_t::kevent_delete (fd_t fd_, short filter_)
85 {
86  struct kevent ev;
87 
88  EV_SET (&ev, fd_, filter_, EV_DELETE, 0, 0, 0);
89  int rc = kevent (kqueue_fd, &ev, 1, NULL, 0, NULL);
90  errno_assert (rc != -1);
91 }
92 
93 zmq::kqueue_t::handle_t zmq::kqueue_t::add_fd (fd_t fd_,
94  i_poll_events *reactor_)
95 {
96  poll_entry_t *pe = new (std::nothrow) poll_entry_t;
97  alloc_assert (pe);
98 
99  pe->fd = fd_;
100  pe->flag_pollin = 0;
101  pe->flag_pollout = 0;
102  pe->reactor = reactor_;
103 
104  adjust_load (1);
105 
106  return pe;
107 }
108 
109 void zmq::kqueue_t::rm_fd (handle_t handle_)
110 {
111  poll_entry_t *pe = (poll_entry_t*) handle_;
112  if (pe->flag_pollin)
113  kevent_delete (pe->fd, EVFILT_READ);
114  if (pe->flag_pollout)
115  kevent_delete (pe->fd, EVFILT_WRITE);
116  pe->fd = retired_fd;
117  retired.push_back (pe);
118 
119  adjust_load (-1);
120 }
121 
122 void zmq::kqueue_t::set_pollin (handle_t handle_)
123 {
124  poll_entry_t *pe = (poll_entry_t*) handle_;
125  if (likely (!pe->flag_pollin)) {
126  pe->flag_pollin = true;
127  kevent_add (pe->fd, EVFILT_READ, pe);
128  }
129 }
130 
131 void zmq::kqueue_t::reset_pollin (handle_t handle_)
132 {
133  poll_entry_t *pe = (poll_entry_t*) handle_;
134  if (likely (pe->flag_pollin)) {
135  pe->flag_pollin = false;
136  kevent_delete (pe->fd, EVFILT_READ);
137  }
138 }
139 
140 void zmq::kqueue_t::set_pollout (handle_t handle_)
141 {
142  poll_entry_t *pe = (poll_entry_t*) handle_;
143  if (likely (!pe->flag_pollout)) {
144  pe->flag_pollout = true;
145  kevent_add (pe->fd, EVFILT_WRITE, pe);
146  }
147 }
148 
149 void zmq::kqueue_t::reset_pollout (handle_t handle_)
150 {
151  poll_entry_t *pe = (poll_entry_t*) handle_;
152  if (likely (pe->flag_pollout)) {
153  pe->flag_pollout = false;
154  kevent_delete (pe->fd, EVFILT_WRITE);
155  }
156 }
157 
158 void zmq::kqueue_t::start ()
159 {
160  ctx.start_thread (worker, worker_routine, this);
161 }
162 
163 void zmq::kqueue_t::stop ()
164 {
165  stopping = true;
166 }
167 
168 int zmq::kqueue_t::max_fds ()
169 {
170  return -1;
171 }
172 
173 void zmq::kqueue_t::loop ()
174 {
175  while (!stopping) {
176 
177  // Execute any due timers.
178  int timeout = (int) execute_timers ();
179 
180  // Wait for events.
181  struct kevent ev_buf [max_io_events];
182  timespec ts = {timeout / 1000, (timeout % 1000) * 1000000};
183  int n = kevent (kqueue_fd, NULL, 0, &ev_buf [0], max_io_events,
184  timeout ? &ts: NULL);
185 #ifdef HAVE_FORK
186  if (unlikely(pid != getpid())) {
187  //printf("zmq::kqueue_t::loop aborting on forked child %d\n", (int)getpid());
188  // simply exit the loop in a forked process.
189  return;
190  }
191 #endif
192  if (n == -1) {
193  errno_assert (errno == EINTR);
194  continue;
195  }
196 
197  for (int i = 0; i < n; i ++) {
198  poll_entry_t *pe = (poll_entry_t*) ev_buf [i].udata;
199 
200  if (pe->fd == retired_fd)
201  continue;
202  if (ev_buf [i].flags & EV_EOF)
203  pe->reactor->in_event ();
204  if (pe->fd == retired_fd)
205  continue;
206  if (ev_buf [i].filter == EVFILT_WRITE)
207  pe->reactor->out_event ();
208  if (pe->fd == retired_fd)
209  continue;
210  if (ev_buf [i].filter == EVFILT_READ)
211  pe->reactor->in_event ();
212  }
213 
214  // Destroy retired event sources.
215  for (retired_t::iterator it = retired.begin (); it != retired.end (); ++it) {
216  LIBZMQ_DELETE(*it);
217  }
218  retired.clear ();
219  }
220 }
221 
222 void zmq::kqueue_t::worker_routine (void *arg_)
223 {
224  ((kqueue_t*) arg_)->loop ();
225 }
226 
227 #endif
#define LIBZMQ_DELETE(p_object)
Definition: macros.hpp:7
int fd_t
Definition: fd.hpp:50
zmq::ctx_t * ctx
Definition: object.hpp:139
static void worker(void *s)
#define unlikely(x)
Definition: likely.hpp:38
void start_thread(thread_t &thread_, thread_fn *tfn_, void *arg_) const
Definition: ctx.cpp:422
#define alloc_assert(x)
Definition: err.hpp:159
#define errno_assert(x)
Definition: err.hpp:129
#define likely(x)
Definition: likely.hpp:37
Definition: command.hpp:80
poller_t::handle_t handle_t
Definition: io_object.hpp:62