36 #include "platform.hpp" 38 #if defined ZMQ_HAVE_WINDOWS 41 #if defined _WIN32_WCE 42 #include <cmnintrin.h> 63 #include "platform.hpp" 74 #if defined ZMQ_HAVE_VMCI 79 #ifdef ZMQ_HAVE_OPENPGM 102 #define ENTER_MUTEX() \ 106 #define EXIT_MUTEX(); \ 112 return tag == 0xbaddecaf;
116 uint32_t tid_,
int sid_)
121 s =
new (std::nothrow)
pair_t (parent_, tid_, sid_);
124 s =
new (std::nothrow)
pub_t (parent_, tid_, sid_);
127 s =
new (std::nothrow)
sub_t (parent_, tid_, sid_);
130 s =
new (std::nothrow)
req_t (parent_, tid_, sid_);
133 s =
new (std::nothrow)
rep_t (parent_, tid_, sid_);
136 s =
new (std::nothrow)
dealer_t (parent_, tid_, sid_);
139 s =
new (std::nothrow)
router_t (parent_, tid_, sid_);
142 s =
new (std::nothrow)
pull_t (parent_, tid_, sid_);
145 s =
new (std::nothrow)
push_t (parent_, tid_, sid_);
148 s =
new (std::nothrow)
xpub_t (parent_, tid_, sid_);
151 s =
new (std::nothrow)
xsub_t (parent_, tid_, sid_);
154 s =
new (std::nothrow)
stream_t (parent_, tid_, sid_);
157 s =
new (std::nothrow)
server_t (parent_, tid_, sid_);
160 s =
new (std::nothrow)
client_t (parent_, tid_, sid_);
163 s =
new (std::nothrow)
radio_t (parent_, tid_, sid_);
166 s =
new (std::nothrow)
dish_t (parent_, tid_, sid_);
169 s =
new (std::nothrow)
gather_t (parent_, tid_, sid_);
172 s =
new (std::nothrow)
scatter_t (parent_, tid_, sid_);
191 own_t (parent_, tid_),
249 std::string &protocol_, std::string &address_)
253 std::string uri (uri_);
254 std::string::size_type pos = uri.find (
"://");
255 if (pos == std::string::npos) {
259 protocol_ = uri.substr (0, pos);
260 address_ = uri.substr (pos + 3);
262 if (protocol_.empty () || address_.empty ()) {
272 if (protocol_ !=
"inproc" 273 #
if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
274 && protocol_ !=
"ipc" 276 && protocol_ !=
"tcp" 277 #
if defined ZMQ_HAVE_OPENPGM
279 && protocol_ !=
"pgm" 280 && protocol_ !=
"epgm" 282 #
if defined ZMQ_HAVE_TIPC
284 && protocol_ !=
"tipc" 286 #
if defined ZMQ_HAVE_NORM
287 && protocol_ !=
"norm" 289 #
if defined ZMQ_HAVE_VMCI
290 && protocol_ !=
"vmci" 292 && protocol_ !=
"udp") {
300 #if defined ZMQ_HAVE_OPENPGM || defined ZMQ_HAVE_NORM 301 if ((protocol_ ==
"pgm" || protocol_ ==
"epgm" || protocol_ ==
"norm") &&
354 int rc =
xsetsockopt (option_, optval_, optvallen_);
355 if (rc == 0 || errno != EINVAL) {
381 if (*optvallen_ <
sizeof (
int)) {
386 memset(optval_, 0, *optvallen_);
387 *((
int*) optval_) =
rcvmore ? 1 : 0;
388 *optvallen_ =
sizeof (
int);
394 if (*optvallen_ <
sizeof (
fd_t)) {
408 *optvallen_ =
sizeof(
fd_t);
415 if (*optvallen_ <
sizeof (
int)) {
421 if (rc != 0 && (errno == EINTR || errno ==
ETERM)) {
426 *((
int*) optval_) = 0;
431 *optvallen_ =
sizeof (
int);
449 if (*optvallen_ <
sizeof (
int)) {
454 memset(optval_, 0, *optvallen_);
456 *optvallen_ =
sizeof (
int);
470 int rc =
xjoin (group_);
538 std::string protocol;
545 if (protocol ==
"inproc") {
557 if (protocol ==
"pgm" || protocol ==
"epgm" || protocol ==
"norm" || protocol ==
"udp") {
576 if (protocol ==
"tcp") {
597 #if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS 598 if (protocol ==
"ipc") {
619 #if defined ZMQ_HAVE_TIPC 620 if (protocol ==
"tipc") {
621 tipc_listener_t *listener =
new (std::nothrow) tipc_listener_t (
624 int rc = listener->set_address (address.c_str ());
641 #if defined ZMQ_HAVE_VMCI 642 if (protocol ==
"vmci") {
643 vmci_listener_t *listener =
new (std::nothrow) vmci_listener_t (
646 int rc = listener->set_address (address.c_str ());
686 std::string protocol;
693 if (protocol ==
"inproc") {
718 pipe_t *new_pipes [2] = {NULL, NULL};
727 int hwms [2] = {conflate? -1 : sndhwm, conflate? -1 : rcvhwm};
728 bool conflates [2] = {conflate, conflate};
729 rc =
pipepair (parents, new_pipes, hwms, conflates);
747 bool written = new_pipes [0]->
write (&
id);
749 new_pipes [0]->
flush ();
762 bool written = new_pipes [0]->
write (&
id);
764 new_pipes [0]->
flush ();
774 bool written = new_pipes [1]->
write (&
id);
776 new_pipes [1]->
flush ();
792 inprocs.insert (inprocs_t::value_type (std::string (addr_), new_pipes [0]));
802 const endpoints_t::iterator it =
endpoints.find (addr_);
824 if (protocol ==
"tcp") {
835 const char *check = address.c_str ();
836 if (isalnum (*check) || isxdigit (*check) || *check ==
'[') {
838 while (isalnum (*check)
840 || *check ==
'.' || *check ==
'-' || *check ==
':' || *check ==
'%' 841 || *check ==
';' || *check ==
']' || *check ==
'_' 851 check = strrchr (address.c_str (),
':');
854 if (*check && (isdigit (*check)))
867 #if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS 869 if (protocol ==
"ipc") {
881 if (protocol ==
"udp") {
894 #ifdef ZMQ_HAVE_OPENPGM 895 if (protocol ==
"pgm" || protocol ==
"epgm") {
896 struct pgm_addrinfo_t *res = NULL;
897 uint16_t port_number = 0;
898 int rc = pgm_socket_t::init_address(address.c_str(), &res, &port_number);
900 pgm_freeaddrinfo (res);
901 if (rc != 0 || port_number == 0) {
907 #if defined ZMQ_HAVE_TIPC 909 if (protocol ==
"tipc") {
910 paddr->
resolved.tipc_addr =
new (std::nothrow) tipc_address_t ();
912 int rc = paddr->
resolved.tipc_addr->resolve (address.c_str());
920 #if defined ZMQ_HAVE_VMCI 922 if (protocol ==
"vmci") {
923 paddr->
resolved.vmci_addr =
new (std::nothrow) vmci_address_t (this->
get_ctx ());
925 int rc = paddr->
resolved.vmci_addr->resolve (address.c_str ());
941 bool subscribe_to_all = protocol ==
"pgm" || protocol ==
"epgm" || protocol ==
"norm" || protocol ==
"udp";
946 object_t *parents [2] = {
this, session};
947 pipe_t *new_pipes [2] = {NULL, NULL};
958 bool conflates [2] = {conflate, conflate};
959 rc =
pipepair (parents, new_pipes, hwms, conflates);
964 newpipe = new_pipes [0];
1012 std::string protocol;
1020 if (protocol ==
"inproc") {
1025 std::pair <inprocs_t::iterator, inprocs_t::iterator> range =
inprocs.equal_range (std::string (addr_));
1026 if (range.first == range.second) {
1032 for (inprocs_t::iterator it = range.first; it != range.second; ++it)
1033 it->second->terminate (
true);
1034 inprocs.erase (range.first, range.second);
1039 std::string resolved_addr = std::string (addr_);
1040 std::pair <endpoints_t::iterator, endpoints_t::iterator> range;
1047 if (protocol ==
"tcp") {
1048 range =
endpoints.equal_range (resolved_addr);
1049 if (range.first == range.second) {
1056 range =
endpoints.equal_range (resolved_addr);
1058 if (range.first == range.second) {
1070 range =
endpoints.equal_range (resolved_addr);
1071 if (range.first == range.second) {
1077 for (endpoints_t::iterator it = range.first; it != range.second; ++it) {
1079 if (it->second.second != NULL)
1080 it->second.second->terminate (
false);
1083 endpoints.erase (range.first, range.second);
1143 uint64_t end = timeout < 0 ? 0 : (
clock.
now_ms () + timeout);
1209 int rc =
xrecv (msg_);
1210 if (
unlikely (rc != 0 && errno != EAGAIN)) {
1247 uint64_t end = timeout < 0 ? 0 : (
clock.
now_ms () + timeout);
1251 bool block = (
ticks != 0);
1350 if (timeout_ != 0) {
1369 if (tsc && throttle_) {
1582 for (inprocs_t::iterator it =
inprocs.begin (); it !=
inprocs.end (); ++it)
1583 if (it->second == pipe_) {
1612 if (addr_ == NULL) {
1617 std::string protocol;
1623 if (protocol !=
"inproc") {
1665 monitor_event (ZMQ_EVENT_CONNECT_RETRIED, interval_, addr_);
1719 uint16_t
event = (uint16_t) event_;
1720 uint32_t value = (uint32_t) value_;
1721 memcpy (data + 0, &event,
sizeof(event));
1722 memcpy (data + 2, &value,
sizeof(value));
1727 memcpy (
zmq_msg_data (&msg), addr_.c_str (), addr_.size ());
#define ZMQ_EVENT_CONNECT_DELAYED
void pend_connection(const std::string &addr_, const endpoint_t &endpoint, pipe_t **pipes_)
void pipe_terminated(pipe_t *pipe_)
int process_commands(int timeout_, bool throttle_)
static session_base_t * create(zmq::io_thread_t *io_thread_, bool active_, zmq::socket_base_t *socket_, const options_t &options_, address_t *addr_)
virtual int to_string(std::string &addr_)
#define LIBZMQ_DELETE(p_object)
void process_command(zmq::command_t &cmd_)
ZMQ_EXPORT int zmq_setsockopt(void *s, int option, const void *optval, size_t optvallen)
#define ZMQ_EVENT_BIND_FAILED
#define ZMQ_LAST_ENDPOINT
void process_term(int linger_)
virtual void xpipe_terminated(pipe_t *pipe_)=0
void event_accept_failed(const std::string &addr_, int err_)
#define ZMQ_EVENT_CLOSE_FAILED
virtual void xhiccuped(pipe_t *pipe_)
std::vector< pipe_t * >::size_type size_type
virtual void xattach_pipe(zmq::pipe_t *pipe_, bool subscribe_to_all_=false)=0
void attach_pipe(zmq::pipe_t *pipe_)
#define ZMQ_EVENT_CONNECTED
int pipepair(zmq::object_t *parents_[2], zmq::pipe_t *pipes_[2], int hwms_[2], bool conflate_[2])
void monitor_event(int event_, int value_, const std::string &addr_)
int get_address(std::string &addr_)
int join(const char *group)
void write_activated(pipe_t *pipe_)
void start_reaping(poller_t *poller_)
int add_signaler(signaler_t *s)
virtual blob_t get_credential() const
int setsockopt(int option_, const void *optval_, size_t optvallen_)
int getsockopt(int option_, void *optval_, size_t *optvallen_) const
virtual void process_destroy()
zmq::io_thread_t * choose_io_thread(uint64_t affinity_)
void attach_pipe(zmq::pipe_t *pipe_, bool subscribe_to_all_=false)
void event_listening(const std::string &addr_, int fd_)
int resolve(const char *path_)
int bind(const char *addr_)
void register_term_acks(int count_)
ZMQ_EXPORT int zmq_errno(void)
virtual int recv(command_t *cmd_, int timeout_)=0
int term_endpoint(const char *addr_)
void event_connected(const std::string &addr_, int fd_)
int unregister_endpoint(const std::string &addr_, socket_base_t *socket_)
int get_address(std::string &addr_)
int set_address(const char *addr_)
virtual void xwrite_activated(pipe_t *pipe_)
int register_endpoint(const char *addr_, const zmq::endpoint_t &endpoint_)
ZMQ_EXPORT void * zmq_msg_data(zmq_msg_t *msg)
ZMQ_EXPORT void * zmq_socket(void *, int type)
signaler_t * reaper_signaler
void destroy_socket(zmq::socket_base_t *socket_)
virtual void xread_activated(pipe_t *pipe_)
void reset_flags(unsigned char flags_)
#define ZMQ_EVENT_CONNECT_RETRIED
void event_close_failed(const std::string &addr_, int fd_)
int resolve(const char *name_, bool local_, bool ipv6_, bool is_src_=false)
int check_protocol(const std::string &protocol_)
int set_address(const char *addr_)
void unregister_endpoints(zmq::socket_base_t *socket_)
int getsockopt(int option_, void *optval_, size_t *optvallen_)
void event_connect_delayed(const std::string &addr_, int err_)
void connect_pending(const char *addr_, zmq::socket_base_t *bind_socket_)
int send(zmq::msg_t *msg_, int flags_)
#define LIBZMQ_UNUSED(object)
void set_hwms_boost(int inhwmboost_, int outhwmboost_)
void stop_monitor(bool send_monitor_stopped_event_=true)
#define ZMQ_EVENT_DISCONNECTED
void update_pipe_options(int option_)
void send_bind(zmq::own_t *destination_, zmq::pipe_t *pipe_, bool inc_seqnum_=true)
unsigned char identity[256]
std::basic_string< unsigned char > blob_t
zmq::object_t * destination
ZMQ_EXPORT int zmq_close(void *s)
int monitor(const char *endpoint_, int events_)
ZMQ_EXPORT int zmq_sendmsg(void *s, zmq_msg_t *msg, int flags)
virtual int xleave(const char *group_)
void timer_event(int id_)
static socket_base_t * create(int type_, zmq::ctx_t *parent_, uint32_t tid_, int sid_)
void set_event_sink(i_pipe_events *sink_)
void event_closed(const std::string &addr_, int fd_)
void term_child(own_t *object_)
virtual int xsend(zmq::msg_t *msg_)
#define ZMQ_EVENT_ACCEPTED
socket_base_t(zmq::ctx_t *parent_, uint32_t tid_, int sid_, bool thread_safe_=false)
void event_connect_retried(const std::string &addr_, int interval_)
void set_flags(unsigned char flags_)
void hiccuped(pipe_t *pipe_)
#define ZMQ_EVENT_LISTENING
void terminate(bool delay_)
void launch_child(own_t *object_)
bool is_valid(int option_) const
void extract_flags(msg_t *msg_)
ZMQ_EXPORT int zmq_bind(void *s, const char *addr)
void event_disconnected(const std::string &addr_, int fd_)
#define ZMQ_EVENT_MONITOR_STOPPED
void event_bind_failed(const std::string &addr_, int err_)
void process_term(int linger_)
int remove_signaler(signaler_t *s)
int resolve(const char *name_, bool receiver_)
std::pair< own_t *, pipe_t * > endpoint_pipe_t
void add_endpoint(const char *addr_, own_t *endpoint_, pipe_t *pipe)
int setsockopt(int option_, const void *optval_, size_t optvallen_)
int leave(const char *group)
virtual int xrecv(zmq::msg_t *msg_)
void send_reap(zmq::socket_base_t *socket_)
ZMQ_EXPORT int zmq_msg_init_size(zmq_msg_t *msg, size_t size)
i_mailbox * get_mailbox()
std::string last_endpoint
void process_bind(zmq::pipe_t *pipe_)
poller_t::handle_t handle
int parse_uri(const char *uri_, std::string &protocol_, std::string &address_)
void event_accepted(const std::string &addr_, int fd_)
unsigned char identity_size
union zmq::address_t::@0 resolved
int to_string(std::string &addr_) const
zmq::endpoint_t find_endpoint(const char *addr_)
void read_activated(pipe_t *pipe_)
void unregister_term_ack()
#define ZMQ_EVENT_ACCEPT_FAILED
int recv(zmq::msg_t *msg_, int flags_)
virtual int xjoin(const char *group_)
int connect(const char *addr_)
virtual int xsetsockopt(int option_, const void *optval_, size_t optvallen_)