libzmq  master
ZeroMQ C++ Core Engine (LIBZMQ)
test_xpub_manual.cpp
Go to the documentation of this file.
1 /*
2  Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 
4  This file is part of libzmq, the ZeroMQ core engine in C++.
5 
6  libzmq is free software; you can redistribute it and/or modify it under
7  the terms of the GNU Lesser General Public License (LGPL) as published
8  by the Free Software Foundation; either version 3 of the License, or
9  (at your option) any later version.
10 
11  As a special exception, the Contributors give you permission to link
12  this library with independent modules to produce an executable,
13  regardless of the license terms of these independent modules, and to
14  copy and distribute the resulting executable under terms of your choice,
15  provided that you also meet, for each linked independent module, the
16  terms and conditions of the license of that module. An independent
17  module is a module which is not derived from or based on this library.
18  If you modify this library, you must extend this exception to your
19  version of the library.
20 
21  libzmq is distributed in the hope that it will be useful, but WITHOUT
22  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23  FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24  License for more details.
25 
26  You should have received a copy of the GNU Lesser General Public License
27  along with this program. If not, see <http://www.gnu.org/licenses/>.
28 */
29 
30 #include "testutil.hpp"
31 
33 {
34  void *ctx = zmq_ctx_new ();
35  assert (ctx);
36 
37  // Create a publisher
38  void *pub = zmq_socket (ctx, ZMQ_XPUB);
39  assert (pub);
40  int rc = zmq_bind (pub, "inproc://soname");
41  assert (rc == 0);
42 
43  // set pub socket options
44  int manual = 1;
45  rc = zmq_setsockopt(pub, ZMQ_XPUB_MANUAL, &manual, 4);
46  assert (rc == 0);
47 
48  // Create a subscriber
49  void *sub = zmq_socket (ctx, ZMQ_XSUB);
50  assert (sub);
51  rc = zmq_connect (sub, "inproc://soname");
52  assert (rc == 0);
53 
54  // Subscribe for A
55  char subscription[2] = { 1, 'A'};
56  rc = zmq_send_const(sub, subscription, 2, 0);
57  assert (rc == 2);
58 
59  char buffer[2];
60 
61  // Receive subscriptions from subscriber
62  rc = zmq_recv(pub, buffer, 2, 0);
63  assert(rc == 2);
64  assert(buffer[0] == 1);
65  assert(buffer[1] == 'A');
66 
67  // Subscribe socket for B instead
68  rc = zmq_setsockopt(pub, ZMQ_SUBSCRIBE, "B", 1);
69  assert(rc == 0);
70 
71  // Sending A message and B Message
72  rc = zmq_send_const(pub, "A", 1, 0);
73  assert(rc == 1);
74 
75  rc = zmq_send_const(pub, "B", 1, 0);
76  assert(rc == 1);
77 
78  rc = zmq_recv(sub, buffer, 1, ZMQ_DONTWAIT);
79  assert(rc == 1);
80  assert(buffer[0] == 'B');
81 
82  // Clean up.
83  rc = zmq_close (pub);
84  assert (rc == 0);
85  rc = zmq_close (sub);
86  assert (rc == 0);
87  rc = zmq_ctx_term (ctx);
88  assert (rc == 0);
89 
90  return 0 ;
91 }
92 
94  const char *backend)
95 {
96  assert (frontend && backend);
97 
98  const char* topic = "1";
99  const char* payload = "X";
100 
101  int manual = 1;
102 
103  void *ctx = zmq_ctx_new ();
104  assert (ctx);
105 
106  // proxy frontend
107  void *xsub_proxy = zmq_socket (ctx, ZMQ_XSUB);
108  assert (xsub_proxy);
109  assert (zmq_bind (xsub_proxy, frontend) == 0);
110 
111  // proxy backend
112  void *xpub_proxy = zmq_socket (ctx, ZMQ_XPUB);
113  assert (xpub_proxy);
114  assert (zmq_setsockopt (xpub_proxy, ZMQ_XPUB_MANUAL, &manual, 4) == 0);
115  assert (zmq_bind (xpub_proxy, backend) == 0);
116 
117  // publisher
118  void *pub = zmq_socket (ctx, ZMQ_PUB);
119  assert (zmq_connect (pub, frontend) == 0);
120 
121  // first subscriber subscribes
122  void *sub1 = zmq_socket (ctx, ZMQ_SUB);
123  assert (sub1);
124  assert (zmq_connect (sub1, backend) == 0);
125  assert (zmq_setsockopt (sub1, ZMQ_SUBSCRIBE, topic, 1) == 0);
126 
127  // wait
129 
130  // proxy reroutes and confirms subscriptions
131  char sub_buff[2];
132  assert (zmq_recv (xpub_proxy, sub_buff, 2, ZMQ_DONTWAIT) == 2);
133  assert (sub_buff [0] == 1);
134  assert (sub_buff [1] == *topic);
135  assert (zmq_setsockopt (xpub_proxy, ZMQ_SUBSCRIBE, topic, 1) == 0);
136  assert (zmq_send (xsub_proxy, sub_buff, 2, 0) == 2);
137 
138  // second subscriber subscribes
139  void *sub2 = zmq_socket (ctx, ZMQ_SUB);
140  assert (sub2);
141  assert (zmq_connect (sub2, backend) == 0);
142  assert (zmq_setsockopt (sub2, ZMQ_SUBSCRIBE, topic, 1) == 0);
143 
144  // wait
146 
147  // proxy reroutes
148  assert (zmq_recv (xpub_proxy, sub_buff, 2, ZMQ_DONTWAIT) == 2);
149  assert (sub_buff [0] == 1);
150  assert (sub_buff [1] == *topic);
151  assert (zmq_setsockopt (xpub_proxy, ZMQ_SUBSCRIBE, topic, 1) == 0);
152  assert (zmq_send (xsub_proxy, sub_buff, 2, 0) == 2);
153 
154  // wait
156 
157  // let publisher send a msg
158  assert (zmq_send (pub, topic, 1, ZMQ_SNDMORE) == 1);
159  assert (zmq_send (pub, payload, 1, 0) == 1);
160 
161  // wait
163 
164  // proxy reroutes data messages to subscribers
165  char topic_buff[1];
166  char data_buff[1];
167  assert (zmq_recv (xsub_proxy, topic_buff, 1, ZMQ_DONTWAIT) == 1);
168  assert (topic_buff [0] == *topic);
169  assert (zmq_recv (xsub_proxy, data_buff, 1, ZMQ_DONTWAIT) == 1);
170  assert (data_buff [0] == *payload);
171  assert (zmq_send (xpub_proxy, topic_buff, 1, ZMQ_SNDMORE) == 1);
172  assert (zmq_send (xpub_proxy, data_buff, 1, 0) == 1);
173 
174  // wait
176 
177  // each subscriber should now get a message
178  assert (zmq_recv (sub2, topic_buff, 1, ZMQ_DONTWAIT) == 1);
179  assert (topic_buff [0] == *topic);
180  assert (zmq_recv (sub2, data_buff, 1, ZMQ_DONTWAIT) == 1);
181  assert (data_buff [0] == *payload);
182 
183  assert (zmq_recv (sub1, topic_buff, 1, ZMQ_DONTWAIT) == 1);
184  assert (topic_buff [0] == *topic);
185  assert (zmq_recv (sub1, data_buff, 1, ZMQ_DONTWAIT) == 1);
186  assert (data_buff [0] == *payload);
187 
188  // Disconnect both subscribers
189  assert (zmq_close (sub1) == 0);
190  assert (zmq_close (sub2) == 0);
191 
192  // wait
194 
195  // unsubscribe messages are passed from proxy to publisher
196  assert (zmq_recv (xpub_proxy, sub_buff, 2, 0) == 2);
197  assert (sub_buff [0] == 0);
198  assert (sub_buff [1] == *topic);
199  assert (zmq_setsockopt (xpub_proxy, ZMQ_UNSUBSCRIBE, topic, 1) == 0);
200  assert (zmq_send (xsub_proxy, sub_buff, 2, 0) == 2);
201 
202  // should receive another unsubscribe msg
203  assert (zmq_recv (xpub_proxy, sub_buff, 2, 0) == 2
204  && "Should receive the second unsubscribe message.");
205  assert (sub_buff [0] == 0);
206  assert (sub_buff [1] == *topic);
207  assert (zmq_setsockopt (xpub_proxy, ZMQ_UNSUBSCRIBE, topic, 1) == 0);
208  assert (zmq_send (xsub_proxy, sub_buff, 2, 0) == 2);
209 
210  // wait
212 
213  // let publisher send a msg
214  assert (zmq_send (pub, topic, 1, ZMQ_SNDMORE) == 1);
215  assert (zmq_send (pub, payload, 1, 0) == 1);
216 
217  // wait
219 
220  // nothing should come to the proxy
221  assert (zmq_recv (xsub_proxy, topic_buff, 1, ZMQ_DONTWAIT) == -1);
222  assert (errno == EAGAIN);
223 
224  assert (zmq_close (pub) == 0);
225  assert (zmq_close (xpub_proxy) == 0);
226  assert (zmq_close (xsub_proxy) == 0);
227  assert (zmq_ctx_term (ctx) == 0);
228 
229  return 0;
230 }
231 
232 int test_missing_subscriptions(const char *frontend, const char *backend)
233 {
234  assert (frontend && backend);
235 
236  const char* topic1 = "1";
237  const char* topic2 = "2";
238  const char* payload = "X";
239 
240  int manual = 1;
241 
242  void *ctx = zmq_ctx_new ();
243  assert (ctx);
244 
245  // proxy frontend
246  void *xsub_proxy = zmq_socket (ctx, ZMQ_XSUB);
247  assert (xsub_proxy);
248  assert (zmq_bind (xsub_proxy, frontend) == 0);
249 
250  // proxy backend
251  void *xpub_proxy = zmq_socket (ctx, ZMQ_XPUB);
252  assert (xpub_proxy);
253  assert (zmq_setsockopt (xpub_proxy, ZMQ_XPUB_MANUAL, &manual, 4) == 0);
254  assert (zmq_bind (xpub_proxy, backend) == 0);
255 
256  // publisher
257  void *pub = zmq_socket (ctx, ZMQ_PUB);
258  assert (zmq_connect (pub, frontend) == 0);
259 
260  // Here's the problem: because subscribers subscribe in quick succession,
261  // the proxy is unable to confirm the first subscription before receiving
262  // the second. This causes the first subscription to get lost.
263 
264  // first subscriber
265  void *sub1 = zmq_socket (ctx, ZMQ_SUB);
266  assert (sub1);
267  assert (zmq_connect (sub1, backend) == 0);
268  assert (zmq_setsockopt (sub1, ZMQ_SUBSCRIBE, topic1, 1) == 0);
269 
270  // second subscriber
271  void *sub2 = zmq_socket (ctx, ZMQ_SUB);
272  assert (sub2);
273  assert (zmq_connect (sub2, backend) == 0);
274  assert (zmq_setsockopt (sub2, ZMQ_SUBSCRIBE, topic2, 1) == 0);
275 
276  // wait
278 
279  // proxy now reroutes and confirms subscriptions
280  char buffer[2];
281  assert (zmq_recv (xpub_proxy, buffer, 2, ZMQ_DONTWAIT) == 2);
282  assert (buffer [0] == 1);
283  assert (buffer [1] == *topic1);
284  assert (zmq_setsockopt (xpub_proxy, ZMQ_SUBSCRIBE, topic1, 1) == 0);
285  assert (zmq_send (xsub_proxy, buffer, 2, 0) == 2);
286 
287  assert (zmq_recv (xpub_proxy, buffer, 2, ZMQ_DONTWAIT) == 2);
288  assert (buffer [0] == 1);
289  assert (buffer [1] == *topic2);
290  assert (zmq_setsockopt (xpub_proxy, ZMQ_SUBSCRIBE, topic2, 1) == 0);
291  assert (zmq_send (xsub_proxy, buffer, 2, 0) == 2);
292 
293  // wait
295 
296  // let publisher send 2 msgs, each with its own topic
297  assert (zmq_send (pub, topic1, 1, ZMQ_SNDMORE) == 1);
298  assert (zmq_send (pub, payload, 1, 0) == 1);
299  assert (zmq_send (pub, topic2, 1, ZMQ_SNDMORE) == 1);
300  assert (zmq_send (pub, payload, 1, 0) == 1);
301 
302  // wait
304 
305  // proxy reroutes data messages to subscribers
306  char topic_buff [1];
307  char data_buff [1];
308  assert (zmq_recv (xsub_proxy, topic_buff, 1, ZMQ_DONTWAIT) == 1);
309  assert (topic_buff [0] == *topic1);
310  assert (zmq_recv (xsub_proxy, data_buff, 1, ZMQ_DONTWAIT) == 1);
311  assert (data_buff [0] == *payload);
312  assert (zmq_send (xpub_proxy, topic_buff, 1, ZMQ_SNDMORE) == 1);
313  assert (zmq_send (xpub_proxy, data_buff, 1, 0) == 1);
314 
315  assert (zmq_recv (xsub_proxy, topic_buff, 1, ZMQ_DONTWAIT) == 1);
316  assert (topic_buff [0] == *topic2);
317  assert (zmq_recv (xsub_proxy, data_buff, 1, ZMQ_DONTWAIT) == 1);
318  assert (data_buff [0] == *payload);
319  assert (zmq_send (xpub_proxy, topic_buff, 1, ZMQ_SNDMORE) == 1);
320  assert (zmq_send (xpub_proxy, data_buff, 1, 0) == 1);
321 
322  // wait
324 
325  // each subscriber should now get a message
326  assert (zmq_recv (sub2, topic_buff, 1, ZMQ_DONTWAIT) == 1);
327  assert (topic_buff [0] == *topic2);
328  assert (zmq_recv (sub2, data_buff, 1, ZMQ_DONTWAIT) == 1);
329  assert (data_buff [0] == *payload);
330 
331  assert (zmq_recv (sub1, topic_buff, 1, ZMQ_DONTWAIT) == 1);
332  assert (topic_buff [0] == *topic1);
333  assert (zmq_recv (sub1, data_buff, 1, ZMQ_DONTWAIT) == 1);
334  assert (data_buff [0] == *payload);
335 
336  // Clean up
337  assert (zmq_close (sub1) == 0);
338  assert (zmq_close (sub2) == 0);
339  assert (zmq_close (pub) == 0);
340  assert (zmq_close (xpub_proxy) == 0);
341  assert (zmq_close (xsub_proxy) == 0);
342  assert (zmq_ctx_term (ctx) == 0);
343 
344  return 0;
345 }
346 
347 
348 int main(void)
349 {
351  test_basic ();
352 
353  const char *frontend;
354  const char *backend;
355 
356 #if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
357  frontend = "ipc://frontend";
358  backend = "ipc://backend";
359  test_xpub_proxy_unsubscribe_on_disconnect (frontend, backend);
360  test_missing_subscriptions (frontend, backend);
361 #endif
362  frontend = "tcp://127.0.0.1:5560";
363  backend = "tcp://127.0.0.1:5561";
364  test_xpub_proxy_unsubscribe_on_disconnect (frontend, backend);
365  test_missing_subscriptions (frontend, backend);
366 
367  return 0;
368 }
void msleep(int milliseconds)
Definition: testutil.hpp:316
ZMQ_EXPORT int zmq_setsockopt(void *s, int option, const void *optval, size_t optvallen)
Definition: zmq.cpp:265
int test_basic()
ZMQ_EXPORT void * zmq_ctx_new(void)
Definition: zmq.cpp:115
#define ZMQ_SNDMORE
Definition: zmq.h:346
#define ZMQ_SUB
Definition: zmq.h:248
#define ZMQ_XPUB
Definition: zmq.h:255
#define ZMQ_SUBSCRIBE
Definition: zmq.h:266
#define ZMQ_XSUB
Definition: zmq.h:256
void setup_test_environment(void)
Definition: testutil.hpp:285
#define SETTLE_TIME
Definition: testutil.hpp:44
ZMQ_EXPORT int zmq_recv(void *s, void *buf, size_t len, int flags)
Definition: zmq.cpp:507
ZMQ_EXPORT int zmq_send_const(void *s, const void *buf, size_t len, int flags)
Definition: zmq.cpp:416
#define ZMQ_PUB
Definition: zmq.h:247
ZMQ_EXPORT void * zmq_socket(void *, int type)
Definition: zmq.cpp:244
#define ZMQ_UNSUBSCRIBE
Definition: zmq.h:267
int main(void)
ZMQ_EXPORT int zmq_connect(void *s, const char *addr)
Definition: zmq.cpp:332
int test_missing_subscriptions(const char *frontend, const char *backend)
ZMQ_EXPORT int zmq_close(void *s)
Definition: zmq.cpp:255
ZMQ_EXPORT int zmq_send(void *s, const void *buf, size_t len, int flags)
Definition: zmq.cpp:387
int test_xpub_proxy_unsubscribe_on_disconnect(const char *frontend, const char *backend)
ZMQ_EXPORT int zmq_bind(void *s, const char *addr)
Definition: zmq.cpp:321
ZMQ_EXPORT int zmq_ctx_term(void *context)
Definition: zmq.cpp:162
#define ZMQ_DONTWAIT
Definition: zmq.h:345
#define ZMQ_XPUB_MANUAL
Definition: zmq.h:322