Line data Source code
1 : /*
2 : Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 :
4 : This file is part of libzmq, the ZeroMQ core engine in C++.
5 :
6 : libzmq is free software; you can redistribute it and/or modify it under
7 : the terms of the GNU Lesser General Public License (LGPL) as published
8 : by the Free Software Foundation; either version 3 of the License, or
9 : (at your option) any later version.
10 :
11 : As a special exception, the Contributors give you permission to link
12 : this library with independent modules to produce an executable,
13 : regardless of the license terms of these independent modules, and to
14 : copy and distribute the resulting executable under terms of your choice,
15 : provided that you also meet, for each linked independent module, the
16 : terms and conditions of the license of that module. An independent
17 : module is a module which is not derived from or based on this library.
18 : If you modify this library, you must extend this exception to your
19 : version of the library.
20 :
21 : libzmq is distributed in the hope that it will be useful, but WITHOUT
22 : ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 : FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 : License for more details.
25 :
26 : You should have received a copy of the GNU Lesser General Public License
27 : along with this program. If not, see <http://www.gnu.org/licenses/>.
28 : */
29 :
30 : #include "precompiled.hpp"
31 : #include <new>
32 : #include <string>
33 : #include <algorithm>
34 :
35 : #include "macros.hpp"
36 : #include "platform.hpp"
37 :
38 : #if defined ZMQ_HAVE_WINDOWS
39 : #include "windows.hpp"
40 : #if defined _MSC_VER
41 : #if defined _WIN32_WCE
42 : #include <cmnintrin.h>
43 : #else
44 : #include <intrin.h>
45 : #endif
46 : #endif
47 : #else
48 : #include <unistd.h>
49 : #include <ctype.h>
50 : #endif
51 :
52 : #include "socket_base.hpp"
53 : #include "tcp_listener.hpp"
54 : #include "ipc_listener.hpp"
55 : #include "tipc_listener.hpp"
56 : #include "tcp_connecter.hpp"
57 : #include "io_thread.hpp"
58 : #include "session_base.hpp"
59 : #include "config.hpp"
60 : #include "pipe.hpp"
61 : #include "err.hpp"
62 : #include "ctx.hpp"
63 : #include "platform.hpp"
64 : #include "likely.hpp"
65 : #include "msg.hpp"
66 : #include "address.hpp"
67 : #include "ipc_address.hpp"
68 : #include "tcp_address.hpp"
69 : #include "udp_address.hpp"
70 : #include "tipc_address.hpp"
71 : #include "mailbox.hpp"
72 : #include "mailbox_safe.hpp"
73 :
74 : #if defined ZMQ_HAVE_VMCI
75 : #include "vmci_address.hpp"
76 : #include "vmci_listener.hpp"
77 : #endif
78 :
79 : #ifdef ZMQ_HAVE_OPENPGM
80 : #include "pgm_socket.hpp"
81 : #endif
82 :
83 : #include "pair.hpp"
84 : #include "pub.hpp"
85 : #include "sub.hpp"
86 : #include "req.hpp"
87 : #include "rep.hpp"
88 : #include "pull.hpp"
89 : #include "push.hpp"
90 : #include "dealer.hpp"
91 : #include "router.hpp"
92 : #include "xpub.hpp"
93 : #include "xsub.hpp"
94 : #include "stream.hpp"
95 : #include "server.hpp"
96 : #include "client.hpp"
97 : #include "radio.hpp"
98 : #include "dish.hpp"
99 : #include "gather.hpp"
100 : #include "scatter.hpp"
101 :
102 : #define ENTER_MUTEX() \
103 : if (thread_safe) \
104 : sync.lock();
105 :
106 : #define EXIT_MUTEX(); \
107 : if (thread_safe) \
108 : sync.unlock();
109 :
110 7508899 : bool zmq::socket_base_t::check_tag ()
111 : {
112 7508899 : return tag == 0xbaddecaf;
113 : }
114 :
115 11197 : zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_,
116 : uint32_t tid_, int sid_)
117 : {
118 11197 : socket_base_t *s = NULL;
119 11197 : switch (type_) {
120 : case ZMQ_PAIR:
121 6376 : s = new (std::nothrow) pair_t (parent_, tid_, sid_);
122 6376 : break;
123 : case ZMQ_PUB:
124 234 : s = new (std::nothrow) pub_t (parent_, tid_, sid_);
125 234 : break;
126 : case ZMQ_SUB:
127 3243 : s = new (std::nothrow) sub_t (parent_, tid_, sid_);
128 3243 : break;
129 : case ZMQ_REQ:
130 108 : s = new (std::nothrow) req_t (parent_, tid_, sid_);
131 108 : break;
132 : case ZMQ_REP:
133 150 : s = new (std::nothrow) rep_t (parent_, tid_, sid_);
134 150 : break;
135 : case ZMQ_DEALER:
136 600 : s = new (std::nothrow) dealer_t (parent_, tid_, sid_);
137 600 : break;
138 : case ZMQ_ROUTER:
139 87 : s = new (std::nothrow) router_t (parent_, tid_, sid_);
140 87 : break;
141 : case ZMQ_PULL:
142 117 : s = new (std::nothrow) pull_t (parent_, tid_, sid_);
143 117 : break;
144 : case ZMQ_PUSH:
145 156 : s = new (std::nothrow) push_t (parent_, tid_, sid_);
146 156 : break;
147 : case ZMQ_XPUB:
148 24 : s = new (std::nothrow) xpub_t (parent_, tid_, sid_);
149 24 : break;
150 : case ZMQ_XSUB:
151 18 : s = new (std::nothrow) xsub_t (parent_, tid_, sid_);
152 18 : break;
153 : case ZMQ_STREAM:
154 33 : s = new (std::nothrow) stream_t (parent_, tid_, sid_);
155 33 : break;
156 : case ZMQ_SERVER:
157 15 : s = new (std::nothrow) server_t (parent_, tid_, sid_);
158 15 : break;
159 : case ZMQ_CLIENT:
160 15 : s = new (std::nothrow) client_t (parent_, tid_, sid_);
161 15 : break;
162 : case ZMQ_RADIO:
163 6 : s = new (std::nothrow) radio_t (parent_, tid_, sid_);
164 6 : break;
165 : case ZMQ_DISH:
166 6 : s = new (std::nothrow) dish_t (parent_, tid_, sid_);
167 6 : break;
168 : case ZMQ_GATHER:
169 6 : s = new (std::nothrow) gather_t (parent_, tid_, sid_);
170 6 : break;
171 : case ZMQ_SCATTER:
172 3 : s = new (std::nothrow) scatter_t (parent_, tid_, sid_);
173 3 : break;
174 : default:
175 0 : errno = EINVAL;
176 0 : return NULL;
177 : }
178 :
179 11197 : alloc_assert (s);
180 :
181 11197 : if (s->mailbox == NULL) {
182 66 : s->destroyed = true;
183 66 : LIBZMQ_DELETE(s);
184 : return NULL;
185 : }
186 :
187 : return s;
188 : }
189 :
190 11197 : zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_, int sid_, bool thread_safe_) :
191 : own_t (parent_, tid_),
192 : tag (0xbaddecaf),
193 : ctx_terminated (false),
194 : destroyed (false),
195 : poller(NULL),
196 : handle(NULL),
197 : last_tsc (0),
198 : ticks (0),
199 : rcvmore (false),
200 : monitor_socket (NULL),
201 : monitor_events (0),
202 : thread_safe (thread_safe_),
203 100773 : reaper_signaler (NULL)
204 : {
205 11197 : options.socket_id = sid_;
206 11197 : options.ipv6 = (parent_->get (ZMQ_IPV6) != 0);
207 11197 : options.linger = parent_->get (ZMQ_BLOCKY)? -1: 0;
208 :
209 11197 : if (thread_safe)
210 51 : mailbox = new mailbox_safe_t(&sync);
211 : else {
212 11146 : mailbox_t *m = new mailbox_t();
213 11146 : if (m->get_fd () != retired_fd)
214 11080 : mailbox = m;
215 : else {
216 66 : LIBZMQ_DELETE (m);
217 66 : mailbox = NULL;
218 : }
219 : }
220 11197 : }
221 :
222 111970 : zmq::socket_base_t::~socket_base_t ()
223 : {
224 11197 : if (mailbox)
225 11131 : LIBZMQ_DELETE(mailbox);
226 :
227 11197 : if (reaper_signaler)
228 51 : LIBZMQ_DELETE(reaper_signaler);
229 :
230 11197 : stop_monitor ();
231 11197 : zmq_assert (destroyed);
232 11197 : }
233 :
234 11131 : zmq::i_mailbox *zmq::socket_base_t::get_mailbox ()
235 : {
236 11131 : return mailbox;
237 : }
238 :
239 8247 : void zmq::socket_base_t::stop ()
240 : {
241 : // Called by ctx when it is terminated (zmq_ctx_term).
242 : // 'stop' command is sent from the threads that called zmq_ctx_term to
243 : // the thread owning the socket. This way, blocking call in the
244 : // owner thread can be interrupted.
245 8247 : send_stop ();
246 8247 : }
247 :
248 5143 : int zmq::socket_base_t::parse_uri (const char *uri_,
249 : std::string &protocol_, std::string &address_)
250 : {
251 5143 : zmq_assert (uri_ != NULL);
252 :
253 5143 : std::string uri (uri_);
254 5143 : std::string::size_type pos = uri.find ("://");
255 5143 : if (pos == std::string::npos) {
256 0 : errno = EINVAL;
257 0 : return -1;
258 : }
259 10286 : protocol_ = uri.substr (0, pos);
260 10285 : address_ = uri.substr (pos + 3);
261 :
262 10284 : if (protocol_.empty () || address_.empty ()) {
263 3 : errno = EINVAL;
264 3 : return -1;
265 : }
266 : return 0;
267 : }
268 :
269 5139 : int zmq::socket_base_t::check_protocol (const std::string &protocol_)
270 : {
271 : // First check out whether the protocol is something we are aware of.
272 5139 : if (protocol_ != "inproc"
273 : #if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
274 4181 : && protocol_ != "ipc"
275 : #endif
276 4049 : && protocol_ != "tcp"
277 : #if defined ZMQ_HAVE_OPENPGM
278 : // pgm/epgm transports only available if 0MQ is compiled with OpenPGM.
279 : && protocol_ != "pgm"
280 : && protocol_ != "epgm"
281 : #endif
282 : #if defined ZMQ_HAVE_TIPC
283 : // TIPC transport is only available on Linux.
284 : && protocol_ != "tipc"
285 : #endif
286 : #if defined ZMQ_HAVE_NORM
287 : && protocol_ != "norm"
288 : #endif
289 : #if defined ZMQ_HAVE_VMCI
290 : && protocol_ != "vmci"
291 : #endif
292 5151 : && protocol_ != "udp") {
293 3 : errno = EPROTONOSUPPORT;
294 3 : return -1;
295 : }
296 :
297 : // Check whether socket type and transport protocol match.
298 : // Specifically, multicast protocols can't be combined with
299 : // bi-directional messaging patterns (socket types).
300 : #if defined ZMQ_HAVE_OPENPGM || defined ZMQ_HAVE_NORM
301 : if ((protocol_ == "pgm" || protocol_ == "epgm" || protocol_ == "norm") &&
302 : options.type != ZMQ_PUB && options.type != ZMQ_SUB &&
303 : options.type != ZMQ_XPUB && options.type != ZMQ_XSUB) {
304 : errno = ENOCOMPATPROTO;
305 : return -1;
306 : }
307 : #endif
308 :
309 5135 : if (protocol_ == "udp" && (options.type != ZMQ_DISH &&
310 : options.type != ZMQ_RADIO)) {
311 0 : errno = ENOCOMPATPROTO;
312 0 : return -1;
313 : }
314 :
315 : // Protocol is available.
316 : return 0;
317 : }
318 :
319 8417 : void zmq::socket_base_t::attach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
320 : {
321 : // First, register the pipe so that we can terminate it later on.
322 8417 : pipe_->set_event_sink (this);
323 8417 : pipes.push_back (pipe_);
324 :
325 : // Let the derived socket type know about new pipe.
326 8417 : xattach_pipe (pipe_, subscribe_to_all_);
327 :
328 : // If the socket is already being closed, ask any new pipes to terminate
329 : // straight away.
330 8417 : if (is_terminating ()) {
331 2823 : register_term_acks (1);
332 2823 : pipe_->terminate (false);
333 : }
334 8417 : }
335 :
336 2613 : int zmq::socket_base_t::setsockopt (int option_, const void *optval_,
337 : size_t optvallen_)
338 : {
339 2613 : ENTER_MUTEX ();
340 :
341 2613 : if (!options.is_valid(option_)) {
342 0 : errno = EINVAL;
343 0 : EXIT_MUTEX ();
344 : return -1;
345 : }
346 :
347 2613 : if (unlikely (ctx_terminated)) {
348 6 : errno = ETERM;
349 6 : EXIT_MUTEX ();
350 : return -1;
351 : }
352 :
353 : // First, check whether specific socket type overloads the option.
354 2607 : int rc = xsetsockopt (option_, optval_, optvallen_);
355 2607 : if (rc == 0 || errno != EINVAL) {
356 210 : EXIT_MUTEX ();
357 210 : return rc;
358 : }
359 :
360 : // If the socket type doesn't support the option, pass it to
361 : // the generic option parser.
362 2397 : rc = options.setsockopt (option_, optval_, optvallen_);
363 2397 : update_pipe_options(option_);
364 :
365 2397 : EXIT_MUTEX ();
366 2397 : return rc;
367 : }
368 :
369 2654195 : int zmq::socket_base_t::getsockopt (int option_, void *optval_,
370 : size_t *optvallen_)
371 : {
372 2654195 : ENTER_MUTEX ();
373 :
374 2654195 : if (unlikely (ctx_terminated)) {
375 0 : errno = ETERM;
376 0 : EXIT_MUTEX ();
377 : return -1;
378 : }
379 :
380 2654195 : if (option_ == ZMQ_RCVMORE) {
381 1020 : if (*optvallen_ < sizeof (int)) {
382 0 : errno = EINVAL;
383 0 : EXIT_MUTEX ();
384 : return -1;
385 : }
386 1020 : memset(optval_, 0, *optvallen_);
387 1020 : *((int*) optval_) = rcvmore ? 1 : 0;
388 1020 : *optvallen_ = sizeof (int);
389 1020 : EXIT_MUTEX ();
390 : return 0;
391 : }
392 :
393 2653175 : if (option_ == ZMQ_FD) {
394 1325879 : if (*optvallen_ < sizeof (fd_t)) {
395 0 : errno = EINVAL;
396 0 : EXIT_MUTEX ();
397 : return -1;
398 : }
399 :
400 1325879 : if (thread_safe) {
401 : // thread safe socket doesn't provide file descriptor
402 0 : errno = EINVAL;
403 0 : EXIT_MUTEX ();
404 : return -1;
405 : }
406 :
407 1325879 : *((fd_t*)optval_) = ((mailbox_t*)mailbox)->get_fd();
408 1325879 : *optvallen_ = sizeof(fd_t);
409 :
410 1325879 : EXIT_MUTEX ();
411 : return 0;
412 : }
413 :
414 1327296 : if (option_ == ZMQ_EVENTS) {
415 1327173 : if (*optvallen_ < sizeof (int)) {
416 0 : errno = EINVAL;
417 0 : EXIT_MUTEX ();
418 : return -1;
419 : }
420 1327173 : int rc = process_commands (0, false);
421 1327173 : if (rc != 0 && (errno == EINTR || errno == ETERM)) {
422 0 : EXIT_MUTEX ();
423 : return -1;
424 : }
425 1327173 : errno_assert (rc == 0);
426 1327173 : *((int*) optval_) = 0;
427 1327171 : if (has_out ())
428 1366 : *((int*) optval_) |= ZMQ_POLLOUT;
429 1327171 : if (has_in ())
430 530030 : *((int*) optval_) |= ZMQ_POLLIN;
431 1327171 : *optvallen_ = sizeof (int);
432 1327171 : EXIT_MUTEX ();
433 : return 0;
434 : }
435 :
436 123 : if (option_ == ZMQ_LAST_ENDPOINT) {
437 66 : if (*optvallen_ < last_endpoint.size () + 1) {
438 0 : errno = EINVAL;
439 0 : EXIT_MUTEX ();
440 : return -1;
441 : }
442 33 : strncpy(static_cast <char *> (optval_), last_endpoint.c_str(), last_endpoint.size() + 1);
443 66 : *optvallen_ = last_endpoint.size () + 1;
444 33 : EXIT_MUTEX ();
445 : return 0;
446 : }
447 :
448 90 : if (option_ == ZMQ_THREAD_SAFE) {
449 30 : if (*optvallen_ < sizeof (int)) {
450 0 : errno = EINVAL;
451 0 : EXIT_MUTEX ();
452 : return -1;
453 : }
454 30 : memset(optval_, 0, *optvallen_);
455 30 : *((int*) optval_) = thread_safe ? 1 : 0;
456 30 : *optvallen_ = sizeof (int);
457 30 : EXIT_MUTEX ();
458 : return 0;
459 : }
460 :
461 60 : int rc = options.getsockopt (option_, optval_, optvallen_);
462 60 : EXIT_MUTEX ();
463 60 : return rc;
464 : }
465 :
466 15 : int zmq::socket_base_t::join (const char* group_)
467 : {
468 15 : ENTER_MUTEX ();
469 :
470 15 : int rc = xjoin (group_);
471 :
472 15 : EXIT_MUTEX();
473 :
474 15 : return rc;
475 : }
476 :
477 6 : int zmq::socket_base_t::leave (const char* group_)
478 : {
479 6 : ENTER_MUTEX ();
480 :
481 6 : int rc = xleave (group_);
482 :
483 6 : EXIT_MUTEX();
484 :
485 6 : return rc;
486 : }
487 :
488 3 : int zmq::socket_base_t::add_signaler(signaler_t *s_)
489 : {
490 3 : ENTER_MUTEX ();
491 :
492 3 : if (!thread_safe) {
493 0 : errno = EINVAL;
494 0 : EXIT_MUTEX ();
495 : return -1;
496 : }
497 :
498 3 : ((mailbox_safe_t*)mailbox)->add_signaler(s_);
499 :
500 3 : EXIT_MUTEX ();
501 : return 0;
502 : }
503 :
504 0 : int zmq::socket_base_t::remove_signaler(signaler_t *s_)
505 : {
506 0 : ENTER_MUTEX ();
507 :
508 0 : if (!thread_safe) {
509 0 : errno = EINVAL;
510 0 : EXIT_MUTEX ();
511 : return -1;
512 : }
513 :
514 0 : ((mailbox_safe_t*)mailbox)->remove_signaler(s_);
515 :
516 0 : EXIT_MUTEX ();
517 : return 0;
518 : }
519 :
520 703 : int zmq::socket_base_t::bind (const char *addr_)
521 : {
522 703 : ENTER_MUTEX ();
523 :
524 703 : if (unlikely (ctx_terminated)) {
525 0 : errno = ETERM;
526 0 : EXIT_MUTEX ();
527 : return -1;
528 : }
529 :
530 : // Process pending commands, if any.
531 703 : int rc = process_commands (0, false);
532 703 : if (unlikely (rc != 0)) {
533 0 : EXIT_MUTEX ();
534 : return -1;
535 : }
536 :
537 : // Parse addr_ string.
538 : std::string protocol;
539 : std::string address;
540 703 : if (parse_uri (addr_, protocol, address) || check_protocol (protocol)) {
541 0 : EXIT_MUTEX ();
542 : return -1;
543 : }
544 :
545 703 : if (protocol == "inproc") {
546 358 : const endpoint_t endpoint = { this, options };
547 358 : rc = register_endpoint (addr_, endpoint);
548 358 : if (rc == 0) {
549 358 : connect_pending (addr_, this);
550 358 : last_endpoint.assign (addr_);
551 358 : options.connected = true;
552 : }
553 358 : EXIT_MUTEX ();
554 358 : return rc;
555 : }
556 :
557 1380 : if (protocol == "pgm" || protocol == "epgm" || protocol == "norm" || protocol == "udp") {
558 : // For convenience's sake, bind can be used interchangeable with
559 : // connect for PGM, EPGM, NORM and UDP transports.
560 3 : EXIT_MUTEX ();
561 3 : rc = connect (addr_);
562 3 : if (rc != -1)
563 3 : options.connected = true;
564 3 : return rc;
565 : }
566 :
567 : // Remaining transports require to be run in an I/O thread, so at this
568 : // point we'll choose one.
569 342 : io_thread_t *io_thread = choose_io_thread (options.affinity);
570 342 : if (!io_thread) {
571 0 : errno = EMTHREAD;
572 0 : EXIT_MUTEX ();
573 : return -1;
574 : }
575 :
576 342 : if (protocol == "tcp") {
577 : tcp_listener_t *listener = new (std::nothrow) tcp_listener_t (
578 279 : io_thread, this, options);
579 279 : alloc_assert (listener);
580 279 : rc = listener->set_address (address.c_str ());
581 279 : if (rc != 0) {
582 0 : LIBZMQ_DELETE(listener);
583 0 : event_bind_failed (address, zmq_errno());
584 0 : EXIT_MUTEX ();
585 : return -1;
586 : }
587 :
588 : // Save last endpoint URI
589 279 : listener->get_address (last_endpoint);
590 :
591 558 : add_endpoint (last_endpoint.c_str (), (own_t *) listener, NULL);
592 279 : options.connected = true;
593 279 : EXIT_MUTEX ();
594 : return 0;
595 : }
596 :
597 : #if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
598 63 : if (protocol == "ipc") {
599 : ipc_listener_t *listener = new (std::nothrow) ipc_listener_t (
600 63 : io_thread, this, options);
601 63 : alloc_assert (listener);
602 63 : int rc = listener->set_address (address.c_str ());
603 63 : if (rc != 0) {
604 0 : LIBZMQ_DELETE(listener);
605 0 : event_bind_failed (address, zmq_errno());
606 0 : EXIT_MUTEX ();
607 : return -1;
608 : }
609 :
610 : // Save last endpoint URI
611 63 : listener->get_address (last_endpoint);
612 :
613 126 : add_endpoint (last_endpoint.c_str (), (own_t *) listener, NULL);
614 63 : options.connected = true;
615 63 : EXIT_MUTEX ();
616 : return 0;
617 : }
618 : #endif
619 : #if defined ZMQ_HAVE_TIPC
620 : if (protocol == "tipc") {
621 : tipc_listener_t *listener = new (std::nothrow) tipc_listener_t (
622 : io_thread, this, options);
623 : alloc_assert (listener);
624 : int rc = listener->set_address (address.c_str ());
625 : if (rc != 0) {
626 : LIBZMQ_DELETE(listener);
627 : event_bind_failed (address, zmq_errno());
628 : EXIT_MUTEX ();
629 : return -1;
630 : }
631 :
632 : // Save last endpoint URI
633 : listener->get_address (last_endpoint);
634 :
635 : add_endpoint (addr_, (own_t *) listener, NULL);
636 : options.connected = true;
637 : EXIT_MUTEX ();
638 : return 0;
639 : }
640 : #endif
641 : #if defined ZMQ_HAVE_VMCI
642 : if (protocol == "vmci") {
643 : vmci_listener_t *listener = new (std::nothrow) vmci_listener_t (
644 : io_thread, this, options);
645 : alloc_assert (listener);
646 : int rc = listener->set_address (address.c_str ());
647 : if (rc != 0) {
648 : LIBZMQ_DELETE(listener);
649 : event_bind_failed (address, zmq_errno ());
650 : EXIT_MUTEX ();
651 : return -1;
652 : }
653 :
654 : listener->get_address (last_endpoint);
655 :
656 : add_endpoint (last_endpoint.c_str(), (own_t *) listener, NULL);
657 : options.connected = true;
658 : EXIT_MUTEX ();
659 : return 0;
660 : }
661 : #endif
662 :
663 0 : EXIT_MUTEX ();
664 0 : zmq_assert (false);
665 : return -1;
666 : }
667 :
668 4329 : int zmq::socket_base_t::connect (const char *addr_)
669 : {
670 4329 : ENTER_MUTEX ();
671 :
672 4329 : if (unlikely (ctx_terminated)) {
673 0 : errno = ETERM;
674 0 : EXIT_MUTEX ();
675 : return -1;
676 : }
677 :
678 : // Process pending commands, if any.
679 4329 : int rc = process_commands (0, false);
680 4329 : if (unlikely (rc != 0)) {
681 0 : EXIT_MUTEX ();
682 : return -1;
683 : }
684 :
685 : // Parse addr_ string.
686 : std::string protocol;
687 : std::string address;
688 4329 : if (parse_uri (addr_, protocol, address) || check_protocol (protocol)) {
689 6 : EXIT_MUTEX ();
690 : return -1;
691 : }
692 :
693 4323 : if (protocol == "inproc") {
694 :
695 : // TODO: inproc connect is specific with respect to creating pipes
696 : // as there's no 'reconnect' functionality implemented. Once that
697 : // is in place we should follow generic pipe creation algorithm.
698 :
699 : // Find the peer endpoint.
700 567 : endpoint_t peer = find_endpoint (addr_);
701 :
702 : // The total HWM for an inproc connection should be the sum of
703 : // the binder's HWM and the connector's HWM.
704 567 : int sndhwm = 0;
705 567 : if (peer.socket == NULL)
706 276 : sndhwm = options.sndhwm;
707 291 : else if (options.sndhwm != 0 && peer.options.rcvhwm != 0)
708 282 : sndhwm = options.sndhwm + peer.options.rcvhwm;
709 567 : int rcvhwm = 0;
710 567 : if (peer.socket == NULL)
711 276 : rcvhwm = options.rcvhwm;
712 : else
713 291 : if (options.rcvhwm != 0 && peer.options.sndhwm != 0)
714 291 : rcvhwm = options.rcvhwm + peer.options.sndhwm;
715 :
716 : // Create a bi-directional pipe to connect the peers.
717 567 : object_t *parents [2] = {this, peer.socket == NULL ? this : peer.socket};
718 567 : pipe_t *new_pipes [2] = {NULL, NULL};
719 :
720 567 : bool conflate = options.conflate &&
721 0 : (options.type == ZMQ_DEALER ||
722 0 : options.type == ZMQ_PULL ||
723 0 : options.type == ZMQ_PUSH ||
724 0 : options.type == ZMQ_PUB ||
725 567 : options.type == ZMQ_SUB);
726 :
727 567 : int hwms [2] = {conflate? -1 : sndhwm, conflate? -1 : rcvhwm};
728 567 : bool conflates [2] = {conflate, conflate};
729 567 : rc = pipepair (parents, new_pipes, hwms, conflates);
730 567 : if (!conflate) {
731 567 : new_pipes[0]->set_hwms_boost(peer.options.sndhwm, peer.options.rcvhwm);
732 567 : new_pipes[1]->set_hwms_boost(options.sndhwm, options.rcvhwm);
733 : }
734 :
735 567 : errno_assert (rc == 0);
736 :
737 567 : if (!peer.socket) {
738 : // The peer doesn't exist yet so we don't know whether
739 : // to send the identity message or not. To resolve this,
740 : // we always send our identity and drop it later if
741 : // the peer doesn't expect it.
742 : msg_t id;
743 276 : rc = id.init_size (options.identity_size);
744 276 : errno_assert (rc == 0);
745 276 : memcpy (id.data (), options.identity, options.identity_size);
746 276 : id.set_flags (msg_t::identity);
747 276 : bool written = new_pipes [0]->write (&id);
748 276 : zmq_assert (written);
749 276 : new_pipes [0]->flush ();
750 :
751 276 : const endpoint_t endpoint = {this, options};
752 552 : pend_connection (std::string (addr_), endpoint, new_pipes);
753 : }
754 : else {
755 : // If required, send the identity of the local socket to the peer.
756 291 : if (peer.options.recv_identity) {
757 : msg_t id;
758 42 : rc = id.init_size (options.identity_size);
759 42 : errno_assert (rc == 0);
760 42 : memcpy (id.data (), options.identity, options.identity_size);
761 42 : id.set_flags (msg_t::identity);
762 42 : bool written = new_pipes [0]->write (&id);
763 42 : zmq_assert (written);
764 42 : new_pipes [0]->flush ();
765 : }
766 :
767 : // If required, send the identity of the peer to the local socket.
768 291 : if (options.recv_identity) {
769 : msg_t id;
770 33 : rc = id.init_size (peer.options.identity_size);
771 33 : errno_assert (rc == 0);
772 33 : memcpy (id.data (), peer.options.identity, peer.options.identity_size);
773 33 : id.set_flags (msg_t::identity);
774 33 : bool written = new_pipes [1]->write (&id);
775 33 : zmq_assert (written);
776 33 : new_pipes [1]->flush ();
777 : }
778 :
779 : // Attach remote end of the pipe to the peer socket. Note that peer's
780 : // seqnum was incremented in find_endpoint function. We don't need it
781 : // increased here.
782 291 : send_bind (peer.socket, new_pipes [1], false);
783 : }
784 :
785 : // Attach local end of the pipe to this socket object.
786 567 : attach_pipe (new_pipes [0]);
787 :
788 : // Save last endpoint URI
789 567 : last_endpoint.assign (addr_);
790 :
791 : // remember inproc connections for disconnect
792 2268 : inprocs.insert (inprocs_t::value_type (std::string (addr_), new_pipes [0]));
793 :
794 567 : options.connected = true;
795 567 : EXIT_MUTEX ();
796 567 : return 0;
797 : }
798 3756 : bool is_single_connect = (options.type == ZMQ_DEALER ||
799 3756 : options.type == ZMQ_SUB ||
800 3756 : options.type == ZMQ_REQ);
801 3756 : if (unlikely (is_single_connect)) {
802 10627 : const endpoints_t::iterator it = endpoints.find (addr_);
803 7086 : if (it != endpoints.end ()) {
804 : // There is no valid use for multiple connects for SUB-PUB nor
805 : // DEALER-ROUTER nor REQ-REP. Multiple connects produces
806 : // nonsensical results.
807 0 : EXIT_MUTEX ();
808 : return 0;
809 : }
810 : }
811 :
812 : // Choose the I/O thread to run the session in.
813 3756 : io_thread_t *io_thread = choose_io_thread (options.affinity);
814 3753 : if (!io_thread) {
815 0 : errno = EMTHREAD;
816 0 : EXIT_MUTEX ();
817 : return -1;
818 : }
819 :
820 3753 : address_t *paddr = new (std::nothrow) address_t (protocol, address, this->get_ctx ());
821 3756 : alloc_assert (paddr);
822 :
823 : // Resolve address (if needed by the protocol)
824 3753 : if (protocol == "tcp") {
825 : // Do some basic sanity checks on tcp:// address syntax
826 : // - hostname starts with digit or letter, with embedded '-' or '.'
827 : // - IPv6 address may contain hex chars and colons.
828 : // - IPv6 link local address may contain % followed by interface name / zone_id
829 : // (Reference: https://tools.ietf.org/html/rfc4007)
830 : // - IPv4 address may contain decimal digits and dots.
831 : // - Address must end in ":port" where port is *, or numeric
832 : // - Address may contain two parts separated by ':'
833 : // Following code is quick and dirty check to catch obvious errors,
834 : // without trying to be fully accurate.
835 3684 : const char *check = address.c_str ();
836 3684 : if (isalnum (*check) || isxdigit (*check) || *check == '[') {
837 3686 : check++;
838 55340 : while (isalnum (*check)
839 17814 : || isxdigit (*check)
840 17813 : || *check == '.' || *check == '-' || *check == ':' || *check == '%'
841 3706 : || *check == ';' || *check == ']' || *check == '_'
842 : ) {
843 47968 : check++;
844 : }
845 : }
846 : // Assume the worst, now look for success
847 3684 : rc = -1;
848 : // Did we reach the end of the address safely?
849 3684 : if (*check == 0) {
850 : // Do we have a valid port string? (cannot be '*' in connect
851 7354 : check = strrchr (address.c_str (), ':');
852 3677 : if (check) {
853 3678 : check++;
854 3678 : if (*check && (isdigit (*check)))
855 3675 : rc = 0; // Valid
856 : }
857 : }
858 3684 : if (rc == -1) {
859 9 : errno = EINVAL;
860 9 : LIBZMQ_DELETE(paddr);
861 9 : EXIT_MUTEX ();
862 : return -1;
863 : }
864 : // Defer resolution until a socket is opened
865 3675 : paddr->resolved.tcp_addr = NULL;
866 : }
867 : #if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
868 : else
869 69 : if (protocol == "ipc") {
870 63 : paddr->resolved.ipc_addr = new (std::nothrow) ipc_address_t ();
871 63 : alloc_assert (paddr->resolved.ipc_addr);
872 63 : int rc = paddr->resolved.ipc_addr->resolve (address.c_str ());
873 63 : if (rc != 0) {
874 0 : LIBZMQ_DELETE(paddr);
875 0 : EXIT_MUTEX ();
876 : return -1;
877 : }
878 : }
879 : #endif
880 :
881 3744 : if (protocol == "udp") {
882 6 : paddr->resolved.udp_addr = new (std::nothrow) udp_address_t ();
883 6 : alloc_assert (paddr->resolved.udp_addr);
884 12 : rc = paddr->resolved.udp_addr->resolve (address.c_str(), options.type == ZMQ_DISH);
885 6 : if (rc != 0) {
886 0 : LIBZMQ_DELETE(paddr);
887 0 : EXIT_MUTEX ();
888 : return -1;
889 : }
890 : }
891 :
892 : // TBD - Should we check address for ZMQ_HAVE_NORM???
893 :
894 : #ifdef ZMQ_HAVE_OPENPGM
895 : if (protocol == "pgm" || protocol == "epgm") {
896 : struct pgm_addrinfo_t *res = NULL;
897 : uint16_t port_number = 0;
898 : int rc = pgm_socket_t::init_address(address.c_str(), &res, &port_number);
899 : if (res != NULL)
900 : pgm_freeaddrinfo (res);
901 : if (rc != 0 || port_number == 0) {
902 : EXIT_MUTEX ();
903 : return -1;
904 : }
905 : }
906 : #endif
907 : #if defined ZMQ_HAVE_TIPC
908 : else
909 : if (protocol == "tipc") {
910 : paddr->resolved.tipc_addr = new (std::nothrow) tipc_address_t ();
911 : alloc_assert (paddr->resolved.tipc_addr);
912 : int rc = paddr->resolved.tipc_addr->resolve (address.c_str());
913 : if (rc != 0) {
914 : LIBZMQ_DELETE(paddr);
915 : EXIT_MUTEX ();
916 : return -1;
917 : }
918 : }
919 : #endif
920 : #if defined ZMQ_HAVE_VMCI
921 : else
922 : if (protocol == "vmci") {
923 : paddr->resolved.vmci_addr = new (std::nothrow) vmci_address_t (this->get_ctx ());
924 : alloc_assert (paddr->resolved.vmci_addr);
925 : int rc = paddr->resolved.vmci_addr->resolve (address.c_str ());
926 : if (rc != 0) {
927 : LIBZMQ_DELETE(paddr);
928 : EXIT_MUTEX ();
929 : return -1;
930 : }
931 : }
932 : #endif
933 :
934 : // Create session.
935 : session_base_t *session = session_base_t::create (io_thread, true, this,
936 3744 : options, paddr);
937 3744 : errno_assert (session);
938 :
939 : // PGM does not support subscription forwarding; ask for all data to be
940 : // sent to this pipe. (same for NORM, currently?)
941 14984 : bool subscribe_to_all = protocol == "pgm" || protocol == "epgm" || protocol == "norm" || protocol == "udp";
942 3749 : pipe_t *newpipe = NULL;
943 :
944 3749 : if (options.immediate != 1 || subscribe_to_all) {
945 : // Create a bi-directional pipe.
946 3738 : object_t *parents [2] = {this, session};
947 3738 : pipe_t *new_pipes [2] = {NULL, NULL};
948 :
949 3738 : bool conflate = options.conflate &&
950 0 : (options.type == ZMQ_DEALER ||
951 0 : options.type == ZMQ_PULL ||
952 0 : options.type == ZMQ_PUSH ||
953 0 : options.type == ZMQ_PUB ||
954 3738 : options.type == ZMQ_SUB);
955 :
956 : int hwms [2] = {conflate? -1 : options.sndhwm,
957 3738 : conflate? -1 : options.rcvhwm};
958 3738 : bool conflates [2] = {conflate, conflate};
959 3738 : rc = pipepair (parents, new_pipes, hwms, conflates);
960 3738 : errno_assert (rc == 0);
961 :
962 : // Attach local end of the pipe to the socket object.
963 3738 : attach_pipe (new_pipes [0], subscribe_to_all);
964 3738 : newpipe = new_pipes [0];
965 :
966 : // Attach remote end of the pipe to the session object later on.
967 3738 : session->attach_pipe (new_pipes [1]);
968 : }
969 :
970 : // Save last endpoint URI
971 3749 : paddr->to_string (last_endpoint);
972 :
973 3746 : add_endpoint (addr_, (own_t *) session, newpipe);
974 3747 : EXIT_MUTEX ();
975 : return 0;
976 : }
977 :
978 4088 : void zmq::socket_base_t::add_endpoint (const char *addr_, own_t *endpoint_, pipe_t *pipe)
979 : {
980 : // Activate the session. Make it a child of this socket.
981 4088 : launch_child (endpoint_);
982 20445 : endpoints.insert (endpoints_t::value_type (std::string (addr_), endpoint_pipe_t (endpoint_, pipe)));
983 4089 : }
984 :
985 84 : int zmq::socket_base_t::term_endpoint (const char *addr_)
986 : {
987 84 : ENTER_MUTEX ();
988 :
989 : // Check whether the library haven't been shut down yet.
990 84 : if (unlikely (ctx_terminated)) {
991 0 : errno = ETERM;
992 0 : EXIT_MUTEX ();
993 : return -1;
994 : }
995 :
996 : // Check whether endpoint address passed to the function is valid.
997 84 : if (unlikely (!addr_)) {
998 0 : errno = EINVAL;
999 0 : EXIT_MUTEX ();
1000 : return -1;
1001 : }
1002 :
1003 : // Process pending commands, if any, since there could be pending unprocessed process_own()'s
1004 : // (from launch_child() for example) we're asked to terminate now.
1005 84 : int rc = process_commands (0, false);
1006 84 : if (unlikely(rc != 0)) {
1007 0 : EXIT_MUTEX ();
1008 : return -1;
1009 : }
1010 :
1011 : // Parse addr_ string.
1012 : std::string protocol;
1013 : std::string address;
1014 84 : if (parse_uri(addr_, protocol, address) || check_protocol(protocol)) {
1015 0 : EXIT_MUTEX ();
1016 : return -1;
1017 : }
1018 :
1019 : // Disconnect an inproc socket
1020 84 : if (protocol == "inproc") {
1021 18 : if (unregister_endpoint (std::string(addr_), this) == 0) {
1022 6 : EXIT_MUTEX ();
1023 : return 0;
1024 : }
1025 9 : std::pair <inprocs_t::iterator, inprocs_t::iterator> range = inprocs.equal_range (std::string (addr_));
1026 3 : if (range.first == range.second) {
1027 0 : errno = ENOENT;
1028 0 : EXIT_MUTEX ();
1029 : return -1;
1030 : }
1031 :
1032 6 : for (inprocs_t::iterator it = range.first; it != range.second; ++it)
1033 3 : it->second->terminate (true);
1034 3 : inprocs.erase (range.first, range.second);
1035 3 : EXIT_MUTEX ();
1036 : return 0;
1037 : }
1038 :
1039 75 : std::string resolved_addr = std::string (addr_);
1040 : std::pair <endpoints_t::iterator, endpoints_t::iterator> range;
1041 :
1042 : // The resolved last_endpoint is used as a key in the endpoints map.
1043 : // The address passed by the user might not match in the TCP case due to
1044 : // IPv4-in-IPv6 mapping (EG: tcp://[::ffff:127.0.0.1]:9999), so try to
1045 : // resolve before giving up. Given at this stage we don't know whether a
1046 : // socket is connected or bound, try with both.
1047 75 : if (protocol == "tcp") {
1048 207 : range = endpoints.equal_range (resolved_addr);
1049 69 : if (range.first == range.second) {
1050 6 : tcp_address_t *tcp_addr = new (std::nothrow) tcp_address_t ();
1051 6 : alloc_assert (tcp_addr);
1052 12 : rc = tcp_addr->resolve (address.c_str (), false, options.ipv6);
1053 :
1054 6 : if (rc == 0) {
1055 6 : tcp_addr->to_string (resolved_addr);
1056 18 : range = endpoints.equal_range (resolved_addr);
1057 :
1058 6 : if (range.first == range.second) {
1059 6 : rc = tcp_addr->resolve (address.c_str (), true, options.ipv6);
1060 3 : if (rc == 0) {
1061 3 : tcp_addr->to_string (resolved_addr);
1062 : }
1063 : }
1064 : }
1065 6 : LIBZMQ_DELETE(tcp_addr);
1066 : }
1067 : }
1068 :
1069 : // Find the endpoints range (if any) corresponding to the addr_ string.
1070 225 : range = endpoints.equal_range (resolved_addr);
1071 75 : if (range.first == range.second) {
1072 6 : errno = ENOENT;
1073 6 : EXIT_MUTEX ();
1074 : return -1;
1075 : }
1076 :
1077 138 : for (endpoints_t::iterator it = range.first; it != range.second; ++it) {
1078 : // If we have an associated pipe, terminate it.
1079 69 : if (it->second.second != NULL)
1080 27 : it->second.second->terminate (false);
1081 69 : term_child (it->second.first);
1082 : }
1083 69 : endpoints.erase (range.first, range.second);
1084 69 : EXIT_MUTEX ();
1085 : return 0;
1086 : }
1087 :
1088 864514 : int zmq::socket_base_t::send (msg_t *msg_, int flags_)
1089 : {
1090 864514 : ENTER_MUTEX ();
1091 :
1092 : // Check whether the library haven't been shut down yet.
1093 864522 : if (unlikely (ctx_terminated)) {
1094 0 : errno = ETERM;
1095 0 : EXIT_MUTEX ();
1096 : return -1;
1097 : }
1098 :
1099 : // Check whether message passed to the function is valid.
1100 864522 : if (unlikely (!msg_ || !msg_->check ())) {
1101 0 : errno = EFAULT;
1102 0 : EXIT_MUTEX ();
1103 : return -1;
1104 : }
1105 :
1106 : // Process pending commands, if any.
1107 864522 : int rc = process_commands (0, true);
1108 864522 : if (unlikely (rc != 0)) {
1109 0 : EXIT_MUTEX ();
1110 : return -1;
1111 : }
1112 :
1113 : // Clear any user-visible flags that are set on the message.
1114 864522 : msg_->reset_flags (msg_t::more);
1115 :
1116 : // At this point we impose the flags on the message.
1117 864522 : if (flags_ & ZMQ_SNDMORE)
1118 1177 : msg_->set_flags (msg_t::more);
1119 :
1120 864522 : msg_->reset_metadata ();
1121 :
1122 : // Try to send the message using method in each socket class
1123 864522 : rc = xsend (msg_);
1124 864522 : if (rc == 0) {
1125 851560 : EXIT_MUTEX ();
1126 : return 0;
1127 : }
1128 12962 : if (unlikely (errno != EAGAIN)) {
1129 25 : EXIT_MUTEX ();
1130 : return -1;
1131 : }
1132 :
1133 : // In case of non-blocking send we'll simply propagate
1134 : // the error - including EAGAIN - up the stack.
1135 12937 : if (flags_ & ZMQ_DONTWAIT || options.sndtimeo == 0) {
1136 11861 : EXIT_MUTEX ();
1137 : return -1;
1138 : }
1139 :
1140 : // Compute the time when the timeout should occur.
1141 : // If the timeout is infinite, don't care.
1142 1076 : int timeout = options.sndtimeo;
1143 1076 : uint64_t end = timeout < 0 ? 0 : (clock.now_ms () + timeout);
1144 :
1145 : // Oops, we couldn't send the message. Wait for the next
1146 : // command, process it and try to send the message again.
1147 : // If timeout is reached in the meantime, return EAGAIN.
1148 : while (true) {
1149 1433 : if (unlikely (process_commands (timeout, false) != 0)) {
1150 0 : EXIT_MUTEX ();
1151 : return -1;
1152 : }
1153 1433 : rc = xsend (msg_);
1154 1433 : if (rc == 0)
1155 : break;
1156 420 : if (unlikely (errno != EAGAIN)) {
1157 0 : EXIT_MUTEX ();
1158 : return -1;
1159 : }
1160 420 : if (timeout > 0) {
1161 63 : timeout = (int) (end - clock.now_ms ());
1162 63 : if (timeout <= 0) {
1163 63 : errno = EAGAIN;
1164 63 : EXIT_MUTEX ();
1165 : return -1;
1166 : }
1167 : }
1168 : }
1169 :
1170 1013 : EXIT_MUTEX ();
1171 : return 0;
1172 : }
1173 :
1174 3971399 : int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
1175 : {
1176 3971399 : ENTER_MUTEX ();
1177 :
1178 : // Check whether the library haven't been shut down yet.
1179 3970009 : if (unlikely (ctx_terminated)) {
1180 0 : errno = ETERM;
1181 0 : EXIT_MUTEX ();
1182 : return -1;
1183 : }
1184 :
1185 : // Check whether message passed to the function is valid.
1186 3970009 : if (unlikely (!msg_ || !msg_->check ())) {
1187 0 : errno = EFAULT;
1188 0 : EXIT_MUTEX ();
1189 : return -1;
1190 : }
1191 :
1192 : // Once every inbound_poll_rate messages check for signals and process
1193 : // incoming commands. This happens only if we are not polling altogether
1194 : // because there are messages available all the time. If poll occurs,
1195 : // ticks is set to zero and thus we avoid this code.
1196 : //
1197 : // Note that 'recv' uses different command throttling algorithm (the one
1198 : // described above) from the one used by 'send'. This is because counting
1199 : // ticks is more efficient than doing RDTSC all the time.
1200 3967580 : if (++ticks == inbound_poll_rate) {
1201 8493 : if (unlikely (process_commands (0, false) != 0)) {
1202 0 : EXIT_MUTEX ();
1203 : return -1;
1204 : }
1205 8493 : ticks = 0;
1206 : }
1207 :
1208 : // Get the message.
1209 3967580 : int rc = xrecv (msg_);
1210 3588989 : if (unlikely (rc != 0 && errno != EAGAIN)) {
1211 0 : EXIT_MUTEX ();
1212 : return -1;
1213 : }
1214 :
1215 : // If we have the message, return immediately.
1216 3589043 : if (rc == 0) {
1217 850909 : extract_flags (msg_);
1218 850909 : EXIT_MUTEX ();
1219 : return 0;
1220 : }
1221 :
1222 : // If the message cannot be fetched immediately, there are two scenarios.
1223 : // For non-blocking recv, commands are processed in case there's an
1224 : // activate_reader command already waiting int a command pipe.
1225 : // If it's not, return EAGAIN.
1226 2738134 : if (flags_ & ZMQ_DONTWAIT || options.rcvtimeo == 0) {
1227 2736980 : if (unlikely (process_commands (0, false) != 0)) {
1228 0 : EXIT_MUTEX ();
1229 : return -1;
1230 : }
1231 3149908 : ticks = 0;
1232 :
1233 3149908 : rc = xrecv (msg_);
1234 3116642 : if (rc < 0) {
1235 3116552 : EXIT_MUTEX ();
1236 3119671 : return rc;
1237 : }
1238 90 : extract_flags (msg_);
1239 :
1240 90 : EXIT_MUTEX ();
1241 : return 0;
1242 : }
1243 :
1244 : // Compute the time when the timeout should occur.
1245 : // If the timeout is infinite, don't care.
1246 1154 : int timeout = options.rcvtimeo;
1247 1154 : uint64_t end = timeout < 0 ? 0 : (clock.now_ms () + timeout);
1248 :
1249 : // In blocking scenario, commands are processed over and over again until
1250 : // we are able to fetch a message.
1251 1154 : bool block = (ticks != 0);
1252 : while (true) {
1253 2358 : if (unlikely (process_commands (block ? timeout : 0, false) != 0)) {
1254 15 : EXIT_MUTEX ();
1255 : return -1;
1256 : }
1257 2343 : rc = xrecv (msg_);
1258 2343 : if (rc == 0) {
1259 1058 : ticks = 0;
1260 : break;
1261 : }
1262 1285 : if (unlikely (errno != EAGAIN)) {
1263 0 : EXIT_MUTEX ();
1264 : return -1;
1265 : }
1266 1285 : block = true;
1267 1285 : if (timeout > 0) {
1268 698 : timeout = (int) (end - clock.now_ms ());
1269 698 : if (timeout <= 0) {
1270 81 : errno = EAGAIN;
1271 81 : EXIT_MUTEX ();
1272 : return -1;
1273 : }
1274 : }
1275 : }
1276 :
1277 1058 : extract_flags (msg_);
1278 1208 : EXIT_MUTEX ();
1279 : return 0;
1280 : }
1281 :
1282 11131 : int zmq::socket_base_t::close ()
1283 : {
1284 11131 : ENTER_MUTEX ();
1285 :
1286 : // Remove all existing signalers for thread safe sockets
1287 11131 : if (thread_safe)
1288 51 : ((mailbox_safe_t*)mailbox)->clear_signalers();
1289 :
1290 : // Mark the socket as dead
1291 11131 : tag = 0xdeadbeef;
1292 :
1293 11131 : EXIT_MUTEX ();
1294 :
1295 : // Transfer the ownership of the socket from this application thread
1296 : // to the reaper thread which will take care of the rest of shutdown
1297 : // process.
1298 11131 : send_reap (this);
1299 :
1300 11131 : return 0;
1301 : }
1302 :
1303 0 : bool zmq::socket_base_t::has_in ()
1304 : {
1305 1327171 : return xhas_in ();
1306 : }
1307 :
1308 0 : bool zmq::socket_base_t::has_out ()
1309 : {
1310 1327173 : return xhas_out ();
1311 : }
1312 :
1313 11131 : void zmq::socket_base_t::start_reaping (poller_t *poller_)
1314 : {
1315 : // Plug the socket to the reaper thread.
1316 11131 : poller = poller_;
1317 :
1318 : fd_t fd;
1319 :
1320 11131 : if (!thread_safe)
1321 11080 : fd = ((mailbox_t*)mailbox)->get_fd();
1322 : else {
1323 51 : ENTER_MUTEX ();
1324 :
1325 51 : reaper_signaler = new signaler_t();
1326 :
1327 : // Add signaler to the safe mailbox
1328 51 : fd = reaper_signaler->get_fd();
1329 51 : ((mailbox_safe_t*)mailbox)->add_signaler(reaper_signaler);
1330 :
1331 : // Send a signal to make sure reaper handle existing commands
1332 51 : reaper_signaler->send();
1333 :
1334 51 : EXIT_MUTEX ();
1335 : }
1336 :
1337 11131 : handle = poller->add_fd (fd, this);
1338 11131 : poller->set_pollin (handle);
1339 :
1340 : // Initialise the termination and check whether it can be deallocated
1341 : // immediately.
1342 11131 : terminate ();
1343 11131 : check_destroy ();
1344 11131 : }
1345 :
1346 4958756 : int zmq::socket_base_t::process_commands (int timeout_, bool throttle_)
1347 : {
1348 : int rc;
1349 : command_t cmd;
1350 4958756 : if (timeout_ != 0) {
1351 :
1352 : // If we are asked to wait, simply ask mailbox to wait.
1353 3687 : rc = mailbox->recv (&cmd, timeout_);
1354 : }
1355 : else {
1356 :
1357 : // If we are asked not to wait, check whether we haven't processed
1358 : // commands recently, so that we can throttle the new commands.
1359 :
1360 : // Get the CPU's tick counter. If 0, the counter is not available.
1361 4955069 : const uint64_t tsc = zmq::clock_t::rdtsc ();
1362 :
1363 : // Optimised version of command processing - it doesn't have to check
1364 : // for incoming commands each time. It does so only if certain time
1365 : // elapsed since last command processing. Command delay varies
1366 : // depending on CPU speed: It's ~1ms on 3GHz CPU, ~2ms on 1.5GHz CPU
1367 : // etc. The optimisation makes sense only on platforms where getting
1368 : // a timestamp is a very cheap operation (tens of nanoseconds).
1369 4955087 : if (tsc && throttle_) {
1370 :
1371 : // Check whether TSC haven't jumped backwards (in case of migration
1372 : // between CPU cores) and whether certain time have elapsed since
1373 : // last command processing. If it didn't do nothing.
1374 864522 : if (tsc >= last_tsc && tsc - last_tsc <= max_command_delay)
1375 : return 0;
1376 1756 : last_tsc = tsc;
1377 : }
1378 :
1379 : // Check whether there are any commands pending for this thread.
1380 4092321 : rc = mailbox->recv (&cmd, 0);
1381 : }
1382 :
1383 : // Process all available commands.
1384 4539117 : while (rc == 0) {
1385 29796 : cmd.destination->process_command (cmd);
1386 29794 : rc = mailbox->recv (&cmd, 0);
1387 : }
1388 :
1389 4509321 : if (errno == EINTR)
1390 : return -1;
1391 :
1392 4509318 : zmq_assert (errno == EAGAIN);
1393 :
1394 4509312 : if (ctx_terminated) {
1395 5924 : errno = ETERM;
1396 5924 : return -1;
1397 : }
1398 :
1399 : return 0;
1400 : }
1401 :
1402 2263 : void zmq::socket_base_t::process_stop ()
1403 : {
1404 : // Here, someone have called zmq_ctx_term while the socket was still alive.
1405 : // We'll remember the fact so that any blocking call is interrupted and any
1406 : // further attempt to use the socket will return ETERM. The user is still
1407 : // responsible for calling zmq_close on the socket though!
1408 2263 : stop_monitor ();
1409 2263 : ctx_terminated = true;
1410 2263 : }
1411 :
1412 4112 : void zmq::socket_base_t::process_bind (pipe_t *pipe_)
1413 : {
1414 4112 : attach_pipe (pipe_);
1415 4112 : }
1416 :
1417 11131 : void zmq::socket_base_t::process_term (int linger_)
1418 : {
1419 : // Unregister all inproc endpoints associated with this socket.
1420 : // Doing this we make sure that no new pipes from other sockets (inproc)
1421 : // will be initiated.
1422 11131 : unregister_endpoints (this);
1423 :
1424 : // Ask all attached pipes to terminate.
1425 33046 : for (pipes_t::size_type i = 0; i != pipes.size (); ++i)
1426 10784 : pipes [i]->terminate (false);
1427 22262 : register_term_acks ((int) pipes.size ());
1428 :
1429 : // Continue the termination process immediately.
1430 11131 : own_t::process_term (linger_);
1431 11131 : }
1432 :
1433 2397 : void zmq::socket_base_t::update_pipe_options(int option_)
1434 : {
1435 2397 : if (option_ == ZMQ_SNDHWM || option_ == ZMQ_RCVHWM)
1436 : {
1437 114 : for (pipes_t::size_type i = 0; i != pipes.size(); ++i)
1438 : {
1439 12 : pipes[i]->set_hwms(options.rcvhwm, options.sndhwm);
1440 : }
1441 : }
1442 :
1443 2397 : }
1444 :
1445 11131 : void zmq::socket_base_t::process_destroy ()
1446 : {
1447 11131 : destroyed = true;
1448 11131 : }
1449 :
1450 270 : int zmq::socket_base_t::xsetsockopt (int, const void *, size_t)
1451 : {
1452 270 : errno = EINVAL;
1453 270 : return -1;
1454 : }
1455 :
1456 9 : bool zmq::socket_base_t::xhas_out ()
1457 : {
1458 9 : return false;
1459 : }
1460 :
1461 0 : int zmq::socket_base_t::xsend (msg_t *)
1462 : {
1463 0 : errno = ENOTSUP;
1464 0 : return -1;
1465 : }
1466 :
1467 529850 : bool zmq::socket_base_t::xhas_in ()
1468 : {
1469 529850 : return false;
1470 : }
1471 :
1472 0 : int zmq::socket_base_t::xjoin (const char *group_)
1473 : {
1474 : LIBZMQ_UNUSED (group_);
1475 0 : errno = ENOTSUP;
1476 0 : return -1;
1477 : }
1478 :
1479 0 : int zmq::socket_base_t::xleave (const char *group_)
1480 : {
1481 : LIBZMQ_UNUSED (group_);
1482 0 : errno = ENOTSUP;
1483 0 : return -1;
1484 : }
1485 :
1486 0 : int zmq::socket_base_t::xrecv (msg_t *)
1487 : {
1488 0 : errno = ENOTSUP;
1489 0 : return -1;
1490 : }
1491 :
1492 0 : zmq::blob_t zmq::socket_base_t::get_credential () const
1493 : {
1494 0 : return blob_t ();
1495 : }
1496 :
1497 0 : void zmq::socket_base_t::xread_activated (pipe_t *)
1498 : {
1499 0 : zmq_assert (false);
1500 0 : }
1501 0 : void zmq::socket_base_t::xwrite_activated (pipe_t *)
1502 : {
1503 0 : zmq_assert (false);
1504 0 : }
1505 :
1506 0 : void zmq::socket_base_t::xhiccuped (pipe_t *)
1507 : {
1508 0 : zmq_assert (false);
1509 0 : }
1510 :
1511 13186 : void zmq::socket_base_t::in_event ()
1512 : {
1513 : // This function is invoked only once the socket is running in the context
1514 : // of the reaper thread. Process any commands from other threads/sockets
1515 : // that may be available at the moment. Ultimately, the socket will
1516 : // be destroyed.
1517 13186 : ENTER_MUTEX ();
1518 :
1519 : // If the socket is thread safe we need to unsignal the reaper signaler
1520 13186 : if (thread_safe)
1521 82 : reaper_signaler->recv();
1522 :
1523 13186 : process_commands (0, false);
1524 13186 : EXIT_MUTEX();
1525 13186 : check_destroy();
1526 13186 : }
1527 :
1528 0 : void zmq::socket_base_t::out_event ()
1529 : {
1530 0 : zmq_assert (false);
1531 0 : }
1532 :
1533 0 : void zmq::socket_base_t::timer_event (int)
1534 : {
1535 0 : zmq_assert (false);
1536 0 : }
1537 :
1538 24317 : void zmq::socket_base_t::check_destroy ()
1539 : {
1540 : // If the object was already marked as destroyed, finish the deallocation.
1541 24317 : if (destroyed) {
1542 :
1543 : // Remove the socket from the reaper's poller.
1544 11131 : poller->rm_fd (handle);
1545 :
1546 : // Remove the socket from the context.
1547 11131 : destroy_socket (this);
1548 :
1549 : // Notify the reaper about the fact.
1550 11131 : send_reaped ();
1551 :
1552 : // Deallocate.
1553 11131 : own_t::process_destroy ();
1554 : }
1555 24317 : }
1556 :
1557 1370 : void zmq::socket_base_t::read_activated (pipe_t *pipe_)
1558 : {
1559 1370 : xread_activated (pipe_);
1560 1370 : }
1561 :
1562 880 : void zmq::socket_base_t::write_activated (pipe_t *pipe_)
1563 : {
1564 880 : xwrite_activated (pipe_);
1565 880 : }
1566 :
1567 3 : void zmq::socket_base_t::hiccuped (pipe_t *pipe_)
1568 : {
1569 3 : if (options.immediate == 1)
1570 3 : pipe_->terminate (false);
1571 : else
1572 : // Notify derived sockets of the hiccup
1573 0 : xhiccuped (pipe_);
1574 3 : }
1575 :
1576 8417 : void zmq::socket_base_t::pipe_terminated (pipe_t *pipe_)
1577 : {
1578 : // Notify the specific socket type about the pipe termination.
1579 8417 : xpipe_terminated (pipe_);
1580 :
1581 : // Remove pipe from inproc pipes
1582 33668 : for (inprocs_t::iterator it = inprocs.begin (); it != inprocs.end (); ++it)
1583 564 : if (it->second == pipe_) {
1584 564 : inprocs.erase (it);
1585 : break;
1586 : }
1587 :
1588 : // Remove the pipe from the list of attached pipes and confirm its
1589 : // termination if we are already shutting down.
1590 8417 : pipes.erase (pipe_);
1591 8417 : if (is_terminating ())
1592 8215 : unregister_term_ack ();
1593 8417 : }
1594 :
1595 852057 : void zmq::socket_base_t::extract_flags (msg_t *msg_)
1596 : {
1597 : // Test whether IDENTITY flag is valid for this socket type.
1598 852057 : if (unlikely (msg_->flags () & msg_t::identity))
1599 0 : zmq_assert (options.recv_identity);
1600 :
1601 : // Remove MORE flag.
1602 852057 : rcvmore = msg_->flags () & msg_t::more ? true : false;
1603 852057 : }
1604 :
1605 27 : int zmq::socket_base_t::monitor (const char *addr_, int events_)
1606 : {
1607 27 : if (unlikely (ctx_terminated)) {
1608 0 : errno = ETERM;
1609 0 : return -1;
1610 : }
1611 : // Support deregistering monitoring endpoints as well
1612 27 : if (addr_ == NULL) {
1613 0 : stop_monitor ();
1614 : return 0;
1615 : }
1616 : // Parse addr_ string.
1617 : std::string protocol;
1618 : std::string address;
1619 27 : if (parse_uri (addr_, protocol, address) || check_protocol (protocol))
1620 : return -1;
1621 :
1622 : // Event notification only supported over inproc://
1623 27 : if (protocol != "inproc") {
1624 3 : errno = EPROTONOSUPPORT;
1625 3 : return -1;
1626 : }
1627 : // already monitoring. Stop previous monitor before starting new one.
1628 24 : if (monitor_socket != NULL) {
1629 0 : stop_monitor (true);
1630 : }
1631 : // Register events to monitor
1632 24 : monitor_events = events_;
1633 24 : monitor_socket = zmq_socket (get_ctx (), ZMQ_PAIR);
1634 24 : if (monitor_socket == NULL)
1635 : return -1;
1636 :
1637 : // Never block context termination on pending event messages
1638 24 : int linger = 0;
1639 24 : int rc = zmq_setsockopt (monitor_socket, ZMQ_LINGER, &linger, sizeof (linger));
1640 24 : if (rc == -1)
1641 0 : stop_monitor (false);
1642 :
1643 : // Spawn the monitor socket endpoint
1644 24 : rc = zmq_bind (monitor_socket, addr_);
1645 24 : if (rc == -1)
1646 0 : stop_monitor (false);
1647 24 : return rc;
1648 : }
1649 :
1650 3247 : void zmq::socket_base_t::event_connected (const std::string &addr_, int fd_)
1651 : {
1652 3247 : if (monitor_events & ZMQ_EVENT_CONNECTED)
1653 6 : monitor_event (ZMQ_EVENT_CONNECTED, fd_, addr_);
1654 3247 : }
1655 :
1656 3786 : void zmq::socket_base_t::event_connect_delayed (const std::string &addr_, int err_)
1657 : {
1658 3786 : if (monitor_events & ZMQ_EVENT_CONNECT_DELAYED)
1659 3 : monitor_event (ZMQ_EVENT_CONNECT_DELAYED, err_, addr_);
1660 3786 : }
1661 :
1662 324 : void zmq::socket_base_t::event_connect_retried (const std::string &addr_, int interval_)
1663 : {
1664 324 : if (monitor_events & ZMQ_EVENT_CONNECT_RETRIED)
1665 0 : monitor_event (ZMQ_EVENT_CONNECT_RETRIED, interval_, addr_);
1666 324 : }
1667 :
1668 342 : void zmq::socket_base_t::event_listening (const std::string &addr_, int fd_)
1669 : {
1670 342 : if (monitor_events & ZMQ_EVENT_LISTENING)
1671 3 : monitor_event (ZMQ_EVENT_LISTENING, fd_, addr_);
1672 342 : }
1673 :
1674 0 : void zmq::socket_base_t::event_bind_failed (const std::string &addr_, int err_)
1675 : {
1676 0 : if (monitor_events & ZMQ_EVENT_BIND_FAILED)
1677 0 : monitor_event (ZMQ_EVENT_BIND_FAILED, err_, addr_);
1678 0 : }
1679 :
1680 3515 : void zmq::socket_base_t::event_accepted (const std::string &addr_, int fd_)
1681 : {
1682 3515 : if (monitor_events & ZMQ_EVENT_ACCEPTED)
1683 21 : monitor_event (ZMQ_EVENT_ACCEPTED, fd_, addr_);
1684 3515 : }
1685 :
1686 9 : void zmq::socket_base_t::event_accept_failed (const std::string &addr_, int err_)
1687 : {
1688 9 : if (monitor_events & ZMQ_EVENT_ACCEPT_FAILED)
1689 0 : monitor_event (ZMQ_EVENT_ACCEPT_FAILED, err_, addr_);
1690 9 : }
1691 :
1692 944 : void zmq::socket_base_t::event_closed (const std::string &addr_, int fd_)
1693 : {
1694 944 : if (monitor_events & ZMQ_EVENT_CLOSED)
1695 3 : monitor_event (ZMQ_EVENT_CLOSED, fd_, addr_);
1696 944 : }
1697 :
1698 3 : void zmq::socket_base_t::event_close_failed (const std::string &addr_, int err_)
1699 : {
1700 3 : if (monitor_events & ZMQ_EVENT_CLOSE_FAILED)
1701 0 : monitor_event (ZMQ_EVENT_CLOSE_FAILED, err_, addr_);
1702 3 : }
1703 :
1704 2401 : void zmq::socket_base_t::event_disconnected (const std::string &addr_, int fd_)
1705 : {
1706 2401 : if (monitor_events & ZMQ_EVENT_DISCONNECTED)
1707 12 : monitor_event (ZMQ_EVENT_DISCONNECTED, fd_, addr_);
1708 2401 : }
1709 :
1710 : // Send a monitor event
1711 54 : void zmq::socket_base_t::monitor_event (int event_, int value_, const std::string &addr_)
1712 : {
1713 54 : if (monitor_socket) {
1714 : // Send event in first frame
1715 : zmq_msg_t msg;
1716 54 : zmq_msg_init_size (&msg, 6);
1717 54 : uint8_t *data = (uint8_t *) zmq_msg_data (&msg);
1718 : // Avoid dereferencing uint32_t on unaligned address
1719 54 : uint16_t event = (uint16_t) event_;
1720 54 : uint32_t value = (uint32_t) value_;
1721 : memcpy (data + 0, &event, sizeof(event));
1722 54 : memcpy (data + 2, &value, sizeof(value));
1723 54 : zmq_sendmsg (monitor_socket, &msg, ZMQ_SNDMORE);
1724 :
1725 : // Send address in second frame
1726 54 : zmq_msg_init_size (&msg, addr_.size());
1727 54 : memcpy (zmq_msg_data (&msg), addr_.c_str (), addr_.size ());
1728 54 : zmq_sendmsg (monitor_socket, &msg, 0);
1729 : }
1730 54 : }
1731 :
1732 13459 : void zmq::socket_base_t::stop_monitor (bool send_monitor_stopped_event_)
1733 : {
1734 13459 : if (monitor_socket) {
1735 24 : if ((monitor_events & ZMQ_EVENT_MONITOR_STOPPED) && send_monitor_stopped_event_)
1736 12 : monitor_event (ZMQ_EVENT_MONITOR_STOPPED, 0, "");
1737 24 : zmq_close (monitor_socket);
1738 24 : monitor_socket = NULL;
1739 24 : monitor_events = 0;
1740 : }
1741 13459 : }
|