Line data Source code
1 : /*
2 : Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 :
4 : This file is part of libzmq, the ZeroMQ core engine in C++.
5 :
6 : libzmq is free software; you can redistribute it and/or modify it under
7 : the terms of the GNU Lesser General Public License (LGPL) as published
8 : by the Free Software Foundation; either version 3 of the License, or
9 : (at your option) any later version.
10 :
11 : As a special exception, the Contributors give you permission to link
12 : this library with independent modules to produce an executable,
13 : regardless of the license terms of these independent modules, and to
14 : copy and distribute the resulting executable under terms of your choice,
15 : provided that you also meet, for each linked independent module, the
16 : terms and conditions of the license of that module. An independent
17 : module is a module which is not derived from or based on this library.
18 : If you modify this library, you must extend this exception to your
19 : version of the library.
20 :
21 : libzmq is distributed in the hope that it will be useful, but WITHOUT
22 : ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 : FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 : License for more details.
25 :
26 : You should have received a copy of the GNU Lesser General Public License
27 : along with this program. If not, see <http://www.gnu.org/licenses/>.
28 : */
29 :
30 : #include "precompiled.hpp"
31 : #include "poller.hpp"
32 :
33 : // On AIX, poll.h has to be included before zmq.h to get consistent
34 : // definition of pollfd structure (AIX uses 'reqevents' and 'retnevents'
35 : // instead of 'events' and 'revents' and defines macros to map from POSIX-y
36 : // names to AIX-specific names).
37 : #if defined ZMQ_POLL_BASED_ON_POLL
38 : #include <poll.h>
39 : #elif defined ZMQ_POLL_BASED_ON_SELECT
40 : #if defined ZMQ_HAVE_WINDOWS
41 : #include "windows.hpp"
42 : #elif defined ZMQ_HAVE_HPUX
43 : #include <sys/param.h>
44 : #include <sys/types.h>
45 : #include <sys/time.h>
46 : #elif defined ZMQ_HAVE_OPENVMS
47 : #include <sys/types.h>
48 : #include <sys/time.h>
49 : #else
50 : #include <sys/select.h>
51 : #endif
52 : #endif
53 :
54 : #include "signaler.hpp"
55 : #include "likely.hpp"
56 : #include "stdint.hpp"
57 : #include "config.hpp"
58 : #include "err.hpp"
59 : #include "fd.hpp"
60 : #include "ip.hpp"
61 :
62 : #if defined ZMQ_HAVE_EVENTFD
63 : #include <sys/eventfd.h>
64 : #endif
65 :
66 : #if defined ZMQ_HAVE_WINDOWS
67 : #include "windows.hpp"
68 : #else
69 : #include <unistd.h>
70 : #include <netinet/tcp.h>
71 : #include <sys/types.h>
72 : #include <sys/socket.h>
73 : #endif
74 :
75 : #if !defined (ZMQ_HAVE_WINDOWS)
76 : // Helper to sleep for specific number of milliseconds (or until signal)
77 : //
78 : static int sleep_ms (unsigned int ms_)
79 : {
80 0 : if (ms_ == 0)
81 : return 0;
82 : #if defined ZMQ_HAVE_WINDOWS
83 : Sleep (ms_ > 0 ? ms_ : INFINITE);
84 : return 0;
85 : #elif defined ZMQ_HAVE_ANDROID
86 : usleep (ms_ * 1000);
87 : return 0;
88 : #else
89 0 : return usleep (ms_ * 1000);
90 : #endif
91 : }
92 :
93 : // Helper to wait on close(), for non-blocking sockets, until it completes
94 : // If EAGAIN is received, will sleep briefly (1-100ms) then try again, until
95 : // the overall timeout is reached.
96 : //
97 12583 : static int close_wait_ms (int fd_, unsigned int max_ms_ = 2000)
98 : {
99 12583 : unsigned int ms_so_far = 0;
100 12583 : unsigned int step_ms = max_ms_ / 10;
101 12583 : if (step_ms < 1)
102 0 : step_ms = 1;
103 12583 : if (step_ms > 100)
104 12583 : step_ms = 100;
105 :
106 12583 : int rc = 0; // do not sleep on first attempt
107 12583 : do {
108 12583 : if (rc == -1 && errno == EAGAIN) {
109 : sleep_ms (step_ms);
110 0 : ms_so_far += step_ms;
111 : }
112 12583 : rc = close (fd_);
113 12583 : } while (ms_so_far < max_ms_ && rc == -1 && errno == EAGAIN);
114 :
115 12583 : return rc;
116 : }
117 : #endif
118 :
119 12649 : zmq::signaler_t::signaler_t ()
120 : {
121 : // Create the socketpair for signaling.
122 12649 : if (make_fdpair (&r, &w) == 0) {
123 12583 : unblock_socket (w);
124 12583 : unblock_socket (r);
125 : }
126 : #ifdef HAVE_FORK
127 12649 : pid = getpid ();
128 : #endif
129 12649 : }
130 :
131 : // This might get run after some part of construction failed, leaving one or
132 : // both of r and w retired_fd.
133 12649 : zmq::signaler_t::~signaler_t ()
134 : {
135 : #if defined ZMQ_HAVE_EVENTFD
136 12649 : if (r == retired_fd) return;
137 12583 : int rc = close_wait_ms (r);
138 12583 : errno_assert (rc == 0);
139 : #elif defined ZMQ_HAVE_WINDOWS
140 : if (w != retired_fd) {
141 : const struct linger so_linger = { 1, 0 };
142 : int rc = setsockopt (w, SOL_SOCKET, SO_LINGER,
143 : (const char *) &so_linger, sizeof so_linger);
144 : // Only check shutdown if WSASTARTUP was previously done
145 : if (rc == 0 || WSAGetLastError () != WSANOTINITIALISED) {
146 : wsa_assert (rc != SOCKET_ERROR);
147 : rc = closesocket (w);
148 : wsa_assert (rc != SOCKET_ERROR);
149 : if (r == retired_fd) return;
150 : rc = closesocket (r);
151 : wsa_assert (rc != SOCKET_ERROR);
152 : }
153 : }
154 : #else
155 : if (w != retired_fd) {
156 : int rc = close_wait_ms (w);
157 : errno_assert (rc == 0);
158 : }
159 : if (r != retired_fd) {
160 : int rc = close_wait_ms (r);
161 : errno_assert (rc == 0);
162 : }
163 : #endif
164 12649 : }
165 :
166 1349610 : zmq::fd_t zmq::signaler_t::get_fd () const
167 : {
168 1349610 : return r;
169 : }
170 :
171 37701 : void zmq::signaler_t::send ()
172 : {
173 : #if defined HAVE_FORK
174 37701 : if (unlikely (pid != getpid ())) {
175 : //printf("Child process %d signaler_t::send returning without sending #1\n", getpid());
176 0 : return; // do not send anything in forked child context
177 : }
178 : #endif
179 : #if defined ZMQ_HAVE_EVENTFD
180 37698 : const uint64_t inc = 1;
181 37698 : ssize_t sz = write (w, &inc, sizeof (inc));
182 37693 : errno_assert (sz == sizeof (inc));
183 : #elif defined ZMQ_HAVE_WINDOWS
184 : unsigned char dummy = 0;
185 : int nbytes = ::send (w, (char *) &dummy, sizeof (dummy), 0);
186 : wsa_assert (nbytes != SOCKET_ERROR);
187 : zmq_assert (nbytes == sizeof (dummy));
188 : #else
189 : unsigned char dummy = 0;
190 : while (true) {
191 : ssize_t nbytes = ::send (w, &dummy, sizeof (dummy), 0);
192 : if (unlikely (nbytes == -1 && errno == EINTR))
193 : continue;
194 : #if defined(HAVE_FORK)
195 : if (unlikely (pid != getpid ())) {
196 : //printf("Child process %d signaler_t::send returning without sending #2\n", getpid());
197 : errno = EINTR;
198 : break;
199 : }
200 : #endif
201 : zmq_assert (nbytes == sizeof dummy);
202 : break;
203 : }
204 : #endif
205 : }
206 :
207 4134438 : int zmq::signaler_t::wait (int timeout_)
208 : {
209 : #ifdef HAVE_FORK
210 4134438 : if (unlikely (pid != getpid ())) {
211 : // we have forked and the file descriptor is closed. Emulate an interrupt
212 : // response.
213 : //printf("Child process %d signaler_t::wait returning simulating interrupt #1\n", getpid());
214 0 : errno = EINTR;
215 0 : return -1;
216 : }
217 : #endif
218 :
219 : #ifdef ZMQ_POLL_BASED_ON_POLL
220 : struct pollfd pfd;
221 4134581 : pfd.fd = r;
222 4134581 : pfd.events = POLLIN;
223 4547841 : int rc = poll (&pfd, 1, timeout_);
224 4547841 : if (unlikely (rc < 0)) {
225 0 : errno_assert (errno == EINTR);
226 : return -1;
227 : }
228 : else
229 4547841 : if (unlikely (rc == 0)) {
230 4516264 : errno = EAGAIN;
231 4516164 : return -1;
232 : }
233 : #ifdef HAVE_FORK
234 : else
235 31577 : if (unlikely (pid != getpid ())) {
236 : // we have forked and the file descriptor is closed. Emulate an interrupt
237 : // response.
238 : //printf("Child process %d signaler_t::wait returning simulating interrupt #2\n", getpid());
239 0 : errno = EINTR;
240 0 : return -1;
241 : }
242 : #endif
243 31578 : zmq_assert (rc == 1);
244 31577 : zmq_assert (pfd.revents & POLLIN);
245 : return 0;
246 :
247 : #elif defined ZMQ_POLL_BASED_ON_SELECT
248 :
249 : fd_set fds;
250 : FD_ZERO (&fds);
251 : FD_SET (r, &fds);
252 : struct timeval timeout;
253 : if (timeout_ >= 0) {
254 : timeout.tv_sec = timeout_ / 1000;
255 : timeout.tv_usec = timeout_ % 1000 * 1000;
256 : }
257 : #ifdef ZMQ_HAVE_WINDOWS
258 : int rc = select (0, &fds, NULL, NULL,
259 : timeout_ >= 0 ? &timeout : NULL);
260 : wsa_assert (rc != SOCKET_ERROR);
261 : #else
262 : int rc = select (r + 1, &fds, NULL, NULL,
263 : timeout_ >= 0 ? &timeout : NULL);
264 : if (unlikely (rc < 0)) {
265 : errno_assert (errno == EINTR);
266 : return -1;
267 : }
268 : #endif
269 : if (unlikely (rc == 0)) {
270 : errno = EAGAIN;
271 : return -1;
272 : }
273 : zmq_assert (rc == 1);
274 : return 0;
275 :
276 : #else
277 : #error
278 : #endif
279 : }
280 :
281 88 : void zmq::signaler_t::recv ()
282 : {
283 : // Attempt to read a signal.
284 : #if defined ZMQ_HAVE_EVENTFD
285 : uint64_t dummy;
286 176 : ssize_t sz = read (r, &dummy, sizeof (dummy));
287 88 : errno_assert (sz == sizeof (dummy));
288 :
289 : // If we accidentally grabbed the next signal(s) along with the current
290 : // one, return it back to the eventfd object.
291 88 : if (unlikely (dummy > 1)) {
292 6 : const uint64_t inc = dummy - 1;
293 6 : ssize_t sz2 = write (w, &inc, sizeof (inc));
294 6 : errno_assert (sz2 == sizeof (inc));
295 88 : return;
296 : }
297 :
298 82 : zmq_assert (dummy == 1);
299 : #else
300 : unsigned char dummy;
301 : #if defined ZMQ_HAVE_WINDOWS
302 : int nbytes = ::recv (r, (char *) &dummy, sizeof (dummy), 0);
303 : wsa_assert (nbytes != SOCKET_ERROR);
304 : #else
305 : ssize_t nbytes = ::recv (r, &dummy, sizeof (dummy), 0);
306 : errno_assert (nbytes >= 0);
307 : #endif
308 : zmq_assert (nbytes == sizeof (dummy));
309 : zmq_assert (dummy == 0);
310 : #endif
311 : }
312 :
313 31577 : int zmq::signaler_t::recv_failable ()
314 : {
315 : // Attempt to read a signal.
316 : #if defined ZMQ_HAVE_EVENTFD
317 : uint64_t dummy;
318 63152 : ssize_t sz = read (r, &dummy, sizeof (dummy));
319 31575 : if (sz == -1) {
320 0 : errno_assert (errno == EAGAIN);
321 : return -1;
322 : }
323 : else {
324 31575 : errno_assert (sz == sizeof (dummy));
325 :
326 : // If we accidentally grabbed the next signal(s) along with the current
327 : // one, return it back to the eventfd object.
328 31576 : if (unlikely (dummy > 1)) {
329 0 : const uint64_t inc = dummy - 1;
330 0 : ssize_t sz2 = write (w, &inc, sizeof (inc));
331 0 : errno_assert (sz2 == sizeof (inc));
332 : return 0;
333 : }
334 :
335 31576 : zmq_assert (dummy == 1);
336 : }
337 : #else
338 : unsigned char dummy;
339 : #if defined ZMQ_HAVE_WINDOWS
340 : int nbytes = ::recv (r, (char *) &dummy, sizeof (dummy), 0);
341 : if (nbytes == SOCKET_ERROR) {
342 : const int last_error = WSAGetLastError();
343 : if (last_error == WSAEWOULDBLOCK) {
344 : errno = EAGAIN;
345 : return -1;
346 : }
347 : wsa_assert (last_error == WSAEWOULDBLOCK);
348 : }
349 : #else
350 : ssize_t nbytes = ::recv (r, &dummy, sizeof (dummy), 0);
351 : if (nbytes == -1) {
352 : if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
353 : errno = EAGAIN;
354 : return -1;
355 : }
356 : errno_assert (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR);
357 : }
358 : #endif
359 : zmq_assert (nbytes == sizeof (dummy));
360 : zmq_assert (dummy == 0);
361 : #endif
362 : return 0;
363 : }
364 :
365 : #ifdef HAVE_FORK
366 0 : void zmq::signaler_t::forked ()
367 : {
368 : // Close file descriptors created in the parent and create new pair
369 0 : close (r);
370 0 : close (w);
371 0 : make_fdpair (&r, &w);
372 0 : }
373 : #endif
374 :
375 : // Returns -1 if we could not make the socket pair successfully
376 12649 : int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
377 : {
378 : #if defined ZMQ_HAVE_EVENTFD
379 12649 : fd_t fd = eventfd (0, 0);
380 12649 : if (fd == -1) {
381 66 : errno_assert (errno == ENFILE || errno == EMFILE);
382 66 : *w_ = *r_ = -1;
383 66 : return -1;
384 : }
385 : else {
386 12583 : *w_ = *r_ = fd;
387 12583 : return 0;
388 : }
389 :
390 : #elif defined ZMQ_HAVE_WINDOWS
391 : # if !defined _WIN32_WCE
392 : // Windows CE does not manage security attributes
393 : SECURITY_DESCRIPTOR sd;
394 : SECURITY_ATTRIBUTES sa;
395 : memset (&sd, 0, sizeof sd);
396 : memset (&sa, 0, sizeof sa);
397 :
398 : InitializeSecurityDescriptor (&sd, SECURITY_DESCRIPTOR_REVISION);
399 : SetSecurityDescriptorDacl (&sd, TRUE, 0, FALSE);
400 :
401 : sa.nLength = sizeof (SECURITY_ATTRIBUTES);
402 : sa.lpSecurityDescriptor = &sd;
403 : # endif
404 :
405 : // This function has to be in a system-wide critical section so that
406 : // two instances of the library don't accidentally create signaler
407 : // crossing the process boundary.
408 : // We'll use named event object to implement the critical section.
409 : // Note that if the event object already exists, the CreateEvent requests
410 : // EVENT_ALL_ACCESS access right. If this fails, we try to open
411 : // the event object asking for SYNCHRONIZE access only.
412 : HANDLE sync = NULL;
413 :
414 : // Create critical section only if using fixed signaler port
415 : // Use problematic Event implementation for compatibility if using old port 5905.
416 : // Otherwise use Mutex implementation.
417 : int event_signaler_port = 5905;
418 :
419 : if (signaler_port == event_signaler_port) {
420 : # if !defined _WIN32_WCE
421 : sync = CreateEventW (&sa, FALSE, TRUE, L"Global\\zmq-signaler-port-sync");
422 : # else
423 : sync = CreateEventW (NULL, FALSE, TRUE, L"Global\\zmq-signaler-port-sync");
424 : # endif
425 : if (sync == NULL && GetLastError () == ERROR_ACCESS_DENIED)
426 : sync = OpenEventW (SYNCHRONIZE | EVENT_MODIFY_STATE,
427 : FALSE, L"Global\\zmq-signaler-port-sync");
428 :
429 : win_assert (sync != NULL);
430 : }
431 : else
432 : if (signaler_port != 0) {
433 : wchar_t mutex_name [MAX_PATH];
434 : # ifdef __MINGW32__
435 : _snwprintf (mutex_name, MAX_PATH, L"Global\\zmq-signaler-port-%d", signaler_port);
436 : # else
437 : swprintf (mutex_name, MAX_PATH, L"Global\\zmq-signaler-port-%d", signaler_port);
438 : # endif
439 :
440 : # if !defined _WIN32_WCE
441 : sync = CreateMutexW (&sa, FALSE, mutex_name);
442 : # else
443 : sync = CreateMutexW (NULL, FALSE, mutex_name);
444 : # endif
445 : if (sync == NULL && GetLastError () == ERROR_ACCESS_DENIED)
446 : sync = OpenMutexW (SYNCHRONIZE, FALSE, mutex_name);
447 :
448 : win_assert (sync != NULL);
449 : }
450 :
451 : // Windows has no 'socketpair' function. CreatePipe is no good as pipe
452 : // handles cannot be polled on. Here we create the socketpair by hand.
453 : *w_ = INVALID_SOCKET;
454 : *r_ = INVALID_SOCKET;
455 :
456 : // Create listening socket.
457 : SOCKET listener;
458 : listener = open_socket (AF_INET, SOCK_STREAM, 0);
459 : wsa_assert (listener != INVALID_SOCKET);
460 :
461 : // Set SO_REUSEADDR and TCP_NODELAY on listening socket.
462 : BOOL so_reuseaddr = 1;
463 : int rc = setsockopt (listener, SOL_SOCKET, SO_REUSEADDR,
464 : (char *) &so_reuseaddr, sizeof so_reuseaddr);
465 : wsa_assert (rc != SOCKET_ERROR);
466 : BOOL tcp_nodelay = 1;
467 : rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELAY,
468 : (char *) &tcp_nodelay, sizeof tcp_nodelay);
469 : wsa_assert (rc != SOCKET_ERROR);
470 :
471 : // Init sockaddr to signaler port.
472 : struct sockaddr_in addr;
473 : memset (&addr, 0, sizeof addr);
474 : addr.sin_family = AF_INET;
475 : addr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
476 : addr.sin_port = htons (signaler_port);
477 :
478 : // Create the writer socket.
479 : *w_ = open_socket (AF_INET, SOCK_STREAM, 0);
480 : wsa_assert (*w_ != INVALID_SOCKET);
481 :
482 : // Set TCP_NODELAY on writer socket.
483 : rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELAY,
484 : (char *) &tcp_nodelay, sizeof tcp_nodelay);
485 : wsa_assert (rc != SOCKET_ERROR);
486 :
487 : if (sync != NULL) {
488 : // Enter the critical section.
489 : DWORD dwrc = WaitForSingleObject (sync, INFINITE);
490 : zmq_assert (dwrc == WAIT_OBJECT_0 || dwrc == WAIT_ABANDONED);
491 : }
492 :
493 : // Bind listening socket to signaler port.
494 : rc = bind (listener, (const struct sockaddr *) &addr, sizeof addr);
495 :
496 : if (rc != SOCKET_ERROR && signaler_port == 0) {
497 : // Retrieve ephemeral port number
498 : int addrlen = sizeof addr;
499 : rc = getsockname (listener, (struct sockaddr *) &addr, &addrlen);
500 : }
501 :
502 : // Listen for incoming connections.
503 : if (rc != SOCKET_ERROR)
504 : rc = listen (listener, 1);
505 :
506 : // Connect writer to the listener.
507 : if (rc != SOCKET_ERROR)
508 : rc = connect (*w_, (struct sockaddr *) &addr, sizeof addr);
509 :
510 : // Accept connection from writer.
511 : if (rc != SOCKET_ERROR)
512 : *r_ = accept (listener, NULL, NULL);
513 :
514 : // Send/receive large chunk to work around TCP slow start
515 : // This code is a workaround for #1608
516 : if (*r_ != INVALID_SOCKET) {
517 : size_t dummy_size = 1024 * 1024; // 1M to overload default receive buffer
518 : unsigned char *dummy = (unsigned char *) malloc (dummy_size);
519 : int still_to_send = (int) dummy_size;
520 : int still_to_recv = (int) dummy_size;
521 : while (still_to_send || still_to_recv) {
522 : int nbytes;
523 : if (still_to_send > 0) {
524 : nbytes = ::send (*w_, (char *) (dummy + dummy_size - still_to_send), still_to_send, 0);
525 : wsa_assert (nbytes != SOCKET_ERROR);
526 : still_to_send -= nbytes;
527 : }
528 : nbytes = ::recv (*r_, (char *) (dummy + dummy_size - still_to_recv), still_to_recv, 0);
529 : wsa_assert (nbytes != SOCKET_ERROR);
530 : still_to_recv -= nbytes;
531 : }
532 : free (dummy);
533 : }
534 :
535 : // Save errno if error occurred in bind/listen/connect/accept.
536 : int saved_errno = 0;
537 : if (*r_ == INVALID_SOCKET)
538 : saved_errno = WSAGetLastError ();
539 :
540 : // We don't need the listening socket anymore. Close it.
541 : rc = closesocket (listener);
542 : wsa_assert(rc != SOCKET_ERROR);
543 :
544 : if (sync != NULL) {
545 : // Exit the critical section.
546 : BOOL brc;
547 : if (signaler_port == event_signaler_port)
548 : brc = SetEvent (sync);
549 : else
550 : brc = ReleaseMutex (sync);
551 : win_assert (brc != 0);
552 :
553 : // Release the kernel object
554 : brc = CloseHandle (sync);
555 : win_assert (brc != 0);
556 : }
557 :
558 : if (*r_ != INVALID_SOCKET) {
559 : # if !defined _WIN32_WCE
560 : // On Windows, preventing sockets to be inherited by child processes.
561 : BOOL brc = SetHandleInformation ((HANDLE) *r_, HANDLE_FLAG_INHERIT, 0);
562 : win_assert (brc);
563 : # endif
564 : return 0;
565 : }
566 : else {
567 : // Cleanup writer if connection failed
568 : if (*w_ != INVALID_SOCKET) {
569 : rc = closesocket (*w_);
570 : wsa_assert (rc != SOCKET_ERROR);
571 : *w_ = INVALID_SOCKET;
572 : }
573 : // Set errno from saved value
574 : errno = wsa_error_to_errno (saved_errno);
575 : return -1;
576 : }
577 :
578 : #elif defined ZMQ_HAVE_OPENVMS
579 :
580 : // Whilst OpenVMS supports socketpair - it maps to AF_INET only. Further,
581 : // it does not set the socket options TCP_NODELAY and TCP_NODELACK which
582 : // can lead to performance problems.
583 : //
584 : // The bug will be fixed in V5.6 ECO4 and beyond. In the meantime, we'll
585 : // create the socket pair manually.
586 : struct sockaddr_in lcladdr;
587 : memset (&lcladdr, 0, sizeof lcladdr);
588 : lcladdr.sin_family = AF_INET;
589 : lcladdr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
590 : lcladdr.sin_port = 0;
591 :
592 : int listener = open_socket (AF_INET, SOCK_STREAM, 0);
593 : errno_assert (listener != -1);
594 :
595 : int on = 1;
596 : int rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELAY, &on, sizeof on);
597 : errno_assert (rc != -1);
598 :
599 : rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELACK, &on, sizeof on);
600 : errno_assert (rc != -1);
601 :
602 : rc = bind (listener, (struct sockaddr *) &lcladdr, sizeof lcladdr);
603 : errno_assert (rc != -1);
604 :
605 : socklen_t lcladdr_len = sizeof lcladdr;
606 :
607 : rc = getsockname (listener, (struct sockaddr *) &lcladdr, &lcladdr_len);
608 : errno_assert (rc != -1);
609 :
610 : rc = listen (listener, 1);
611 : errno_assert (rc != -1);
612 :
613 : *w_ = open_socket (AF_INET, SOCK_STREAM, 0);
614 : errno_assert (*w_ != -1);
615 :
616 : rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELAY, &on, sizeof on);
617 : errno_assert (rc != -1);
618 :
619 : rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELACK, &on, sizeof on);
620 : errno_assert (rc != -1);
621 :
622 : rc = connect (*w_, (struct sockaddr *) &lcladdr, sizeof lcladdr);
623 : errno_assert (rc != -1);
624 :
625 : *r_ = accept (listener, NULL, NULL);
626 : errno_assert (*r_ != -1);
627 :
628 : close (listener);
629 :
630 : return 0;
631 :
632 : #else
633 : // All other implementations support socketpair()
634 : int sv [2];
635 : int rc = socketpair (AF_UNIX, SOCK_STREAM, 0, sv);
636 : if (rc == -1) {
637 : errno_assert (errno == ENFILE || errno == EMFILE);
638 : *w_ = *r_ = -1;
639 : return -1;
640 : }
641 : else {
642 : *w_ = sv [0];
643 : *r_ = sv [1];
644 : return 0;
645 : }
646 : #endif
647 : }
|