32 #include "platform.hpp" 33 #ifdef ZMQ_HAVE_WINDOWS 52 #if defined (ZMQ_USE_TWEETNACL) 54 #elif defined (ZMQ_USE_LIBSODIUM) 59 #include <vmci_sockets.h> 62 #define ZMQ_CTX_TAG_VALUE_GOOD 0xabadcafe 63 #define ZMQ_CTX_TAG_VALUE_BAD 0xdeadbeef 67 if (max_requested >= zmq::poller_t::max_fds () && zmq::poller_t::max_fds () != -1)
69 max_requested = zmq::poller_t::max_fds () - 1;
98 #if defined (ZMQ_USE_TWEETNACL) 100 unsigned char tmpbytes[4];
101 randombytes(tmpbytes, 4);
102 #elif defined (ZMQ_USE_SODIUM) 103 int rc = sodium_init ();
121 for (io_threads_t::size_type i = 0; i !=
io_threads.size (); i++) {
126 for (io_threads_t::size_type i = 0; i !=
io_threads.size (); i++) {
140 #ifdef ZMQ_HAVE_CURVE 141 randombytes_close ();
157 for (pending_connections_t::iterator p = copy.begin (); p != copy.end (); ++p) {
159 s->
bind (p->first.c_str ());
167 if (pid != getpid ()) {
171 sockets [i]->get_mailbox ()->forked ();
197 if (rc == -1 && errno == EINTR)
209 VMCISock_ReleaseAFValueFd (vmci_fd);
257 if (option_ ==
ZMQ_IPV6 && optval_ >= 0) {
259 ipv6 = (optval_ != 0);
283 max_msgsz = optval_ < INT_MAX? optval_: INT_MAX;
346 for (
int i = 2; i != ios + 2; i++) {
356 i >= (int32_t) ios + 2; i--) {
402 uint32_t tid = socket_->
get_tid ();
424 thread_.
start(tfn_, arg_);
441 for (io_threads_t::size_type i = 0; i !=
io_threads.size (); i++) {
442 if (!affinity_ || (affinity_ & (uint64_t (1) << i))) {
444 if (selected_io_thread == NULL || load < min_load) {
450 return selected_io_thread;
459 endpoints_t::value_type (std::string (addr_), endpoint_)).second;
475 const endpoints_t::iterator it =
endpoints.find (addr_);
476 if (it ==
endpoints.end () || it->second.socket != socket_) {
494 endpoints_t::iterator it =
endpoints.begin ();
496 if (it->second.socket == socket_) {
497 endpoints_t::iterator to_erase = it;
512 endpoints_t::iterator it =
endpoints.find (addr_);
535 {endpoint_, pipes_ [0], pipes_ [1]};
539 endpoints_t::iterator it =
endpoints.find (addr_);
543 pending_connections.insert (pending_connections_t::value_type (addr_, pending_connection));
556 std::pair<pending_connections_t::iterator, pending_connections_t::iterator> pending =
pending_connections.equal_range(addr_);
558 for (pending_connections_t::iterator p = pending.first; p != pending.second; ++p)
575 const int rc = msg.
close ();
622 int zmq::ctx_t::get_vmci_socket_family ()
627 vmci_family = VMCISock_GetAFValueFd (&vmci_fd);
631 int rc = fcntl (vmci_fd, F_SETFD, FD_CLOEXEC);
endpoint_t find_endpoint(const char *addr_)
int unregister_endpoint(const std::string &addr_, socket_base_t *socket_)
void destroy_socket(zmq::socket_base_t *socket_)
#define ZMQ_THREAD_PRIORITY_DFLT
#define LIBZMQ_DELETE(p_object)
void process_command(zmq::command_t &cmd_)
#define ZMQ_MAX_SOCKETS_DFLT
std::vector< socket_base_t * >::size_type size_type
#define ZMQ_THREAD_SCHED_POLICY_DFLT
void set_hwms(int inhwm_, int outhwm_)
void pend_connection(const std::string &addr_, const endpoint_t &endpoint_, pipe_t **pipes_)
void set_tid(uint32_t id)
int bind(const char *addr_)
#define ZMQ_IO_THREADS_DFLT
zmq::socket_base_t * create_socket(int type_)
void start(thread_fn *tfn_, void *arg_)
int set(int option_, int optval_)
static atomic_counter_t max_socket_id
int recv(command_t *cmd_, int timeout_)
union zmq::command_t::args_t args
int init_size(size_t size_)
void connect_inproc_sockets(zmq::socket_base_t *bind_socket_, options_t &bind_options, const pending_connection_t &pending_connection_, side side_)
struct zmq::command_t::args_t::@5 bind
int clipped_maxsocket(int max_requested)
#define ZMQ_THREAD_PRIORITY
void setSchedulingParameters(int priority_, int schedulingPolicy_)
void set_hwms_boost(int inhwmboost_, int outhwmboost_)
std::multimap< std::string, pending_connection_t > pending_connections_t
void send_bind(zmq::own_t *destination_, zmq::pipe_t *pipe_, bool inc_seqnum_=true)
unsigned char identity[256]
mailbox_t * get_mailbox()
void start_thread(thread_t &thread_, thread_fn *tfn_, void *arg_) const
void send_inproc_connected(zmq::socket_base_t *socket_)
void send_command(uint32_t tid_, const command_t &command_)
static socket_base_t * create(int type_, zmq::ctx_t *parent_, uint32_t tid_, int sid_)
integer_t add(integer_t increment_)
#define ZMQ_THREAD_SCHED_POLICY
zmq::io_thread_t * choose_io_thread(uint64_t affinity_)
enum zmq::command_t::type_t type
virtual void send(const command_t &cmd_)=0
empty_slots_t empty_slots
#define ZMQ_CTX_TAG_VALUE_BAD
pending_connections_t pending_connections
void connect_pending(const char *addr_, zmq::socket_base_t *bind_socket_)
int register_endpoint(const char *addr_, const endpoint_t &endpoint_)
i_mailbox * get_mailbox()
unsigned char identity_size
zmq::object_t * get_reaper()
mailbox_t * get_mailbox()
void unregister_endpoints(zmq::socket_base_t *socket_)
#define ZMQ_CTX_TAG_VALUE_GOOD