37 #if defined ZMQ_POLL_BASED_ON_POLL 39 #elif defined ZMQ_POLL_BASED_ON_SELECT 40 #if defined ZMQ_HAVE_WINDOWS 42 #elif defined ZMQ_HAVE_HPUX 43 #include <sys/param.h> 44 #include <sys/types.h> 46 #elif defined ZMQ_HAVE_OPENVMS 47 #include <sys/types.h> 50 #include <sys/select.h> 62 #if defined ZMQ_HAVE_EVENTFD 63 #include <sys/eventfd.h> 66 #if defined ZMQ_HAVE_WINDOWS 70 #include <netinet/tcp.h> 71 #include <sys/types.h> 72 #include <sys/socket.h> 75 #if !defined (ZMQ_HAVE_WINDOWS) 82 #if defined ZMQ_HAVE_WINDOWS 83 Sleep (ms_ > 0 ? ms_ : INFINITE);
85 #elif defined ZMQ_HAVE_ANDROID 89 return usleep (ms_ * 1000);
99 unsigned int ms_so_far = 0;
100 unsigned int step_ms = max_ms_ / 10;
108 if (rc == -1 && errno == EAGAIN) {
110 ms_so_far += step_ms;
113 }
while (ms_so_far < max_ms_ && rc == -1 && errno == EAGAIN);
135 #if defined ZMQ_HAVE_EVENTFD 139 #elif defined ZMQ_HAVE_WINDOWS 141 const struct linger so_linger = { 1, 0 };
142 int rc = setsockopt (
w, SOL_SOCKET, SO_LINGER,
143 (
const char *) &so_linger,
sizeof so_linger);
145 if (rc == 0 || WSAGetLastError () != WSANOTINITIALISED) {
146 wsa_assert (rc != SOCKET_ERROR);
147 rc = closesocket (
w);
148 wsa_assert (rc != SOCKET_ERROR);
150 rc = closesocket (
r);
151 wsa_assert (rc != SOCKET_ERROR);
173 #if defined HAVE_FORK 179 #if defined ZMQ_HAVE_EVENTFD 180 const uint64_t inc = 1;
181 ssize_t sz = write (
w, &inc,
sizeof (inc));
183 #elif defined ZMQ_HAVE_WINDOWS 184 unsigned char dummy = 0;
185 int nbytes =
::send (
w, (
char *) &dummy,
sizeof (dummy), 0);
186 wsa_assert (nbytes != SOCKET_ERROR);
189 unsigned char dummy = 0;
191 ssize_t nbytes =
::send (
w, &dummy,
sizeof (dummy), 0);
192 if (
unlikely (nbytes == -1 && errno == EINTR))
194 #if defined(HAVE_FORK) 219 #ifdef ZMQ_POLL_BASED_ON_POLL 223 int rc = poll (&pfd, 1, timeout_);
247 #elif defined ZMQ_POLL_BASED_ON_SELECT 252 struct timeval timeout;
254 timeout.tv_sec = timeout_ / 1000;
255 timeout.tv_usec = timeout_ % 1000 * 1000;
257 #ifdef ZMQ_HAVE_WINDOWS 258 int rc = select (0, &fds, NULL, NULL,
259 timeout_ >= 0 ? &timeout : NULL);
260 wsa_assert (rc != SOCKET_ERROR);
262 int rc = select (
r + 1, &fds, NULL, NULL,
263 timeout_ >= 0 ? &timeout : NULL);
284 #if defined ZMQ_HAVE_EVENTFD 286 ssize_t sz = read (
r, &dummy,
sizeof (dummy));
292 const uint64_t inc = dummy - 1;
293 ssize_t sz2 = write (
w, &inc,
sizeof (inc));
301 #if defined ZMQ_HAVE_WINDOWS 302 int nbytes =
::recv (
r, (
char *) &dummy,
sizeof (dummy), 0);
303 wsa_assert (nbytes != SOCKET_ERROR);
305 ssize_t nbytes =
::recv (
r, &dummy,
sizeof (dummy), 0);
316 #if defined ZMQ_HAVE_EVENTFD 318 ssize_t sz = read (
r, &dummy,
sizeof (dummy));
329 const uint64_t inc = dummy - 1;
330 ssize_t sz2 = write (
w, &inc,
sizeof (inc));
339 #if defined ZMQ_HAVE_WINDOWS 340 int nbytes =
::recv (
r, (
char *) &dummy,
sizeof (dummy), 0);
341 if (nbytes == SOCKET_ERROR) {
342 const int last_error = WSAGetLastError();
343 if (last_error == WSAEWOULDBLOCK) {
347 wsa_assert (last_error == WSAEWOULDBLOCK);
350 ssize_t nbytes =
::recv (
r, &dummy,
sizeof (dummy), 0);
352 if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
356 errno_assert (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR);
366 void zmq::signaler_t::forked ()
378 #if defined ZMQ_HAVE_EVENTFD 379 fd_t fd = eventfd (0, 0);
390 #elif defined ZMQ_HAVE_WINDOWS 391 # if !defined _WIN32_WCE 393 SECURITY_DESCRIPTOR sd;
394 SECURITY_ATTRIBUTES sa;
395 memset (&sd, 0,
sizeof sd);
396 memset (&sa, 0,
sizeof sa);
398 InitializeSecurityDescriptor (&sd, SECURITY_DESCRIPTOR_REVISION);
399 SetSecurityDescriptorDacl (&sd, TRUE, 0, FALSE);
401 sa.nLength =
sizeof (SECURITY_ATTRIBUTES);
402 sa.lpSecurityDescriptor = &sd;
417 int event_signaler_port = 5905;
420 # if !defined _WIN32_WCE 421 sync = CreateEventW (&sa, FALSE, TRUE, L
"Global\\zmq-signaler-port-sync");
423 sync = CreateEventW (NULL, FALSE, TRUE, L
"Global\\zmq-signaler-port-sync");
425 if (sync == NULL && GetLastError () == ERROR_ACCESS_DENIED)
426 sync = OpenEventW (SYNCHRONIZE | EVENT_MODIFY_STATE,
427 FALSE, L
"Global\\zmq-signaler-port-sync");
429 win_assert (sync != NULL);
433 wchar_t mutex_name [MAX_PATH];
435 _snwprintf (mutex_name, MAX_PATH, L
"Global\\zmq-signaler-port-%d",
signaler_port);
437 swprintf (mutex_name, MAX_PATH, L
"Global\\zmq-signaler-port-%d",
signaler_port);
440 # if !defined _WIN32_WCE 441 sync = CreateMutexW (&sa, FALSE, mutex_name);
443 sync = CreateMutexW (NULL, FALSE, mutex_name);
445 if (sync == NULL && GetLastError () == ERROR_ACCESS_DENIED)
446 sync = OpenMutexW (SYNCHRONIZE, FALSE, mutex_name);
448 win_assert (sync != NULL);
453 *w_ = INVALID_SOCKET;
454 *r_ = INVALID_SOCKET;
459 wsa_assert (listener != INVALID_SOCKET);
462 BOOL so_reuseaddr = 1;
463 int rc = setsockopt (listener, SOL_SOCKET, SO_REUSEADDR,
464 (
char *) &so_reuseaddr,
sizeof so_reuseaddr);
465 wsa_assert (rc != SOCKET_ERROR);
466 BOOL tcp_nodelay = 1;
467 rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELAY,
468 (
char *) &tcp_nodelay,
sizeof tcp_nodelay);
469 wsa_assert (rc != SOCKET_ERROR);
472 struct sockaddr_in addr;
473 memset (&addr, 0,
sizeof addr);
474 addr.sin_family = AF_INET;
475 addr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
480 wsa_assert (*w_ != INVALID_SOCKET);
483 rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELAY,
484 (
char *) &tcp_nodelay,
sizeof tcp_nodelay);
485 wsa_assert (rc != SOCKET_ERROR);
489 DWORD dwrc = WaitForSingleObject (sync, INFINITE);
490 zmq_assert (dwrc == WAIT_OBJECT_0 || dwrc == WAIT_ABANDONED);
494 rc =
bind (listener, (
const struct sockaddr *) &addr,
sizeof addr);
498 int addrlen =
sizeof addr;
499 rc = getsockname (listener, (
struct sockaddr *) &addr, &addrlen);
503 if (rc != SOCKET_ERROR)
504 rc = listen (listener, 1);
507 if (rc != SOCKET_ERROR)
508 rc = connect (*w_, (
struct sockaddr *) &addr,
sizeof addr);
511 if (rc != SOCKET_ERROR)
512 *r_ = accept (listener, NULL, NULL);
516 if (*r_ != INVALID_SOCKET) {
517 size_t dummy_size = 1024 * 1024;
518 unsigned char *dummy = (
unsigned char *) malloc (dummy_size);
519 int still_to_send = (
int) dummy_size;
520 int still_to_recv = (
int) dummy_size;
521 while (still_to_send || still_to_recv) {
523 if (still_to_send > 0) {
524 nbytes =
::send (*w_, (
char *) (dummy + dummy_size - still_to_send), still_to_send, 0);
525 wsa_assert (nbytes != SOCKET_ERROR);
526 still_to_send -= nbytes;
528 nbytes =
::recv (*r_, (
char *) (dummy + dummy_size - still_to_recv), still_to_recv, 0);
529 wsa_assert (nbytes != SOCKET_ERROR);
530 still_to_recv -= nbytes;
537 if (*r_ == INVALID_SOCKET)
538 saved_errno = WSAGetLastError ();
541 rc = closesocket (listener);
542 wsa_assert(rc != SOCKET_ERROR);
548 brc = SetEvent (sync);
550 brc = ReleaseMutex (sync);
551 win_assert (brc != 0);
554 brc = CloseHandle (sync);
555 win_assert (brc != 0);
558 if (*r_ != INVALID_SOCKET) {
559 # if !defined _WIN32_WCE 561 BOOL brc = SetHandleInformation ((HANDLE) *r_, HANDLE_FLAG_INHERIT, 0);
568 if (*w_ != INVALID_SOCKET) {
569 rc = closesocket (*w_);
570 wsa_assert (rc != SOCKET_ERROR);
571 *w_ = INVALID_SOCKET;
574 errno = wsa_error_to_errno (saved_errno);
578 #elif defined ZMQ_HAVE_OPENVMS 586 struct sockaddr_in lcladdr;
587 memset (&lcladdr, 0,
sizeof lcladdr);
588 lcladdr.sin_family = AF_INET;
589 lcladdr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
590 lcladdr.sin_port = 0;
592 int listener =
open_socket (AF_INET, SOCK_STREAM, 0);
596 int rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELAY, &on,
sizeof on);
599 rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELACK, &on,
sizeof on);
602 rc =
bind (listener, (
struct sockaddr *) &lcladdr,
sizeof lcladdr);
605 socklen_t lcladdr_len =
sizeof lcladdr;
607 rc = getsockname (listener, (
struct sockaddr *) &lcladdr, &lcladdr_len);
610 rc = listen (listener, 1);
616 rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELAY, &on,
sizeof on);
619 rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELACK, &on,
sizeof on);
622 rc = connect (*w_, (
struct sockaddr *) &lcladdr,
sizeof lcladdr);
625 *r_ = accept (listener, NULL, NULL);
635 int rc = socketpair (AF_UNIX, SOCK_STREAM, 0, sv);
fd_t open_socket(int domain_, int type_, int protocol_)
void unblock_socket(fd_t s_)
static int close_wait_ms(int fd_, unsigned int max_ms_=2000)
static int sleep_ms(unsigned int ms_)
static int make_fdpair(fd_t *r_, fd_t *w_)