Line data Source code
1 : /*
2 : Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 :
4 : This file is part of libzmq, the ZeroMQ core engine in C++.
5 :
6 : libzmq is free software; you can redistribute it and/or modify it under
7 : the terms of the GNU Lesser General Public License (LGPL) as published
8 : by the Free Software Foundation; either version 3 of the License, or
9 : (at your option) any later version.
10 :
11 : As a special exception, the Contributors give you permission to link
12 : this library with independent modules to produce an executable,
13 : regardless of the license terms of these independent modules, and to
14 : copy and distribute the resulting executable under terms of your choice,
15 : provided that you also meet, for each linked independent module, the
16 : terms and conditions of the license of that module. An independent
17 : module is a module which is not derived from or based on this library.
18 : If you modify this library, you must extend this exception to your
19 : version of the library.
20 :
21 : libzmq is distributed in the hope that it will be useful, but WITHOUT
22 : ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 : FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 : License for more details.
25 :
26 : You should have received a copy of the GNU Lesser General Public License
27 : along with this program. If not, see <http://www.gnu.org/licenses/>.
28 : */
29 : #include "precompiled.hpp"
30 : #define ZMQ_TYPE_UNSAFE
31 :
32 : #include "macros.hpp"
33 : #include "poller.hpp"
34 :
35 : // On AIX platform, poll.h has to be included first to get consistent
36 : // definition of pollfd structure (AIX uses 'reqevents' and 'retnevents'
37 : // instead of 'events' and 'revents' and defines macros to map from POSIX-y
38 : // names to AIX-specific names).
39 : #if defined ZMQ_POLL_BASED_ON_POLL
40 : #include <poll.h>
41 : #endif
42 :
43 : // zmq.h must be included *after* poll.h for AIX to build properly
44 : #include "../include/zmq.h"
45 :
46 : #if defined ZMQ_HAVE_WINDOWS
47 : #include "windows.hpp"
48 : #else
49 : #include <unistd.h>
50 : #endif
51 :
52 :
53 : // XSI vector I/O
54 : #if defined ZMQ_HAVE_UIO
55 : #include <sys/uio.h>
56 : #else
57 : struct iovec {
58 : void *iov_base;
59 : size_t iov_len;
60 : };
61 : #endif
62 :
63 :
64 : #include <string.h>
65 : #include <stdlib.h>
66 : #include <new>
67 : #include <climits>
68 :
69 : #include "proxy.hpp"
70 : #include "socket_base.hpp"
71 : #include "stdint.hpp"
72 : #include "config.hpp"
73 : #include "likely.hpp"
74 : #include "clock.hpp"
75 : #include "ctx.hpp"
76 : #include "err.hpp"
77 : #include "msg.hpp"
78 : #include "fd.hpp"
79 : #include "metadata.hpp"
80 : #include "signaler.hpp"
81 : #include "socket_poller.hpp"
82 : #include "timers.hpp"
83 :
84 : #if defined ZMQ_HAVE_OPENPGM
85 : #define __PGM_WININT_H__
86 : #include <pgm/pgm.h>
87 : #endif
88 :
89 : // Compile time check whether msg_t fits into zmq_msg_t.
90 : typedef char check_msg_t_size
91 : [sizeof (zmq::msg_t) == sizeof (zmq_msg_t) ? 1 : -1];
92 :
93 :
94 3 : void zmq_version (int *major_, int *minor_, int *patch_)
95 : {
96 3 : *major_ = ZMQ_VERSION_MAJOR;
97 3 : *minor_ = ZMQ_VERSION_MINOR;
98 3 : *patch_ = ZMQ_VERSION_PATCH;
99 3 : }
100 :
101 :
102 3 : const char *zmq_strerror (int errnum_)
103 : {
104 3 : return zmq::errno_to_string (errnum_);
105 : }
106 :
107 3930 : int zmq_errno (void)
108 : {
109 3930 : return errno;
110 : }
111 :
112 :
113 : // New context API
114 :
115 423 : void *zmq_ctx_new (void)
116 : {
117 : #if defined ZMQ_HAVE_OPENPGM
118 :
119 : // Init PGM transport. Ensure threading and timer are enabled. Find PGM
120 : // protocol ID. Note that if you want to use gettimeofday and sleep for
121 : // openPGM timing, set environment variables PGM_TIMER to "GTOD" and
122 : // PGM_SLEEP to "USLEEP".
123 : pgm_error_t *pgm_error = NULL;
124 : const bool ok = pgm_init (&pgm_error);
125 : if (ok != TRUE) {
126 :
127 : // Invalid parameters don't set pgm_error_t
128 : zmq_assert (pgm_error != NULL);
129 : if (pgm_error->domain == PGM_ERROR_DOMAIN_TIME && (
130 : pgm_error->code == PGM_ERROR_FAILED)) {
131 :
132 : // Failed to access RTC or HPET device.
133 : pgm_error_free (pgm_error);
134 : errno = EINVAL;
135 : return NULL;
136 : }
137 :
138 : // PGM_ERROR_DOMAIN_ENGINE: WSAStartup errors or missing WSARecvMsg.
139 : zmq_assert (false);
140 : }
141 : #endif
142 :
143 : #ifdef ZMQ_HAVE_WINDOWS
144 : // Intialise Windows sockets. Note that WSAStartup can be called multiple
145 : // times given that WSACleanup will be called for each WSAStartup.
146 : // We do this before the ctx constructor since its embedded mailbox_t
147 : // object needs Winsock to be up and running.
148 : WORD version_requested = MAKEWORD (2, 2);
149 : WSADATA wsa_data;
150 : int rc = WSAStartup (version_requested, &wsa_data);
151 : zmq_assert (rc == 0);
152 : zmq_assert (LOBYTE (wsa_data.wVersion) == 2 &&
153 : HIBYTE (wsa_data.wVersion) == 2);
154 : #endif
155 :
156 : // Create 0MQ context.
157 423 : zmq::ctx_t *ctx = new (std::nothrow) zmq::ctx_t;
158 423 : alloc_assert (ctx);
159 423 : return ctx;
160 : }
161 :
162 423 : int zmq_ctx_term (void *ctx_)
163 : {
164 423 : if (!ctx_ || !((zmq::ctx_t *) ctx_)->check_tag ()) {
165 0 : errno = EFAULT;
166 0 : return -1;
167 : }
168 :
169 423 : int rc = ((zmq::ctx_t *) ctx_)->terminate ();
170 423 : int en = errno;
171 :
172 : // Shut down only if termination was not interrupted by a signal.
173 : if (!rc || en != EINTR) {
174 : #ifdef ZMQ_HAVE_WINDOWS
175 : // On Windows, uninitialise socket layer.
176 : rc = WSACleanup ();
177 : wsa_assert (rc != SOCKET_ERROR);
178 : #endif
179 :
180 : #if defined ZMQ_HAVE_OPENPGM
181 : // Shut down the OpenPGM library.
182 : if (pgm_shutdown () != TRUE)
183 : zmq_assert (false);
184 : #endif
185 : }
186 :
187 : errno = en;
188 423 : return rc;
189 : }
190 :
191 6 : int zmq_ctx_shutdown (void *ctx_)
192 : {
193 6 : if (!ctx_ || !((zmq::ctx_t *) ctx_)->check_tag ()) {
194 0 : errno = EFAULT;
195 0 : return -1;
196 : }
197 6 : return ((zmq::ctx_t *) ctx_)->shutdown ();
198 : }
199 :
200 42 : int zmq_ctx_set (void *ctx_, int option_, int optval_)
201 : {
202 42 : if (!ctx_ || !((zmq::ctx_t *) ctx_)->check_tag ()) {
203 0 : errno = EFAULT;
204 0 : return -1;
205 : }
206 42 : return ((zmq::ctx_t *) ctx_)->set (option_, optval_);
207 : }
208 :
209 18 : int zmq_ctx_get (void *ctx_, int option_)
210 : {
211 18 : if (!ctx_ || !((zmq::ctx_t *) ctx_)->check_tag ()) {
212 0 : errno = EFAULT;
213 0 : return -1;
214 : }
215 18 : return ((zmq::ctx_t *) ctx_)->get (option_);
216 : }
217 :
218 : // Stable/legacy context API
219 :
220 3 : void *zmq_init (int io_threads_)
221 : {
222 3 : if (io_threads_ >= 0) {
223 3 : void *ctx = zmq_ctx_new ();
224 3 : zmq_ctx_set (ctx, ZMQ_IO_THREADS, io_threads_);
225 3 : return ctx;
226 : }
227 0 : errno = EINVAL;
228 0 : return NULL;
229 : }
230 :
231 0 : int zmq_term (void *ctx_)
232 : {
233 0 : return zmq_ctx_term (ctx_);
234 : }
235 :
236 30 : int zmq_ctx_destroy (void *ctx_)
237 : {
238 30 : return zmq_ctx_term (ctx_);
239 : }
240 :
241 :
242 : // Sockets
243 :
244 11181 : void *zmq_socket (void *ctx_, int type_)
245 : {
246 11181 : if (!ctx_ || !((zmq::ctx_t *) ctx_)->check_tag ()) {
247 0 : errno = EFAULT;
248 0 : return NULL;
249 : }
250 11181 : zmq::ctx_t *ctx = (zmq::ctx_t *) ctx_;
251 11181 : zmq::socket_base_t *s = ctx->create_socket (type_);
252 11181 : return (void *) s;
253 : }
254 :
255 11115 : int zmq_close (void *s_)
256 : {
257 11115 : if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
258 0 : errno = ENOTSOCK;
259 0 : return -1;
260 : }
261 11115 : ((zmq::socket_base_t*) s_)->close ();
262 11115 : return 0;
263 : }
264 :
265 2613 : int zmq_setsockopt (void *s_, int option_, const void *optval_,
266 : size_t optvallen_)
267 : {
268 2613 : if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
269 0 : errno = ENOTSOCK;
270 0 : return -1;
271 : }
272 2613 : zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
273 2613 : int result = s->setsockopt (option_, optval_, optvallen_);
274 2613 : return result;
275 : }
276 :
277 2654034 : int zmq_getsockopt (void *s_, int option_, void *optval_, size_t *optvallen_)
278 : {
279 2654034 : if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
280 0 : errno = ENOTSOCK;
281 0 : return -1;
282 : }
283 2654034 : zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
284 2654034 : int result = s->getsockopt (option_, optval_, optvallen_);
285 2654028 : return result;
286 : }
287 :
288 27 : int zmq_socket_monitor (void *s_, const char *addr_, int events_)
289 : {
290 27 : if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
291 0 : errno = ENOTSOCK;
292 0 : return -1;
293 : }
294 27 : zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
295 27 : int result = s->monitor (addr_, events_);
296 27 : return result;
297 : }
298 :
299 15 : int zmq_join (void *s_, const char* group_)
300 : {
301 15 : if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
302 0 : errno = ENOTSOCK;
303 0 : return -1;
304 : }
305 15 : zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
306 15 : int result = s->join (group_);
307 15 : return result;
308 : }
309 :
310 6 : int zmq_leave (void *s_, const char* group_)
311 : {
312 6 : if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
313 0 : errno = ENOTSOCK;
314 0 : return -1;
315 : }
316 6 : zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
317 6 : int result = s->leave (group_);
318 6 : return result;
319 : }
320 :
321 687 : int zmq_bind (void *s_, const char *addr_)
322 : {
323 687 : if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
324 0 : errno = ENOTSOCK;
325 0 : return -1;
326 : }
327 687 : zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
328 687 : int result = s->bind (addr_);
329 687 : return result;
330 : }
331 :
332 4326 : int zmq_connect (void *s_, const char *addr_)
333 : {
334 4326 : if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
335 0 : errno = ENOTSOCK;
336 0 : return -1;
337 : }
338 4326 : zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
339 4326 : int result = s->connect (addr_);
340 4326 : return result;
341 : }
342 :
343 54 : int zmq_unbind (void *s_, const char *addr_)
344 : {
345 54 : if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
346 0 : errno = ENOTSOCK;
347 0 : return -1;
348 : }
349 54 : zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
350 54 : return s->term_endpoint (addr_);
351 : }
352 :
353 30 : int zmq_disconnect (void *s_, const char *addr_)
354 : {
355 30 : if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
356 0 : errno = ENOTSOCK;
357 0 : return -1;
358 : }
359 30 : zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
360 30 : return s->term_endpoint (addr_);
361 : }
362 :
363 : // Sending functions.
364 :
365 : static inline int
366 864412 : s_sendmsg (zmq::socket_base_t *s_, zmq_msg_t *msg_, int flags_)
367 : {
368 864412 : size_t sz = zmq_msg_size (msg_);
369 864412 : int rc = s_->send ((zmq::msg_t *) msg_, flags_);
370 864413 : if (unlikely (rc < 0))
371 : return -1;
372 :
373 : // This is what I'd like to do, my C++ fu is too weak -- PH 2016/02/09
374 : // int max_msgsz = s_->parent->get (ZMQ_MAX_MSGSZ);
375 852466 : size_t max_msgsz = INT_MAX;
376 :
377 : // Truncate returned size to INT_MAX to avoid overflow to negative values
378 852466 : return (int) (sz < max_msgsz? sz: max_msgsz);
379 : }
380 :
381 : /* To be deprecated once zmq_msg_send() is stable */
382 108 : int zmq_sendmsg (void *s_, zmq_msg_t *msg_, int flags_)
383 : {
384 108 : return zmq_msg_send (msg_, s_, flags_);
385 : }
386 :
387 863879 : int zmq_send (void *s_, const void *buf_, size_t len_, int flags_)
388 : {
389 863879 : if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
390 0 : errno = ENOTSOCK;
391 0 : return -1;
392 : }
393 : zmq_msg_t msg;
394 863879 : if (zmq_msg_init_size (&msg, len_))
395 : return -1;
396 :
397 : // We explicitly allow a send from NULL, size zero
398 863878 : if (len_) {
399 : assert (buf_);
400 602558 : memcpy (zmq_msg_data (&msg), buf_, len_);
401 : }
402 863875 : zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
403 863875 : int rc = s_sendmsg (s, &msg, flags_);
404 863878 : if (unlikely (rc < 0)) {
405 11931 : int err = errno;
406 11931 : int rc2 = zmq_msg_close (&msg);
407 11931 : errno_assert (rc2 == 0);
408 11931 : errno = err;
409 11931 : return -1;
410 : }
411 : // Note the optimisation here. We don't close the msg object as it is
412 : // empty anyway. This may change when implementation of zmq_msg_t changes.
413 : return rc;
414 : }
415 :
416 174 : int zmq_send_const (void *s_, const void *buf_, size_t len_, int flags_)
417 : {
418 174 : if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
419 0 : errno = ENOTSOCK;
420 0 : return -1;
421 : }
422 : zmq_msg_t msg;
423 174 : int rc = zmq_msg_init_data (&msg, (void *)buf_, len_, NULL, NULL);
424 174 : if (rc != 0)
425 : return -1;
426 :
427 174 : zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
428 174 : rc = s_sendmsg (s, &msg, flags_);
429 174 : if (unlikely (rc < 0)) {
430 0 : int err = errno;
431 0 : int rc2 = zmq_msg_close (&msg);
432 0 : errno_assert (rc2 == 0);
433 0 : errno = err;
434 0 : return -1;
435 : }
436 : // Note the optimisation here. We don't close the msg object as it is
437 : // empty anyway. This may change when implementation of zmq_msg_t changes.
438 : return rc;
439 : }
440 :
441 :
442 : // Send multiple messages.
443 : // TODO: this function has no man page
444 : //
445 : // If flag bit ZMQ_SNDMORE is set the vector is treated as
446 : // a single multi-part message, i.e. the last message has
447 : // ZMQ_SNDMORE bit switched off.
448 : //
449 24 : int zmq_sendiov (void *s_, iovec *a_, size_t count_, int flags_)
450 : {
451 24 : if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
452 6 : errno = ENOTSOCK;
453 6 : return -1;
454 : }
455 18 : if (unlikely (count_ <= 0 || !a_)) {
456 12 : errno = EINVAL;
457 12 : return -1;
458 : }
459 :
460 : int rc = 0;
461 : zmq_msg_t msg;
462 : zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
463 :
464 60 : for (size_t i = 0; i < count_; ++i) {
465 60 : rc = zmq_msg_init_size (&msg, a_[i].iov_len);
466 60 : if (rc != 0) {
467 : rc = -1;
468 : break;
469 : }
470 60 : memcpy (zmq_msg_data (&msg), a_[i].iov_base, a_[i].iov_len);
471 60 : if (i == count_ - 1)
472 6 : flags_ = flags_ & ~ZMQ_SNDMORE;
473 60 : rc = s_sendmsg (s, &msg, flags_);
474 60 : if (unlikely (rc < 0)) {
475 0 : int err = errno;
476 0 : int rc2 = zmq_msg_close (&msg);
477 0 : errno_assert (rc2 == 0);
478 0 : errno = err;
479 0 : rc = -1;
480 0 : break;
481 : }
482 : }
483 6 : return rc;
484 : }
485 :
486 : // Receiving functions.
487 :
488 : static int
489 3971497 : s_recvmsg (zmq::socket_base_t *s_, zmq_msg_t *msg_, int flags_)
490 : {
491 3971497 : int rc = s_->recv ((zmq::msg_t *) msg_, flags_);
492 3975513 : if (unlikely (rc < 0))
493 : return -1;
494 :
495 : // Truncate returned size to INT_MAX to avoid overflow to negative values
496 851946 : size_t sz = zmq_msg_size (msg_);
497 851946 : return (int) (sz < INT_MAX? sz: INT_MAX);
498 : }
499 :
500 : /* To be deprecated once zmq_msg_recv() is stable */
501 3 : int zmq_recvmsg (void *s_, zmq_msg_t *msg_, int flags_)
502 : {
503 3 : return zmq_msg_recv (msg_, s_, flags_);
504 : }
505 :
506 :
507 3976632 : int zmq_recv (void *s_, void *buf_, size_t len_, int flags_)
508 : {
509 3976632 : if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
510 0 : errno = ENOTSOCK;
511 0 : return -1;
512 : }
513 : zmq_msg_t msg;
514 3976106 : int rc = zmq_msg_init (&msg);
515 3970666 : errno_assert (rc == 0);
516 :
517 3970666 : zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
518 3970666 : int nbytes = s_recvmsg (s, &msg, flags_);
519 3975277 : if (unlikely (nbytes < 0)) {
520 3124309 : int err = errno;
521 3126191 : rc = zmq_msg_close (&msg);
522 3125390 : errno_assert (rc == 0);
523 3125225 : errno = err;
524 3125225 : return -1;
525 : }
526 :
527 : // An oversized message is silently truncated.
528 850968 : size_t to_copy = size_t (nbytes) < len_ ? size_t (nbytes) : len_;
529 :
530 : // We explicitly allow a null buffer argument if len is zero
531 850968 : if (to_copy) {
532 : assert (buf_);
533 601578 : memcpy (buf_, zmq_msg_data (&msg), to_copy);
534 : }
535 850968 : rc = zmq_msg_close (&msg);
536 850968 : errno_assert (rc == 0);
537 :
538 850968 : return nbytes;
539 : }
540 :
541 : // Receive a multi-part message
542 : //
543 : // Receives up to *count_ parts of a multi-part message.
544 : // Sets *count_ to the actual number of parts read.
545 : // ZMQ_RCVMORE is set to indicate if a complete multi-part message was read.
546 : // Returns number of message parts read, or -1 on error.
547 : //
548 : // Note: even if -1 is returned, some parts of the message
549 : // may have been read. Therefore the client must consult
550 : // *count_ to retrieve message parts successfully read,
551 : // even if -1 is returned.
552 : //
553 : // The iov_base* buffers of each iovec *a_ filled in by this
554 : // function may be freed using free().
555 : // TODO: this function has no man page
556 : //
557 30 : int zmq_recviov (void *s_, iovec *a_, size_t *count_, int flags_)
558 : {
559 30 : if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
560 6 : errno = ENOTSOCK;
561 6 : return -1;
562 : }
563 24 : if (unlikely (!count_ || *count_ <= 0 || !a_)) {
564 18 : errno = EINVAL;
565 18 : return -1;
566 : }
567 :
568 6 : zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
569 :
570 6 : size_t count = *count_;
571 6 : int nread = 0;
572 6 : bool recvmore = true;
573 :
574 6 : *count_ = 0;
575 :
576 66 : for (size_t i = 0; recvmore && i < count; ++i) {
577 :
578 : zmq_msg_t msg;
579 60 : int rc = zmq_msg_init (&msg);
580 60 : errno_assert (rc == 0);
581 :
582 60 : int nbytes = s_recvmsg (s, &msg, flags_);
583 60 : if (unlikely (nbytes < 0)) {
584 0 : int err = errno;
585 0 : rc = zmq_msg_close (&msg);
586 0 : errno_assert (rc == 0);
587 0 : errno = err;
588 0 : nread = -1;
589 0 : break;
590 : }
591 :
592 60 : a_[i].iov_len = zmq_msg_size (&msg);
593 60 : a_[i].iov_base = static_cast<char *> (malloc(a_[i].iov_len));
594 60 : if (unlikely (!a_[i].iov_base)) {
595 0 : errno = ENOMEM;
596 0 : return -1;
597 : }
598 60 : memcpy(a_[i].iov_base,static_cast<char *> (zmq_msg_data (&msg)),
599 60 : a_[i].iov_len);
600 : // Assume zmq_socket ZMQ_RVCMORE is properly set.
601 60 : zmq::msg_t* p_msg = reinterpret_cast<zmq::msg_t*>(&msg);
602 60 : recvmore = p_msg->flags() & zmq::msg_t::more;
603 60 : rc = zmq_msg_close(&msg);
604 60 : errno_assert (rc == 0);
605 60 : ++*count_;
606 60 : ++nread;
607 : }
608 6 : return nread;
609 : }
610 :
611 : // Message manipulators.
612 :
613 3976978 : int zmq_msg_init (zmq_msg_t *msg_)
614 : {
615 3976978 : return ((zmq::msg_t*) msg_)->init ();
616 : }
617 :
618 864134 : int zmq_msg_init_size (zmq_msg_t *msg_, size_t size_)
619 : {
620 864134 : return ((zmq::msg_t*) msg_)->init_size (size_);
621 : }
622 :
623 195 : int zmq_msg_init_data (zmq_msg_t *msg_, void *data_, size_t size_,
624 : zmq_free_fn *ffn_, void *hint_)
625 : {
626 195 : return ((zmq::msg_t*) msg_)->init_data (data_, size_, ffn_, hint_);
627 : }
628 :
629 303 : int zmq_msg_send (zmq_msg_t *msg_, void *s_, int flags_)
630 : {
631 303 : if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
632 0 : errno = ENOTSOCK;
633 0 : return -1;
634 : }
635 303 : zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
636 303 : int result = s_sendmsg (s, msg_, flags_);
637 303 : return result;
638 : }
639 :
640 936 : int zmq_msg_recv (zmq_msg_t *msg_, void *s_, int flags_)
641 : {
642 936 : if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
643 0 : errno = ENOTSOCK;
644 0 : return -1;
645 : }
646 936 : zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
647 936 : int result = s_recvmsg (s, msg_, flags_);
648 936 : return result;
649 : }
650 :
651 3989718 : int zmq_msg_close (zmq_msg_t *msg_)
652 : {
653 3989718 : return ((zmq::msg_t*) msg_)->close ();
654 : }
655 :
656 0 : int zmq_msg_move (zmq_msg_t *dest_, zmq_msg_t *src_)
657 : {
658 0 : return ((zmq::msg_t*) dest_)->move (*(zmq::msg_t*) src_);
659 : }
660 :
661 54 : int zmq_msg_copy (zmq_msg_t *dest_, zmq_msg_t *src_)
662 : {
663 54 : return ((zmq::msg_t*) dest_)->copy (*(zmq::msg_t*) src_);
664 : }
665 :
666 1204561 : void *zmq_msg_data (zmq_msg_t *msg_)
667 : {
668 1204561 : return ((zmq::msg_t*) msg_)->data ();
669 : }
670 :
671 1715379 : size_t zmq_msg_size (zmq_msg_t *msg_)
672 : {
673 1715379 : return ((zmq::msg_t*) msg_)->size ();
674 : }
675 :
676 144 : int zmq_msg_more (zmq_msg_t *msg_)
677 : {
678 144 : return zmq_msg_get (msg_, ZMQ_MORE);
679 : }
680 :
681 156 : int zmq_msg_get (zmq_msg_t *msg_, int property_)
682 : {
683 : const char* fd_string;
684 :
685 156 : switch (property_) {
686 : case ZMQ_MORE:
687 144 : return (((zmq::msg_t*) msg_)->flags () & zmq::msg_t::more)? 1: 0;
688 : case ZMQ_SRCFD:
689 3 : fd_string = zmq_msg_gets(msg_, "__fd");
690 3 : if (fd_string == NULL)
691 : return (int)-1;
692 :
693 3 : return atoi(fd_string);
694 : case ZMQ_SHARED:
695 15 : return (((zmq::msg_t*) msg_)->is_cmsg ()) ||
696 18 : (((zmq::msg_t*) msg_)->flags () & zmq::msg_t::shared)? 1: 0;
697 : default:
698 0 : errno = EINVAL;
699 0 : return -1;
700 : }
701 : }
702 :
703 0 : int zmq_msg_set (zmq_msg_t *, int, int)
704 : {
705 : // No properties supported at present
706 0 : errno = EINVAL;
707 0 : return -1;
708 : }
709 :
710 9 : int zmq_msg_set_routing_id (zmq_msg_t *msg_, uint32_t routing_id_)
711 : {
712 9 : return ((zmq::msg_t *) msg_)->set_routing_id (routing_id_);
713 : }
714 :
715 18 : uint32_t zmq_msg_routing_id (zmq_msg_t *msg_)
716 : {
717 18 : return ((zmq::msg_t *) msg_)->get_routing_id ();
718 : }
719 :
720 18 : int zmq_msg_set_group (zmq_msg_t *msg_, const char *group_)
721 : {
722 18 : return ((zmq::msg_t *) msg_)->set_group (group_);
723 : }
724 :
725 12 : const char *zmq_msg_group (zmq_msg_t *msg_)
726 : {
727 12 : return ((zmq::msg_t *) msg_)->group ();
728 : }
729 :
730 : // Get message metadata string
731 :
732 27 : const char *zmq_msg_gets (zmq_msg_t *msg_, const char *property_)
733 : {
734 27 : zmq::metadata_t *metadata = ((zmq::msg_t *) msg_)->metadata ();
735 27 : const char *value = NULL;
736 27 : if (metadata)
737 54 : value = metadata->get (std::string (property_));
738 27 : if (value)
739 : return value;
740 : else {
741 3 : errno = EINVAL;
742 3 : return NULL;
743 : }
744 : }
745 :
746 : // Polling.
747 :
748 530747 : int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
749 : {
750 : // TODO: the function implementation can just call zmq_pollfd_poll with
751 : // pollfd as NULL, however pollfd is not yet stable.
752 : #if defined ZMQ_POLL_BASED_ON_POLL
753 530747 : if (unlikely (nitems_ < 0)) {
754 0 : errno = EINVAL;
755 0 : return -1;
756 : }
757 530747 : if (unlikely (nitems_ == 0)) {
758 301 : if (timeout_ == 0)
759 : return 0;
760 : #if defined ZMQ_HAVE_WINDOWS
761 : Sleep (timeout_ > 0 ? timeout_ : INFINITE);
762 : return 0;
763 : #elif defined ZMQ_HAVE_ANDROID
764 : usleep (timeout_ * 1000);
765 : return 0;
766 : #else
767 301 : return usleep (timeout_ * 1000);
768 : #endif
769 : }
770 :
771 530446 : if (!items_) {
772 0 : errno = EFAULT;
773 0 : return -1;
774 : }
775 :
776 530446 : zmq::clock_t clock;
777 530446 : uint64_t now = 0;
778 530446 : uint64_t end = 0;
779 : pollfd spollfds[ZMQ_POLLITEMS_DFLT];
780 530446 : pollfd *pollfds = spollfds;
781 :
782 530446 : if (nitems_ > ZMQ_POLLITEMS_DFLT) {
783 0 : pollfds = (pollfd*) malloc (nitems_ * sizeof (pollfd));
784 0 : alloc_assert (pollfds);
785 : }
786 :
787 : // Build pollset for poll () system call.
788 1856319 : for (int i = 0; i != nitems_; i++) {
789 :
790 : // If the poll item is a 0MQ socket, we poll on the file descriptor
791 : // retrieved by the ZMQ_FD socket option.
792 1325873 : if (items_ [i].socket) {
793 1325873 : size_t zmq_fd_size = sizeof (zmq::fd_t);
794 2651746 : if (zmq_getsockopt (items_ [i].socket, ZMQ_FD, &pollfds [i].fd,
795 1325873 : &zmq_fd_size) == -1) {
796 0 : if (pollfds != spollfds)
797 0 : free (pollfds);
798 0 : return -1;
799 : }
800 1325873 : pollfds [i].events = items_ [i].events ? POLLIN : 0;
801 : }
802 : // Else, the poll item is a raw file descriptor. Just convert the
803 : // events to normal POLLIN/POLLOUT for poll ().
804 : else {
805 0 : pollfds [i].fd = items_ [i].fd;
806 : pollfds [i].events =
807 : (items_ [i].events & ZMQ_POLLIN ? POLLIN : 0) |
808 0 : (items_ [i].events & ZMQ_POLLOUT ? POLLOUT : 0) |
809 0 : (items_ [i].events & ZMQ_POLLPRI ? POLLPRI : 0);
810 : }
811 : }
812 :
813 : bool first_pass = true;
814 : int nevents = 0;
815 :
816 : while (true) {
817 : // Compute the timeout for the subsequent poll.
818 : int timeout;
819 531035 : if (first_pass)
820 : timeout = 0;
821 : else
822 589 : if (timeout_ < 0)
823 : timeout = -1;
824 : else
825 487 : timeout = end - now;
826 :
827 : // Wait for events.
828 : while (true) {
829 1062072 : int rc = poll (pollfds, nitems_, timeout);
830 531037 : if (rc == -1 && errno == EINTR) {
831 0 : if (pollfds != spollfds)
832 0 : free (pollfds);
833 : return -1;
834 : }
835 531037 : errno_assert (rc >= 0);
836 : break;
837 : }
838 : // Check for the events.
839 1327151 : for (int i = 0; i != nitems_; i++) {
840 :
841 1327153 : items_ [i].revents = 0;
842 :
843 : // The poll item is a 0MQ socket. Retrieve pending events
844 : // using the ZMQ_EVENTS socket option.
845 1327153 : if (items_ [i].socket) {
846 1327153 : size_t zmq_events_size = sizeof (uint32_t);
847 : uint32_t zmq_events;
848 1327151 : if (zmq_getsockopt (items_ [i].socket, ZMQ_EVENTS, &zmq_events,
849 1327153 : &zmq_events_size) == -1) {
850 0 : if (pollfds != spollfds)
851 0 : free (pollfds);
852 0 : return -1;
853 : }
854 1857095 : if ((items_ [i].events & ZMQ_POLLOUT) &&
855 529944 : (zmq_events & ZMQ_POLLOUT))
856 100 : items_ [i].revents |= ZMQ_POLLOUT;
857 2124358 : if ((items_ [i].events & ZMQ_POLLIN) &&
858 797207 : (zmq_events & ZMQ_POLLIN))
859 265054 : items_ [i].revents |= ZMQ_POLLIN;
860 : }
861 : // Else, the poll item is a raw file descriptor, simply convert
862 : // the events to zmq_pollitem_t-style format.
863 : else {
864 0 : if (pollfds [i].revents & POLLIN)
865 0 : items_ [i].revents |= ZMQ_POLLIN;
866 0 : if (pollfds [i].revents & POLLOUT)
867 0 : items_ [i].revents |= ZMQ_POLLOUT;
868 0 : if (pollfds [i].revents & POLLPRI)
869 0 : items_ [i].revents |= ZMQ_POLLPRI;
870 0 : if (pollfds [i].revents & ~(POLLIN | POLLOUT | POLLPRI))
871 0 : items_ [i].revents |= ZMQ_POLLERR;
872 : }
873 :
874 1327151 : if (items_ [i].revents)
875 265154 : nevents++;
876 : }
877 :
878 : // If timeout is zero, exit immediately whether there are events or not.
879 531035 : if (timeout_ == 0)
880 : break;
881 :
882 : // If there are events to return, we can exit immediately.
883 266063 : if (nevents)
884 : break;
885 :
886 : // At this point we are meant to wait for events but there are none.
887 : // If timeout is infinite we can just loop until we get some events.
888 1022 : if (timeout_ < 0) {
889 102 : if (first_pass)
890 58 : first_pass = false;
891 : continue;
892 : }
893 :
894 : // The timeout is finite and there are no events. In the first pass
895 : // we get a timestamp of when the polling have begun. (We assume that
896 : // first pass have taken negligible time). We also compute the time
897 : // when the polling should time out.
898 920 : if (first_pass) {
899 484 : now = clock.now_ms ();
900 484 : end = now + timeout_;
901 484 : if (now == end)
902 : break;
903 : first_pass = false;
904 : continue;
905 : }
906 :
907 : // Find out whether timeout have expired.
908 436 : now = clock.now_ms ();
909 436 : if (now >= end)
910 : break;
911 : }
912 :
913 530446 : if (pollfds != spollfds)
914 0 : free (pollfds);
915 530446 : return nevents;
916 :
917 : #elif defined ZMQ_POLL_BASED_ON_SELECT
918 :
919 : if (unlikely (nitems_ < 0)) {
920 : errno = EINVAL;
921 : return -1;
922 : }
923 : if (unlikely (nitems_ == 0)) {
924 : if (timeout_ == 0)
925 : return 0;
926 : #if defined ZMQ_HAVE_WINDOWS
927 : Sleep (timeout_ > 0 ? timeout_ : INFINITE);
928 : return 0;
929 : #else
930 : return usleep (timeout_ * 1000);
931 : #endif
932 : }
933 : zmq::clock_t clock;
934 : uint64_t now = 0;
935 : uint64_t end = 0;
936 :
937 : // Ensure we do not attempt to select () on more than FD_SETSIZE
938 : // file descriptors.
939 : zmq_assert (nitems_ <= FD_SETSIZE);
940 :
941 : fd_set pollset_in = { 0 };
942 : fd_set pollset_out = { 0 };
943 : fd_set pollset_err = { 0 };
944 :
945 : zmq::fd_t maxfd = 0;
946 :
947 : // Build the fd_sets for passing to select ().
948 : for (int i = 0; i != nitems_; i++) {
949 :
950 : // If the poll item is a 0MQ socket we are interested in input on the
951 : // notification file descriptor retrieved by the ZMQ_FD socket option.
952 : if (items_ [i].socket) {
953 : size_t zmq_fd_size = sizeof (zmq::fd_t);
954 : zmq::fd_t notify_fd;
955 : if (zmq_getsockopt (items_ [i].socket, ZMQ_FD, ¬ify_fd,
956 : &zmq_fd_size) == -1)
957 : return -1;
958 : if (items_ [i].events) {
959 : FD_SET (notify_fd, &pollset_in);
960 : if (maxfd < notify_fd)
961 : maxfd = notify_fd;
962 : }
963 : }
964 : // Else, the poll item is a raw file descriptor. Convert the poll item
965 : // events to the appropriate fd_sets.
966 : else {
967 : if (items_ [i].events & ZMQ_POLLIN)
968 : FD_SET (items_ [i].fd, &pollset_in);
969 : if (items_ [i].events & ZMQ_POLLOUT)
970 : FD_SET (items_ [i].fd, &pollset_out);
971 : if (items_ [i].events & ZMQ_POLLERR)
972 : FD_SET (items_ [i].fd, &pollset_err);
973 : if (maxfd < items_ [i].fd)
974 : maxfd = items_ [i].fd;
975 : }
976 : }
977 :
978 : bool first_pass = true;
979 : int nevents = 0;
980 : fd_set inset, outset, errset;
981 :
982 : while (true) {
983 :
984 : // Compute the timeout for the subsequent poll.
985 : timeval timeout;
986 : timeval *ptimeout;
987 : if (first_pass) {
988 : timeout.tv_sec = 0;
989 : timeout.tv_usec = 0;
990 : ptimeout = &timeout;
991 : }
992 : else
993 : if (timeout_ < 0)
994 : ptimeout = NULL;
995 : else {
996 : timeout.tv_sec = (long) ((end - now) / 1000);
997 : timeout.tv_usec = (long) ((end - now) % 1000 * 1000);
998 : ptimeout = &timeout;
999 : }
1000 :
1001 : // Wait for events. Ignore interrupts if there's infinite timeout.
1002 : while (true) {
1003 : memcpy (&inset, &pollset_in, sizeof (fd_set));
1004 : memcpy (&outset, &pollset_out, sizeof (fd_set));
1005 : memcpy (&errset, &pollset_err, sizeof (fd_set));
1006 : #if defined ZMQ_HAVE_WINDOWS
1007 : int rc = select (0, &inset, &outset, &errset, ptimeout);
1008 : if (unlikely (rc == SOCKET_ERROR)) {
1009 : errno = zmq::wsa_error_to_errno (WSAGetLastError ());
1010 : wsa_assert (errno == ENOTSOCK);
1011 : return -1;
1012 : }
1013 : #else
1014 : int rc = select (maxfd + 1, &inset, &outset, &errset, ptimeout);
1015 : if (unlikely (rc == -1)) {
1016 : errno_assert (errno == EINTR || errno == EBADF);
1017 : return -1;
1018 : }
1019 : #endif
1020 : break;
1021 : }
1022 :
1023 : // Check for the events.
1024 : for (int i = 0; i != nitems_; i++) {
1025 :
1026 : items_ [i].revents = 0;
1027 :
1028 : // The poll item is a 0MQ socket. Retrieve pending events
1029 : // using the ZMQ_EVENTS socket option.
1030 : if (items_ [i].socket) {
1031 : size_t zmq_events_size = sizeof (uint32_t);
1032 : uint32_t zmq_events;
1033 : if (zmq_getsockopt (items_ [i].socket, ZMQ_EVENTS, &zmq_events,
1034 : &zmq_events_size) == -1)
1035 : return -1;
1036 : if ((items_ [i].events & ZMQ_POLLOUT) &&
1037 : (zmq_events & ZMQ_POLLOUT))
1038 : items_ [i].revents |= ZMQ_POLLOUT;
1039 : if ((items_ [i].events & ZMQ_POLLIN) &&
1040 : (zmq_events & ZMQ_POLLIN))
1041 : items_ [i].revents |= ZMQ_POLLIN;
1042 : }
1043 : // Else, the poll item is a raw file descriptor, simply convert
1044 : // the events to zmq_pollitem_t-style format.
1045 : else {
1046 : if (FD_ISSET (items_ [i].fd, &inset))
1047 : items_ [i].revents |= ZMQ_POLLIN;
1048 : if (FD_ISSET (items_ [i].fd, &outset))
1049 : items_ [i].revents |= ZMQ_POLLOUT;
1050 : if (FD_ISSET (items_ [i].fd, &errset))
1051 : items_ [i].revents |= ZMQ_POLLERR;
1052 : }
1053 :
1054 : if (items_ [i].revents)
1055 : nevents++;
1056 : }
1057 :
1058 : // If timeout is zero, exit immediately whether there are events or not.
1059 : if (timeout_ == 0)
1060 : break;
1061 :
1062 : // If there are events to return, we can exit immediately.
1063 : if (nevents)
1064 : break;
1065 :
1066 : // At this point we are meant to wait for events but there are none.
1067 : // If timeout is infinite we can just loop until we get some events.
1068 : if (timeout_ < 0) {
1069 : if (first_pass)
1070 : first_pass = false;
1071 : continue;
1072 : }
1073 :
1074 : // The timeout is finite and there are no events. In the first pass
1075 : // we get a timestamp of when the polling have begun. (We assume that
1076 : // first pass have taken negligible time). We also compute the time
1077 : // when the polling should time out.
1078 : if (first_pass) {
1079 : now = clock.now_ms ();
1080 : end = now + timeout_;
1081 : if (now == end)
1082 : break;
1083 : first_pass = false;
1084 : continue;
1085 : }
1086 :
1087 : // Find out whether timeout have expired.
1088 : now = clock.now_ms ();
1089 : if (now >= end)
1090 : break;
1091 : }
1092 :
1093 : return nevents;
1094 :
1095 : #else
1096 : // Exotic platforms that support neither poll() nor select().
1097 : errno = ENOTSUP;
1098 : return -1;
1099 : #endif
1100 : }
1101 :
1102 : // The poller functionality
1103 :
1104 3 : void *zmq_poller_new (void)
1105 : {
1106 3 : zmq::socket_poller_t *poller = new (std::nothrow) zmq::socket_poller_t;
1107 3 : alloc_assert (poller);
1108 3 : return poller;
1109 : }
1110 :
1111 3 : int zmq_poller_destroy (void **poller_p_)
1112 : {
1113 3 : void *poller = *poller_p_;
1114 3 : if (!poller || !((zmq::socket_poller_t*) poller)->check_tag ()) {
1115 0 : errno = EFAULT;
1116 0 : return -1;
1117 : }
1118 :
1119 3 : delete ((zmq::socket_poller_t*) poller);
1120 3 : *poller_p_ = NULL;
1121 3 : return 0;
1122 : }
1123 :
1124 6 : int zmq_poller_add (void *poller_, void *s_, void *user_data_, short events_)
1125 : {
1126 6 : if (!poller_ || !((zmq::socket_poller_t*)poller_)->check_tag ()) {
1127 0 : errno = EFAULT;
1128 0 : return -1;
1129 : }
1130 :
1131 6 : if (!s_ || !((zmq::socket_base_t*)s_)->check_tag ()) {
1132 0 : errno = ENOTSOCK;
1133 0 : return -1;
1134 : }
1135 6 : zmq::socket_base_t *socket = (zmq::socket_base_t*)s_;
1136 :
1137 6 : return ((zmq::socket_poller_t*)poller_)->add (socket, user_data_, events_);
1138 : }
1139 :
1140 : #if defined _WIN32
1141 : int zmq_poller_add_fd (void *poller_, SOCKET fd_, void *user_data_, short events_)
1142 : #else
1143 3 : int zmq_poller_add_fd (void *poller_, int fd_, void *user_data_, short events_)
1144 : #endif
1145 : {
1146 3 : if (!poller_ || !((zmq::socket_poller_t*)poller_)->check_tag ()) {
1147 0 : errno = EFAULT;
1148 0 : return -1;
1149 : }
1150 :
1151 3 : return ((zmq::socket_poller_t*)poller_)->add_fd (fd_, user_data_, events_);
1152 : }
1153 :
1154 :
1155 3 : int zmq_poller_modify (void *poller_, void *s_, short events_)
1156 : {
1157 3 : if (!poller_ || !((zmq::socket_poller_t*)poller_)->check_tag ()) {
1158 0 : errno = EFAULT;
1159 0 : return -1;
1160 : }
1161 :
1162 3 : if (!s_ || !((zmq::socket_base_t*)s_)->check_tag ()) {
1163 0 : errno = ENOTSOCK;
1164 0 : return -1;
1165 : }
1166 3 : zmq::socket_base_t *socket = (zmq::socket_base_t*)s_;
1167 :
1168 3 : return ((zmq::socket_poller_t*)poller_)->modify (socket, events_);
1169 : }
1170 :
1171 :
1172 : #if defined _WIN32
1173 : int zmq_poller_modify_fd (void *poller_, SOCKET fd_, short events_)
1174 : #else
1175 0 : int zmq_poller_modify_fd (void *poller_, int fd_, short events_)
1176 : #endif
1177 : {
1178 0 : if (!poller_ || !((zmq::socket_poller_t*)poller_)->check_tag ()) {
1179 0 : errno = EFAULT;
1180 0 : return -1;
1181 : }
1182 :
1183 0 : return ((zmq::socket_poller_t*)poller_)->modify_fd (fd_, events_);
1184 : }
1185 :
1186 :
1187 3 : int zmq_poller_remove (void *poller_, void *s_)
1188 : {
1189 3 : if (!poller_ || !((zmq::socket_poller_t*)poller_)->check_tag ()) {
1190 0 : errno = EFAULT;
1191 0 : return -1;
1192 : }
1193 :
1194 3 : if (!s_ || !((zmq::socket_base_t*)s_)->check_tag ()) {
1195 0 : errno = ENOTSOCK;
1196 0 : return -1;
1197 : }
1198 3 : zmq::socket_base_t *socket = (zmq::socket_base_t*)s_;
1199 :
1200 3 : return ((zmq::socket_poller_t*)poller_)->remove (socket);
1201 : }
1202 :
1203 : #if defined _WIN32
1204 : int zmq_poller_remove_fd (void *poller_, SOCKET fd_)
1205 : #else
1206 3 : int zmq_poller_remove_fd (void *poller_, int fd_)
1207 : #endif
1208 : {
1209 3 : if (!poller_ || !((zmq::socket_poller_t*)poller_)->check_tag ()) {
1210 0 : errno = EFAULT;
1211 0 : return -1;
1212 : }
1213 :
1214 3 : return ((zmq::socket_poller_t*)poller_)->remove_fd (fd_);
1215 : }
1216 :
1217 :
1218 18 : int zmq_poller_wait (void *poller_, zmq_poller_event_t *event, long timeout_)
1219 : {
1220 18 : if (!poller_ || !((zmq::socket_poller_t*)poller_)->check_tag ()) {
1221 0 : errno = EFAULT;
1222 0 : return -1;
1223 : }
1224 :
1225 18 : zmq_assert (event != NULL);
1226 :
1227 : zmq::socket_poller_t::event_t e;
1228 : memset (&e, 0, sizeof (e));
1229 :
1230 18 : int rc = ((zmq::socket_poller_t*)poller_)->wait (&e, timeout_);
1231 :
1232 18 : event->socket = e.socket;
1233 18 : event->fd = e.fd;
1234 18 : event->user_data = e.user_data;
1235 18 : event->events = e.events;
1236 :
1237 18 : return rc;
1238 : }
1239 :
1240 : // Timers
1241 :
1242 3 : void *zmq_timers_new (void)
1243 : {
1244 3 : zmq::timers_t *timers = new (std::nothrow) zmq::timers_t;
1245 3 : alloc_assert (timers);
1246 3 : return timers;
1247 : }
1248 :
1249 3 : int zmq_timers_destroy (void **timers_p_)
1250 : {
1251 3 : void *timers = *timers_p_;
1252 3 : if (!timers || !((zmq::timers_t *) timers)->check_tag ()) {
1253 0 : errno = EFAULT;
1254 0 : return -1;
1255 : }
1256 3 : delete ((zmq::timers_t *) timers);
1257 3 : *timers_p_ = NULL;
1258 3 : return 0;
1259 : }
1260 :
1261 3 : int zmq_timers_add (void *timers_, size_t interval_, zmq_timer_fn handler_, void *arg_)
1262 : {
1263 3 : if (!timers_ || !((zmq::timers_t*)timers_)->check_tag ()) {
1264 0 : errno = EFAULT;
1265 0 : return -1;
1266 : }
1267 :
1268 3 : return ((zmq::timers_t*)timers_)->add (interval_, handler_, arg_);
1269 : }
1270 :
1271 3 : int zmq_timers_cancel (void *timers_, int timer_id_)
1272 : {
1273 3 : if (!timers_ || !((zmq::timers_t*)timers_)->check_tag ()) {
1274 0 : errno = EFAULT;
1275 0 : return -1;
1276 : }
1277 :
1278 3 : return ((zmq::timers_t*)timers_)->cancel (timer_id_);
1279 : }
1280 :
1281 3 : int zmq_timers_set_interval (void *timers_, int timer_id_, size_t interval_)
1282 : {
1283 3 : if (!timers_ || !((zmq::timers_t*)timers_)->check_tag ()) {
1284 0 : errno = EFAULT;
1285 0 : return -1;
1286 : }
1287 :
1288 3 : return ((zmq::timers_t*)timers_)->set_interval (timer_id_, interval_);
1289 : }
1290 :
1291 3 : int zmq_timers_reset (void *timers_, int timer_id_)
1292 : {
1293 3 : if (!timers_ || !((zmq::timers_t*)timers_)->check_tag ()) {
1294 0 : errno = EFAULT;
1295 0 : return -1;
1296 : }
1297 :
1298 3 : return ((zmq::timers_t*)timers_)->reset (timer_id_);
1299 : }
1300 :
1301 27 : long zmq_timers_timeout (void *timers_)
1302 : {
1303 27 : if (!timers_ || !((zmq::timers_t*)timers_)->check_tag ()) {
1304 0 : errno = EFAULT;
1305 0 : return -1;
1306 : }
1307 :
1308 27 : return ((zmq::timers_t*)timers_)->timeout ();
1309 : }
1310 :
1311 24 : int zmq_timers_execute (void *timers_)
1312 : {
1313 24 : if (!timers_ || !((zmq::timers_t*)timers_)->check_tag ()) {
1314 0 : errno = EFAULT;
1315 0 : return -1;
1316 : }
1317 :
1318 24 : return ((zmq::timers_t*)timers_)->execute ();
1319 : }
1320 :
1321 : // The proxy functionality
1322 :
1323 0 : int zmq_proxy (void *frontend_, void *backend_, void *capture_)
1324 : {
1325 0 : if (!frontend_ || !backend_) {
1326 0 : errno = EFAULT;
1327 0 : return -1;
1328 : }
1329 : return zmq::proxy (
1330 : (zmq::socket_base_t*) frontend_,
1331 : (zmq::socket_base_t*) backend_,
1332 0 : (zmq::socket_base_t*) capture_);
1333 : }
1334 :
1335 9 : int zmq_proxy_steerable (void *frontend_, void *backend_, void *capture_, void *control_)
1336 : {
1337 9 : if (!frontend_ || !backend_) {
1338 0 : errno = EFAULT;
1339 0 : return -1;
1340 : }
1341 : return zmq::proxy (
1342 : (zmq::socket_base_t*) frontend_,
1343 : (zmq::socket_base_t*) backend_,
1344 : (zmq::socket_base_t*) capture_,
1345 9 : (zmq::socket_base_t*) control_);
1346 : }
1347 :
1348 : // The deprecated device functionality
1349 :
1350 0 : int zmq_device (int /* type */, void *frontend_, void *backend_)
1351 : {
1352 : return zmq::proxy (
1353 : (zmq::socket_base_t*) frontend_,
1354 0 : (zmq::socket_base_t*) backend_, NULL);
1355 : }
1356 :
1357 : // Probe library capabilities; for now, reports on transport and security
1358 :
1359 21 : int zmq_has (const char *capability)
1360 : {
1361 : #if !defined (ZMQ_HAVE_WINDOWS) && !defined (ZMQ_HAVE_OPENVMS)
1362 21 : if (strcmp (capability, "ipc") == 0)
1363 : return true;
1364 : #endif
1365 : #if defined (ZMQ_HAVE_OPENPGM)
1366 : if (strcmp (capability, "pgm") == 0)
1367 : return true;
1368 : #endif
1369 : #if defined (ZMQ_HAVE_TIPC)
1370 : if (strcmp (capability, "tipc") == 0)
1371 : return true;
1372 : #endif
1373 : #if defined (ZMQ_HAVE_NORM)
1374 : if (strcmp (capability, "norm") == 0)
1375 : return true;
1376 : #endif
1377 : #if defined (ZMQ_HAVE_CURVE)
1378 18 : if (strcmp (capability, "curve") == 0)
1379 : return true;
1380 : #endif
1381 : #if defined (HAVE_LIBGSSAPI_KRB5)
1382 : if (strcmp (capability, "gssapi") == 0)
1383 : return true;
1384 : #endif
1385 : #if defined (ZMQ_HAVE_VMCI)
1386 : if (strcmp (capability, "vmci") == 0)
1387 : return true;
1388 : #endif
1389 : // Whatever the application asked for, we don't have
1390 15 : return false;
1391 : }
|