LCOV - code coverage report
Current view: top level - home/h/core/forks/m4-libzmq/src - signaler.cpp (source / functions) Hit Total Coverage
Test: zeromq-4.2.0 Code Coverage Lines: 65 84 77.4 %
Date: 2016-05-09 Functions: 9 10 90.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*
       2             :     Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
       3             : 
       4             :     This file is part of libzmq, the ZeroMQ core engine in C++.
       5             : 
       6             :     libzmq is free software; you can redistribute it and/or modify it under
       7             :     the terms of the GNU Lesser General Public License (LGPL) as published
       8             :     by the Free Software Foundation; either version 3 of the License, or
       9             :     (at your option) any later version.
      10             : 
      11             :     As a special exception, the Contributors give you permission to link
      12             :     this library with independent modules to produce an executable,
      13             :     regardless of the license terms of these independent modules, and to
      14             :     copy and distribute the resulting executable under terms of your choice,
      15             :     provided that you also meet, for each linked independent module, the
      16             :     terms and conditions of the license of that module. An independent
      17             :     module is a module which is not derived from or based on this library.
      18             :     If you modify this library, you must extend this exception to your
      19             :     version of the library.
      20             : 
      21             :     libzmq is distributed in the hope that it will be useful, but WITHOUT
      22             :     ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
      23             :     FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
      24             :     License for more details.
      25             : 
      26             :     You should have received a copy of the GNU Lesser General Public License
      27             :     along with this program.  If not, see <http://www.gnu.org/licenses/>.
      28             : */
      29             : 
      30             : #include "precompiled.hpp"
      31             : #include "poller.hpp"
      32             : 
      33             : //  On AIX, poll.h has to be included before zmq.h to get consistent
      34             : //  definition of pollfd structure (AIX uses 'reqevents' and 'retnevents'
      35             : //  instead of 'events' and 'revents' and defines macros to map from POSIX-y
      36             : //  names to AIX-specific names).
      37             : #if defined ZMQ_POLL_BASED_ON_POLL
      38             : #include <poll.h>
      39             : #elif defined ZMQ_POLL_BASED_ON_SELECT
      40             : #if defined ZMQ_HAVE_WINDOWS
      41             : #include "windows.hpp"
      42             : #elif defined ZMQ_HAVE_HPUX
      43             : #include <sys/param.h>
      44             : #include <sys/types.h>
      45             : #include <sys/time.h>
      46             : #elif defined ZMQ_HAVE_OPENVMS
      47             : #include <sys/types.h>
      48             : #include <sys/time.h>
      49             : #else
      50             : #include <sys/select.h>
      51             : #endif
      52             : #endif
      53             : 
      54             : #include "signaler.hpp"
      55             : #include "likely.hpp"
      56             : #include "stdint.hpp"
      57             : #include "config.hpp"
      58             : #include "err.hpp"
      59             : #include "fd.hpp"
      60             : #include "ip.hpp"
      61             : 
      62             : #if defined ZMQ_HAVE_EVENTFD
      63             : #include <sys/eventfd.h>
      64             : #endif
      65             : 
      66             : #if defined ZMQ_HAVE_WINDOWS
      67             : #include "windows.hpp"
      68             : #else
      69             : #include <unistd.h>
      70             : #include <netinet/tcp.h>
      71             : #include <sys/types.h>
      72             : #include <sys/socket.h>
      73             : #endif
      74             : 
      75             : #if !defined (ZMQ_HAVE_WINDOWS)
      76             : // Helper to sleep for specific number of milliseconds (or until signal)
      77             : //
      78             : static int sleep_ms (unsigned int ms_)
      79             : {
      80           0 :     if (ms_ == 0)
      81             :         return 0;
      82             : #if defined ZMQ_HAVE_WINDOWS
      83             :     Sleep (ms_ > 0 ? ms_ : INFINITE);
      84             :     return 0;
      85             : #elif defined ZMQ_HAVE_ANDROID
      86             :     usleep (ms_ * 1000);
      87             :     return 0;
      88             : #else
      89           0 :     return usleep (ms_ * 1000);
      90             : #endif
      91             : }
      92             : 
      93             : // Helper to wait on close(), for non-blocking sockets, until it completes
      94             : // If EAGAIN is received, will sleep briefly (1-100ms) then try again, until
      95             : // the overall timeout is reached.
      96             : //
      97       12583 : static int close_wait_ms (int fd_, unsigned int max_ms_ = 2000)
      98             : {
      99       12583 :     unsigned int ms_so_far = 0;
     100       12583 :     unsigned int step_ms   = max_ms_ / 10;
     101       12583 :     if (step_ms < 1)
     102           0 :         step_ms = 1;
     103       12583 :     if (step_ms > 100)
     104       12583 :         step_ms = 100;
     105             : 
     106       12583 :     int rc = 0;       // do not sleep on first attempt
     107       12583 :     do {
     108       12583 :         if (rc == -1 && errno == EAGAIN) {
     109             :             sleep_ms (step_ms);
     110           0 :             ms_so_far += step_ms;
     111             :         }
     112       12583 :         rc = close (fd_);
     113       12583 :     } while (ms_so_far < max_ms_ && rc == -1 && errno == EAGAIN);
     114             : 
     115       12583 :     return rc;
     116             : }
     117             : #endif
     118             : 
     119       12649 : zmq::signaler_t::signaler_t ()
     120             : {
     121             :     //  Create the socketpair for signaling.
     122       12649 :     if (make_fdpair (&r, &w) == 0) {
     123       12583 :         unblock_socket (w);
     124       12583 :         unblock_socket (r);
     125             :     }
     126             : #ifdef HAVE_FORK
     127       12649 :     pid = getpid ();
     128             : #endif
     129       12649 : }
     130             : 
     131             : // This might get run after some part of construction failed, leaving one or
     132             : // both of r and w retired_fd.
     133       12649 : zmq::signaler_t::~signaler_t ()
     134             : {
     135             : #if defined ZMQ_HAVE_EVENTFD
     136       12649 :     if (r == retired_fd) return;
     137       12583 :     int rc = close_wait_ms (r);
     138       12583 :     errno_assert (rc == 0);
     139             : #elif defined ZMQ_HAVE_WINDOWS
     140             :     if (w != retired_fd) {
     141             :         const struct linger so_linger = { 1, 0 };
     142             :         int rc = setsockopt (w, SOL_SOCKET, SO_LINGER,
     143             :             (const char *) &so_linger, sizeof so_linger);
     144             :         //  Only check shutdown if WSASTARTUP was previously done
     145             :         if (rc == 0 || WSAGetLastError () != WSANOTINITIALISED) {
     146             :             wsa_assert (rc != SOCKET_ERROR);
     147             :             rc = closesocket (w);
     148             :             wsa_assert (rc != SOCKET_ERROR);
     149             :             if (r == retired_fd) return;
     150             :             rc = closesocket (r);
     151             :             wsa_assert (rc != SOCKET_ERROR);
     152             :         }
     153             :     }
     154             : #else
     155             :     if (w != retired_fd) {
     156             :         int rc = close_wait_ms (w);
     157             :         errno_assert (rc == 0);
     158             :     }
     159             :     if (r != retired_fd) {
     160             :         int rc = close_wait_ms (r);
     161             :         errno_assert (rc == 0);
     162             :     }
     163             : #endif
     164       12649 : }
     165             : 
     166     1349610 : zmq::fd_t zmq::signaler_t::get_fd () const
     167             : {
     168     1349610 :     return r;
     169             : }
     170             : 
     171       37701 : void zmq::signaler_t::send ()
     172             : {
     173             : #if defined HAVE_FORK
     174       37701 :     if (unlikely (pid != getpid ())) {
     175             :         //printf("Child process %d signaler_t::send returning without sending #1\n", getpid());
     176           0 :         return; // do not send anything in forked child context
     177             :     }
     178             : #endif
     179             : #if defined ZMQ_HAVE_EVENTFD
     180       37698 :     const uint64_t inc = 1;
     181       37698 :     ssize_t sz = write (w, &inc, sizeof (inc));
     182       37693 :     errno_assert (sz == sizeof (inc));
     183             : #elif defined ZMQ_HAVE_WINDOWS
     184             :     unsigned char dummy = 0;
     185             :     int nbytes = ::send (w, (char *) &dummy, sizeof (dummy), 0);
     186             :     wsa_assert (nbytes != SOCKET_ERROR);
     187             :     zmq_assert (nbytes == sizeof (dummy));
     188             : #else
     189             :     unsigned char dummy = 0;
     190             :     while (true) {
     191             :         ssize_t nbytes = ::send (w, &dummy, sizeof (dummy), 0);
     192             :         if (unlikely (nbytes == -1 && errno == EINTR))
     193             :             continue;
     194             : #if defined(HAVE_FORK)
     195             :         if (unlikely (pid != getpid ())) {
     196             :             //printf("Child process %d signaler_t::send returning without sending #2\n", getpid());
     197             :             errno = EINTR;
     198             :             break;
     199             :         }
     200             : #endif
     201             :         zmq_assert (nbytes == sizeof dummy);
     202             :         break;
     203             :     }
     204             : #endif
     205             : }
     206             : 
     207     4134438 : int zmq::signaler_t::wait (int timeout_)
     208             : {
     209             : #ifdef HAVE_FORK
     210     4134438 :     if (unlikely (pid != getpid ())) {
     211             :         // we have forked and the file descriptor is closed. Emulate an interrupt
     212             :         // response.
     213             :         //printf("Child process %d signaler_t::wait returning simulating interrupt #1\n", getpid());
     214           0 :         errno = EINTR;
     215           0 :         return -1;
     216             :     }
     217             : #endif
     218             : 
     219             : #ifdef ZMQ_POLL_BASED_ON_POLL
     220             :     struct pollfd pfd;
     221     4134581 :     pfd.fd = r;
     222     4134581 :     pfd.events = POLLIN;
     223     4547841 :     int rc = poll (&pfd, 1, timeout_);
     224     4547841 :     if (unlikely (rc < 0)) {
     225           0 :         errno_assert (errno == EINTR);
     226             :         return -1;
     227             :     }
     228             :     else
     229     4547841 :     if (unlikely (rc == 0)) {
     230     4516264 :         errno = EAGAIN;
     231     4516164 :         return -1;
     232             :     }
     233             : #ifdef HAVE_FORK
     234             :     else
     235       31577 :     if (unlikely (pid != getpid ())) {
     236             :         // we have forked and the file descriptor is closed. Emulate an interrupt
     237             :         // response.
     238             :         //printf("Child process %d signaler_t::wait returning simulating interrupt #2\n", getpid());
     239           0 :         errno = EINTR;
     240           0 :         return -1;
     241             :     }
     242             : #endif
     243       31578 :     zmq_assert (rc == 1);
     244       31577 :     zmq_assert (pfd.revents & POLLIN);
     245             :     return 0;
     246             : 
     247             : #elif defined ZMQ_POLL_BASED_ON_SELECT
     248             : 
     249             :     fd_set fds;
     250             :     FD_ZERO (&fds);
     251             :     FD_SET (r, &fds);
     252             :     struct timeval timeout;
     253             :     if (timeout_ >= 0) {
     254             :         timeout.tv_sec = timeout_ / 1000;
     255             :         timeout.tv_usec = timeout_ % 1000 * 1000;
     256             :     }
     257             : #ifdef ZMQ_HAVE_WINDOWS
     258             :     int rc = select (0, &fds, NULL, NULL,
     259             :         timeout_ >= 0 ? &timeout : NULL);
     260             :     wsa_assert (rc != SOCKET_ERROR);
     261             : #else
     262             :     int rc = select (r + 1, &fds, NULL, NULL,
     263             :         timeout_ >= 0 ? &timeout : NULL);
     264             :     if (unlikely (rc < 0)) {
     265             :         errno_assert (errno == EINTR);
     266             :         return -1;
     267             :     }
     268             : #endif
     269             :     if (unlikely (rc == 0)) {
     270             :         errno = EAGAIN;
     271             :         return -1;
     272             :     }
     273             :     zmq_assert (rc == 1);
     274             :     return 0;
     275             : 
     276             : #else
     277             : #error
     278             : #endif
     279             : }
     280             : 
     281          88 : void zmq::signaler_t::recv ()
     282             : {
     283             :     //  Attempt to read a signal.
     284             : #if defined ZMQ_HAVE_EVENTFD
     285             :     uint64_t dummy;
     286         176 :     ssize_t sz = read (r, &dummy, sizeof (dummy));
     287          88 :     errno_assert (sz == sizeof (dummy));
     288             : 
     289             :     //  If we accidentally grabbed the next signal(s) along with the current
     290             :     //  one, return it back to the eventfd object.
     291          88 :     if (unlikely (dummy > 1)) {
     292           6 :         const uint64_t inc = dummy - 1;
     293           6 :         ssize_t sz2 = write (w, &inc, sizeof (inc));
     294           6 :         errno_assert (sz2 == sizeof (inc));
     295          88 :         return;
     296             :     }
     297             : 
     298          82 :     zmq_assert (dummy == 1);
     299             : #else
     300             :     unsigned char dummy;
     301             : #if defined ZMQ_HAVE_WINDOWS
     302             :     int nbytes = ::recv (r, (char *) &dummy, sizeof (dummy), 0);
     303             :     wsa_assert (nbytes != SOCKET_ERROR);
     304             : #else
     305             :     ssize_t nbytes = ::recv (r, &dummy, sizeof (dummy), 0);
     306             :     errno_assert (nbytes >= 0);
     307             : #endif
     308             :     zmq_assert (nbytes == sizeof (dummy));
     309             :     zmq_assert (dummy == 0);
     310             : #endif
     311             : }
     312             : 
     313       31577 : int zmq::signaler_t::recv_failable ()
     314             : {
     315             :     //  Attempt to read a signal.
     316             : #if defined ZMQ_HAVE_EVENTFD
     317             :     uint64_t dummy;
     318       63152 :     ssize_t sz = read (r, &dummy, sizeof (dummy));
     319       31575 :     if (sz == -1) {
     320           0 :         errno_assert (errno == EAGAIN);
     321             :         return -1;
     322             :     }
     323             :     else {
     324       31575 :         errno_assert (sz == sizeof (dummy));
     325             : 
     326             :         //  If we accidentally grabbed the next signal(s) along with the current
     327             :         //  one, return it back to the eventfd object.
     328       31576 :         if (unlikely (dummy > 1)) {
     329           0 :             const uint64_t inc = dummy - 1;
     330           0 :             ssize_t sz2 = write (w, &inc, sizeof (inc));
     331           0 :             errno_assert (sz2 == sizeof (inc));
     332             :             return 0;
     333             :         }
     334             : 
     335       31576 :         zmq_assert (dummy == 1);
     336             :     }
     337             : #else
     338             :     unsigned char dummy;
     339             : #if defined ZMQ_HAVE_WINDOWS
     340             :     int nbytes = ::recv (r, (char *) &dummy, sizeof (dummy), 0);
     341             :     if (nbytes == SOCKET_ERROR) {
     342             :                 const int last_error = WSAGetLastError();
     343             :                 if (last_error == WSAEWOULDBLOCK) {
     344             :                         errno = EAGAIN;
     345             :             return -1;
     346             :                 }
     347             :         wsa_assert (last_error == WSAEWOULDBLOCK);
     348             :     }
     349             : #else
     350             :     ssize_t nbytes = ::recv (r, &dummy, sizeof (dummy), 0);
     351             :     if (nbytes == -1) {
     352             :         if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
     353             :             errno = EAGAIN;
     354             :             return -1;
     355             :         }
     356             :         errno_assert (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR);
     357             :     }
     358             : #endif
     359             :     zmq_assert (nbytes == sizeof (dummy));
     360             :     zmq_assert (dummy == 0);
     361             : #endif
     362             :     return 0;
     363             : }
     364             : 
     365             : #ifdef HAVE_FORK
     366           0 : void zmq::signaler_t::forked ()
     367             : {
     368             :     //  Close file descriptors created in the parent and create new pair
     369           0 :     close (r);
     370           0 :     close (w);
     371           0 :     make_fdpair (&r, &w);
     372           0 : }
     373             : #endif
     374             : 
     375             : //  Returns -1 if we could not make the socket pair successfully
     376       12649 : int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
     377             : {
     378             : #if defined ZMQ_HAVE_EVENTFD
     379       12649 :     fd_t fd = eventfd (0, 0);
     380       12649 :     if (fd == -1) {
     381          66 :         errno_assert (errno == ENFILE || errno == EMFILE);
     382          66 :         *w_ = *r_ = -1;
     383          66 :         return -1;
     384             :     }
     385             :     else {
     386       12583 :         *w_ = *r_ = fd;
     387       12583 :         return 0;
     388             :     }
     389             : 
     390             : #elif defined ZMQ_HAVE_WINDOWS
     391             : #   if !defined _WIN32_WCE
     392             :     // Windows CE does not manage security attributes
     393             :     SECURITY_DESCRIPTOR sd;
     394             :     SECURITY_ATTRIBUTES sa;
     395             :     memset (&sd, 0, sizeof sd);
     396             :     memset (&sa, 0, sizeof sa);
     397             : 
     398             :     InitializeSecurityDescriptor (&sd, SECURITY_DESCRIPTOR_REVISION);
     399             :     SetSecurityDescriptorDacl (&sd, TRUE, 0, FALSE);
     400             : 
     401             :     sa.nLength = sizeof (SECURITY_ATTRIBUTES);
     402             :     sa.lpSecurityDescriptor = &sd;
     403             : #   endif
     404             : 
     405             :     //  This function has to be in a system-wide critical section so that
     406             :     //  two instances of the library don't accidentally create signaler
     407             :     //  crossing the process boundary.
     408             :     //  We'll use named event object to implement the critical section.
     409             :     //  Note that if the event object already exists, the CreateEvent requests
     410             :     //  EVENT_ALL_ACCESS access right. If this fails, we try to open
     411             :     //  the event object asking for SYNCHRONIZE access only.
     412             :     HANDLE sync = NULL;
     413             : 
     414             :     //  Create critical section only if using fixed signaler port
     415             :     //  Use problematic Event implementation for compatibility if using old port 5905.
     416             :     //  Otherwise use Mutex implementation.
     417             :     int event_signaler_port = 5905;
     418             : 
     419             :     if (signaler_port == event_signaler_port) {
     420             : #       if !defined _WIN32_WCE
     421             :         sync = CreateEventW (&sa, FALSE, TRUE, L"Global\\zmq-signaler-port-sync");
     422             : #       else
     423             :         sync = CreateEventW (NULL, FALSE, TRUE, L"Global\\zmq-signaler-port-sync");
     424             : #       endif
     425             :         if (sync == NULL && GetLastError () == ERROR_ACCESS_DENIED)
     426             :             sync = OpenEventW (SYNCHRONIZE | EVENT_MODIFY_STATE,
     427             :                               FALSE, L"Global\\zmq-signaler-port-sync");
     428             : 
     429             :         win_assert (sync != NULL);
     430             :     }
     431             :     else
     432             :     if (signaler_port != 0) {
     433             :         wchar_t mutex_name [MAX_PATH];
     434             : #       ifdef __MINGW32__
     435             :         _snwprintf (mutex_name, MAX_PATH, L"Global\\zmq-signaler-port-%d", signaler_port);
     436             : #       else
     437             :         swprintf (mutex_name, MAX_PATH, L"Global\\zmq-signaler-port-%d", signaler_port);
     438             : #       endif
     439             : 
     440             : #       if !defined _WIN32_WCE
     441             :         sync = CreateMutexW (&sa, FALSE, mutex_name);
     442             : #       else
     443             :         sync = CreateMutexW (NULL, FALSE, mutex_name);
     444             : #       endif
     445             :         if (sync == NULL && GetLastError () == ERROR_ACCESS_DENIED)
     446             :             sync = OpenMutexW (SYNCHRONIZE, FALSE, mutex_name);
     447             : 
     448             :         win_assert (sync != NULL);
     449             :     }
     450             : 
     451             :     //  Windows has no 'socketpair' function. CreatePipe is no good as pipe
     452             :     //  handles cannot be polled on. Here we create the socketpair by hand.
     453             :     *w_ = INVALID_SOCKET;
     454             :     *r_ = INVALID_SOCKET;
     455             : 
     456             :     //  Create listening socket.
     457             :     SOCKET listener;
     458             :     listener = open_socket (AF_INET, SOCK_STREAM, 0);
     459             :     wsa_assert (listener != INVALID_SOCKET);
     460             : 
     461             :     //  Set SO_REUSEADDR and TCP_NODELAY on listening socket.
     462             :     BOOL so_reuseaddr = 1;
     463             :     int rc = setsockopt (listener, SOL_SOCKET, SO_REUSEADDR,
     464             :         (char *) &so_reuseaddr, sizeof so_reuseaddr);
     465             :     wsa_assert (rc != SOCKET_ERROR);
     466             :     BOOL tcp_nodelay = 1;
     467             :     rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELAY,
     468             :         (char *) &tcp_nodelay, sizeof tcp_nodelay);
     469             :     wsa_assert (rc != SOCKET_ERROR);
     470             : 
     471             :     //  Init sockaddr to signaler port.
     472             :     struct sockaddr_in addr;
     473             :     memset (&addr, 0, sizeof addr);
     474             :     addr.sin_family = AF_INET;
     475             :     addr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
     476             :     addr.sin_port = htons (signaler_port);
     477             : 
     478             :     //  Create the writer socket.
     479             :     *w_ = open_socket (AF_INET, SOCK_STREAM, 0);
     480             :     wsa_assert (*w_ != INVALID_SOCKET);
     481             : 
     482             :     //  Set TCP_NODELAY on writer socket.
     483             :     rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELAY,
     484             :         (char *) &tcp_nodelay, sizeof tcp_nodelay);
     485             :     wsa_assert (rc != SOCKET_ERROR);
     486             : 
     487             :     if (sync != NULL) {
     488             :         //  Enter the critical section.
     489             :         DWORD dwrc = WaitForSingleObject (sync, INFINITE);
     490             :         zmq_assert (dwrc == WAIT_OBJECT_0 || dwrc == WAIT_ABANDONED);
     491             :     }
     492             : 
     493             :     //  Bind listening socket to signaler port.
     494             :     rc = bind (listener, (const struct sockaddr *) &addr, sizeof addr);
     495             : 
     496             :     if (rc != SOCKET_ERROR && signaler_port == 0) {
     497             :         //  Retrieve ephemeral port number
     498             :         int addrlen = sizeof addr;
     499             :         rc = getsockname (listener, (struct sockaddr *) &addr, &addrlen);
     500             :     }
     501             : 
     502             :     //  Listen for incoming connections.
     503             :     if (rc != SOCKET_ERROR)
     504             :         rc = listen (listener, 1);
     505             : 
     506             :     //  Connect writer to the listener.
     507             :     if (rc != SOCKET_ERROR)
     508             :         rc = connect (*w_, (struct sockaddr *) &addr, sizeof addr);
     509             : 
     510             :     //  Accept connection from writer.
     511             :     if (rc != SOCKET_ERROR)
     512             :         *r_ = accept (listener, NULL, NULL);
     513             : 
     514             :     //  Send/receive large chunk to work around TCP slow start
     515             :     //  This code is a workaround for #1608
     516             :     if (*r_ != INVALID_SOCKET) {
     517             :         size_t dummy_size = 1024 * 1024;        //  1M to overload default receive buffer
     518             :         unsigned char *dummy = (unsigned char *) malloc (dummy_size);
     519             :         int still_to_send = (int) dummy_size;
     520             :         int still_to_recv = (int) dummy_size;
     521             :         while (still_to_send || still_to_recv) {
     522             :             int nbytes;
     523             :             if (still_to_send > 0) {
     524             :                 nbytes = ::send (*w_, (char *) (dummy + dummy_size - still_to_send), still_to_send, 0);
     525             :                 wsa_assert (nbytes != SOCKET_ERROR);
     526             :                 still_to_send -= nbytes;
     527             :             }
     528             :             nbytes = ::recv (*r_, (char *) (dummy + dummy_size - still_to_recv), still_to_recv, 0);
     529             :             wsa_assert (nbytes != SOCKET_ERROR);
     530             :             still_to_recv -= nbytes;
     531             :         }
     532             :         free (dummy);
     533             :     }
     534             : 
     535             :     //  Save errno if error occurred in bind/listen/connect/accept.
     536             :     int saved_errno = 0;
     537             :     if (*r_ == INVALID_SOCKET)
     538             :         saved_errno = WSAGetLastError ();
     539             : 
     540             :     //  We don't need the listening socket anymore. Close it.
     541             :     rc = closesocket (listener);
     542             :     wsa_assert(rc != SOCKET_ERROR);
     543             : 
     544             :     if (sync != NULL) {
     545             :         //  Exit the critical section.
     546             :         BOOL brc;
     547             :         if (signaler_port == event_signaler_port)
     548             :             brc = SetEvent (sync);
     549             :         else
     550             :             brc = ReleaseMutex (sync);
     551             :         win_assert (brc != 0);
     552             : 
     553             :         //  Release the kernel object
     554             :         brc = CloseHandle (sync);
     555             :         win_assert (brc != 0);
     556             :     }
     557             : 
     558             :     if (*r_ != INVALID_SOCKET) {
     559             : #   if !defined _WIN32_WCE
     560             :         //  On Windows, preventing sockets to be inherited by child processes.
     561             :         BOOL brc = SetHandleInformation ((HANDLE) *r_, HANDLE_FLAG_INHERIT, 0);
     562             :         win_assert (brc);
     563             : #   endif
     564             :         return 0;
     565             :     }
     566             :     else {
     567             :         //  Cleanup writer if connection failed
     568             :         if (*w_ != INVALID_SOCKET) {
     569             :             rc = closesocket (*w_);
     570             :             wsa_assert (rc != SOCKET_ERROR);
     571             :             *w_ = INVALID_SOCKET;
     572             :         }
     573             :         //  Set errno from saved value
     574             :         errno = wsa_error_to_errno (saved_errno);
     575             :         return -1;
     576             :     }
     577             : 
     578             : #elif defined ZMQ_HAVE_OPENVMS
     579             : 
     580             :     //  Whilst OpenVMS supports socketpair - it maps to AF_INET only.  Further,
     581             :     //  it does not set the socket options TCP_NODELAY and TCP_NODELACK which
     582             :     //  can lead to performance problems.
     583             :     //
     584             :     //  The bug will be fixed in V5.6 ECO4 and beyond.  In the meantime, we'll
     585             :     //  create the socket pair manually.
     586             :     struct sockaddr_in lcladdr;
     587             :     memset (&lcladdr, 0, sizeof lcladdr);
     588             :     lcladdr.sin_family = AF_INET;
     589             :     lcladdr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
     590             :     lcladdr.sin_port = 0;
     591             : 
     592             :     int listener = open_socket (AF_INET, SOCK_STREAM, 0);
     593             :     errno_assert (listener != -1);
     594             : 
     595             :     int on = 1;
     596             :     int rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELAY, &on, sizeof on);
     597             :     errno_assert (rc != -1);
     598             : 
     599             :     rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELACK, &on, sizeof on);
     600             :     errno_assert (rc != -1);
     601             : 
     602             :     rc = bind (listener, (struct sockaddr *) &lcladdr, sizeof lcladdr);
     603             :     errno_assert (rc != -1);
     604             : 
     605             :     socklen_t lcladdr_len = sizeof lcladdr;
     606             : 
     607             :     rc = getsockname (listener, (struct sockaddr *) &lcladdr, &lcladdr_len);
     608             :     errno_assert (rc != -1);
     609             : 
     610             :     rc = listen (listener, 1);
     611             :     errno_assert (rc != -1);
     612             : 
     613             :     *w_ = open_socket (AF_INET, SOCK_STREAM, 0);
     614             :     errno_assert (*w_ != -1);
     615             : 
     616             :     rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELAY, &on, sizeof on);
     617             :     errno_assert (rc != -1);
     618             : 
     619             :     rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELACK, &on, sizeof on);
     620             :     errno_assert (rc != -1);
     621             : 
     622             :     rc = connect (*w_, (struct sockaddr *) &lcladdr, sizeof lcladdr);
     623             :     errno_assert (rc != -1);
     624             : 
     625             :     *r_ = accept (listener, NULL, NULL);
     626             :     errno_assert (*r_ != -1);
     627             : 
     628             :     close (listener);
     629             : 
     630             :     return 0;
     631             : 
     632             : #else
     633             :     // All other implementations support socketpair()
     634             :     int sv [2];
     635             :     int rc = socketpair (AF_UNIX, SOCK_STREAM, 0, sv);
     636             :     if (rc == -1) {
     637             :         errno_assert (errno == ENFILE || errno == EMFILE);
     638             :         *w_ = *r_ = -1;
     639             :         return -1;
     640             :     }
     641             :     else {
     642             :         *w_ = sv [0];
     643             :         *r_ = sv [1];
     644             :         return 0;
     645             :     }
     646             : #endif
     647             : }

Generated by: LCOV version 1.10