Line data Source code
1 : /*
2 : Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 :
4 : This file is part of libzmq, the ZeroMQ core engine in C++.
5 :
6 : libzmq is free software; you can redistribute it and/or modify it under
7 : the terms of the GNU Lesser General Public License (LGPL) as published
8 : by the Free Software Foundation; either version 3 of the License, or
9 : (at your option) any later version.
10 :
11 : As a special exception, the Contributors give you permission to link
12 : this library with independent modules to produce an executable,
13 : regardless of the license terms of these independent modules, and to
14 : copy and distribute the resulting executable under terms of your choice,
15 : provided that you also meet, for each linked independent module, the
16 : terms and conditions of the license of that module. An independent
17 : module is a module which is not derived from or based on this library.
18 : If you modify this library, you must extend this exception to your
19 : version of the library.
20 :
21 : libzmq is distributed in the hope that it will be useful, but WITHOUT
22 : ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 : FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 : License for more details.
25 :
26 : You should have received a copy of the GNU Lesser General Public License
27 : along with this program. If not, see <http://www.gnu.org/licenses/>.
28 : */
29 :
30 : #include "precompiled.hpp"
31 : #include "macros.hpp"
32 : #include "platform.hpp"
33 : #ifdef ZMQ_HAVE_WINDOWS
34 : #include "windows.hpp"
35 : #else
36 : #include <unistd.h>
37 : #endif
38 :
39 : #include <limits>
40 : #include <climits>
41 : #include <new>
42 : #include <string.h>
43 :
44 : #include "ctx.hpp"
45 : #include "socket_base.hpp"
46 : #include "io_thread.hpp"
47 : #include "reaper.hpp"
48 : #include "pipe.hpp"
49 : #include "err.hpp"
50 : #include "msg.hpp"
51 :
52 : #if defined (ZMQ_USE_TWEETNACL)
53 : # include "tweetnacl.h"
54 : #elif defined (ZMQ_USE_LIBSODIUM)
55 : # include "sodium.h"
56 : #endif
57 :
58 : #ifdef ZMQ_HAVE_VMCI
59 : #include <vmci_sockets.h>
60 : #endif
61 :
62 : #define ZMQ_CTX_TAG_VALUE_GOOD 0xabadcafe
63 : #define ZMQ_CTX_TAG_VALUE_BAD 0xdeadbeef
64 :
65 429 : int clipped_maxsocket (int max_requested)
66 : {
67 429 : if (max_requested >= zmq::poller_t::max_fds () && zmq::poller_t::max_fds () != -1)
68 : // -1 because we need room for the reaper mailbox.
69 0 : max_requested = zmq::poller_t::max_fds () - 1;
70 :
71 429 : return max_requested;
72 : }
73 :
74 423 : zmq::ctx_t::ctx_t () :
75 : tag (ZMQ_CTX_TAG_VALUE_GOOD),
76 : starting (true),
77 : terminating (false),
78 : reaper (NULL),
79 : slot_count (0),
80 : slots (NULL),
81 423 : max_sockets (clipped_maxsocket (ZMQ_MAX_SOCKETS_DFLT)),
82 : max_msgsz (INT_MAX),
83 : io_thread_count (ZMQ_IO_THREADS_DFLT),
84 : blocky (true),
85 : ipv6 (false),
86 : thread_priority (ZMQ_THREAD_PRIORITY_DFLT),
87 1692 : thread_sched_policy (ZMQ_THREAD_SCHED_POLICY_DFLT)
88 : {
89 : #ifdef HAVE_FORK
90 423 : pid = getpid();
91 : #endif
92 : #ifdef ZMQ_HAVE_VMCI
93 : vmci_fd = -1;
94 : vmci_family = -1;
95 : #endif
96 :
97 423 : crypto_sync.lock ();
98 : #if defined (ZMQ_USE_TWEETNACL)
99 : // allow opening of /dev/urandom
100 : unsigned char tmpbytes[4];
101 423 : randombytes(tmpbytes, 4);
102 : #elif defined (ZMQ_USE_SODIUM)
103 : int rc = sodium_init ();
104 : zmq_assert (rc != -1);
105 : #endif
106 423 : crypto_sync.unlock ();
107 423 : }
108 :
109 11670 : bool zmq::ctx_t::check_tag ()
110 : {
111 11670 : return tag == ZMQ_CTX_TAG_VALUE_GOOD;
112 : }
113 :
114 2538 : zmq::ctx_t::~ctx_t ()
115 : {
116 : // Check that there are no remaining sockets.
117 846 : zmq_assert (sockets.empty ());
118 :
119 : // Ask I/O threads to terminate. If stop signal wasn't sent to I/O
120 : // thread subsequent invocation of destructor would hang-up.
121 3681 : for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) {
122 5913 : io_threads [i]->stop ();
123 : }
124 :
125 : // Wait till I/O threads actually terminate.
126 3681 : for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) {
127 2412 : LIBZMQ_DELETE(io_threads [i]);
128 : }
129 :
130 : // Deallocate the reaper thread object.
131 423 : LIBZMQ_DELETE(reaper);
132 :
133 : // Deallocate the array of mailboxes. No special work is
134 : // needed as mailboxes themselves were deallocated with their
135 : // corresponding io_thread/socket objects.
136 423 : free (slots);
137 :
138 : // If we've done any Curve encryption, we may have a file handle
139 : // to /dev/urandom open that needs to be cleaned up.
140 : #ifdef ZMQ_HAVE_CURVE
141 423 : randombytes_close ();
142 : #endif
143 :
144 : // Remove the tag, so that the object is considered dead.
145 423 : tag = ZMQ_CTX_TAG_VALUE_BAD;
146 423 : }
147 :
148 423 : int zmq::ctx_t::terminate ()
149 : {
150 423 : slot_sync.lock();
151 :
152 423 : bool saveTerminating = terminating;
153 423 : terminating = false;
154 :
155 : // Connect up any pending inproc connections, otherwise we will hang
156 423 : pending_connections_t copy = pending_connections;
157 862 : for (pending_connections_t::iterator p = copy.begin (); p != copy.end (); ++p) {
158 16 : zmq::socket_base_t *s = create_socket (ZMQ_PAIR);
159 32 : s->bind (p->first.c_str ());
160 16 : s->close ();
161 : }
162 423 : terminating = saveTerminating;
163 :
164 423 : if (!starting) {
165 :
166 : #ifdef HAVE_FORK
167 423 : if (pid != getpid ()) {
168 : // we are a forked child process. Close all file descriptors
169 : // inherited from the parent.
170 0 : for (sockets_t::size_type i = 0; i != sockets.size (); i++)
171 0 : sockets [i]->get_mailbox ()->forked ();
172 :
173 0 : term_mailbox.forked ();
174 : }
175 : #endif
176 :
177 : // Check whether termination was already underway, but interrupted and now
178 : // restarted.
179 423 : bool restarted = terminating;
180 423 : terminating = true;
181 :
182 : // First attempt to terminate the context.
183 423 : if (!restarted) {
184 : // First send stop command to sockets so that any blocking calls
185 : // can be interrupted. If there are no sockets we can ask reaper
186 : // thread to stop.
187 16899 : for (sockets_t::size_type i = 0; i != sockets.size (); i++)
188 16482 : sockets [i]->stop ();
189 834 : if (sockets.empty ())
190 16 : reaper->stop ();
191 : }
192 423 : slot_sync.unlock();
193 :
194 : // Wait till reaper thread closes all the sockets.
195 : command_t cmd;
196 423 : int rc = term_mailbox.recv (&cmd, -1);
197 423 : if (rc == -1 && errno == EINTR)
198 0 : return -1;
199 423 : errno_assert (rc == 0);
200 423 : zmq_assert (cmd.type == command_t::done);
201 423 : slot_sync.lock ();
202 846 : zmq_assert (sockets.empty ());
203 : }
204 423 : slot_sync.unlock ();
205 :
206 : #ifdef ZMQ_HAVE_VMCI
207 : vmci_sync.lock ();
208 :
209 : VMCISock_ReleaseAFValueFd (vmci_fd);
210 : vmci_family = -1;
211 : vmci_fd = -1;
212 :
213 : vmci_sync.unlock ();
214 : #endif
215 :
216 : // Deallocate the resources.
217 423 : delete this;
218 :
219 : return 0;
220 : }
221 :
222 6 : int zmq::ctx_t::shutdown ()
223 : {
224 6 : slot_sync.lock ();
225 6 : if (!starting && !terminating) {
226 6 : terminating = true;
227 :
228 : // Send stop command to sockets so that any blocking calls
229 : // can be interrupted. If there are no sockets we can ask reaper
230 : // thread to stop.
231 24 : for (sockets_t::size_type i = 0; i != sockets.size (); i++)
232 12 : sockets [i]->stop ();
233 12 : if (sockets.empty ())
234 0 : reaper->stop ();
235 : }
236 6 : slot_sync.unlock ();
237 :
238 6 : return 0;
239 : }
240 :
241 42 : int zmq::ctx_t::set (int option_, int optval_)
242 : {
243 42 : int rc = 0;
244 84 : if (option_ == ZMQ_MAX_SOCKETS
245 42 : && optval_ >= 1 && optval_ == clipped_maxsocket (optval_)) {
246 3 : opt_sync.lock ();
247 3 : max_sockets = optval_;
248 3 : opt_sync.unlock ();
249 : }
250 : else
251 39 : if (option_ == ZMQ_IO_THREADS && optval_ >= 0) {
252 33 : opt_sync.lock ();
253 33 : io_thread_count = optval_;
254 33 : opt_sync.unlock ();
255 : }
256 : else
257 6 : if (option_ == ZMQ_IPV6 && optval_ >= 0) {
258 3 : opt_sync.lock ();
259 3 : ipv6 = (optval_ != 0);
260 3 : opt_sync.unlock ();
261 : }
262 : else
263 3 : if (option_ == ZMQ_THREAD_PRIORITY && optval_ >= 0) {
264 0 : opt_sync.lock();
265 0 : thread_priority = optval_;
266 0 : opt_sync.unlock ();
267 : }
268 : else
269 3 : if (option_ == ZMQ_THREAD_SCHED_POLICY && optval_ >= 0) {
270 0 : opt_sync.lock();
271 0 : thread_sched_policy = optval_;
272 0 : opt_sync.unlock ();
273 : }
274 : else
275 3 : if (option_ == ZMQ_BLOCKY && optval_ >= 0) {
276 3 : opt_sync.lock ();
277 3 : blocky = (optval_ != 0);
278 3 : opt_sync.unlock ();
279 : }
280 : else
281 0 : if (option_ == ZMQ_MAX_MSGSZ && optval_ >= 0) {
282 0 : opt_sync.lock ();
283 0 : max_msgsz = optval_ < INT_MAX? optval_: INT_MAX;
284 0 : opt_sync.unlock ();
285 : }
286 : else {
287 0 : errno = EINVAL;
288 0 : rc = -1;
289 : }
290 42 : return rc;
291 : }
292 :
293 22412 : int zmq::ctx_t::get (int option_)
294 : {
295 22412 : int rc = 0;
296 22412 : if (option_ == ZMQ_MAX_SOCKETS)
297 3 : rc = max_sockets;
298 : else
299 22409 : if (option_ == ZMQ_SOCKET_LIMIT)
300 3 : rc = clipped_maxsocket (65535);
301 : else
302 22406 : if (option_ == ZMQ_IO_THREADS)
303 3 : rc = io_thread_count;
304 : else
305 22403 : if (option_ == ZMQ_IPV6)
306 11203 : rc = ipv6;
307 : else
308 11200 : if (option_ == ZMQ_BLOCKY)
309 11200 : rc = blocky;
310 : else
311 0 : if (option_ == ZMQ_MAX_MSGSZ)
312 0 : rc = max_msgsz;
313 : else {
314 0 : errno = EINVAL;
315 0 : rc = -1;
316 : }
317 22412 : return rc;
318 : }
319 :
320 11197 : zmq::socket_base_t *zmq::ctx_t::create_socket (int type_)
321 : {
322 11197 : slot_sync.lock ();
323 11197 : if (unlikely (starting)) {
324 :
325 423 : starting = false;
326 : // Initialise the array of mailboxes. Additional three slots are for
327 : // zmq_ctx_term thread and reaper thread.
328 423 : opt_sync.lock ();
329 423 : int mazmq = max_sockets;
330 423 : int ios = io_thread_count;
331 423 : opt_sync.unlock ();
332 423 : slot_count = mazmq + ios + 2;
333 423 : slots = (i_mailbox **) malloc (sizeof (i_mailbox*) * slot_count);
334 423 : alloc_assert (slots);
335 :
336 : // Initialise the infrastructure for zmq_ctx_term thread.
337 423 : slots [term_tid] = &term_mailbox;
338 :
339 : // Create the reaper thread.
340 423 : reaper = new (std::nothrow) reaper_t (this, reaper_tid);
341 423 : alloc_assert (reaper);
342 423 : slots [reaper_tid] = reaper->get_mailbox ();
343 423 : reaper->start ();
344 :
345 : // Create I/O thread objects and launch them.
346 603 : for (int i = 2; i != ios + 2; i++) {
347 603 : io_thread_t *io_thread = new (std::nothrow) io_thread_t (this, i);
348 603 : alloc_assert (io_thread);
349 603 : io_threads.push_back (io_thread);
350 603 : slots [i] = io_thread->get_mailbox ();
351 603 : io_thread->start ();
352 : }
353 :
354 : // In the unused part of the slot array, create a list of empty slots.
355 1646598 : for (int32_t i = (int32_t) slot_count - 1;
356 823299 : i >= (int32_t) ios + 2; i--) {
357 1645752 : empty_slots.push_back (i);
358 822876 : slots [i] = NULL;
359 : }
360 : }
361 :
362 : // Once zmq_ctx_term() was called, we can't create new sockets.
363 11197 : if (terminating) {
364 0 : slot_sync.unlock ();
365 0 : errno = ETERM;
366 0 : return NULL;
367 : }
368 :
369 : // If max_sockets limit was reached, return error.
370 22394 : if (empty_slots.empty ()) {
371 0 : slot_sync.unlock ();
372 0 : errno = EMFILE;
373 0 : return NULL;
374 : }
375 :
376 : // Choose a slot for the socket.
377 22394 : uint32_t slot = empty_slots.back ();
378 11197 : empty_slots.pop_back ();
379 :
380 : // Generate new unique socket ID.
381 11197 : int sid = ((int) max_socket_id.add (1)) + 1;
382 :
383 : // Create the socket and register its mailbox.
384 11197 : socket_base_t *s = socket_base_t::create (type_, this, slot, sid);
385 11197 : if (!s) {
386 66 : empty_slots.push_back (slot);
387 66 : slot_sync.unlock ();
388 : return NULL;
389 : }
390 11131 : sockets.push_back (s);
391 11131 : slots [slot] = s->get_mailbox ();
392 :
393 11131 : slot_sync.unlock ();
394 : return s;
395 : }
396 :
397 11131 : void zmq::ctx_t::destroy_socket (class socket_base_t *socket_)
398 : {
399 11131 : slot_sync.lock ();
400 :
401 : // Free the associated thread slot.
402 11131 : uint32_t tid = socket_->get_tid ();
403 11131 : empty_slots.push_back (tid);
404 11131 : slots [tid] = NULL;
405 :
406 : // Remove the socket from the list of sockets.
407 11131 : sockets.erase (socket_);
408 :
409 : // If zmq_ctx_term() was already called and there are no more socket
410 : // we can ask reaper thread to terminate.
411 19381 : if (terminating && sockets.empty ())
412 407 : reaper->stop ();
413 :
414 11131 : slot_sync.unlock ();
415 11131 : }
416 :
417 22240 : zmq::object_t *zmq::ctx_t::get_reaper ()
418 : {
419 22240 : return reaper;
420 : }
421 :
422 1026 : void zmq::ctx_t::start_thread (thread_t &thread_, thread_fn *tfn_, void *arg_) const
423 : {
424 1026 : thread_.start(tfn_, arg_);
425 1026 : thread_.setSchedulingParameters(thread_priority, thread_sched_policy);
426 1026 : }
427 :
428 135236 : void zmq::ctx_t::send_command (uint32_t tid_, const command_t &command_)
429 : {
430 135236 : slots [tid_]->send (command_);
431 135470 : }
432 :
433 11559 : zmq::io_thread_t *zmq::ctx_t::choose_io_thread (uint64_t affinity_)
434 : {
435 196248 : if (io_threads.empty ())
436 : return NULL;
437 :
438 : // Find the I/O thread with minimum load.
439 : int min_load = -1;
440 : io_thread_t *selected_io_thread = NULL;
441 294865 : for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) {
442 65045 : if (!affinity_ || (affinity_ & (uint64_t (1) << i))) {
443 130090 : int load = io_threads [i]->get_load ();
444 65047 : if (selected_io_thread == NULL || load < min_load) {
445 19918 : min_load = load;
446 59754 : selected_io_thread = io_threads [i];
447 : }
448 : }
449 : }
450 : return selected_io_thread;
451 : }
452 :
453 358 : int zmq::ctx_t::register_endpoint (const char *addr_,
454 : const endpoint_t &endpoint_)
455 : {
456 358 : endpoints_sync.lock ();
457 :
458 : const bool inserted = endpoints.insert (
459 1074 : endpoints_t::value_type (std::string (addr_), endpoint_)).second;
460 :
461 358 : endpoints_sync.unlock ();
462 :
463 358 : if (!inserted) {
464 0 : errno = EADDRINUSE;
465 0 : return -1;
466 : }
467 : return 0;
468 : }
469 :
470 9 : int zmq::ctx_t::unregister_endpoint (
471 : const std::string &addr_, socket_base_t *socket_)
472 : {
473 9 : endpoints_sync.lock ();
474 :
475 18 : const endpoints_t::iterator it = endpoints.find (addr_);
476 27 : if (it == endpoints.end () || it->second.socket != socket_) {
477 3 : endpoints_sync.unlock ();
478 3 : errno = ENOENT;
479 3 : return -1;
480 : }
481 :
482 : // Remove endpoint.
483 6 : endpoints.erase (it);
484 :
485 6 : endpoints_sync.unlock ();
486 :
487 : return 0;
488 : }
489 :
490 11131 : void zmq::ctx_t::unregister_endpoints (socket_base_t *socket_)
491 : {
492 11131 : endpoints_sync.lock ();
493 :
494 22262 : endpoints_t::iterator it = endpoints.begin ();
495 42085 : while (it != endpoints.end ()) {
496 4346 : if (it->second.socket == socket_) {
497 352 : endpoints_t::iterator to_erase = it;
498 : ++it;
499 352 : endpoints.erase (to_erase);
500 : continue;
501 : }
502 : ++it;
503 : }
504 :
505 11131 : endpoints_sync.unlock ();
506 11131 : }
507 :
508 591 : zmq::endpoint_t zmq::ctx_t::find_endpoint (const char *addr_)
509 : {
510 591 : endpoints_sync.lock ();
511 :
512 1773 : endpoints_t::iterator it = endpoints.find (addr_);
513 1182 : if (it == endpoints.end ()) {
514 279 : endpoints_sync.unlock ();
515 279 : errno = ECONNREFUSED;
516 279 : endpoint_t empty = {NULL, options_t()};
517 : return empty;
518 : }
519 312 : endpoint_t endpoint = it->second;
520 :
521 : // Increment the command sequence number of the peer so that it won't
522 : // get deallocated until "bind" command is issued by the caller.
523 : // The subsequent 'bind' has to be called with inc_seqnum parameter
524 : // set to false, so that the seqnum isn't incremented twice.
525 312 : endpoint.socket->inc_seqnum ();
526 :
527 312 : endpoints_sync.unlock ();
528 : return endpoint;
529 : }
530 :
531 276 : void zmq::ctx_t::pend_connection (const std::string &addr_,
532 : const endpoint_t &endpoint_, pipe_t **pipes_)
533 : {
534 : const pending_connection_t pending_connection =
535 276 : {endpoint_, pipes_ [0], pipes_ [1]};
536 :
537 276 : endpoints_sync.lock ();
538 :
539 552 : endpoints_t::iterator it = endpoints.find (addr_);
540 552 : if (it == endpoints.end ()) {
541 : // Still no bind.
542 256 : endpoint_.socket->inc_seqnum ();
543 512 : pending_connections.insert (pending_connections_t::value_type (addr_, pending_connection));
544 : }
545 : else
546 : // Bind has happened in the mean time, connect directly
547 40 : connect_inproc_sockets (it->second.socket, it->second.options, pending_connection, connect_side);
548 :
549 276 : endpoints_sync.unlock ();
550 276 : }
551 :
552 358 : void zmq::ctx_t::connect_pending (const char *addr_, zmq::socket_base_t *bind_socket_)
553 : {
554 358 : endpoints_sync.lock ();
555 :
556 1074 : std::pair<pending_connections_t::iterator, pending_connections_t::iterator> pending = pending_connections.equal_range(addr_);
557 :
558 972 : for (pending_connections_t::iterator p = pending.first; p != pending.second; ++p)
559 512 : connect_inproc_sockets(bind_socket_, endpoints[addr_].options, p->second, bind_side);
560 :
561 358 : pending_connections.erase(pending.first, pending.second);
562 358 : endpoints_sync.unlock ();
563 358 : }
564 :
565 276 : void zmq::ctx_t::connect_inproc_sockets (zmq::socket_base_t *bind_socket_,
566 : options_t& bind_options, const pending_connection_t &pending_connection_, side side_)
567 : {
568 276 : bind_socket_->inc_seqnum();
569 276 : pending_connection_.bind_pipe->set_tid (bind_socket_->get_tid ());
570 :
571 276 : if (!bind_options.recv_identity) {
572 : msg_t msg;
573 273 : const bool ok = pending_connection_.bind_pipe->read (&msg);
574 273 : zmq_assert (ok);
575 273 : const int rc = msg.close ();
576 273 : errno_assert (rc == 0);
577 : }
578 :
579 276 : bool conflate = pending_connection_.endpoint.options.conflate &&
580 0 : (pending_connection_.endpoint.options.type == ZMQ_DEALER ||
581 0 : pending_connection_.endpoint.options.type == ZMQ_PULL ||
582 0 : pending_connection_.endpoint.options.type == ZMQ_PUSH ||
583 0 : pending_connection_.endpoint.options.type == ZMQ_PUB ||
584 276 : pending_connection_.endpoint.options.type == ZMQ_SUB);
585 :
586 276 : if (!conflate) {
587 276 : pending_connection_.connect_pipe->set_hwms_boost(bind_options.sndhwm, bind_options.rcvhwm);
588 276 : pending_connection_.bind_pipe->set_hwms_boost(pending_connection_.endpoint.options.sndhwm, pending_connection_.endpoint.options.rcvhwm);
589 :
590 276 : pending_connection_.connect_pipe->set_hwms(pending_connection_.endpoint.options.rcvhwm, pending_connection_.endpoint.options.sndhwm);
591 276 : pending_connection_.bind_pipe->set_hwms(bind_options.rcvhwm, bind_options.sndhwm);
592 : }
593 : else {
594 0 : pending_connection_.connect_pipe->set_hwms(-1, -1);
595 0 : pending_connection_.bind_pipe->set_hwms(-1, -1);
596 : }
597 :
598 276 : if (side_ == bind_side) {
599 : command_t cmd;
600 256 : cmd.type = command_t::bind;
601 256 : cmd.args.bind.pipe = pending_connection_.bind_pipe;
602 256 : bind_socket_->process_command (cmd);
603 256 : bind_socket_->send_inproc_connected (pending_connection_.endpoint.socket);
604 : }
605 : else
606 20 : pending_connection_.connect_pipe->send_bind (bind_socket_, pending_connection_.bind_pipe, false);
607 :
608 276 : if (pending_connection_.endpoint.options.recv_identity) {
609 : msg_t id;
610 0 : int rc = id.init_size (bind_options.identity_size);
611 0 : errno_assert (rc == 0);
612 0 : memcpy (id.data (), bind_options.identity, bind_options.identity_size);
613 0 : id.set_flags (msg_t::identity);
614 0 : bool written = pending_connection_.bind_pipe->write (&id);
615 0 : zmq_assert (written);
616 0 : pending_connection_.bind_pipe->flush ();
617 : }
618 276 : }
619 :
620 : #ifdef ZMQ_HAVE_VMCI
621 :
622 : int zmq::ctx_t::get_vmci_socket_family ()
623 : {
624 : vmci_sync.lock ();
625 :
626 : if (vmci_fd == -1) {
627 : vmci_family = VMCISock_GetAFValueFd (&vmci_fd);
628 :
629 : if (vmci_fd != -1) {
630 : #ifdef FD_CLOEXEC
631 : int rc = fcntl (vmci_fd, F_SETFD, FD_CLOEXEC);
632 : errno_assert (rc != -1);
633 : #endif
634 : }
635 : }
636 :
637 : vmci_sync.unlock ();
638 :
639 : return vmci_family;
640 : }
641 :
642 : #endif
643 :
644 : // The last used socket ID, or 0 if no socket was used so far. Note that this
645 : // is a global variable. Thus, even sockets created in different contexts have
646 : // unique IDs.
647 729 : zmq::atomic_counter_t zmq::ctx_t::max_socket_id;
|