libzmq  master
ZeroMQ C++ Core Engine (LIBZMQ)
test_poller.cpp
Go to the documentation of this file.
1 /*
2  Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 
4  This file is part of libzmq, the ZeroMQ core engine in C++.
5 
6  libzmq is free software; you can redistribute it and/or modify it under
7  the terms of the GNU Lesser General Public License (LGPL) as published
8  by the Free Software Foundation; either version 3 of the License, or
9  (at your option) any later version.
10 
11  As a special exception, the Contributors give you permission to link
12  this library with independent modules to produce an executable,
13  regardless of the license terms of these independent modules, and to
14  copy and distribute the resulting executable under terms of your choice,
15  provided that you also meet, for each linked independent module, the
16  terms and conditions of the license of that module. An independent
17  module is a module which is not derived from or based on this library.
18  If you modify this library, you must extend this exception to your
19  version of the library.
20 
21  libzmq is distributed in the hope that it will be useful, but WITHOUT
22  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23  FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24  License for more details.
25 
26  You should have received a copy of the GNU Lesser General Public License
27  along with this program. If not, see <http://www.gnu.org/licenses/>.
28 */
29 
30 #include "testutil.hpp"
31 
32 int main (void)
33 {
35 
36  void *ctx = zmq_ctx_new ();
37  assert (ctx);
38 
39  // Create few sockets
40  void *vent = zmq_socket (ctx, ZMQ_PUSH);
41  assert (vent);
42  int rc = zmq_bind (vent, "tcp://127.0.0.1:55556");
43  assert (rc == 0);
44 
45  void *sink = zmq_socket (ctx, ZMQ_PULL);
46  assert (sink);
47  rc = zmq_connect (sink, "tcp://127.0.0.1:55556");
48  assert (rc == 0);
49 
50  void *bowl = zmq_socket (ctx, ZMQ_PULL);
51  assert (bowl);
52 
53 #if defined(ZMQ_SERVER) && defined(ZMQ_CLIENT)
54  void *server = zmq_socket (ctx, ZMQ_SERVER);
55  assert (server);
56  rc = zmq_bind (server, "tcp://127.0.0.1:55557");
57  assert (rc == 0);
58 
59  void *client = zmq_socket (ctx, ZMQ_CLIENT);
60  assert (client);
61 #endif
62 
63  // Set up poller
64  void* poller = zmq_poller_new ();
65  zmq_poller_event_t event;
66 
67  // waiting on poller with no registered sockets should report error
68  rc = zmq_poller_wait(poller, &event, 0);
69  assert (rc == -1);
70  assert (errno == ETIMEDOUT);
71 
72  // register sink
73  rc = zmq_poller_add (poller, sink, sink, ZMQ_POLLIN);
74  assert (rc == 0);
75 
76  // Send a message
77  char data[1] = {'H'};
78  rc = zmq_send_const (vent, data, 1, 0);
79  assert (rc == 1);
80 
81  // We expect a message only on the sink
82  rc = zmq_poller_wait (poller, &event, -1);
83  assert (rc == 0);
84  assert (event.socket == sink);
85  assert (event.user_data == sink);
86  rc = zmq_recv (sink, data, 1, 0);
87  assert (rc == 1);
88 
89  // We expect timed out
90  rc = zmq_poller_wait (poller, &event, 0);
91  assert (rc == -1);
92  assert (errno == ETIMEDOUT);
93 
94  // Stop polling sink
95  rc = zmq_poller_remove (poller, sink);
96  assert (rc == 0);
97 
98  // Check we can poll an FD
99  rc = zmq_connect (bowl, "tcp://127.0.0.1:55556");
100  assert (rc == 0);
101 
102 #if defined _WIN32
103  SOCKET fd;
104  size_t fd_size = sizeof (SOCKET);
105 #else
106  int fd;
107  size_t fd_size = sizeof (int);
108 #endif
109 
110  rc = zmq_getsockopt (bowl, ZMQ_FD, &fd, &fd_size);
111  assert (rc == 0);
112  rc = zmq_poller_add_fd (poller, fd, bowl, ZMQ_POLLIN);
113  assert (rc == 0);
114  rc = zmq_poller_wait (poller, &event, 500);
115  assert (rc == 0);
116  assert (event.socket == NULL);
117  assert (event.fd == fd);
118  assert (event.user_data == bowl);
119  zmq_poller_remove_fd (poller, fd);
120 
121 #if defined(ZMQ_SERVER) && defined(ZMQ_CLIENT)
122  // Polling on thread safe sockets
123  rc = zmq_poller_add (poller, server, NULL, ZMQ_POLLIN);
124  assert (rc == 0);
125  rc = zmq_connect (client, "tcp://127.0.0.1:55557");
126  assert (rc == 0);
127  rc = zmq_send_const (client, data, 1, 0);
128  assert (rc == 1);
129  rc = zmq_poller_wait (poller, &event, 500);
130  assert (rc == 0);
131  assert (event.socket == server);
132  assert (event.user_data == NULL);
133  rc = zmq_recv (server, data, 1, 0);
134  assert (rc == 1);
135 
136  // Polling on pollout
137  rc = zmq_poller_modify (poller, server, ZMQ_POLLOUT | ZMQ_POLLIN);
138  assert (rc == 0);
139  rc = zmq_poller_wait (poller, &event, 0);
140  assert (rc == 0);
141  assert (event.socket == server);
142  assert (event.user_data == NULL);
143  assert (event.events == ZMQ_POLLOUT);
144 #endif
145 
146  // Destory sockets, poller and ctx
147  rc = zmq_close (sink);
148  assert (rc == 0);
149  rc = zmq_close (vent);
150  assert (rc == 0);
151  rc = zmq_close (bowl);
152  assert (rc == 0);
153 #if defined(ZMQ_SERVER) && defined(ZMQ_CLIENT)
154  rc = zmq_close (server);
155  assert (rc == 0);
156  rc = zmq_close (client);
157  assert (rc == 0);
158 #endif
159  rc = zmq_poller_destroy(&poller);
160  assert(rc == 0);
161  rc = zmq_ctx_term (ctx);
162  assert (rc == 0);
163 
164  return 0;
165 }
int zmq_poller_remove_fd(void *poller, int fd)
Definition: zmq.cpp:1206
int zmq_poller_add_fd(void *poller, int fd, void *user_data, short events)
Definition: zmq.cpp:1143
int zmq_poller_add(void *poller, void *socket, void *user_data, short events)
Definition: zmq.cpp:1124
int main(void)
Definition: test_poller.cpp:32
ZMQ_EXPORT void * zmq_ctx_new(void)
Definition: zmq.cpp:115
void setup_test_environment(void)
Definition: testutil.hpp:285
int zmq_poller_wait(void *poller, zmq_poller_event_t *event, long timeout)
Definition: zmq.cpp:1218
ZMQ_EXPORT int zmq_recv(void *s, void *buf, size_t len, int flags)
Definition: zmq.cpp:507
ZMQ_EXPORT int zmq_send_const(void *s, const void *buf, size_t len, int flags)
Definition: zmq.cpp:416
ZMQ_EXPORT void * zmq_socket(void *, int type)
Definition: zmq.cpp:244
#define ZMQ_PUSH
Definition: zmq.h:254
#define ZMQ_FD
Definition: zmq.h:273
ZMQ_EXPORT int zmq_getsockopt(void *s, int option, void *optval, size_t *optvallen)
Definition: zmq.cpp:277
ZMQ_EXPORT int zmq_connect(void *s, const char *addr)
Definition: zmq.cpp:332
ZMQ_EXPORT int zmq_close(void *s)
Definition: zmq.cpp:255
#define ETIMEDOUT
Definition: zmq.h:157
int zmq_poller_modify(void *poller, void *socket, short events)
Definition: zmq.cpp:1155
#define ZMQ_SERVER
int zmq_poller_remove(void *poller, void *socket)
Definition: zmq.cpp:1187
#define ZMQ_POLLOUT
Definition: zmq.h:411
ZMQ_EXPORT int zmq_bind(void *s, const char *addr)
Definition: zmq.cpp:321
ZMQ_EXPORT int zmq_ctx_term(void *context)
Definition: zmq.cpp:162
#define ZMQ_POLLIN
Definition: zmq.h:410
int zmq_poller_destroy(void **poller_p)
Definition: zmq.cpp:1111
void * zmq_poller_new(void)
Definition: zmq.cpp:1104
#define ZMQ_PULL
Definition: zmq.h:253
#define ZMQ_CLIENT