libzmq  master
ZeroMQ C++ Core Engine (LIBZMQ)
ctx.cpp
Go to the documentation of this file.
1 /*
2  Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 
4  This file is part of libzmq, the ZeroMQ core engine in C++.
5 
6  libzmq is free software; you can redistribute it and/or modify it under
7  the terms of the GNU Lesser General Public License (LGPL) as published
8  by the Free Software Foundation; either version 3 of the License, or
9  (at your option) any later version.
10 
11  As a special exception, the Contributors give you permission to link
12  this library with independent modules to produce an executable,
13  regardless of the license terms of these independent modules, and to
14  copy and distribute the resulting executable under terms of your choice,
15  provided that you also meet, for each linked independent module, the
16  terms and conditions of the license of that module. An independent
17  module is a module which is not derived from or based on this library.
18  If you modify this library, you must extend this exception to your
19  version of the library.
20 
21  libzmq is distributed in the hope that it will be useful, but WITHOUT
22  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23  FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24  License for more details.
25 
26  You should have received a copy of the GNU Lesser General Public License
27  along with this program. If not, see <http://www.gnu.org/licenses/>.
28 */
29 
30 #include "precompiled.hpp"
31 #include "macros.hpp"
32 #include "platform.hpp"
33 #ifdef ZMQ_HAVE_WINDOWS
34 #include "windows.hpp"
35 #else
36 #include <unistd.h>
37 #endif
38 
39 #include <limits>
40 #include <climits>
41 #include <new>
42 #include <string.h>
43 
44 #include "ctx.hpp"
45 #include "socket_base.hpp"
46 #include "io_thread.hpp"
47 #include "reaper.hpp"
48 #include "pipe.hpp"
49 #include "err.hpp"
50 #include "msg.hpp"
51 
52 #if defined (ZMQ_USE_TWEETNACL)
53 # include "tweetnacl.h"
54 #elif defined (ZMQ_USE_LIBSODIUM)
55 # include "sodium.h"
56 #endif
57 
58 #ifdef ZMQ_HAVE_VMCI
59 #include <vmci_sockets.h>
60 #endif
61 
62 #define ZMQ_CTX_TAG_VALUE_GOOD 0xabadcafe
63 #define ZMQ_CTX_TAG_VALUE_BAD 0xdeadbeef
64 
65 int clipped_maxsocket (int max_requested)
66 {
67  if (max_requested >= zmq::poller_t::max_fds () && zmq::poller_t::max_fds () != -1)
68  // -1 because we need room for the reaper mailbox.
69  max_requested = zmq::poller_t::max_fds () - 1;
70 
71  return max_requested;
72 }
73 
76  starting (true),
77  terminating (false),
78  reaper (NULL),
79  slot_count (0),
80  slots (NULL),
82  max_msgsz (INT_MAX),
83  io_thread_count (ZMQ_IO_THREADS_DFLT),
84  blocky (true),
85  ipv6 (false),
86  thread_priority (ZMQ_THREAD_PRIORITY_DFLT),
87  thread_sched_policy (ZMQ_THREAD_SCHED_POLICY_DFLT)
88 {
89 #ifdef HAVE_FORK
90  pid = getpid();
91 #endif
92 #ifdef ZMQ_HAVE_VMCI
93  vmci_fd = -1;
94  vmci_family = -1;
95 #endif
96 
97  crypto_sync.lock ();
98 #if defined (ZMQ_USE_TWEETNACL)
99  // allow opening of /dev/urandom
100  unsigned char tmpbytes[4];
101  randombytes(tmpbytes, 4);
102 #elif defined (ZMQ_USE_SODIUM)
103  int rc = sodium_init ();
104  zmq_assert (rc != -1);
105 #endif
106  crypto_sync.unlock ();
107 }
108 
110 {
111  return tag == ZMQ_CTX_TAG_VALUE_GOOD;
112 }
113 
115 {
116  // Check that there are no remaining sockets.
117  zmq_assert (sockets.empty ());
118 
119  // Ask I/O threads to terminate. If stop signal wasn't sent to I/O
120  // thread subsequent invocation of destructor would hang-up.
121  for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) {
122  io_threads [i]->stop ();
123  }
124 
125  // Wait till I/O threads actually terminate.
126  for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) {
128  }
129 
130  // Deallocate the reaper thread object.
132 
133  // Deallocate the array of mailboxes. No special work is
134  // needed as mailboxes themselves were deallocated with their
135  // corresponding io_thread/socket objects.
136  free (slots);
137 
138  // If we've done any Curve encryption, we may have a file handle
139  // to /dev/urandom open that needs to be cleaned up.
140 #ifdef ZMQ_HAVE_CURVE
141  randombytes_close ();
142 #endif
143 
144  // Remove the tag, so that the object is considered dead.
146 }
147 
149 {
150  slot_sync.lock();
151 
152  bool saveTerminating = terminating;
153  terminating = false;
154 
155  // Connect up any pending inproc connections, otherwise we will hang
157  for (pending_connections_t::iterator p = copy.begin (); p != copy.end (); ++p) {
159  s->bind (p->first.c_str ());
160  s->close ();
161  }
162  terminating = saveTerminating;
163 
164  if (!starting) {
165 
166 #ifdef HAVE_FORK
167  if (pid != getpid ()) {
168  // we are a forked child process. Close all file descriptors
169  // inherited from the parent.
170  for (sockets_t::size_type i = 0; i != sockets.size (); i++)
171  sockets [i]->get_mailbox ()->forked ();
172 
173  term_mailbox.forked ();
174  }
175 #endif
176 
177  // Check whether termination was already underway, but interrupted and now
178  // restarted.
179  bool restarted = terminating;
180  terminating = true;
181 
182  // First attempt to terminate the context.
183  if (!restarted) {
184  // First send stop command to sockets so that any blocking calls
185  // can be interrupted. If there are no sockets we can ask reaper
186  // thread to stop.
187  for (sockets_t::size_type i = 0; i != sockets.size (); i++)
188  sockets [i]->stop ();
189  if (sockets.empty ())
190  reaper->stop ();
191  }
192  slot_sync.unlock();
193 
194  // Wait till reaper thread closes all the sockets.
195  command_t cmd;
196  int rc = term_mailbox.recv (&cmd, -1);
197  if (rc == -1 && errno == EINTR)
198  return -1;
199  errno_assert (rc == 0);
201  slot_sync.lock ();
202  zmq_assert (sockets.empty ());
203  }
204  slot_sync.unlock ();
205 
206 #ifdef ZMQ_HAVE_VMCI
207  vmci_sync.lock ();
208 
209  VMCISock_ReleaseAFValueFd (vmci_fd);
210  vmci_family = -1;
211  vmci_fd = -1;
212 
213  vmci_sync.unlock ();
214 #endif
215 
216  // Deallocate the resources.
217  delete this;
218 
219  return 0;
220 }
221 
223 {
224  slot_sync.lock ();
225  if (!starting && !terminating) {
226  terminating = true;
227 
228  // Send stop command to sockets so that any blocking calls
229  // can be interrupted. If there are no sockets we can ask reaper
230  // thread to stop.
231  for (sockets_t::size_type i = 0; i != sockets.size (); i++)
232  sockets [i]->stop ();
233  if (sockets.empty ())
234  reaper->stop ();
235  }
236  slot_sync.unlock ();
237 
238  return 0;
239 }
240 
241 int zmq::ctx_t::set (int option_, int optval_)
242 {
243  int rc = 0;
244  if (option_ == ZMQ_MAX_SOCKETS
245  && optval_ >= 1 && optval_ == clipped_maxsocket (optval_)) {
246  opt_sync.lock ();
247  max_sockets = optval_;
248  opt_sync.unlock ();
249  }
250  else
251  if (option_ == ZMQ_IO_THREADS && optval_ >= 0) {
252  opt_sync.lock ();
253  io_thread_count = optval_;
254  opt_sync.unlock ();
255  }
256  else
257  if (option_ == ZMQ_IPV6 && optval_ >= 0) {
258  opt_sync.lock ();
259  ipv6 = (optval_ != 0);
260  opt_sync.unlock ();
261  }
262  else
263  if (option_ == ZMQ_THREAD_PRIORITY && optval_ >= 0) {
264  opt_sync.lock();
265  thread_priority = optval_;
266  opt_sync.unlock ();
267  }
268  else
269  if (option_ == ZMQ_THREAD_SCHED_POLICY && optval_ >= 0) {
270  opt_sync.lock();
271  thread_sched_policy = optval_;
272  opt_sync.unlock ();
273  }
274  else
275  if (option_ == ZMQ_BLOCKY && optval_ >= 0) {
276  opt_sync.lock ();
277  blocky = (optval_ != 0);
278  opt_sync.unlock ();
279  }
280  else
281  if (option_ == ZMQ_MAX_MSGSZ && optval_ >= 0) {
282  opt_sync.lock ();
283  max_msgsz = optval_ < INT_MAX? optval_: INT_MAX;
284  opt_sync.unlock ();
285  }
286  else {
287  errno = EINVAL;
288  rc = -1;
289  }
290  return rc;
291 }
292 
293 int zmq::ctx_t::get (int option_)
294 {
295  int rc = 0;
296  if (option_ == ZMQ_MAX_SOCKETS)
297  rc = max_sockets;
298  else
299  if (option_ == ZMQ_SOCKET_LIMIT)
300  rc = clipped_maxsocket (65535);
301  else
302  if (option_ == ZMQ_IO_THREADS)
303  rc = io_thread_count;
304  else
305  if (option_ == ZMQ_IPV6)
306  rc = ipv6;
307  else
308  if (option_ == ZMQ_BLOCKY)
309  rc = blocky;
310  else
311  if (option_ == ZMQ_MAX_MSGSZ)
312  rc = max_msgsz;
313  else {
314  errno = EINVAL;
315  rc = -1;
316  }
317  return rc;
318 }
319 
321 {
322  slot_sync.lock ();
323  if (unlikely (starting)) {
324 
325  starting = false;
326  // Initialise the array of mailboxes. Additional three slots are for
327  // zmq_ctx_term thread and reaper thread.
328  opt_sync.lock ();
329  int mazmq = max_sockets;
330  int ios = io_thread_count;
331  opt_sync.unlock ();
332  slot_count = mazmq + ios + 2;
333  slots = (i_mailbox **) malloc (sizeof (i_mailbox*) * slot_count);
335 
336  // Initialise the infrastructure for zmq_ctx_term thread.
338 
339  // Create the reaper thread.
340  reaper = new (std::nothrow) reaper_t (this, reaper_tid);
343  reaper->start ();
344 
345  // Create I/O thread objects and launch them.
346  for (int i = 2; i != ios + 2; i++) {
347  io_thread_t *io_thread = new (std::nothrow) io_thread_t (this, i);
348  alloc_assert (io_thread);
349  io_threads.push_back (io_thread);
350  slots [i] = io_thread->get_mailbox ();
351  io_thread->start ();
352  }
353 
354  // In the unused part of the slot array, create a list of empty slots.
355  for (int32_t i = (int32_t) slot_count - 1;
356  i >= (int32_t) ios + 2; i--) {
357  empty_slots.push_back (i);
358  slots [i] = NULL;
359  }
360  }
361 
362  // Once zmq_ctx_term() was called, we can't create new sockets.
363  if (terminating) {
364  slot_sync.unlock ();
365  errno = ETERM;
366  return NULL;
367  }
368 
369  // If max_sockets limit was reached, return error.
370  if (empty_slots.empty ()) {
371  slot_sync.unlock ();
372  errno = EMFILE;
373  return NULL;
374  }
375 
376  // Choose a slot for the socket.
377  uint32_t slot = empty_slots.back ();
378  empty_slots.pop_back ();
379 
380  // Generate new unique socket ID.
381  int sid = ((int) max_socket_id.add (1)) + 1;
382 
383  // Create the socket and register its mailbox.
384  socket_base_t *s = socket_base_t::create (type_, this, slot, sid);
385  if (!s) {
386  empty_slots.push_back (slot);
387  slot_sync.unlock ();
388  return NULL;
389  }
390  sockets.push_back (s);
391  slots [slot] = s->get_mailbox ();
392 
393  slot_sync.unlock ();
394  return s;
395 }
396 
398 {
399  slot_sync.lock ();
400 
401  // Free the associated thread slot.
402  uint32_t tid = socket_->get_tid ();
403  empty_slots.push_back (tid);
404  slots [tid] = NULL;
405 
406  // Remove the socket from the list of sockets.
407  sockets.erase (socket_);
408 
409  // If zmq_ctx_term() was already called and there are no more socket
410  // we can ask reaper thread to terminate.
411  if (terminating && sockets.empty ())
412  reaper->stop ();
413 
414  slot_sync.unlock ();
415 }
416 
418 {
419  return reaper;
420 }
421 
422 void zmq::ctx_t::start_thread (thread_t &thread_, thread_fn *tfn_, void *arg_) const
423 {
424  thread_.start(tfn_, arg_);
426 }
427 
428 void zmq::ctx_t::send_command (uint32_t tid_, const command_t &command_)
429 {
430  slots [tid_]->send (command_);
431 }
432 
434 {
435  if (io_threads.empty ())
436  return NULL;
437 
438  // Find the I/O thread with minimum load.
439  int min_load = -1;
440  io_thread_t *selected_io_thread = NULL;
441  for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) {
442  if (!affinity_ || (affinity_ & (uint64_t (1) << i))) {
443  int load = io_threads [i]->get_load ();
444  if (selected_io_thread == NULL || load < min_load) {
445  min_load = load;
446  selected_io_thread = io_threads [i];
447  }
448  }
449  }
450  return selected_io_thread;
451 }
452 
453 int zmq::ctx_t::register_endpoint (const char *addr_,
454  const endpoint_t &endpoint_)
455 {
456  endpoints_sync.lock ();
457 
458  const bool inserted = endpoints.insert (
459  endpoints_t::value_type (std::string (addr_), endpoint_)).second;
460 
462 
463  if (!inserted) {
464  errno = EADDRINUSE;
465  return -1;
466  }
467  return 0;
468 }
469 
471  const std::string &addr_, socket_base_t *socket_)
472 {
473  endpoints_sync.lock ();
474 
475  const endpoints_t::iterator it = endpoints.find (addr_);
476  if (it == endpoints.end () || it->second.socket != socket_) {
478  errno = ENOENT;
479  return -1;
480  }
481 
482  // Remove endpoint.
483  endpoints.erase (it);
484 
486 
487  return 0;
488 }
489 
491 {
492  endpoints_sync.lock ();
493 
494  endpoints_t::iterator it = endpoints.begin ();
495  while (it != endpoints.end ()) {
496  if (it->second.socket == socket_) {
497  endpoints_t::iterator to_erase = it;
498  ++it;
499  endpoints.erase (to_erase);
500  continue;
501  }
502  ++it;
503  }
504 
506 }
507 
509 {
510  endpoints_sync.lock ();
511 
512  endpoints_t::iterator it = endpoints.find (addr_);
513  if (it == endpoints.end ()) {
515  errno = ECONNREFUSED;
516  endpoint_t empty = {NULL, options_t()};
517  return empty;
518  }
519  endpoint_t endpoint = it->second;
520 
521  // Increment the command sequence number of the peer so that it won't
522  // get deallocated until "bind" command is issued by the caller.
523  // The subsequent 'bind' has to be called with inc_seqnum parameter
524  // set to false, so that the seqnum isn't incremented twice.
525  endpoint.socket->inc_seqnum ();
526 
528  return endpoint;
529 }
530 
531 void zmq::ctx_t::pend_connection (const std::string &addr_,
532  const endpoint_t &endpoint_, pipe_t **pipes_)
533 {
534  const pending_connection_t pending_connection =
535  {endpoint_, pipes_ [0], pipes_ [1]};
536 
537  endpoints_sync.lock ();
538 
539  endpoints_t::iterator it = endpoints.find (addr_);
540  if (it == endpoints.end ()) {
541  // Still no bind.
542  endpoint_.socket->inc_seqnum ();
543  pending_connections.insert (pending_connections_t::value_type (addr_, pending_connection));
544  }
545  else
546  // Bind has happened in the mean time, connect directly
547  connect_inproc_sockets (it->second.socket, it->second.options, pending_connection, connect_side);
548 
550 }
551 
552 void zmq::ctx_t::connect_pending (const char *addr_, zmq::socket_base_t *bind_socket_)
553 {
554  endpoints_sync.lock ();
555 
556  std::pair<pending_connections_t::iterator, pending_connections_t::iterator> pending = pending_connections.equal_range(addr_);
557 
558  for (pending_connections_t::iterator p = pending.first; p != pending.second; ++p)
559  connect_inproc_sockets(bind_socket_, endpoints[addr_].options, p->second, bind_side);
560 
561  pending_connections.erase(pending.first, pending.second);
563 }
564 
566  options_t& bind_options, const pending_connection_t &pending_connection_, side side_)
567 {
568  bind_socket_->inc_seqnum();
569  pending_connection_.bind_pipe->set_tid (bind_socket_->get_tid ());
570 
571  if (!bind_options.recv_identity) {
572  msg_t msg;
573  const bool ok = pending_connection_.bind_pipe->read (&msg);
574  zmq_assert (ok);
575  const int rc = msg.close ();
576  errno_assert (rc == 0);
577  }
578 
579  bool conflate = pending_connection_.endpoint.options.conflate &&
580  (pending_connection_.endpoint.options.type == ZMQ_DEALER ||
581  pending_connection_.endpoint.options.type == ZMQ_PULL ||
582  pending_connection_.endpoint.options.type == ZMQ_PUSH ||
583  pending_connection_.endpoint.options.type == ZMQ_PUB ||
584  pending_connection_.endpoint.options.type == ZMQ_SUB);
585 
586  if (!conflate) {
587  pending_connection_.connect_pipe->set_hwms_boost(bind_options.sndhwm, bind_options.rcvhwm);
588  pending_connection_.bind_pipe->set_hwms_boost(pending_connection_.endpoint.options.sndhwm, pending_connection_.endpoint.options.rcvhwm);
589 
590  pending_connection_.connect_pipe->set_hwms(pending_connection_.endpoint.options.rcvhwm, pending_connection_.endpoint.options.sndhwm);
591  pending_connection_.bind_pipe->set_hwms(bind_options.rcvhwm, bind_options.sndhwm);
592  }
593  else {
594  pending_connection_.connect_pipe->set_hwms(-1, -1);
595  pending_connection_.bind_pipe->set_hwms(-1, -1);
596  }
597 
598  if (side_ == bind_side) {
599  command_t cmd;
600  cmd.type = command_t::bind;
601  cmd.args.bind.pipe = pending_connection_.bind_pipe;
602  bind_socket_->process_command (cmd);
603  bind_socket_->send_inproc_connected (pending_connection_.endpoint.socket);
604  }
605  else
606  pending_connection_.connect_pipe->send_bind (bind_socket_, pending_connection_.bind_pipe, false);
607 
608  if (pending_connection_.endpoint.options.recv_identity) {
609  msg_t id;
610  int rc = id.init_size (bind_options.identity_size);
611  errno_assert (rc == 0);
612  memcpy (id.data (), bind_options.identity, bind_options.identity_size);
613  id.set_flags (msg_t::identity);
614  bool written = pending_connection_.bind_pipe->write (&id);
615  zmq_assert (written);
616  pending_connection_.bind_pipe->flush ();
617  }
618 }
619 
620 #ifdef ZMQ_HAVE_VMCI
621 
622 int zmq::ctx_t::get_vmci_socket_family ()
623 {
624  vmci_sync.lock ();
625 
626  if (vmci_fd == -1) {
627  vmci_family = VMCISock_GetAFValueFd (&vmci_fd);
628 
629  if (vmci_fd != -1) {
630 #ifdef FD_CLOEXEC
631  int rc = fcntl (vmci_fd, F_SETFD, FD_CLOEXEC);
632  errno_assert (rc != -1);
633 #endif
634  }
635  }
636 
637  vmci_sync.unlock ();
638 
639  return vmci_family;
640 }
641 
642 #endif
643 
644 // The last used socket ID, or 0 if no socket was used so far. Note that this
645 // is a global variable. Thus, even sockets created in different contexts have
646 // unique IDs.
endpoint_t find_endpoint(const char *addr_)
Definition: ctx.cpp:508
int unregister_endpoint(const std::string &addr_, socket_base_t *socket_)
Definition: ctx.cpp:470
void destroy_socket(zmq::socket_base_t *socket_)
Definition: ctx.cpp:397
bool read(msg_t *msg_)
Definition: pipe.cpp:169
#define ZMQ_THREAD_PRIORITY_DFLT
Definition: zmq.h:199
#define LIBZMQ_DELETE(p_object)
Definition: macros.hpp:7
void process_command(zmq::command_t &cmd_)
Definition: object.cpp:73
int close()
Definition: msg.cpp:217
i_mailbox ** slots
Definition: ctx.hpp:180
size_type size()
Definition: array.hpp:103
#define ZMQ_MAX_SOCKETS_DFLT
Definition: zmq.h:198
bool write(msg_t *msg_)
Definition: pipe.cpp:221
std::vector< socket_base_t * >::size_type size_type
Definition: array.hpp:93
#define ZMQ_THREAD_SCHED_POLICY_DFLT
Definition: zmq.h:200
bool ipv6
Definition: ctx.hpp:212
#define ZMQ_DEALER
Definition: zmq.h:251
#define zmq_assert(x)
Definition: err.hpp:119
#define ZMQ_MAX_SOCKETS
Definition: zmq.h:190
#define ZMQ_SUB
Definition: zmq.h:248
zmq::reaper_t * reaper
Definition: ctx.hpp:172
void set_hwms(int inhwm_, int outhwm_)
Definition: pipe.cpp:509
bool recv_identity
Definition: options.hpp:146
options_t options
Definition: ctx.hpp:62
void pend_connection(const std::string &addr_, const endpoint_t &endpoint_, pipe_t **pipes_)
Definition: ctx.cpp:531
uint32_t tag
Definition: ctx.hpp:146
#define ZMQ_MAX_MSGSZ
Definition: zmq.h:194
void set_tid(uint32_t id)
Definition: object.cpp:63
int bind(const char *addr_)
#define ZMQ_IO_THREADS_DFLT
Definition: zmq.h:197
#define EADDRINUSE
Definition: zmq.h:124
#define ZMQ_PUB
Definition: zmq.h:247
void push_back(T *item_)
Definition: array.hpp:118
int thread_priority
Definition: ctx.hpp:215
bool starting
Definition: ctx.hpp:160
zmq::socket_base_t * create_socket(int type_)
Definition: ctx.cpp:320
#define ZMQ_IPV6
Definition: zmq.h:295
void start(thread_fn *tfn_, void *arg_)
Definition: thread.cpp:106
int io_thread_count
Definition: ctx.hpp:206
int set(int option_, int optval_)
Definition: ctx.cpp:241
static atomic_counter_t max_socket_id
Definition: ctx.hpp:197
#define ZMQ_PUSH
Definition: zmq.h:254
int recv(command_t *cmd_, int timeout_)
Definition: mailbox.cpp:69
uint32_t slot_count
Definition: ctx.hpp:179
#define ETERM
Definition: zmq.h:169
void inc_seqnum()
Definition: own.cpp:66
ctx_t()
Definition: ctx.cpp:74
union zmq::command_t::args_t args
int init_size(size_t size_)
Definition: msg.cpp:93
mutex_t opt_sync
Definition: ctx.hpp:219
void erase(T *item_)
Definition: array.hpp:125
bool check_tag()
Definition: ctx.cpp:109
void connect_inproc_sockets(zmq::socket_base_t *bind_socket_, options_t &bind_options, const pending_connection_t &pending_connection_, side side_)
Definition: ctx.cpp:565
struct zmq::command_t::args_t::@5 bind
#define ZMQ_SOCKET_LIMIT
Definition: zmq.h:191
int clipped_maxsocket(int max_requested)
Definition: ctx.cpp:65
#define ZMQ_THREAD_PRIORITY
Definition: zmq.h:192
sockets_t sockets
Definition: ctx.hpp:152
void setSchedulingParameters(int priority_, int schedulingPolicy_)
Definition: thread.cpp:120
void set_hwms_boost(int inhwmboost_, int outhwmboost_)
Definition: pipe.cpp:525
#define ZMQ_BLOCKY
Definition: zmq.h:321
~ctx_t()
Definition: ctx.cpp:114
void start()
Definition: reaper.cpp:65
std::multimap< std::string, pending_connection_t > pending_connections_t
Definition: ctx.hpp:190
bool blocky
Definition: ctx.hpp:209
#define unlikely(x)
Definition: likely.hpp:38
void send_bind(zmq::own_t *destination_, zmq::pipe_t *pipe_, bool inc_seqnum_=true)
Definition: object.cpp:238
unsigned char identity[256]
Definition: options.hpp:74
mailbox_t * get_mailbox()
Definition: reaper.cpp:60
int max_sockets
Definition: ctx.hpp:200
void start_thread(thread_t &thread_, thread_fn *tfn_, void *arg_) const
Definition: ctx.cpp:422
io_threads_t io_threads
Definition: ctx.hpp:176
void flush()
Definition: pipe.cpp:248
#define ECONNREFUSED
Definition: zmq.h:130
void send_inproc_connected(zmq::socket_base_t *socket_)
Definition: object.cpp:338
void send_command(uint32_t tid_, const command_t &command_)
Definition: ctx.cpp:428
static socket_base_t * create(int type_, zmq::ctx_t *parent_, uint32_t tid_, int sid_)
void unlock()
Definition: mutex.hpp:136
integer_t add(integer_t increment_)
void stop()
Definition: reaper.cpp:71
#define ZMQ_THREAD_SCHED_POLICY
Definition: zmq.h:193
int get(int option_)
Definition: ctx.cpp:293
int terminate()
Definition: ctx.cpp:148
zmq::io_thread_t * choose_io_thread(uint64_t affinity_)
Definition: ctx.cpp:433
enum zmq::command_t::type_t type
void lock()
Definition: mutex.hpp:120
virtual void send(const command_t &cmd_)=0
mutex_t slot_sync
Definition: ctx.hpp:169
int max_msgsz
Definition: ctx.hpp:203
empty_slots_t empty_slots
Definition: ctx.hpp:156
mutex_t endpoints_sync
Definition: ctx.hpp:194
#define ZMQ_CTX_TAG_VALUE_BAD
Definition: ctx.cpp:63
#define alloc_assert(x)
Definition: err.hpp:159
#define errno_assert(x)
Definition: err.hpp:129
#define ZMQ_IO_THREADS
Definition: zmq.h:189
zmq::pipe_t * pipe
Definition: command.hpp:102
endpoints_t endpoints
Definition: ctx.hpp:187
pending_connections_t pending_connections
Definition: ctx.hpp:191
#define ZMQ_PAIR
Definition: zmq.h:246
socket_base_t * socket
Definition: ctx.hpp:61
void( thread_fn)(void *)
Definition: thread.hpp:44
int thread_sched_policy
Definition: ctx.hpp:216
void connect_pending(const char *addr_, zmq::socket_base_t *bind_socket_)
Definition: ctx.cpp:552
int register_endpoint(const char *addr_, const endpoint_t &endpoint_)
Definition: ctx.cpp:453
int shutdown()
Definition: ctx.cpp:222
i_mailbox * get_mailbox()
bool empty()
Definition: array.hpp:108
mutex_t crypto_sync
Definition: ctx.hpp:237
unsigned char identity_size
Definition: options.hpp:73
zmq::object_t * get_reaper()
Definition: ctx.cpp:417
mailbox_t term_mailbox
Definition: ctx.hpp:183
Definition: command.hpp:80
mailbox_t * get_mailbox()
Definition: io_thread.cpp:65
bool terminating
Definition: ctx.hpp:163
uint32_t get_tid()
Definition: object.cpp:58
#define ZMQ_PULL
Definition: zmq.h:253
void unregister_endpoints(zmq::socket_base_t *socket_)
Definition: ctx.cpp:490
#define ZMQ_CTX_TAG_VALUE_GOOD
Definition: ctx.cpp:62