libzmq  master
ZeroMQ C++ Core Engine (LIBZMQ)
socket_base.cpp
Go to the documentation of this file.
1 /*
2  Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 
4  This file is part of libzmq, the ZeroMQ core engine in C++.
5 
6  libzmq is free software; you can redistribute it and/or modify it under
7  the terms of the GNU Lesser General Public License (LGPL) as published
8  by the Free Software Foundation; either version 3 of the License, or
9  (at your option) any later version.
10 
11  As a special exception, the Contributors give you permission to link
12  this library with independent modules to produce an executable,
13  regardless of the license terms of these independent modules, and to
14  copy and distribute the resulting executable under terms of your choice,
15  provided that you also meet, for each linked independent module, the
16  terms and conditions of the license of that module. An independent
17  module is a module which is not derived from or based on this library.
18  If you modify this library, you must extend this exception to your
19  version of the library.
20 
21  libzmq is distributed in the hope that it will be useful, but WITHOUT
22  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23  FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24  License for more details.
25 
26  You should have received a copy of the GNU Lesser General Public License
27  along with this program. If not, see <http://www.gnu.org/licenses/>.
28 */
29 
30 #include "precompiled.hpp"
31 #include <new>
32 #include <string>
33 #include <algorithm>
34 
35 #include "macros.hpp"
36 #include "platform.hpp"
37 
38 #if defined ZMQ_HAVE_WINDOWS
39 #include "windows.hpp"
40 #if defined _MSC_VER
41 #if defined _WIN32_WCE
42 #include <cmnintrin.h>
43 #else
44 #include <intrin.h>
45 #endif
46 #endif
47 #else
48 #include <unistd.h>
49 #include <ctype.h>
50 #endif
51 
52 #include "socket_base.hpp"
53 #include "tcp_listener.hpp"
54 #include "ipc_listener.hpp"
55 #include "tipc_listener.hpp"
56 #include "tcp_connecter.hpp"
57 #include "io_thread.hpp"
58 #include "session_base.hpp"
59 #include "config.hpp"
60 #include "pipe.hpp"
61 #include "err.hpp"
62 #include "ctx.hpp"
63 #include "platform.hpp"
64 #include "likely.hpp"
65 #include "msg.hpp"
66 #include "address.hpp"
67 #include "ipc_address.hpp"
68 #include "tcp_address.hpp"
69 #include "udp_address.hpp"
70 #include "tipc_address.hpp"
71 #include "mailbox.hpp"
72 #include "mailbox_safe.hpp"
73 
74 #if defined ZMQ_HAVE_VMCI
75 #include "vmci_address.hpp"
76 #include "vmci_listener.hpp"
77 #endif
78 
79 #ifdef ZMQ_HAVE_OPENPGM
80 #include "pgm_socket.hpp"
81 #endif
82 
83 #include "pair.hpp"
84 #include "pub.hpp"
85 #include "sub.hpp"
86 #include "req.hpp"
87 #include "rep.hpp"
88 #include "pull.hpp"
89 #include "push.hpp"
90 #include "dealer.hpp"
91 #include "router.hpp"
92 #include "xpub.hpp"
93 #include "xsub.hpp"
94 #include "stream.hpp"
95 #include "server.hpp"
96 #include "client.hpp"
97 #include "radio.hpp"
98 #include "dish.hpp"
99 #include "gather.hpp"
100 #include "scatter.hpp"
101 
102 #define ENTER_MUTEX() \
103  if (thread_safe) \
104  sync.lock();
105 
106 #define EXIT_MUTEX(); \
107  if (thread_safe) \
108  sync.unlock();
109 
111 {
112  return tag == 0xbaddecaf;
113 }
114 
116  uint32_t tid_, int sid_)
117 {
118  socket_base_t *s = NULL;
119  switch (type_) {
120  case ZMQ_PAIR:
121  s = new (std::nothrow) pair_t (parent_, tid_, sid_);
122  break;
123  case ZMQ_PUB:
124  s = new (std::nothrow) pub_t (parent_, tid_, sid_);
125  break;
126  case ZMQ_SUB:
127  s = new (std::nothrow) sub_t (parent_, tid_, sid_);
128  break;
129  case ZMQ_REQ:
130  s = new (std::nothrow) req_t (parent_, tid_, sid_);
131  break;
132  case ZMQ_REP:
133  s = new (std::nothrow) rep_t (parent_, tid_, sid_);
134  break;
135  case ZMQ_DEALER:
136  s = new (std::nothrow) dealer_t (parent_, tid_, sid_);
137  break;
138  case ZMQ_ROUTER:
139  s = new (std::nothrow) router_t (parent_, tid_, sid_);
140  break;
141  case ZMQ_PULL:
142  s = new (std::nothrow) pull_t (parent_, tid_, sid_);
143  break;
144  case ZMQ_PUSH:
145  s = new (std::nothrow) push_t (parent_, tid_, sid_);
146  break;
147  case ZMQ_XPUB:
148  s = new (std::nothrow) xpub_t (parent_, tid_, sid_);
149  break;
150  case ZMQ_XSUB:
151  s = new (std::nothrow) xsub_t (parent_, tid_, sid_);
152  break;
153  case ZMQ_STREAM:
154  s = new (std::nothrow) stream_t (parent_, tid_, sid_);
155  break;
156  case ZMQ_SERVER:
157  s = new (std::nothrow) server_t (parent_, tid_, sid_);
158  break;
159  case ZMQ_CLIENT:
160  s = new (std::nothrow) client_t (parent_, tid_, sid_);
161  break;
162  case ZMQ_RADIO:
163  s = new (std::nothrow) radio_t (parent_, tid_, sid_);
164  break;
165  case ZMQ_DISH:
166  s = new (std::nothrow) dish_t (parent_, tid_, sid_);
167  break;
168  case ZMQ_GATHER:
169  s = new (std::nothrow) gather_t (parent_, tid_, sid_);
170  break;
171  case ZMQ_SCATTER:
172  s = new (std::nothrow) scatter_t (parent_, tid_, sid_);
173  break;
174  default:
175  errno = EINVAL;
176  return NULL;
177  }
178 
179  alloc_assert (s);
180 
181  if (s->mailbox == NULL) {
182  s->destroyed = true;
183  LIBZMQ_DELETE(s);
184  return NULL;
185  }
186 
187  return s;
188 }
189 
190 zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_, int sid_, bool thread_safe_) :
191  own_t (parent_, tid_),
192  tag (0xbaddecaf),
193  ctx_terminated (false),
194  destroyed (false),
195  poller(NULL),
196  handle(NULL),
197  last_tsc (0),
198  ticks (0),
199  rcvmore (false),
200  monitor_socket (NULL),
201  monitor_events (0),
202  thread_safe (thread_safe_),
203  reaper_signaler (NULL)
204 {
205  options.socket_id = sid_;
206  options.ipv6 = (parent_->get (ZMQ_IPV6) != 0);
207  options.linger = parent_->get (ZMQ_BLOCKY)? -1: 0;
208 
209  if (thread_safe)
210  mailbox = new mailbox_safe_t(&sync);
211  else {
212  mailbox_t *m = new mailbox_t();
213  if (m->get_fd () != retired_fd)
214  mailbox = m;
215  else {
216  LIBZMQ_DELETE (m);
217  mailbox = NULL;
218  }
219  }
220 }
221 
223 {
224  if (mailbox)
226 
227  if (reaper_signaler)
229 
230  stop_monitor ();
232 }
233 
235 {
236  return mailbox;
237 }
238 
240 {
241  // Called by ctx when it is terminated (zmq_ctx_term).
242  // 'stop' command is sent from the threads that called zmq_ctx_term to
243  // the thread owning the socket. This way, blocking call in the
244  // owner thread can be interrupted.
245  send_stop ();
246 }
247 
248 int zmq::socket_base_t::parse_uri (const char *uri_,
249  std::string &protocol_, std::string &address_)
250 {
251  zmq_assert (uri_ != NULL);
252 
253  std::string uri (uri_);
254  std::string::size_type pos = uri.find ("://");
255  if (pos == std::string::npos) {
256  errno = EINVAL;
257  return -1;
258  }
259  protocol_ = uri.substr (0, pos);
260  address_ = uri.substr (pos + 3);
261 
262  if (protocol_.empty () || address_.empty ()) {
263  errno = EINVAL;
264  return -1;
265  }
266  return 0;
267 }
268 
269 int zmq::socket_base_t::check_protocol (const std::string &protocol_)
270 {
271  // First check out whether the protocol is something we are aware of.
272  if (protocol_ != "inproc"
273 #if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
274  && protocol_ != "ipc"
275 #endif
276  && protocol_ != "tcp"
277 #if defined ZMQ_HAVE_OPENPGM
278  // pgm/epgm transports only available if 0MQ is compiled with OpenPGM.
279  && protocol_ != "pgm"
280  && protocol_ != "epgm"
281 #endif
282 #if defined ZMQ_HAVE_TIPC
283  // TIPC transport is only available on Linux.
284  && protocol_ != "tipc"
285 #endif
286 #if defined ZMQ_HAVE_NORM
287  && protocol_ != "norm"
288 #endif
289 #if defined ZMQ_HAVE_VMCI
290  && protocol_ != "vmci"
291 #endif
292  && protocol_ != "udp") {
293  errno = EPROTONOSUPPORT;
294  return -1;
295  }
296 
297  // Check whether socket type and transport protocol match.
298  // Specifically, multicast protocols can't be combined with
299  // bi-directional messaging patterns (socket types).
300 #if defined ZMQ_HAVE_OPENPGM || defined ZMQ_HAVE_NORM
301  if ((protocol_ == "pgm" || protocol_ == "epgm" || protocol_ == "norm") &&
302  options.type != ZMQ_PUB && options.type != ZMQ_SUB &&
304  errno = ENOCOMPATPROTO;
305  return -1;
306  }
307 #endif
308 
309  if (protocol_ == "udp" && (options.type != ZMQ_DISH &&
310  options.type != ZMQ_RADIO)) {
311  errno = ENOCOMPATPROTO;
312  return -1;
313  }
314 
315  // Protocol is available.
316  return 0;
317 }
318 
319 void zmq::socket_base_t::attach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
320 {
321  // First, register the pipe so that we can terminate it later on.
322  pipe_->set_event_sink (this);
323  pipes.push_back (pipe_);
324 
325  // Let the derived socket type know about new pipe.
326  xattach_pipe (pipe_, subscribe_to_all_);
327 
328  // If the socket is already being closed, ask any new pipes to terminate
329  // straight away.
330  if (is_terminating ()) {
331  register_term_acks (1);
332  pipe_->terminate (false);
333  }
334 }
335 
336 int zmq::socket_base_t::setsockopt (int option_, const void *optval_,
337  size_t optvallen_)
338 {
339  ENTER_MUTEX ();
340 
341  if (!options.is_valid(option_)) {
342  errno = EINVAL;
343  EXIT_MUTEX ();
344  return -1;
345  }
346 
347  if (unlikely (ctx_terminated)) {
348  errno = ETERM;
349  EXIT_MUTEX ();
350  return -1;
351  }
352 
353  // First, check whether specific socket type overloads the option.
354  int rc = xsetsockopt (option_, optval_, optvallen_);
355  if (rc == 0 || errno != EINVAL) {
356  EXIT_MUTEX ();
357  return rc;
358  }
359 
360  // If the socket type doesn't support the option, pass it to
361  // the generic option parser.
362  rc = options.setsockopt (option_, optval_, optvallen_);
363  update_pipe_options(option_);
364 
365  EXIT_MUTEX ();
366  return rc;
367 }
368 
369 int zmq::socket_base_t::getsockopt (int option_, void *optval_,
370  size_t *optvallen_)
371 {
372  ENTER_MUTEX ();
373 
374  if (unlikely (ctx_terminated)) {
375  errno = ETERM;
376  EXIT_MUTEX ();
377  return -1;
378  }
379 
380  if (option_ == ZMQ_RCVMORE) {
381  if (*optvallen_ < sizeof (int)) {
382  errno = EINVAL;
383  EXIT_MUTEX ();
384  return -1;
385  }
386  memset(optval_, 0, *optvallen_);
387  *((int*) optval_) = rcvmore ? 1 : 0;
388  *optvallen_ = sizeof (int);
389  EXIT_MUTEX ();
390  return 0;
391  }
392 
393  if (option_ == ZMQ_FD) {
394  if (*optvallen_ < sizeof (fd_t)) {
395  errno = EINVAL;
396  EXIT_MUTEX ();
397  return -1;
398  }
399 
400  if (thread_safe) {
401  // thread safe socket doesn't provide file descriptor
402  errno = EINVAL;
403  EXIT_MUTEX ();
404  return -1;
405  }
406 
407  *((fd_t*)optval_) = ((mailbox_t*)mailbox)->get_fd();
408  *optvallen_ = sizeof(fd_t);
409 
410  EXIT_MUTEX ();
411  return 0;
412  }
413 
414  if (option_ == ZMQ_EVENTS) {
415  if (*optvallen_ < sizeof (int)) {
416  errno = EINVAL;
417  EXIT_MUTEX ();
418  return -1;
419  }
420  int rc = process_commands (0, false);
421  if (rc != 0 && (errno == EINTR || errno == ETERM)) {
422  EXIT_MUTEX ();
423  return -1;
424  }
425  errno_assert (rc == 0);
426  *((int*) optval_) = 0;
427  if (has_out ())
428  *((int*) optval_) |= ZMQ_POLLOUT;
429  if (has_in ())
430  *((int*) optval_) |= ZMQ_POLLIN;
431  *optvallen_ = sizeof (int);
432  EXIT_MUTEX ();
433  return 0;
434  }
435 
436  if (option_ == ZMQ_LAST_ENDPOINT) {
437  if (*optvallen_ < last_endpoint.size () + 1) {
438  errno = EINVAL;
439  EXIT_MUTEX ();
440  return -1;
441  }
442  strncpy(static_cast <char *> (optval_), last_endpoint.c_str(), last_endpoint.size() + 1);
443  *optvallen_ = last_endpoint.size () + 1;
444  EXIT_MUTEX ();
445  return 0;
446  }
447 
448  if (option_ == ZMQ_THREAD_SAFE) {
449  if (*optvallen_ < sizeof (int)) {
450  errno = EINVAL;
451  EXIT_MUTEX ();
452  return -1;
453  }
454  memset(optval_, 0, *optvallen_);
455  *((int*) optval_) = thread_safe ? 1 : 0;
456  *optvallen_ = sizeof (int);
457  EXIT_MUTEX ();
458  return 0;
459  }
460 
461  int rc = options.getsockopt (option_, optval_, optvallen_);
462  EXIT_MUTEX ();
463  return rc;
464 }
465 
466 int zmq::socket_base_t::join (const char* group_)
467 {
468  ENTER_MUTEX ();
469 
470  int rc = xjoin (group_);
471 
472  EXIT_MUTEX();
473 
474  return rc;
475 }
476 
477 int zmq::socket_base_t::leave (const char* group_)
478 {
479  ENTER_MUTEX ();
480 
481  int rc = xleave (group_);
482 
483  EXIT_MUTEX();
484 
485  return rc;
486 }
487 
489 {
490  ENTER_MUTEX ();
491 
492  if (!thread_safe) {
493  errno = EINVAL;
494  EXIT_MUTEX ();
495  return -1;
496  }
497 
498  ((mailbox_safe_t*)mailbox)->add_signaler(s_);
499 
500  EXIT_MUTEX ();
501  return 0;
502 }
503 
505 {
506  ENTER_MUTEX ();
507 
508  if (!thread_safe) {
509  errno = EINVAL;
510  EXIT_MUTEX ();
511  return -1;
512  }
513 
514  ((mailbox_safe_t*)mailbox)->remove_signaler(s_);
515 
516  EXIT_MUTEX ();
517  return 0;
518 }
519 
520 int zmq::socket_base_t::bind (const char *addr_)
521 {
522  ENTER_MUTEX ();
523 
524  if (unlikely (ctx_terminated)) {
525  errno = ETERM;
526  EXIT_MUTEX ();
527  return -1;
528  }
529 
530  // Process pending commands, if any.
531  int rc = process_commands (0, false);
532  if (unlikely (rc != 0)) {
533  EXIT_MUTEX ();
534  return -1;
535  }
536 
537  // Parse addr_ string.
538  std::string protocol;
539  std::string address;
540  if (parse_uri (addr_, protocol, address) || check_protocol (protocol)) {
541  EXIT_MUTEX ();
542  return -1;
543  }
544 
545  if (protocol == "inproc") {
546  const endpoint_t endpoint = { this, options };
547  rc = register_endpoint (addr_, endpoint);
548  if (rc == 0) {
549  connect_pending (addr_, this);
550  last_endpoint.assign (addr_);
551  options.connected = true;
552  }
553  EXIT_MUTEX ();
554  return rc;
555  }
556 
557  if (protocol == "pgm" || protocol == "epgm" || protocol == "norm" || protocol == "udp") {
558  // For convenience's sake, bind can be used interchangeable with
559  // connect for PGM, EPGM, NORM and UDP transports.
560  EXIT_MUTEX ();
561  rc = connect (addr_);
562  if (rc != -1)
563  options.connected = true;
564  return rc;
565  }
566 
567  // Remaining transports require to be run in an I/O thread, so at this
568  // point we'll choose one.
570  if (!io_thread) {
571  errno = EMTHREAD;
572  EXIT_MUTEX ();
573  return -1;
574  }
575 
576  if (protocol == "tcp") {
577  tcp_listener_t *listener = new (std::nothrow) tcp_listener_t (
578  io_thread, this, options);
579  alloc_assert (listener);
580  rc = listener->set_address (address.c_str ());
581  if (rc != 0) {
582  LIBZMQ_DELETE(listener);
583  event_bind_failed (address, zmq_errno());
584  EXIT_MUTEX ();
585  return -1;
586  }
587 
588  // Save last endpoint URI
589  listener->get_address (last_endpoint);
590 
591  add_endpoint (last_endpoint.c_str (), (own_t *) listener, NULL);
592  options.connected = true;
593  EXIT_MUTEX ();
594  return 0;
595  }
596 
597 #if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
598  if (protocol == "ipc") {
599  ipc_listener_t *listener = new (std::nothrow) ipc_listener_t (
600  io_thread, this, options);
601  alloc_assert (listener);
602  int rc = listener->set_address (address.c_str ());
603  if (rc != 0) {
604  LIBZMQ_DELETE(listener);
605  event_bind_failed (address, zmq_errno());
606  EXIT_MUTEX ();
607  return -1;
608  }
609 
610  // Save last endpoint URI
611  listener->get_address (last_endpoint);
612 
613  add_endpoint (last_endpoint.c_str (), (own_t *) listener, NULL);
614  options.connected = true;
615  EXIT_MUTEX ();
616  return 0;
617  }
618 #endif
619 #if defined ZMQ_HAVE_TIPC
620  if (protocol == "tipc") {
621  tipc_listener_t *listener = new (std::nothrow) tipc_listener_t (
622  io_thread, this, options);
623  alloc_assert (listener);
624  int rc = listener->set_address (address.c_str ());
625  if (rc != 0) {
626  LIBZMQ_DELETE(listener);
627  event_bind_failed (address, zmq_errno());
628  EXIT_MUTEX ();
629  return -1;
630  }
631 
632  // Save last endpoint URI
633  listener->get_address (last_endpoint);
634 
635  add_endpoint (addr_, (own_t *) listener, NULL);
636  options.connected = true;
637  EXIT_MUTEX ();
638  return 0;
639  }
640 #endif
641 #if defined ZMQ_HAVE_VMCI
642  if (protocol == "vmci") {
643  vmci_listener_t *listener = new (std::nothrow) vmci_listener_t (
644  io_thread, this, options);
645  alloc_assert (listener);
646  int rc = listener->set_address (address.c_str ());
647  if (rc != 0) {
648  LIBZMQ_DELETE(listener);
649  event_bind_failed (address, zmq_errno ());
650  EXIT_MUTEX ();
651  return -1;
652  }
653 
654  listener->get_address (last_endpoint);
655 
656  add_endpoint (last_endpoint.c_str(), (own_t *) listener, NULL);
657  options.connected = true;
658  EXIT_MUTEX ();
659  return 0;
660  }
661 #endif
662 
663  EXIT_MUTEX ();
664  zmq_assert (false);
665  return -1;
666 }
667 
668 int zmq::socket_base_t::connect (const char *addr_)
669 {
670  ENTER_MUTEX ();
671 
672  if (unlikely (ctx_terminated)) {
673  errno = ETERM;
674  EXIT_MUTEX ();
675  return -1;
676  }
677 
678  // Process pending commands, if any.
679  int rc = process_commands (0, false);
680  if (unlikely (rc != 0)) {
681  EXIT_MUTEX ();
682  return -1;
683  }
684 
685  // Parse addr_ string.
686  std::string protocol;
687  std::string address;
688  if (parse_uri (addr_, protocol, address) || check_protocol (protocol)) {
689  EXIT_MUTEX ();
690  return -1;
691  }
692 
693  if (protocol == "inproc") {
694 
695  // TODO: inproc connect is specific with respect to creating pipes
696  // as there's no 'reconnect' functionality implemented. Once that
697  // is in place we should follow generic pipe creation algorithm.
698 
699  // Find the peer endpoint.
700  endpoint_t peer = find_endpoint (addr_);
701 
702  // The total HWM for an inproc connection should be the sum of
703  // the binder's HWM and the connector's HWM.
704  int sndhwm = 0;
705  if (peer.socket == NULL)
706  sndhwm = options.sndhwm;
707  else if (options.sndhwm != 0 && peer.options.rcvhwm != 0)
708  sndhwm = options.sndhwm + peer.options.rcvhwm;
709  int rcvhwm = 0;
710  if (peer.socket == NULL)
711  rcvhwm = options.rcvhwm;
712  else
713  if (options.rcvhwm != 0 && peer.options.sndhwm != 0)
714  rcvhwm = options.rcvhwm + peer.options.sndhwm;
715 
716  // Create a bi-directional pipe to connect the peers.
717  object_t *parents [2] = {this, peer.socket == NULL ? this : peer.socket};
718  pipe_t *new_pipes [2] = {NULL, NULL};
719 
720  bool conflate = options.conflate &&
721  (options.type == ZMQ_DEALER ||
722  options.type == ZMQ_PULL ||
723  options.type == ZMQ_PUSH ||
724  options.type == ZMQ_PUB ||
725  options.type == ZMQ_SUB);
726 
727  int hwms [2] = {conflate? -1 : sndhwm, conflate? -1 : rcvhwm};
728  bool conflates [2] = {conflate, conflate};
729  rc = pipepair (parents, new_pipes, hwms, conflates);
730  if (!conflate) {
731  new_pipes[0]->set_hwms_boost(peer.options.sndhwm, peer.options.rcvhwm);
732  new_pipes[1]->set_hwms_boost(options.sndhwm, options.rcvhwm);
733  }
734 
735  errno_assert (rc == 0);
736 
737  if (!peer.socket) {
738  // The peer doesn't exist yet so we don't know whether
739  // to send the identity message or not. To resolve this,
740  // we always send our identity and drop it later if
741  // the peer doesn't expect it.
742  msg_t id;
743  rc = id.init_size (options.identity_size);
744  errno_assert (rc == 0);
745  memcpy (id.data (), options.identity, options.identity_size);
746  id.set_flags (msg_t::identity);
747  bool written = new_pipes [0]->write (&id);
748  zmq_assert (written);
749  new_pipes [0]->flush ();
750 
751  const endpoint_t endpoint = {this, options};
752  pend_connection (std::string (addr_), endpoint, new_pipes);
753  }
754  else {
755  // If required, send the identity of the local socket to the peer.
756  if (peer.options.recv_identity) {
757  msg_t id;
758  rc = id.init_size (options.identity_size);
759  errno_assert (rc == 0);
760  memcpy (id.data (), options.identity, options.identity_size);
761  id.set_flags (msg_t::identity);
762  bool written = new_pipes [0]->write (&id);
763  zmq_assert (written);
764  new_pipes [0]->flush ();
765  }
766 
767  // If required, send the identity of the peer to the local socket.
768  if (options.recv_identity) {
769  msg_t id;
770  rc = id.init_size (peer.options.identity_size);
771  errno_assert (rc == 0);
772  memcpy (id.data (), peer.options.identity, peer.options.identity_size);
773  id.set_flags (msg_t::identity);
774  bool written = new_pipes [1]->write (&id);
775  zmq_assert (written);
776  new_pipes [1]->flush ();
777  }
778 
779  // Attach remote end of the pipe to the peer socket. Note that peer's
780  // seqnum was incremented in find_endpoint function. We don't need it
781  // increased here.
782  send_bind (peer.socket, new_pipes [1], false);
783  }
784 
785  // Attach local end of the pipe to this socket object.
786  attach_pipe (new_pipes [0]);
787 
788  // Save last endpoint URI
789  last_endpoint.assign (addr_);
790 
791  // remember inproc connections for disconnect
792  inprocs.insert (inprocs_t::value_type (std::string (addr_), new_pipes [0]));
793 
794  options.connected = true;
795  EXIT_MUTEX ();
796  return 0;
797  }
798  bool is_single_connect = (options.type == ZMQ_DEALER ||
799  options.type == ZMQ_SUB ||
800  options.type == ZMQ_REQ);
801  if (unlikely (is_single_connect)) {
802  const endpoints_t::iterator it = endpoints.find (addr_);
803  if (it != endpoints.end ()) {
804  // There is no valid use for multiple connects for SUB-PUB nor
805  // DEALER-ROUTER nor REQ-REP. Multiple connects produces
806  // nonsensical results.
807  EXIT_MUTEX ();
808  return 0;
809  }
810  }
811 
812  // Choose the I/O thread to run the session in.
814  if (!io_thread) {
815  errno = EMTHREAD;
816  EXIT_MUTEX ();
817  return -1;
818  }
819 
820  address_t *paddr = new (std::nothrow) address_t (protocol, address, this->get_ctx ());
821  alloc_assert (paddr);
822 
823  // Resolve address (if needed by the protocol)
824  if (protocol == "tcp") {
825  // Do some basic sanity checks on tcp:// address syntax
826  // - hostname starts with digit or letter, with embedded '-' or '.'
827  // - IPv6 address may contain hex chars and colons.
828  // - IPv6 link local address may contain % followed by interface name / zone_id
829  // (Reference: https://tools.ietf.org/html/rfc4007)
830  // - IPv4 address may contain decimal digits and dots.
831  // - Address must end in ":port" where port is *, or numeric
832  // - Address may contain two parts separated by ':'
833  // Following code is quick and dirty check to catch obvious errors,
834  // without trying to be fully accurate.
835  const char *check = address.c_str ();
836  if (isalnum (*check) || isxdigit (*check) || *check == '[') {
837  check++;
838  while (isalnum (*check)
839  || isxdigit (*check)
840  || *check == '.' || *check == '-' || *check == ':' || *check == '%'
841  || *check == ';' || *check == ']' || *check == '_'
842  ) {
843  check++;
844  }
845  }
846  // Assume the worst, now look for success
847  rc = -1;
848  // Did we reach the end of the address safely?
849  if (*check == 0) {
850  // Do we have a valid port string? (cannot be '*' in connect
851  check = strrchr (address.c_str (), ':');
852  if (check) {
853  check++;
854  if (*check && (isdigit (*check)))
855  rc = 0; // Valid
856  }
857  }
858  if (rc == -1) {
859  errno = EINVAL;
860  LIBZMQ_DELETE(paddr);
861  EXIT_MUTEX ();
862  return -1;
863  }
864  // Defer resolution until a socket is opened
865  paddr->resolved.tcp_addr = NULL;
866  }
867 #if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
868  else
869  if (protocol == "ipc") {
870  paddr->resolved.ipc_addr = new (std::nothrow) ipc_address_t ();
871  alloc_assert (paddr->resolved.ipc_addr);
872  int rc = paddr->resolved.ipc_addr->resolve (address.c_str ());
873  if (rc != 0) {
874  LIBZMQ_DELETE(paddr);
875  EXIT_MUTEX ();
876  return -1;
877  }
878  }
879 #endif
880 
881 if (protocol == "udp") {
882  paddr->resolved.udp_addr = new (std::nothrow) udp_address_t ();
883  alloc_assert (paddr->resolved.udp_addr);
884  rc = paddr->resolved.udp_addr->resolve (address.c_str(), options.type == ZMQ_DISH);
885  if (rc != 0) {
886  LIBZMQ_DELETE(paddr);
887  EXIT_MUTEX ();
888  return -1;
889  }
890 }
891 
892 // TBD - Should we check address for ZMQ_HAVE_NORM???
893 
894 #ifdef ZMQ_HAVE_OPENPGM
895  if (protocol == "pgm" || protocol == "epgm") {
896  struct pgm_addrinfo_t *res = NULL;
897  uint16_t port_number = 0;
898  int rc = pgm_socket_t::init_address(address.c_str(), &res, &port_number);
899  if (res != NULL)
900  pgm_freeaddrinfo (res);
901  if (rc != 0 || port_number == 0) {
902  EXIT_MUTEX ();
903  return -1;
904  }
905  }
906 #endif
907 #if defined ZMQ_HAVE_TIPC
908  else
909  if (protocol == "tipc") {
910  paddr->resolved.tipc_addr = new (std::nothrow) tipc_address_t ();
911  alloc_assert (paddr->resolved.tipc_addr);
912  int rc = paddr->resolved.tipc_addr->resolve (address.c_str());
913  if (rc != 0) {
914  LIBZMQ_DELETE(paddr);
915  EXIT_MUTEX ();
916  return -1;
917  }
918  }
919 #endif
920 #if defined ZMQ_HAVE_VMCI
921  else
922  if (protocol == "vmci") {
923  paddr->resolved.vmci_addr = new (std::nothrow) vmci_address_t (this->get_ctx ());
924  alloc_assert (paddr->resolved.vmci_addr);
925  int rc = paddr->resolved.vmci_addr->resolve (address.c_str ());
926  if (rc != 0) {
927  LIBZMQ_DELETE(paddr);
928  EXIT_MUTEX ();
929  return -1;
930  }
931  }
932 #endif
933 
934  // Create session.
935  session_base_t *session = session_base_t::create (io_thread, true, this,
936  options, paddr);
937  errno_assert (session);
938 
939  // PGM does not support subscription forwarding; ask for all data to be
940  // sent to this pipe. (same for NORM, currently?)
941  bool subscribe_to_all = protocol == "pgm" || protocol == "epgm" || protocol == "norm" || protocol == "udp";
942  pipe_t *newpipe = NULL;
943 
944  if (options.immediate != 1 || subscribe_to_all) {
945  // Create a bi-directional pipe.
946  object_t *parents [2] = {this, session};
947  pipe_t *new_pipes [2] = {NULL, NULL};
948 
949  bool conflate = options.conflate &&
950  (options.type == ZMQ_DEALER ||
951  options.type == ZMQ_PULL ||
952  options.type == ZMQ_PUSH ||
953  options.type == ZMQ_PUB ||
954  options.type == ZMQ_SUB);
955 
956  int hwms [2] = {conflate? -1 : options.sndhwm,
957  conflate? -1 : options.rcvhwm};
958  bool conflates [2] = {conflate, conflate};
959  rc = pipepair (parents, new_pipes, hwms, conflates);
960  errno_assert (rc == 0);
961 
962  // Attach local end of the pipe to the socket object.
963  attach_pipe (new_pipes [0], subscribe_to_all);
964  newpipe = new_pipes [0];
965 
966  // Attach remote end of the pipe to the session object later on.
967  session->attach_pipe (new_pipes [1]);
968  }
969 
970  // Save last endpoint URI
971  paddr->to_string (last_endpoint);
972 
973  add_endpoint (addr_, (own_t *) session, newpipe);
974  EXIT_MUTEX ();
975  return 0;
976 }
977 
978 void zmq::socket_base_t::add_endpoint (const char *addr_, own_t *endpoint_, pipe_t *pipe)
979 {
980  // Activate the session. Make it a child of this socket.
981  launch_child (endpoint_);
982  endpoints.insert (endpoints_t::value_type (std::string (addr_), endpoint_pipe_t (endpoint_, pipe)));
983 }
984 
985 int zmq::socket_base_t::term_endpoint (const char *addr_)
986 {
987  ENTER_MUTEX ();
988 
989  // Check whether the library haven't been shut down yet.
990  if (unlikely (ctx_terminated)) {
991  errno = ETERM;
992  EXIT_MUTEX ();
993  return -1;
994  }
995 
996  // Check whether endpoint address passed to the function is valid.
997  if (unlikely (!addr_)) {
998  errno = EINVAL;
999  EXIT_MUTEX ();
1000  return -1;
1001  }
1002 
1003  // Process pending commands, if any, since there could be pending unprocessed process_own()'s
1004  // (from launch_child() for example) we're asked to terminate now.
1005  int rc = process_commands (0, false);
1006  if (unlikely(rc != 0)) {
1007  EXIT_MUTEX ();
1008  return -1;
1009  }
1010 
1011  // Parse addr_ string.
1012  std::string protocol;
1013  std::string address;
1014  if (parse_uri(addr_, protocol, address) || check_protocol(protocol)) {
1015  EXIT_MUTEX ();
1016  return -1;
1017  }
1018 
1019  // Disconnect an inproc socket
1020  if (protocol == "inproc") {
1021  if (unregister_endpoint (std::string(addr_), this) == 0) {
1022  EXIT_MUTEX ();
1023  return 0;
1024  }
1025  std::pair <inprocs_t::iterator, inprocs_t::iterator> range = inprocs.equal_range (std::string (addr_));
1026  if (range.first == range.second) {
1027  errno = ENOENT;
1028  EXIT_MUTEX ();
1029  return -1;
1030  }
1031 
1032  for (inprocs_t::iterator it = range.first; it != range.second; ++it)
1033  it->second->terminate (true);
1034  inprocs.erase (range.first, range.second);
1035  EXIT_MUTEX ();
1036  return 0;
1037  }
1038 
1039  std::string resolved_addr = std::string (addr_);
1040  std::pair <endpoints_t::iterator, endpoints_t::iterator> range;
1041 
1042  // The resolved last_endpoint is used as a key in the endpoints map.
1043  // The address passed by the user might not match in the TCP case due to
1044  // IPv4-in-IPv6 mapping (EG: tcp://[::ffff:127.0.0.1]:9999), so try to
1045  // resolve before giving up. Given at this stage we don't know whether a
1046  // socket is connected or bound, try with both.
1047  if (protocol == "tcp") {
1048  range = endpoints.equal_range (resolved_addr);
1049  if (range.first == range.second) {
1050  tcp_address_t *tcp_addr = new (std::nothrow) tcp_address_t ();
1051  alloc_assert (tcp_addr);
1052  rc = tcp_addr->resolve (address.c_str (), false, options.ipv6);
1053 
1054  if (rc == 0) {
1055  tcp_addr->to_string (resolved_addr);
1056  range = endpoints.equal_range (resolved_addr);
1057 
1058  if (range.first == range.second) {
1059  rc = tcp_addr->resolve (address.c_str (), true, options.ipv6);
1060  if (rc == 0) {
1061  tcp_addr->to_string (resolved_addr);
1062  }
1063  }
1064  }
1065  LIBZMQ_DELETE(tcp_addr);
1066  }
1067  }
1068 
1069  // Find the endpoints range (if any) corresponding to the addr_ string.
1070  range = endpoints.equal_range (resolved_addr);
1071  if (range.first == range.second) {
1072  errno = ENOENT;
1073  EXIT_MUTEX ();
1074  return -1;
1075  }
1076 
1077  for (endpoints_t::iterator it = range.first; it != range.second; ++it) {
1078  // If we have an associated pipe, terminate it.
1079  if (it->second.second != NULL)
1080  it->second.second->terminate (false);
1081  term_child (it->second.first);
1082  }
1083  endpoints.erase (range.first, range.second);
1084  EXIT_MUTEX ();
1085  return 0;
1086 }
1087 
1088 int zmq::socket_base_t::send (msg_t *msg_, int flags_)
1089 {
1090  ENTER_MUTEX ();
1091 
1092  // Check whether the library haven't been shut down yet.
1093  if (unlikely (ctx_terminated)) {
1094  errno = ETERM;
1095  EXIT_MUTEX ();
1096  return -1;
1097  }
1098 
1099  // Check whether message passed to the function is valid.
1100  if (unlikely (!msg_ || !msg_->check ())) {
1101  errno = EFAULT;
1102  EXIT_MUTEX ();
1103  return -1;
1104  }
1105 
1106  // Process pending commands, if any.
1107  int rc = process_commands (0, true);
1108  if (unlikely (rc != 0)) {
1109  EXIT_MUTEX ();
1110  return -1;
1111  }
1112 
1113  // Clear any user-visible flags that are set on the message.
1114  msg_->reset_flags (msg_t::more);
1115 
1116  // At this point we impose the flags on the message.
1117  if (flags_ & ZMQ_SNDMORE)
1118  msg_->set_flags (msg_t::more);
1119 
1120  msg_->reset_metadata ();
1121 
1122  // Try to send the message using method in each socket class
1123  rc = xsend (msg_);
1124  if (rc == 0) {
1125  EXIT_MUTEX ();
1126  return 0;
1127  }
1128  if (unlikely (errno != EAGAIN)) {
1129  EXIT_MUTEX ();
1130  return -1;
1131  }
1132 
1133  // In case of non-blocking send we'll simply propagate
1134  // the error - including EAGAIN - up the stack.
1135  if (flags_ & ZMQ_DONTWAIT || options.sndtimeo == 0) {
1136  EXIT_MUTEX ();
1137  return -1;
1138  }
1139 
1140  // Compute the time when the timeout should occur.
1141  // If the timeout is infinite, don't care.
1142  int timeout = options.sndtimeo;
1143  uint64_t end = timeout < 0 ? 0 : (clock.now_ms () + timeout);
1144 
1145  // Oops, we couldn't send the message. Wait for the next
1146  // command, process it and try to send the message again.
1147  // If timeout is reached in the meantime, return EAGAIN.
1148  while (true) {
1149  if (unlikely (process_commands (timeout, false) != 0)) {
1150  EXIT_MUTEX ();
1151  return -1;
1152  }
1153  rc = xsend (msg_);
1154  if (rc == 0)
1155  break;
1156  if (unlikely (errno != EAGAIN)) {
1157  EXIT_MUTEX ();
1158  return -1;
1159  }
1160  if (timeout > 0) {
1161  timeout = (int) (end - clock.now_ms ());
1162  if (timeout <= 0) {
1163  errno = EAGAIN;
1164  EXIT_MUTEX ();
1165  return -1;
1166  }
1167  }
1168  }
1169 
1170  EXIT_MUTEX ();
1171  return 0;
1172 }
1173 
1174 int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
1175 {
1176  ENTER_MUTEX ();
1177 
1178  // Check whether the library haven't been shut down yet.
1179  if (unlikely (ctx_terminated)) {
1180  errno = ETERM;
1181  EXIT_MUTEX ();
1182  return -1;
1183  }
1184 
1185  // Check whether message passed to the function is valid.
1186  if (unlikely (!msg_ || !msg_->check ())) {
1187  errno = EFAULT;
1188  EXIT_MUTEX ();
1189  return -1;
1190  }
1191 
1192  // Once every inbound_poll_rate messages check for signals and process
1193  // incoming commands. This happens only if we are not polling altogether
1194  // because there are messages available all the time. If poll occurs,
1195  // ticks is set to zero and thus we avoid this code.
1196  //
1197  // Note that 'recv' uses different command throttling algorithm (the one
1198  // described above) from the one used by 'send'. This is because counting
1199  // ticks is more efficient than doing RDTSC all the time.
1200  if (++ticks == inbound_poll_rate) {
1201  if (unlikely (process_commands (0, false) != 0)) {
1202  EXIT_MUTEX ();
1203  return -1;
1204  }
1205  ticks = 0;
1206  }
1207 
1208  // Get the message.
1209  int rc = xrecv (msg_);
1210  if (unlikely (rc != 0 && errno != EAGAIN)) {
1211  EXIT_MUTEX ();
1212  return -1;
1213  }
1214 
1215  // If we have the message, return immediately.
1216  if (rc == 0) {
1217  extract_flags (msg_);
1218  EXIT_MUTEX ();
1219  return 0;
1220  }
1221 
1222  // If the message cannot be fetched immediately, there are two scenarios.
1223  // For non-blocking recv, commands are processed in case there's an
1224  // activate_reader command already waiting int a command pipe.
1225  // If it's not, return EAGAIN.
1226  if (flags_ & ZMQ_DONTWAIT || options.rcvtimeo == 0) {
1227  if (unlikely (process_commands (0, false) != 0)) {
1228  EXIT_MUTEX ();
1229  return -1;
1230  }
1231  ticks = 0;
1232 
1233  rc = xrecv (msg_);
1234  if (rc < 0) {
1235  EXIT_MUTEX ();
1236  return rc;
1237  }
1238  extract_flags (msg_);
1239 
1240  EXIT_MUTEX ();
1241  return 0;
1242  }
1243 
1244  // Compute the time when the timeout should occur.
1245  // If the timeout is infinite, don't care.
1246  int timeout = options.rcvtimeo;
1247  uint64_t end = timeout < 0 ? 0 : (clock.now_ms () + timeout);
1248 
1249  // In blocking scenario, commands are processed over and over again until
1250  // we are able to fetch a message.
1251  bool block = (ticks != 0);
1252  while (true) {
1253  if (unlikely (process_commands (block ? timeout : 0, false) != 0)) {
1254  EXIT_MUTEX ();
1255  return -1;
1256  }
1257  rc = xrecv (msg_);
1258  if (rc == 0) {
1259  ticks = 0;
1260  break;
1261  }
1262  if (unlikely (errno != EAGAIN)) {
1263  EXIT_MUTEX ();
1264  return -1;
1265  }
1266  block = true;
1267  if (timeout > 0) {
1268  timeout = (int) (end - clock.now_ms ());
1269  if (timeout <= 0) {
1270  errno = EAGAIN;
1271  EXIT_MUTEX ();
1272  return -1;
1273  }
1274  }
1275  }
1276 
1277  extract_flags (msg_);
1278  EXIT_MUTEX ();
1279  return 0;
1280 }
1281 
1283 {
1284  ENTER_MUTEX ();
1285 
1286  // Remove all existing signalers for thread safe sockets
1287  if (thread_safe)
1288  ((mailbox_safe_t*)mailbox)->clear_signalers();
1289 
1290  // Mark the socket as dead
1291  tag = 0xdeadbeef;
1292 
1293  EXIT_MUTEX ();
1294 
1295  // Transfer the ownership of the socket from this application thread
1296  // to the reaper thread which will take care of the rest of shutdown
1297  // process.
1298  send_reap (this);
1299 
1300  return 0;
1301 }
1302 
1304 {
1305  return xhas_in ();
1306 }
1307 
1309 {
1310  return xhas_out ();
1311 }
1312 
1313 void zmq::socket_base_t::start_reaping (poller_t *poller_)
1314 {
1315  // Plug the socket to the reaper thread.
1316  poller = poller_;
1317 
1318  fd_t fd;
1319 
1320  if (!thread_safe)
1321  fd = ((mailbox_t*)mailbox)->get_fd();
1322  else {
1323  ENTER_MUTEX ();
1324 
1325  reaper_signaler = new signaler_t();
1326 
1327  // Add signaler to the safe mailbox
1328  fd = reaper_signaler->get_fd();
1329  ((mailbox_safe_t*)mailbox)->add_signaler(reaper_signaler);
1330 
1331  // Send a signal to make sure reaper handle existing commands
1332  reaper_signaler->send();
1333 
1334  EXIT_MUTEX ();
1335  }
1336 
1337  handle = poller->add_fd (fd, this);
1338  poller->set_pollin (handle);
1339 
1340  // Initialise the termination and check whether it can be deallocated
1341  // immediately.
1342  terminate ();
1343  check_destroy ();
1344 }
1345 
1346 int zmq::socket_base_t::process_commands (int timeout_, bool throttle_)
1347 {
1348  int rc;
1349  command_t cmd;
1350  if (timeout_ != 0) {
1351 
1352  // If we are asked to wait, simply ask mailbox to wait.
1353  rc = mailbox->recv (&cmd, timeout_);
1354  }
1355  else {
1356 
1357  // If we are asked not to wait, check whether we haven't processed
1358  // commands recently, so that we can throttle the new commands.
1359 
1360  // Get the CPU's tick counter. If 0, the counter is not available.
1361  const uint64_t tsc = zmq::clock_t::rdtsc ();
1362 
1363  // Optimised version of command processing - it doesn't have to check
1364  // for incoming commands each time. It does so only if certain time
1365  // elapsed since last command processing. Command delay varies
1366  // depending on CPU speed: It's ~1ms on 3GHz CPU, ~2ms on 1.5GHz CPU
1367  // etc. The optimisation makes sense only on platforms where getting
1368  // a timestamp is a very cheap operation (tens of nanoseconds).
1369  if (tsc && throttle_) {
1370 
1371  // Check whether TSC haven't jumped backwards (in case of migration
1372  // between CPU cores) and whether certain time have elapsed since
1373  // last command processing. If it didn't do nothing.
1374  if (tsc >= last_tsc && tsc - last_tsc <= max_command_delay)
1375  return 0;
1376  last_tsc = tsc;
1377  }
1378 
1379  // Check whether there are any commands pending for this thread.
1380  rc = mailbox->recv (&cmd, 0);
1381  }
1382 
1383  // Process all available commands.
1384  while (rc == 0) {
1385  cmd.destination->process_command (cmd);
1386  rc = mailbox->recv (&cmd, 0);
1387  }
1388 
1389  if (errno == EINTR)
1390  return -1;
1391 
1392  zmq_assert (errno == EAGAIN);
1393 
1394  if (ctx_terminated) {
1395  errno = ETERM;
1396  return -1;
1397  }
1398 
1399  return 0;
1400 }
1401 
1403 {
1404  // Here, someone have called zmq_ctx_term while the socket was still alive.
1405  // We'll remember the fact so that any blocking call is interrupted and any
1406  // further attempt to use the socket will return ETERM. The user is still
1407  // responsible for calling zmq_close on the socket though!
1408  stop_monitor ();
1409  ctx_terminated = true;
1410 }
1411 
1413 {
1414  attach_pipe (pipe_);
1415 }
1416 
1418 {
1419  // Unregister all inproc endpoints associated with this socket.
1420  // Doing this we make sure that no new pipes from other sockets (inproc)
1421  // will be initiated.
1422  unregister_endpoints (this);
1423 
1424  // Ask all attached pipes to terminate.
1425  for (pipes_t::size_type i = 0; i != pipes.size (); ++i)
1426  pipes [i]->terminate (false);
1427  register_term_acks ((int) pipes.size ());
1428 
1429  // Continue the termination process immediately.
1430  own_t::process_term (linger_);
1431 }
1432 
1434 {
1435  if (option_ == ZMQ_SNDHWM || option_ == ZMQ_RCVHWM)
1436  {
1437  for (pipes_t::size_type i = 0; i != pipes.size(); ++i)
1438  {
1439  pipes[i]->set_hwms(options.rcvhwm, options.sndhwm);
1440  }
1441  }
1442 
1443 }
1444 
1446 {
1447  destroyed = true;
1448 }
1449 
1450 int zmq::socket_base_t::xsetsockopt (int, const void *, size_t)
1451 {
1452  errno = EINVAL;
1453  return -1;
1454 }
1455 
1457 {
1458  return false;
1459 }
1460 
1462 {
1463  errno = ENOTSUP;
1464  return -1;
1465 }
1466 
1468 {
1469  return false;
1470 }
1471 
1472 int zmq::socket_base_t::xjoin (const char *group_)
1473 {
1474  LIBZMQ_UNUSED (group_);
1475  errno = ENOTSUP;
1476  return -1;
1477 }
1478 
1479 int zmq::socket_base_t::xleave (const char *group_)
1480 {
1481  LIBZMQ_UNUSED (group_);
1482  errno = ENOTSUP;
1483  return -1;
1484 }
1485 
1487 {
1488  errno = ENOTSUP;
1489  return -1;
1490 }
1491 
1493 {
1494  return blob_t ();
1495 }
1496 
1498 {
1499  zmq_assert (false);
1500 }
1502 {
1503  zmq_assert (false);
1504 }
1505 
1507 {
1508  zmq_assert (false);
1509 }
1510 
1512 {
1513  // This function is invoked only once the socket is running in the context
1514  // of the reaper thread. Process any commands from other threads/sockets
1515  // that may be available at the moment. Ultimately, the socket will
1516  // be destroyed.
1517  ENTER_MUTEX ();
1518 
1519  // If the socket is thread safe we need to unsignal the reaper signaler
1520  if (thread_safe)
1521  reaper_signaler->recv();
1522 
1523  process_commands (0, false);
1524  EXIT_MUTEX();
1525  check_destroy();
1526 }
1527 
1529 {
1530  zmq_assert (false);
1531 }
1532 
1534 {
1535  zmq_assert (false);
1536 }
1537 
1539 {
1540  // If the object was already marked as destroyed, finish the deallocation.
1541  if (destroyed) {
1542 
1543  // Remove the socket from the reaper's poller.
1544  poller->rm_fd (handle);
1545 
1546  // Remove the socket from the context.
1547  destroy_socket (this);
1548 
1549  // Notify the reaper about the fact.
1550  send_reaped ();
1551 
1552  // Deallocate.
1554  }
1555 }
1556 
1558 {
1559  xread_activated (pipe_);
1560 }
1561 
1563 {
1564  xwrite_activated (pipe_);
1565 }
1566 
1568 {
1569  if (options.immediate == 1)
1570  pipe_->terminate (false);
1571  else
1572  // Notify derived sockets of the hiccup
1573  xhiccuped (pipe_);
1574 }
1575 
1577 {
1578  // Notify the specific socket type about the pipe termination.
1579  xpipe_terminated (pipe_);
1580 
1581  // Remove pipe from inproc pipes
1582  for (inprocs_t::iterator it = inprocs.begin (); it != inprocs.end (); ++it)
1583  if (it->second == pipe_) {
1584  inprocs.erase (it);
1585  break;
1586  }
1587 
1588  // Remove the pipe from the list of attached pipes and confirm its
1589  // termination if we are already shutting down.
1590  pipes.erase (pipe_);
1591  if (is_terminating ())
1593 }
1594 
1596 {
1597  // Test whether IDENTITY flag is valid for this socket type.
1598  if (unlikely (msg_->flags () & msg_t::identity))
1600 
1601  // Remove MORE flag.
1602  rcvmore = msg_->flags () & msg_t::more ? true : false;
1603 }
1604 
1605 int zmq::socket_base_t::monitor (const char *addr_, int events_)
1606 {
1607  if (unlikely (ctx_terminated)) {
1608  errno = ETERM;
1609  return -1;
1610  }
1611  // Support deregistering monitoring endpoints as well
1612  if (addr_ == NULL) {
1613  stop_monitor ();
1614  return 0;
1615  }
1616  // Parse addr_ string.
1617  std::string protocol;
1618  std::string address;
1619  if (parse_uri (addr_, protocol, address) || check_protocol (protocol))
1620  return -1;
1621 
1622  // Event notification only supported over inproc://
1623  if (protocol != "inproc") {
1624  errno = EPROTONOSUPPORT;
1625  return -1;
1626  }
1627  // already monitoring. Stop previous monitor before starting new one.
1628  if (monitor_socket != NULL) {
1629  stop_monitor (true);
1630  }
1631  // Register events to monitor
1632  monitor_events = events_;
1634  if (monitor_socket == NULL)
1635  return -1;
1636 
1637  // Never block context termination on pending event messages
1638  int linger = 0;
1639  int rc = zmq_setsockopt (monitor_socket, ZMQ_LINGER, &linger, sizeof (linger));
1640  if (rc == -1)
1641  stop_monitor (false);
1642 
1643  // Spawn the monitor socket endpoint
1644  rc = zmq_bind (monitor_socket, addr_);
1645  if (rc == -1)
1646  stop_monitor (false);
1647  return rc;
1648 }
1649 
1650 void zmq::socket_base_t::event_connected (const std::string &addr_, int fd_)
1651 {
1653  monitor_event (ZMQ_EVENT_CONNECTED, fd_, addr_);
1654 }
1655 
1656 void zmq::socket_base_t::event_connect_delayed (const std::string &addr_, int err_)
1657 {
1659  monitor_event (ZMQ_EVENT_CONNECT_DELAYED, err_, addr_);
1660 }
1661 
1662 void zmq::socket_base_t::event_connect_retried (const std::string &addr_, int interval_)
1663 {
1665  monitor_event (ZMQ_EVENT_CONNECT_RETRIED, interval_, addr_);
1666 }
1667 
1668 void zmq::socket_base_t::event_listening (const std::string &addr_, int fd_)
1669 {
1671  monitor_event (ZMQ_EVENT_LISTENING, fd_, addr_);
1672 }
1673 
1674 void zmq::socket_base_t::event_bind_failed (const std::string &addr_, int err_)
1675 {
1677  monitor_event (ZMQ_EVENT_BIND_FAILED, err_, addr_);
1678 }
1679 
1680 void zmq::socket_base_t::event_accepted (const std::string &addr_, int fd_)
1681 {
1683  monitor_event (ZMQ_EVENT_ACCEPTED, fd_, addr_);
1684 }
1685 
1686 void zmq::socket_base_t::event_accept_failed (const std::string &addr_, int err_)
1687 {
1689  monitor_event (ZMQ_EVENT_ACCEPT_FAILED, err_, addr_);
1690 }
1691 
1692 void zmq::socket_base_t::event_closed (const std::string &addr_, int fd_)
1693 {
1695  monitor_event (ZMQ_EVENT_CLOSED, fd_, addr_);
1696 }
1697 
1698 void zmq::socket_base_t::event_close_failed (const std::string &addr_, int err_)
1699 {
1701  monitor_event (ZMQ_EVENT_CLOSE_FAILED, err_, addr_);
1702 }
1703 
1704 void zmq::socket_base_t::event_disconnected (const std::string &addr_, int fd_)
1705 {
1707  monitor_event (ZMQ_EVENT_DISCONNECTED, fd_, addr_);
1708 }
1709 
1710 // Send a monitor event
1711 void zmq::socket_base_t::monitor_event (int event_, int value_, const std::string &addr_)
1712 {
1713  if (monitor_socket) {
1714  // Send event in first frame
1715  zmq_msg_t msg;
1716  zmq_msg_init_size (&msg, 6);
1717  uint8_t *data = (uint8_t *) zmq_msg_data (&msg);
1718  // Avoid dereferencing uint32_t on unaligned address
1719  uint16_t event = (uint16_t) event_;
1720  uint32_t value = (uint32_t) value_;
1721  memcpy (data + 0, &event, sizeof(event));
1722  memcpy (data + 2, &value, sizeof(value));
1724 
1725  // Send address in second frame
1726  zmq_msg_init_size (&msg, addr_.size());
1727  memcpy (zmq_msg_data (&msg), addr_.c_str (), addr_.size ());
1728  zmq_sendmsg (monitor_socket, &msg, 0);
1729  }
1730 }
1731 
1732 void zmq::socket_base_t::stop_monitor (bool send_monitor_stopped_event_)
1733 {
1734  if (monitor_socket) {
1735  if ((monitor_events & ZMQ_EVENT_MONITOR_STOPPED) && send_monitor_stopped_event_)
1736  monitor_event (ZMQ_EVENT_MONITOR_STOPPED, 0, "");
1738  monitor_socket = NULL;
1739  monitor_events = 0;
1740  }
1741 }
#define ZMQ_EVENT_CONNECT_DELAYED
Definition: zmq.h:378
void pend_connection(const std::string &addr_, const endpoint_t &endpoint, pipe_t **pipes_)
Definition: object.cpp:173
void pipe_terminated(pipe_t *pipe_)
int process_commands(int timeout_, bool throttle_)
static session_base_t * create(zmq::io_thread_t *io_thread_, bool active_, zmq::socket_base_t *socket_, const options_t &options_, address_t *addr_)
virtual int to_string(std::string &addr_)
#define LIBZMQ_DELETE(p_object)
Definition: macros.hpp:7
void process_command(zmq::command_t &cmd_)
Definition: object.cpp:73
ZMQ_EXPORT int zmq_setsockopt(void *s, int option, const void *optval, size_t optvallen)
Definition: zmq.cpp:265
#define ZMQ_EVENT_BIND_FAILED
Definition: zmq.h:381
#define ZMQ_LAST_ENDPOINT
Definition: zmq.h:286
size_type size()
Definition: array.hpp:103
void process_term(int linger_)
Definition: own.cpp:158
virtual void xpipe_terminated(pipe_t *pipe_)=0
#define ZMQ_STREAM
Definition: zmq.h:257
void event_accept_failed(const std::string &addr_, int err_)
#define ZMQ_EVENT_CLOSE_FAILED
Definition: zmq.h:385
virtual void xhiccuped(pipe_t *pipe_)
bool write(msg_t *msg_)
Definition: pipe.cpp:221
std::vector< pipe_t * >::size_type size_type
Definition: array.hpp:93
ipc_address_t * ipc_addr
Definition: address.hpp:63
virtual void xattach_pipe(zmq::pipe_t *pipe_, bool subscribe_to_all_=false)=0
void attach_pipe(zmq::pipe_t *pipe_)
#define ZMQ_DISH
#define ZMQ_EVENT_CONNECTED
Definition: zmq.h:377
int pipepair(zmq::object_t *parents_[2], zmq::pipe_t *pipes_[2], int hwms_[2], bool conflate_[2])
Definition: pipe.cpp:41
int fd_t
Definition: fd.hpp:50
#define ZMQ_RCVHWM
Definition: zmq.h:282
void monitor_event(int event_, int value_, const std::string &addr_)
fd_t get_fd() const
Definition: mailbox.cpp:54
int get_address(std::string &addr_)
#define ZMQ_SNDMORE
Definition: zmq.h:346
int join(const char *group)
tcp_address_t * tcp_addr
Definition: address.hpp:60
void write_activated(pipe_t *pipe_)
#define ZMQ_DEALER
Definition: zmq.h:251
#define ZMQ_RADIO
void start_reaping(poller_t *poller_)
#define zmq_assert(x)
Definition: err.hpp:119
#define ZMQ_SUB
Definition: zmq.h:248
uint64_t now_ms()
Definition: clock.cpp:182
#define ZMQ_XPUB
Definition: zmq.h:255
#define ENOTSUP
Definition: zmq.h:112
bool recv_identity
Definition: options.hpp:146
#define ZMQ_REP
Definition: zmq.h:250
int add_signaler(signaler_t *s)
virtual blob_t get_credential() const
#define ZMQ_XSUB
Definition: zmq.h:256
int setsockopt(int option_, const void *optval_, size_t optvallen_)
int getsockopt(int option_, void *optval_, size_t *optvallen_) const
Definition: options.cpp:642
virtual void process_destroy()
Definition: own.cpp:212
zmq::io_thread_t * choose_io_thread(uint64_t affinity_)
Definition: object.cpp:189
options_t options
Definition: ctx.hpp:62
void attach_pipe(zmq::pipe_t *pipe_, bool subscribe_to_all_=false)
#define ZMQ_SCATTER
i_mailbox * mailbox
void event_listening(const std::string &addr_, int fd_)
int resolve(const char *path_)
Definition: ipc_address.cpp:58
fd_t get_fd() const
Definition: signaler.cpp:166
#define ZMQ_EVENT_CLOSED
Definition: zmq.h:384
void send_reaped()
Definition: object.cpp:330
int bind(const char *addr_)
void register_term_acks(int count_)
Definition: own.cpp:175
ZMQ_EXPORT int zmq_errno(void)
Definition: zmq.cpp:107
virtual int recv(command_t *cmd_, int timeout_)=0
int term_endpoint(const char *addr_)
#define ZMQ_PUB
Definition: zmq.h:247
void push_back(T *item_)
Definition: array.hpp:118
bool is_terminating()
Definition: own.cpp:153
void event_connected(const std::string &addr_, int fd_)
#define ZMQ_IPV6
Definition: zmq.h:295
int unregister_endpoint(const std::string &addr_, socket_base_t *socket_)
Definition: object.cpp:157
int get_address(std::string &addr_)
uint64_t affinity
Definition: options.hpp:70
void terminate()
Definition: own.cpp:135
int set_address(const char *addr_)
virtual void xwrite_activated(pipe_t *pipe_)
int register_endpoint(const char *addr_, const zmq::endpoint_t &endpoint_)
Definition: object.cpp:151
#define ZMQ_ROUTER
Definition: zmq.h:252
ZMQ_EXPORT void * zmq_msg_data(zmq_msg_t *msg)
Definition: zmq.cpp:666
ZMQ_EXPORT void * zmq_socket(void *, int type)
Definition: zmq.cpp:244
signaler_t * reaper_signaler
void destroy_socket(zmq::socket_base_t *socket_)
Definition: object.cpp:184
#define ZMQ_PUSH
Definition: zmq.h:254
#define ZMQ_SNDHWM
Definition: zmq.h:281
#define ZMQ_LINGER
Definition: zmq.h:276
#define ETERM
Definition: zmq.h:169
virtual void xread_activated(pipe_t *pipe_)
#define ZMQ_FD
Definition: zmq.h:273
udp_address_t * udp_addr
Definition: address.hpp:61
void reset_flags(unsigned char flags_)
Definition: msg.cpp:389
#define ZMQ_EVENT_CONNECT_RETRIED
Definition: zmq.h:379
bool check()
Definition: msg.cpp:50
void event_close_failed(const std::string &addr_, int fd_)
int resolve(const char *name_, bool local_, bool ipv6_, bool is_src_=false)
int check_protocol(const std::string &protocol_)
int set_address(const char *addr_)
void unregister_endpoints(zmq::socket_base_t *socket_)
Definition: object.cpp:163
int getsockopt(int option_, void *optval_, size_t *optvallen_)
void event_connect_delayed(const std::string &addr_, int err_)
void erase(T *item_)
Definition: array.hpp:125
void connect_pending(const char *addr_, zmq::socket_base_t *bind_socket_)
Definition: object.cpp:179
endpoints_t endpoints
int send(zmq::msg_t *msg_, int flags_)
#define ZMQ_REQ
Definition: zmq.h:249
#define LIBZMQ_UNUSED(object)
Definition: macros.hpp:6
void set_hwms_boost(int inhwmboost_, int outhwmboost_)
Definition: pipe.cpp:525
#define ZMQ_BLOCKY
Definition: zmq.h:321
#define ZMQ_THREAD_SAFE
Definition: zmq.h:332
void stop_monitor(bool send_monitor_stopped_event_=true)
#define ZMQ_EVENT_DISCONNECTED
Definition: zmq.h:386
#define unlikely(x)
Definition: likely.hpp:38
void update_pipe_options(int option_)
void send_bind(zmq::own_t *destination_, zmq::pipe_t *pipe_, bool inc_seqnum_=true)
Definition: object.cpp:238
unsigned char identity[256]
Definition: options.hpp:74
std::basic_string< unsigned char > blob_t
Definition: blob.hpp:134
void flush()
Definition: pipe.cpp:248
zmq::object_t * destination
Definition: command.hpp:53
ZMQ_EXPORT int zmq_close(void *s)
Definition: zmq.cpp:255
#define ENTER_MUTEX()
int monitor(const char *endpoint_, int events_)
ZMQ_EXPORT int zmq_sendmsg(void *s, zmq_msg_t *msg, int flags)
Definition: zmq.cpp:382
virtual int xleave(const char *group_)
#define ZMQ_GATHER
void timer_event(int id_)
static socket_base_t * create(int type_, zmq::ctx_t *parent_, uint32_t tid_, int sid_)
void set_event_sink(i_pipe_events *sink_)
Definition: pipe.cpp:111
void event_closed(const std::string &addr_, int fd_)
Definition: zmq.h:221
#define ZMQ_RCVMORE
Definition: zmq.h:272
void term_child(own_t *object_)
Definition: own.cpp:93
int get(int option_)
Definition: ctx.cpp:293
#define ZMQ_SERVER
virtual int xsend(zmq::msg_t *msg_)
#define ZMQ_EVENT_ACCEPTED
Definition: zmq.h:382
socket_base_t(zmq::ctx_t *parent_, uint32_t tid_, int sid_, bool thread_safe_=false)
#define ZMQ_EVENTS
Definition: zmq.h:274
void event_connect_retried(const std::string &addr_, int interval_)
static uint64_t rdtsc()
Definition: clock.cpp:215
void set_flags(unsigned char flags_)
Definition: msg.cpp:384
void hiccuped(pipe_t *pipe_)
#define ZMQ_EVENT_LISTENING
Definition: zmq.h:380
void terminate(bool delay_)
Definition: pipe.cpp:385
#define EPROTONOSUPPORT
Definition: zmq.h:115
void launch_child(own_t *object_)
Definition: own.cpp:81
bool is_valid(int option_) const
Definition: options.cpp:1038
#define ZMQ_POLLOUT
Definition: zmq.h:411
void extract_flags(msg_t *msg_)
ZMQ_EXPORT int zmq_bind(void *s, const char *addr)
Definition: zmq.cpp:321
void event_disconnected(const std::string &addr_, int fd_)
#define ZMQ_EVENT_MONITOR_STOPPED
Definition: zmq.h:387
void send_stop()
Definition: object.cpp:194
#define alloc_assert(x)
Definition: err.hpp:159
#define errno_assert(x)
Definition: err.hpp:129
void event_bind_failed(const std::string &addr_, int err_)
#define ENOCOMPATPROTO
Definition: zmq.h:168
void process_term(int linger_)
int remove_signaler(signaler_t *s)
int resolve(const char *name_, bool receiver_)
Definition: udp_address.cpp:61
#define EXIT_MUTEX()
ctx_t * get_ctx()
Definition: object.cpp:68
std::pair< own_t *, pipe_t * > endpoint_pipe_t
#define ZMQ_PAIR
Definition: zmq.h:246
socket_base_t * socket
Definition: ctx.hpp:61
void add_endpoint(const char *addr_, own_t *endpoint_, pipe_t *pipe)
int setsockopt(int option_, const void *optval_, size_t optvallen_)
Definition: options.cpp:93
int leave(const char *group)
virtual int xrecv(zmq::msg_t *msg_)
void send_reap(zmq::socket_base_t *socket_)
Definition: object.cpp:321
ZMQ_EXPORT int zmq_msg_init_size(zmq_msg_t *msg, size_t size)
Definition: zmq.cpp:618
i_mailbox * get_mailbox()
#define ZMQ_POLLIN
Definition: zmq.h:410
std::string last_endpoint
void process_bind(zmq::pipe_t *pipe_)
poller_t::handle_t handle
int parse_uri(const char *uri_, std::string &protocol_, std::string &address_)
options_t options
Definition: own.hpp:109
void event_accepted(const std::string &addr_, int fd_)
virtual ~socket_base_t()
const char * address
Definition: test_fork.cpp:32
unsigned char identity_size
Definition: options.hpp:73
union zmq::address_t::@0 resolved
int to_string(std::string &addr_) const
Definition: address.cpp:95
zmq::endpoint_t find_endpoint(const char *addr_)
Definition: object.cpp:168
#define ZMQ_DONTWAIT
Definition: zmq.h:345
void read_activated(pipe_t *pipe_)
void reset_metadata()
Definition: msg.cpp:407
void unregister_term_ack()
Definition: own.cpp:180
#define ZMQ_EVENT_ACCEPT_FAILED
Definition: zmq.h:383
int recv(zmq::msg_t *msg_, int flags_)
virtual int xjoin(const char *group_)
unsigned char flags
Definition: msg.hpp:181
#define EMTHREAD
Definition: zmq.h:170
virtual bool xhas_in()
#define ZMQ_PULL
Definition: zmq.h:253
#define ZMQ_CLIENT
virtual bool xhas_out()
int connect(const char *addr_)
virtual int xsetsockopt(int option_, const void *optval_, size_t optvallen_)