32 #if defined ZMQ_USE_KQUEUE 35 #include <sys/types.h> 36 #include <sys/event.h> 51 #if defined ZMQ_HAVE_NETBSD 52 #define kevent_udata_t intptr_t 54 #define kevent_udata_t void * 57 zmq::kqueue_t::kqueue_t (
const zmq::ctx_t &ctx_) :
62 kqueue_fd = kqueue ();
69 zmq::kqueue_t::~kqueue_t ()
75 void zmq::kqueue_t::kevent_add (
fd_t fd_,
short filter_,
void *udata_)
79 EV_SET (&ev, fd_, filter_, EV_ADD, 0, 0, (kevent_udata_t)udata_);
80 int rc = kevent (kqueue_fd, &ev, 1, NULL, 0, NULL);
84 void zmq::kqueue_t::kevent_delete (
fd_t fd_,
short filter_)
88 EV_SET (&ev, fd_, filter_, EV_DELETE, 0, 0, 0);
89 int rc = kevent (kqueue_fd, &ev, 1, NULL, 0, NULL);
93 zmq::kqueue_t::handle_t zmq::kqueue_t::add_fd (
fd_t fd_,
94 i_poll_events *reactor_)
96 poll_entry_t *pe =
new (std::nothrow) poll_entry_t;
101 pe->flag_pollout = 0;
102 pe->reactor = reactor_;
109 void zmq::kqueue_t::rm_fd (
handle_t handle_)
111 poll_entry_t *pe = (poll_entry_t*) handle_;
113 kevent_delete (pe->fd, EVFILT_READ);
114 if (pe->flag_pollout)
115 kevent_delete (pe->fd, EVFILT_WRITE);
117 retired.push_back (pe);
122 void zmq::kqueue_t::set_pollin (
handle_t handle_)
124 poll_entry_t *pe = (poll_entry_t*) handle_;
125 if (
likely (!pe->flag_pollin)) {
126 pe->flag_pollin =
true;
127 kevent_add (pe->fd, EVFILT_READ, pe);
131 void zmq::kqueue_t::reset_pollin (
handle_t handle_)
133 poll_entry_t *pe = (poll_entry_t*) handle_;
134 if (
likely (pe->flag_pollin)) {
135 pe->flag_pollin =
false;
136 kevent_delete (pe->fd, EVFILT_READ);
140 void zmq::kqueue_t::set_pollout (
handle_t handle_)
142 poll_entry_t *pe = (poll_entry_t*) handle_;
143 if (
likely (!pe->flag_pollout)) {
144 pe->flag_pollout =
true;
145 kevent_add (pe->fd, EVFILT_WRITE, pe);
149 void zmq::kqueue_t::reset_pollout (
handle_t handle_)
151 poll_entry_t *pe = (poll_entry_t*) handle_;
152 if (
likely (pe->flag_pollout)) {
153 pe->flag_pollout =
false;
154 kevent_delete (pe->fd, EVFILT_WRITE);
158 void zmq::kqueue_t::start ()
168 int zmq::kqueue_t::max_fds ()
173 void zmq::kqueue_t::loop ()
178 int timeout = (
int) execute_timers ();
182 timespec ts = {timeout / 1000, (timeout % 1000) * 1000000};
183 int n = kevent (kqueue_fd, NULL, 0, &ev_buf [0],
max_io_events,
184 timeout ? &ts: NULL);
197 for (
int i = 0; i < n; i ++) {
198 poll_entry_t *pe = (poll_entry_t*) ev_buf [i].udata;
202 if (ev_buf [i].flags & EV_EOF)
203 pe->reactor->in_event ();
206 if (ev_buf [i].filter == EVFILT_WRITE)
207 pe->reactor->out_event ();
210 if (ev_buf [i].filter == EVFILT_READ)
211 pe->reactor->in_event ();
215 for (retired_t::iterator it = retired.begin (); it != retired.end (); ++it) {
222 void zmq::kqueue_t::worker_routine (
void *arg_)
224 ((kqueue_t*) arg_)->loop ();
#define LIBZMQ_DELETE(p_object)
static void worker(void *s)
void start_thread(thread_t &thread_, thread_fn *tfn_, void *arg_) const
poller_t::handle_t handle_t