Line data Source code
1 : /*
2 : Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 :
4 : This file is part of libzmq, the ZeroMQ core engine in C++.
5 :
6 : libzmq is free software; you can redistribute it and/or modify it under
7 : the terms of the GNU Lesser General Public License (LGPL) as published
8 : by the Free Software Foundation; either version 3 of the License, or
9 : (at your option) any later version.
10 :
11 : As a special exception, the Contributors give you permission to link
12 : this library with independent modules to produce an executable,
13 : regardless of the license terms of these independent modules, and to
14 : copy and distribute the resulting executable under terms of your choice,
15 : provided that you also meet, for each linked independent module, the
16 : terms and conditions of the license of that module. An independent
17 : module is a module which is not derived from or based on this library.
18 : If you modify this library, you must extend this exception to your
19 : version of the library.
20 :
21 : libzmq is distributed in the hope that it will be useful, but WITHOUT
22 : ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 : FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 : License for more details.
25 :
26 : You should have received a copy of the GNU Lesser General Public License
27 : along with this program. If not, see <http://www.gnu.org/licenses/>.
28 : */
29 :
30 : #include "precompiled.hpp"
31 : #include "macros.hpp"
32 : #include "session_base.hpp"
33 : #include "i_engine.hpp"
34 : #include "err.hpp"
35 : #include "pipe.hpp"
36 : #include "likely.hpp"
37 : #include "tcp_connecter.hpp"
38 : #include "ipc_connecter.hpp"
39 : #include "tipc_connecter.hpp"
40 : #include "socks_connecter.hpp"
41 : #include "vmci_connecter.hpp"
42 : #include "pgm_sender.hpp"
43 : #include "pgm_receiver.hpp"
44 : #include "address.hpp"
45 : #include "norm_engine.hpp"
46 : #include "udp_engine.hpp"
47 :
48 : #include "ctx.hpp"
49 : #include "req.hpp"
50 : #include "radio.hpp"
51 : #include "dish.hpp"
52 :
53 7258 : zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_,
54 : bool active_, class socket_base_t *socket_, const options_t &options_,
55 : address_t *addr_)
56 : {
57 7258 : session_base_t *s = NULL;
58 7258 : switch (options_.type) {
59 : case ZMQ_REQ:
60 : s = new (std::nothrow) req_session_t (io_thread_, active_,
61 99 : socket_, options_, addr_);
62 99 : break;
63 : case ZMQ_RADIO:
64 : s = new (std::nothrow) radio_session_t (io_thread_, active_,
65 6 : socket_, options_, addr_);
66 6 : break;
67 : case ZMQ_DISH:
68 : s = new (std::nothrow) dish_session_t (io_thread_, active_,
69 6 : socket_, options_, addr_);
70 6 : break;
71 : case ZMQ_DEALER:
72 : case ZMQ_REP:
73 : case ZMQ_ROUTER:
74 : case ZMQ_PUB:
75 : case ZMQ_XPUB:
76 : case ZMQ_SUB:
77 : case ZMQ_XSUB:
78 : case ZMQ_PUSH:
79 : case ZMQ_PULL:
80 : case ZMQ_PAIR:
81 : case ZMQ_STREAM:
82 : case ZMQ_SERVER:
83 : case ZMQ_CLIENT:
84 : case ZMQ_GATHER:
85 : case ZMQ_SCATTER:
86 : s = new (std::nothrow) session_base_t (io_thread_, active_,
87 7147 : socket_, options_, addr_);
88 7146 : break;
89 : default:
90 0 : errno = EINVAL;
91 0 : return NULL;
92 : }
93 7257 : alloc_assert (s);
94 7257 : return s;
95 : }
96 :
97 7262 : zmq::session_base_t::session_base_t (class io_thread_t *io_thread_,
98 : bool active_, class socket_base_t *socket_, const options_t &options_,
99 : address_t *addr_) :
100 : own_t (io_thread_, options_),
101 : io_object_t (io_thread_),
102 : active (active_),
103 : pipe (NULL),
104 : zap_pipe (NULL),
105 : incomplete_in (false),
106 : pending (false),
107 : engine (NULL),
108 : socket (socket_),
109 : io_thread (io_thread_),
110 : has_linger_timer (false),
111 21778 : addr (addr_)
112 : {
113 7258 : }
114 :
115 36196 : zmq::session_base_t::~session_base_t ()
116 : {
117 7262 : zmq_assert (!pipe);
118 7262 : zmq_assert (!zap_pipe);
119 :
120 : // If there's still a pending linger timer, remove it.
121 7262 : if (has_linger_timer) {
122 0 : cancel_timer (linger_timer_id);
123 0 : has_linger_timer = false;
124 : }
125 :
126 : // Close the engine.
127 7262 : if (engine)
128 4347 : engine->terminate ();
129 :
130 7262 : LIBZMQ_DELETE(addr);
131 14406 : }
132 :
133 3738 : void zmq::session_base_t::attach_pipe (pipe_t *pipe_)
134 : {
135 3738 : zmq_assert (!is_terminating ());
136 3738 : zmq_assert (!pipe);
137 3738 : zmq_assert (pipe_);
138 3738 : pipe = pipe_;
139 3738 : pipe->set_event_sink (this);
140 3738 : }
141 :
142 649832 : int zmq::session_base_t::pull_msg (msg_t *msg_)
143 : {
144 649832 : if (!pipe || !pipe->read (msg_)) {
145 14828 : errno = EAGAIN;
146 14828 : return -1;
147 : }
148 :
149 635004 : incomplete_in = msg_->flags () & msg_t::more ? true : false;
150 :
151 635004 : return 0;
152 : }
153 :
154 636982 : int zmq::session_base_t::push_msg (msg_t *msg_)
155 : {
156 636982 : if(msg_->flags() & msg_t::command)
157 : return 0;
158 636772 : if (pipe && pipe->write (msg_)) {
159 635565 : int rc = msg_->init ();
160 635565 : errno_assert (rc == 0);
161 : return 0;
162 : }
163 :
164 1207 : errno = EAGAIN;
165 1207 : return -1;
166 : }
167 :
168 166 : int zmq::session_base_t::read_zap_msg (msg_t *msg_)
169 : {
170 166 : if (zap_pipe == NULL) {
171 0 : errno = ENOTCONN;
172 0 : return -1;
173 : }
174 :
175 166 : if (!zap_pipe->read (msg_)) {
176 19 : errno = EAGAIN;
177 19 : return -1;
178 : }
179 :
180 : return 0;
181 : }
182 :
183 165 : int zmq::session_base_t::write_zap_msg (msg_t *msg_)
184 : {
185 165 : if (zap_pipe == NULL) {
186 0 : errno = ENOTCONN;
187 0 : return -1;
188 : }
189 :
190 165 : const bool ok = zap_pipe->write (msg_);
191 165 : zmq_assert (ok);
192 :
193 165 : if ((msg_->flags () & msg_t::more) == 0)
194 21 : zap_pipe->flush ();
195 :
196 165 : const int rc = msg_->init ();
197 165 : errno_assert (rc == 0);
198 : return 0;
199 : }
200 :
201 210 : void zmq::session_base_t::reset ()
202 : {
203 210 : }
204 :
205 9562 : void zmq::session_base_t::flush ()
206 : {
207 9562 : if (pipe)
208 9498 : pipe->flush ();
209 9563 : }
210 :
211 2373 : void zmq::session_base_t::clean_pipes ()
212 : {
213 2373 : zmq_assert (pipe != NULL);
214 :
215 : // Get rid of half-processed messages in the out pipe. Flush any
216 : // unflushed messages upstream.
217 2373 : pipe->rollback ();
218 2373 : pipe->flush ();
219 :
220 : // Remove any half-read message from the in pipe.
221 4746 : while (incomplete_in) {
222 : msg_t msg;
223 0 : int rc = msg.init ();
224 0 : errno_assert (rc == 0);
225 0 : rc = pull_msg (&msg);
226 0 : errno_assert (rc == 0);
227 0 : rc = msg.close ();
228 0 : errno_assert (rc == 0);
229 : }
230 2373 : }
231 :
232 7278 : void zmq::session_base_t::pipe_terminated (pipe_t *pipe_)
233 : {
234 : // Drop the reference to the deallocated pipe if required.
235 7281 : zmq_assert (pipe_ == pipe
236 : || pipe_ == zap_pipe
237 : || terminating_pipes.count (pipe_) == 1);
238 :
239 7278 : if (pipe_ == pipe) {
240 : // If this is our current pipe, remove it
241 7254 : pipe = NULL;
242 7254 : if (has_linger_timer) {
243 3 : cancel_timer (linger_timer_id);
244 3 : has_linger_timer = false;
245 : }
246 : }
247 : else
248 24 : if (pipe_ == zap_pipe)
249 21 : zap_pipe = NULL;
250 : else
251 : // Remove the pipe from the detached pipes set
252 3 : terminating_pipes.erase (pipe_);
253 :
254 7278 : if (!is_terminating () && options.raw_socket) {
255 33 : if (engine) {
256 20 : engine->terminate ();
257 20 : engine = NULL;
258 : }
259 33 : terminate ();
260 : }
261 :
262 : // If we are waiting for pending messages to be sent, at this point
263 : // we are sure that there will be no more messages and we can proceed
264 : // with termination safely.
265 14182 : if (pending && !pipe && !zap_pipe && terminating_pipes.empty ()) {
266 6904 : pending = false;
267 6904 : own_t::process_term (0);
268 : }
269 7281 : }
270 :
271 8048 : void zmq::session_base_t::read_activated (pipe_t *pipe_)
272 : {
273 : // Skip activating if we're detaching this pipe
274 8048 : if (unlikely (pipe_ != pipe && pipe_ != zap_pipe)) {
275 0 : zmq_assert (terminating_pipes.count (pipe_) == 1);
276 : return;
277 : }
278 :
279 8048 : if (unlikely (engine == NULL)) {
280 148 : pipe->check_read ();
281 148 : return;
282 : }
283 :
284 7900 : if (likely (pipe_ == pipe))
285 7881 : engine->restart_output ();
286 : else
287 19 : engine->zap_msg_available ();
288 : }
289 :
290 1191 : void zmq::session_base_t::write_activated (pipe_t *pipe_)
291 : {
292 : // Skip activating if we're detaching this pipe
293 1191 : if (pipe != pipe_) {
294 0 : zmq_assert (terminating_pipes.count (pipe_) == 1);
295 1191 : return;
296 : }
297 :
298 1191 : if (engine)
299 1191 : engine->restart_input ();
300 : }
301 :
302 0 : void zmq::session_base_t::hiccuped (pipe_t *)
303 : {
304 : // Hiccups are always sent from session to socket, not the other
305 : // way round.
306 0 : zmq_assert (false);
307 0 : }
308 :
309 10704 : zmq::socket_base_t *zmq::session_base_t::get_socket ()
310 : {
311 10704 : return socket;
312 : }
313 :
314 7262 : void zmq::session_base_t::process_plug ()
315 : {
316 7262 : if (active)
317 3747 : start_connecting (false);
318 7262 : }
319 :
320 24 : int zmq::session_base_t::zap_connect ()
321 : {
322 24 : zmq_assert (zap_pipe == NULL);
323 :
324 24 : endpoint_t peer = find_endpoint ("inproc://zeromq.zap.01");
325 24 : if (peer.socket == NULL) {
326 3 : errno = ECONNREFUSED;
327 3 : return -1;
328 : }
329 21 : if (peer.options.type != ZMQ_REP
330 21 : && peer.options.type != ZMQ_ROUTER
331 21 : && peer.options.type != ZMQ_SERVER) {
332 0 : errno = ECONNREFUSED;
333 0 : return -1;
334 : }
335 :
336 : // Create a bi-directional pipe that will connect
337 : // session with zap socket.
338 21 : object_t *parents [2] = {this, peer.socket};
339 21 : pipe_t *new_pipes [2] = {NULL, NULL};
340 21 : int hwms [2] = {0, 0};
341 21 : bool conflates [2] = {false, false};
342 21 : int rc = pipepair (parents, new_pipes, hwms, conflates);
343 21 : errno_assert (rc == 0);
344 :
345 : // Attach local end of the pipe to this socket object.
346 21 : zap_pipe = new_pipes [0];
347 21 : zap_pipe->set_nodelay ();
348 21 : zap_pipe->set_event_sink (this);
349 :
350 21 : send_bind (peer.socket, new_pipes [1], false);
351 :
352 : // Send empty identity if required by the peer.
353 21 : if (peer.options.recv_identity) {
354 : msg_t id;
355 21 : rc = id.init ();
356 21 : errno_assert (rc == 0);
357 21 : id.set_flags (msg_t::identity);
358 21 : bool ok = zap_pipe->write (&id);
359 21 : zmq_assert (ok);
360 21 : zap_pipe->flush ();
361 : }
362 :
363 : return 0;
364 : }
365 :
366 9 : bool zmq::session_base_t::zap_enabled ()
367 : {
368 : return (
369 9 : options.mechanism != ZMQ_NULL ||
370 6 : (options.mechanism == ZMQ_NULL && options.zap_domain.length() > 0)
371 9 : );
372 : }
373 :
374 6768 : void zmq::session_base_t::process_attach (i_engine *engine_)
375 : {
376 6768 : zmq_assert (engine_ != NULL);
377 :
378 : // Create the pipe if it does not exist yet.
379 6768 : if (!pipe && !is_terminating ()) {
380 3524 : object_t *parents [2] = {this, socket};
381 3524 : pipe_t *pipes [2] = {NULL, NULL};
382 :
383 3527 : bool conflate = options.conflate &&
384 3 : (options.type == ZMQ_DEALER ||
385 0 : options.type == ZMQ_PULL ||
386 0 : options.type == ZMQ_PUSH ||
387 0 : options.type == ZMQ_PUB ||
388 3524 : options.type == ZMQ_SUB);
389 :
390 : int hwms [2] = {conflate? -1 : options.rcvhwm,
391 3524 : conflate? -1 : options.sndhwm};
392 3524 : bool conflates [2] = {conflate, conflate};
393 3524 : int rc = pipepair (parents, pipes, hwms, conflates);
394 3524 : errno_assert (rc == 0);
395 :
396 : // Plug the local end of the pipe.
397 3524 : pipes [0]->set_event_sink (this);
398 :
399 : // Remember the local end of the pipe.
400 3524 : zmq_assert (!pipe);
401 3524 : pipe = pipes [0];
402 :
403 : // Ask socket to plug into the remote end of the pipe.
404 3524 : send_bind (socket, pipes [1]);
405 : }
406 :
407 : // Plug in the engine.
408 6768 : zmq_assert (!engine);
409 6768 : engine = engine_;
410 6768 : engine->plug (io_thread, this);
411 6768 : }
412 :
413 2401 : void zmq::session_base_t::engine_error (
414 : zmq::stream_engine_t::error_reason_t reason)
415 : {
416 : // Engine is dead. Let's forget about it.
417 2401 : engine = NULL;
418 :
419 : // Remove any half-done messages from the pipes.
420 2401 : if (pipe)
421 2373 : clean_pipes ();
422 :
423 2401 : zmq_assert (reason == stream_engine_t::connection_error
424 : || reason == stream_engine_t::timeout_error
425 : || reason == stream_engine_t::protocol_error);
426 :
427 2401 : switch (reason) {
428 : case stream_engine_t::timeout_error:
429 : case stream_engine_t::connection_error:
430 2341 : if (active)
431 210 : reconnect ();
432 : else
433 2131 : terminate ();
434 : break;
435 : case stream_engine_t::protocol_error:
436 60 : terminate ();
437 60 : break;
438 : }
439 :
440 : // Just in case there's only a delimiter in the pipe.
441 2401 : if (pipe)
442 2370 : pipe->check_read ();
443 :
444 2401 : if (zap_pipe)
445 18 : zap_pipe->check_read ();
446 2401 : }
447 :
448 7259 : void zmq::session_base_t::process_term (int linger_)
449 : {
450 7259 : zmq_assert (!pending);
451 :
452 : // If the termination of the pipe happens before the term command is
453 : // delivered there's nothing much to do. We can proceed with the
454 : // standard termination immediately.
455 7613 : if (!pipe && !zap_pipe && terminating_pipes.empty ()) {
456 354 : own_t::process_term (0);
457 7615 : return;
458 : }
459 :
460 6905 : pending = true;
461 :
462 6905 : if (pipe != NULL) {
463 : // If there's finite linger value, delay the termination.
464 : // If linger is infinite (negative) we don't even have to set
465 : // the timer.
466 6902 : if (linger_ > 0) {
467 3 : zmq_assert (!has_linger_timer);
468 3 : add_timer (linger_, linger_timer_id);
469 3 : has_linger_timer = true;
470 : }
471 :
472 : // Start pipe termination process. Delay the termination till all messages
473 : // are processed in case the linger time is non-zero.
474 6902 : pipe->terminate (linger_ != 0);
475 :
476 : // TODO: Should this go into pipe_t::terminate ?
477 : // In case there's no engine and there's only delimiter in the
478 : // pipe it wouldn't be ever read. Thus we check for it explicitly.
479 6904 : if (!engine)
480 3186 : pipe->check_read ();
481 : }
482 :
483 6907 : if (zap_pipe != NULL)
484 21 : zap_pipe->terminate (false);
485 : }
486 :
487 0 : void zmq::session_base_t::timer_event (int id_)
488 : {
489 : // Linger period expired. We can proceed with termination even though
490 : // there are still pending messages to be sent.
491 0 : zmq_assert (id_ == linger_timer_id);
492 0 : has_linger_timer = false;
493 :
494 : // Ask pipe to terminate even though there may be pending messages in it.
495 0 : zmq_assert (pipe);
496 0 : pipe->terminate (false);
497 0 : }
498 :
499 210 : void zmq::session_base_t::reconnect ()
500 : {
501 : // For delayed connect situations, terminate the pipe
502 : // and reestablish later on
503 397 : if (pipe && options.immediate == 1
504 9 : && addr->protocol != "pgm" && addr->protocol != "epgm"
505 219 : && addr->protocol != "norm" && addr->protocol != "udp") {
506 3 : pipe->hiccup ();
507 3 : pipe->terminate (false);
508 3 : terminating_pipes.insert (pipe);
509 3 : pipe = NULL;
510 : }
511 :
512 210 : reset ();
513 :
514 : // Reconnect.
515 210 : if (options.reconnect_ivl != -1)
516 201 : start_connecting (true);
517 :
518 : // For subscriber sockets we hiccup the inbound pipe, which will cause
519 : // the socket object to resend all the subscriptions.
520 210 : if (pipe && (options.type == ZMQ_SUB || options.type == ZMQ_XSUB))
521 112 : pipe->hiccup ();
522 210 : }
523 :
524 3948 : void zmq::session_base_t::start_connecting (bool wait_)
525 : {
526 3948 : zmq_assert (active);
527 :
528 : // Choose I/O thread to run connecter in. Given that we are already
529 : // running in an I/O thread, there must be at least one available.
530 3948 : io_thread_t *io_thread = choose_io_thread (options.affinity);
531 3947 : zmq_assert (io_thread);
532 :
533 : // Create the connecter object.
534 :
535 7893 : if (addr->protocol == "tcp") {
536 7750 : if (!options.socks_proxy_address.empty()) {
537 : address_t *proxy_address = new (std::nothrow)
538 0 : address_t ("tcp", options.socks_proxy_address, this->get_ctx ());
539 0 : alloc_assert (proxy_address);
540 : socks_connecter_t *connecter =
541 : new (std::nothrow) socks_connecter_t (
542 0 : io_thread, this, options, addr, proxy_address, wait_);
543 0 : alloc_assert (connecter);
544 0 : launch_child (connecter);
545 : }
546 : else {
547 : tcp_connecter_t *connecter = new (std::nothrow)
548 3875 : tcp_connecter_t (io_thread, this, options, addr, wait_);
549 3877 : alloc_assert (connecter);
550 3877 : launch_child (connecter);
551 : }
552 : return;
553 : }
554 :
555 : #if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
556 142 : if (addr->protocol == "ipc") {
557 : ipc_connecter_t *connecter = new (std::nothrow) ipc_connecter_t (
558 65 : io_thread, this, options, addr, wait_);
559 65 : alloc_assert (connecter);
560 65 : launch_child (connecter);
561 65 : return;
562 : }
563 : #endif
564 : #if defined ZMQ_HAVE_TIPC
565 : if (addr->protocol == "tipc") {
566 : tipc_connecter_t *connecter = new (std::nothrow) tipc_connecter_t (
567 : io_thread, this, options, addr, wait_);
568 : alloc_assert (connecter);
569 : launch_child (connecter);
570 : return;
571 : }
572 : #endif
573 :
574 12 : if (addr->protocol == "udp") {
575 6 : zmq_assert (options.type == ZMQ_DISH || options.type == ZMQ_RADIO);
576 :
577 6 : udp_engine_t* engine = new (std::nothrow) udp_engine_t ();
578 6 : alloc_assert (engine);
579 :
580 6 : bool recv = false;
581 6 : bool send = false;
582 :
583 6 : if (options.type == ZMQ_RADIO) {
584 : send = true;
585 : recv = false;
586 : }
587 3 : else if (options.type == ZMQ_DISH) {
588 3 : send = false;
589 3 : recv = true;
590 : }
591 :
592 6 : int rc = engine->init (addr, send, recv);
593 6 : errno_assert (rc == 0);
594 :
595 6 : send_attach (this, engine);
596 :
597 6 : return;
598 : }
599 :
600 : #ifdef ZMQ_HAVE_OPENPGM
601 :
602 : // Both PGM and EPGM transports are using the same infrastructure.
603 : if (addr->protocol == "pgm" || addr->protocol == "epgm") {
604 :
605 : zmq_assert (options.type == ZMQ_PUB || options.type == ZMQ_XPUB
606 : || options.type == ZMQ_SUB || options.type == ZMQ_XSUB);
607 :
608 : // For EPGM transport with UDP encapsulation of PGM is used.
609 : bool const udp_encapsulation = addr->protocol == "epgm";
610 :
611 : // At this point we'll create message pipes to the session straight
612 : // away. There's no point in delaying it as no concept of 'connect'
613 : // exists with PGM anyway.
614 : if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB) {
615 :
616 : // PGM sender.
617 : pgm_sender_t *pgm_sender = new (std::nothrow) pgm_sender_t (
618 : io_thread, options);
619 : alloc_assert (pgm_sender);
620 :
621 : int rc = pgm_sender->init (udp_encapsulation, addr->address.c_str ());
622 : errno_assert (rc == 0);
623 :
624 : send_attach (this, pgm_sender);
625 : }
626 : else {
627 :
628 : // PGM receiver.
629 : pgm_receiver_t *pgm_receiver = new (std::nothrow) pgm_receiver_t (
630 : io_thread, options);
631 : alloc_assert (pgm_receiver);
632 :
633 : int rc = pgm_receiver->init (udp_encapsulation, addr->address.c_str ());
634 : errno_assert (rc == 0);
635 :
636 : send_attach (this, pgm_receiver);
637 : }
638 :
639 : return;
640 : }
641 : #endif
642 :
643 : #ifdef ZMQ_HAVE_NORM
644 : if (addr->protocol == "norm") {
645 : // At this point we'll create message pipes to the session straight
646 : // away. There's no point in delaying it as no concept of 'connect'
647 : // exists with NORM anyway.
648 : if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB) {
649 :
650 : // NORM sender.
651 : norm_engine_t* norm_sender = new (std::nothrow) norm_engine_t(io_thread, options);
652 : alloc_assert (norm_sender);
653 :
654 : int rc = norm_sender->init (addr->address.c_str (), true, false);
655 : errno_assert (rc == 0);
656 :
657 : send_attach (this, norm_sender);
658 : }
659 : else { // ZMQ_SUB or ZMQ_XSUB
660 :
661 : // NORM receiver.
662 : norm_engine_t* norm_receiver = new (std::nothrow) norm_engine_t (io_thread, options);
663 : alloc_assert (norm_receiver);
664 :
665 : int rc = norm_receiver->init (addr->address.c_str (), false, true);
666 : errno_assert (rc == 0);
667 :
668 : send_attach (this, norm_receiver);
669 : }
670 : return;
671 : }
672 : #endif // ZMQ_HAVE_NORM
673 :
674 : #if defined ZMQ_HAVE_VMCI
675 : if (addr->protocol == "vmci") {
676 : vmci_connecter_t *connecter = new (std::nothrow) vmci_connecter_t (
677 : io_thread, this, options, addr, wait_);
678 : alloc_assert (connecter);
679 : launch_child (connecter);
680 : return;
681 : }
682 : #endif
683 :
684 0 : zmq_assert (false);
685 : }
|