30 #define ZMQ_TYPE_UNSAFE 39 #if defined ZMQ_POLL_BASED_ON_POLL 44 #include "../include/zmq.h" 46 #if defined ZMQ_HAVE_WINDOWS 54 #if defined ZMQ_HAVE_UIO 84 #if defined ZMQ_HAVE_OPENPGM 85 #define __PGM_WININT_H__ 117 #if defined ZMQ_HAVE_OPENPGM 123 pgm_error_t *pgm_error = NULL;
124 const bool ok = pgm_init (&pgm_error);
129 if (pgm_error->domain == PGM_ERROR_DOMAIN_TIME && (
130 pgm_error->code == PGM_ERROR_FAILED)) {
133 pgm_error_free (pgm_error);
143 #ifdef ZMQ_HAVE_WINDOWS 148 WORD version_requested = MAKEWORD (2, 2);
150 int rc = WSAStartup (version_requested, &wsa_data);
152 zmq_assert (LOBYTE (wsa_data.wVersion) == 2 &&
153 HIBYTE (wsa_data.wVersion) == 2);
164 if (!ctx_ || !((
zmq::ctx_t *) ctx_)->check_tag ()) {
173 if (!rc || en != EINTR) {
174 #ifdef ZMQ_HAVE_WINDOWS 177 wsa_assert (rc != SOCKET_ERROR);
180 #if defined ZMQ_HAVE_OPENPGM 182 if (pgm_shutdown () != TRUE)
193 if (!ctx_ || !((
zmq::ctx_t *) ctx_)->check_tag ()) {
202 if (!ctx_ || !((
zmq::ctx_t *) ctx_)->check_tag ()) {
206 return ((
zmq::ctx_t *) ctx_)->set (option_, optval_);
211 if (!ctx_ || !((
zmq::ctx_t *) ctx_)->check_tag ()) {
222 if (io_threads_ >= 0) {
246 if (!ctx_ || !((
zmq::ctx_t *) ctx_)->check_tag ()) {
273 int result = s->
setsockopt (option_, optval_, optvallen_);
284 int result = s->
getsockopt (option_, optval_, optvallen_);
295 int result = s->
monitor (addr_, events_);
306 int result = s->
join (group_);
317 int result = s->
leave (group_);
328 int result = s->
bind (addr_);
339 int result = s->
connect (addr_);
375 size_t max_msgsz = INT_MAX;
378 return (
int) (sz < max_msgsz? sz: max_msgsz);
387 int zmq_send (
void *s_,
const void *buf_,
size_t len_,
int flags_)
455 if (
unlikely (count_ <= 0 || !a_)) {
464 for (
size_t i = 0; i < count_; ++i) {
497 return (
int) (sz < INT_MAX? sz: INT_MAX);
507 int zmq_recv (
void *s_,
void *buf_,
size_t len_,
int flags_)
518 int nbytes =
s_recvmsg (s, &msg, flags_);
528 size_t to_copy = size_t (nbytes) < len_ ? size_t (nbytes) : len_;
563 if (
unlikely (!count_ || *count_ <= 0 || !a_)) {
570 size_t count = *count_;
572 bool recvmore =
true;
576 for (
size_t i = 0; recvmore && i < count; ++i) {
582 int nbytes =
s_recvmsg (s, &msg, flags_);
598 memcpy(a_[i].iov_base,static_cast<char *> (
zmq_msg_data (&msg)),
620 return ((
zmq::msg_t*) msg_)->init_size (size_);
626 return ((
zmq::msg_t*) msg_)->init_data (data_, size_, ffn_, hint_);
636 int result =
s_sendmsg (s, msg_, flags_);
647 int result =
s_recvmsg (s, msg_, flags_);
683 const char* fd_string;
690 if (fd_string == NULL)
693 return atoi(fd_string);
712 return ((
zmq::msg_t *) msg_)->set_routing_id (routing_id_);
717 return ((
zmq::msg_t *) msg_)->get_routing_id ();
722 return ((
zmq::msg_t *) msg_)->set_group (group_);
735 const char *value = NULL;
737 value = metadata->
get (std::string (property_));
752 #if defined ZMQ_POLL_BASED_ON_POLL 760 #if defined ZMQ_HAVE_WINDOWS 761 Sleep (timeout_ > 0 ? timeout_ : INFINITE);
763 #elif defined ZMQ_HAVE_ANDROID 764 usleep (timeout_ * 1000);
767 return usleep (timeout_ * 1000);
780 pollfd *pollfds = spollfds;
783 pollfds = (pollfd*) malloc (nitems_ *
sizeof (pollfd));
788 for (
int i = 0; i != nitems_; i++) {
792 if (items_ [i].socket) {
795 &zmq_fd_size) == -1) {
796 if (pollfds != spollfds)
800 pollfds [i].events = items_ [i].
events ? POLLIN : 0;
805 pollfds [i].fd = items_ [i].
fd;
813 bool first_pass =
true;
829 int rc = poll (pollfds, nitems_, timeout);
830 if (rc == -1 && errno == EINTR) {
831 if (pollfds != spollfds)
839 for (
int i = 0; i != nitems_; i++) {
845 if (items_ [i].socket) {
846 size_t zmq_events_size =
sizeof (uint32_t);
849 &zmq_events_size) == -1) {
850 if (pollfds != spollfds)
855 (zmq_events & ZMQ_POLLOUT))
858 (zmq_events & ZMQ_POLLIN))
864 if (pollfds [i].revents & POLLIN)
866 if (pollfds [i].revents & POLLOUT)
868 if (pollfds [i].revents & POLLPRI)
870 if (pollfds [i].revents & ~(POLLIN | POLLOUT | POLLPRI))
874 if (items_ [i].revents)
900 end = now + timeout_;
913 if (pollfds != spollfds)
917 #elif defined ZMQ_POLL_BASED_ON_SELECT 926 #if defined ZMQ_HAVE_WINDOWS 927 Sleep (timeout_ > 0 ? timeout_ : INFINITE);
930 return usleep (timeout_ * 1000);
941 fd_set pollset_in = { 0 };
942 fd_set pollset_out = { 0 };
943 fd_set pollset_err = { 0 };
948 for (
int i = 0; i != nitems_; i++) {
952 if (items_ [i].socket) {
958 if (items_ [i].events) {
959 FD_SET (notify_fd, &pollset_in);
960 if (maxfd < notify_fd)
968 FD_SET (items_ [i].fd, &pollset_in);
970 FD_SET (items_ [i].fd, &pollset_out);
972 FD_SET (items_ [i].fd, &pollset_err);
973 if (maxfd < items_ [i].fd)
974 maxfd = items_ [i].
fd;
978 bool first_pass =
true;
980 fd_set inset, outset, errset;
996 timeout.tv_sec = (long) ((end - now) / 1000);
997 timeout.tv_usec = (long) ((end - now) % 1000 * 1000);
1003 memcpy (&inset, &pollset_in,
sizeof (fd_set));
1004 memcpy (&outset, &pollset_out,
sizeof (fd_set));
1005 memcpy (&errset, &pollset_err,
sizeof (fd_set));
1006 #if defined ZMQ_HAVE_WINDOWS 1007 int rc = select (0, &inset, &outset, &errset, ptimeout);
1008 if (
unlikely (rc == SOCKET_ERROR)) {
1009 errno = zmq::wsa_error_to_errno (WSAGetLastError ());
1014 int rc = select (maxfd + 1, &inset, &outset, &errset, ptimeout);
1024 for (
int i = 0; i != nitems_; i++) {
1030 if (items_ [i].socket) {
1031 size_t zmq_events_size =
sizeof (uint32_t);
1032 uint32_t zmq_events;
1034 &zmq_events_size) == -1)
1046 if (FD_ISSET (items_ [i].fd, &inset))
1048 if (FD_ISSET (items_ [i].fd, &outset))
1050 if (FD_ISSET (items_ [i].fd, &errset))
1054 if (items_ [i].revents)
1080 end = now + timeout_;
1113 void *poller = *poller_p_;
1141 int zmq_poller_add_fd (
void *poller_, SOCKET fd_,
void *user_data_,
short events_)
1228 memset (&e, 0,
sizeof (e));
1232 event->socket = e.
socket;
1235 event->events = e.
events;
1251 void *timers = *timers_p_;
1268 return ((
zmq::timers_t*)timers_)->add (interval_, handler_, arg_);
1288 return ((
zmq::timers_t*)timers_)->set_interval (timer_id_, interval_);
1323 int zmq_proxy (
void *frontend_,
void *backend_,
void *capture_)
1325 if (!frontend_ || !backend_) {
1337 if (!frontend_ || !backend_) {
1361 #if !defined (ZMQ_HAVE_WINDOWS) && !defined (ZMQ_HAVE_OPENVMS) 1362 if (strcmp (capability,
"ipc") == 0)
1365 #if defined (ZMQ_HAVE_OPENPGM) 1366 if (strcmp (capability,
"pgm") == 0)
1369 #if defined (ZMQ_HAVE_TIPC) 1370 if (strcmp (capability,
"tipc") == 0)
1373 #if defined (ZMQ_HAVE_NORM) 1374 if (strcmp (capability,
"norm") == 0)
1377 #if defined (ZMQ_HAVE_CURVE) 1378 if (strcmp (capability,
"curve") == 0)
1381 #if defined (HAVE_LIBGSSAPI_KRB5) 1382 if (strcmp (capability,
"gssapi") == 0)
1385 #if defined (ZMQ_HAVE_VMCI) 1386 if (strcmp (capability,
"vmci") == 0)
#define ZMQ_POLLITEMS_DFLT
int zmq_socket_monitor(void *s_, const char *addr_, int events_)
int zmq_msg_get(zmq_msg_t *msg_, int property_)
int zmq_msg_copy(zmq_msg_t *dest_, zmq_msg_t *src_)
void * zmq_timers_new(void)
int zmq_msg_set_routing_id(zmq_msg_t *msg_, uint32_t routing_id_)
int zmq_ctx_get(void *ctx_, int option_)
int zmq_msg_init_data(zmq_msg_t *msg_, void *data_, size_t size_, zmq_free_fn *ffn_, void *hint_)
int join(const char *group)
uint32_t zmq_msg_routing_id(zmq_msg_t *msg_)
#define ZMQ_VERSION_PATCH
int zmq_device(int, void *frontend_, void *backend_)
int zmq_poller_modify(void *poller_, void *s_, short events_)
int zmq_msg_init_size(zmq_msg_t *msg_, size_t size_)
int zmq_msg_recv(zmq_msg_t *msg_, void *s_, int flags_)
int setsockopt(int option_, const void *optval_, size_t optvallen_)
int zmq_ctx_shutdown(void *ctx_)
int zmq_msg_init(zmq_msg_t *msg_)
int zmq_poller_remove(void *poller_, void *s_)
char check_msg_t_size[sizeof(zmq::msg_t)==sizeof(zmq_msg_t)?1:-1]
int zmq_setsockopt(void *s_, int option_, const void *optval_, size_t optvallen_)
int zmq_recviov(void *s_, iovec *a_, size_t *count_, int flags_)
int zmq_has(const char *capability)
int zmq_msg_move(zmq_msg_t *dest_, zmq_msg_t *src_)
int bind(const char *addr_)
int zmq_ctx_set(void *ctx_, int option_, int optval_)
int zmq_timers_destroy(void **timers_p_)
static int s_recvmsg(zmq::socket_base_t *s_, zmq_msg_t *msg_, int flags_)
int term_endpoint(const char *addr_)
#define ZMQ_VERSION_MINOR
int zmq_leave(void *s_, const char *group_)
zmq::socket_base_t * create_socket(int type_)
int zmq_poller_add_fd(void *poller_, int fd_, void *user_data_, short events_)
void * zmq_socket(void *ctx_, int type_)
int zmq_recvmsg(void *s_, zmq_msg_t *msg_, int flags_)
const char * zmq_strerror(int errnum_)
int zmq_unbind(void *s_, const char *addr_)
void * zmq_poller_new(void)
const char * zmq_msg_group(zmq_msg_t *msg_)
int zmq_sendiov(void *s_, iovec *a_, size_t count_, int flags_)
int zmq_proxy(void *frontend_, void *backend_, void *capture_)
int getsockopt(int option_, void *optval_, size_t *optvallen_)
int send(zmq::msg_t *msg_, int flags_)
int zmq_msg_set(zmq_msg_t *, int, int)
int zmq_poll(zmq_pollitem_t *items_, int nitems_, long timeout_)
int monitor(const char *endpoint_, int events_)
int zmq_poller_remove_fd(void *poller_, int fd_)
int zmq_getsockopt(void *s_, int option_, void *optval_, size_t *optvallen_)
void * zmq_init(int io_threads_)
int zmq_msg_set_group(zmq_msg_t *msg_, const char *group_)
int proxy(class socket_base_t *frontend_, class socket_base_t *backend_, class socket_base_t *capture_, class socket_base_t *control_=NULL)
int zmq_timers_reset(void *timers_, int timer_id_)
#define ZMQ_VERSION_MAJOR
void( zmq_free_fn)(void *data, void *hint)
int zmq_timers_execute(void *timers_)
int zmq_sendmsg(void *s_, zmq_msg_t *msg_, int flags_)
size_t zmq_msg_size(zmq_msg_t *msg_)
const char * zmq_msg_gets(zmq_msg_t *msg_, const char *property_)
int zmq_recv(void *s_, void *buf_, size_t len_, int flags_)
int zmq_msg_send(zmq_msg_t *msg_, void *s_, int flags_)
int zmq_timers_cancel(void *timers_, int timer_id_)
long zmq_timers_timeout(void *timers_)
int zmq_msg_more(zmq_msg_t *msg_)
int zmq_join(void *s_, const char *group_)
int zmq_bind(void *s_, const char *addr_)
int zmq_send_const(void *s_, const void *buf_, size_t len_, int flags_)
int zmq_timers_set_interval(void *timers_, int timer_id_, size_t interval_)
int leave(const char *group)
int zmq_ctx_destroy(void *ctx_)
int zmq_ctx_term(void *ctx_)
int zmq_poller_modify_fd(void *poller_, int fd_, short events_)
int zmq_poller_add(void *poller_, void *s_, void *user_data_, short events_)
int zmq_poller_destroy(void **poller_p_)
const char * errno_to_string(int errno_)
int zmq_connect(void *s_, const char *addr_)
static int s_sendmsg(zmq::socket_base_t *s_, zmq_msg_t *msg_, int flags_)
void zmq_version(int *major_, int *minor_, int *patch_)
int zmq_send(void *s_, const void *buf_, size_t len_, int flags_)
int zmq_disconnect(void *s_, const char *addr_)
int zmq_timers_add(void *timers_, size_t interval_, zmq_timer_fn handler_, void *arg_)
int recv(zmq::msg_t *msg_, int flags_)
void( zmq_timer_fn)(int timer_id, void *arg)
int zmq_poller_wait(void *poller_, zmq_poller_event_t *event, long timeout_)
int zmq_proxy_steerable(void *frontend_, void *backend_, void *capture_, void *control_)
int connect(const char *addr_)
void * zmq_msg_data(zmq_msg_t *msg_)
int zmq_msg_close(zmq_msg_t *msg_)