libzmq  master
ZeroMQ C++ Core Engine (LIBZMQ)
test_inproc_connect.cpp
Go to the documentation of this file.
1 /*
2  Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 
4  This file is part of libzmq, the ZeroMQ core engine in C++.
5 
6  libzmq is free software; you can redistribute it and/or modify it under
7  the terms of the GNU Lesser General Public License (LGPL) as published
8  by the Free Software Foundation; either version 3 of the License, or
9  (at your option) any later version.
10 
11  As a special exception, the Contributors give you permission to link
12  this library with independent modules to produce an executable,
13  regardless of the license terms of these independent modules, and to
14  copy and distribute the resulting executable under terms of your choice,
15  provided that you also meet, for each linked independent module, the
16  terms and conditions of the license of that module. An independent
17  module is a module which is not derived from or based on this library.
18  If you modify this library, you must extend this exception to your
19  version of the library.
20 
21  libzmq is distributed in the hope that it will be useful, but WITHOUT
22  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23  FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24  License for more details.
25 
26  You should have received a copy of the GNU Lesser General Public License
27  along with this program. If not, see <http://www.gnu.org/licenses/>.
28 */
29 
30 #include "testutil.hpp"
31 
32 static void pusher (void *ctx)
33 {
34  // Connect first
35  void *connectSocket = zmq_socket (ctx, ZMQ_PAIR);
36  assert (connectSocket);
37  int rc = zmq_connect (connectSocket, "inproc://sink");
38  assert (rc == 0);
39 
40  // Queue up some data
41  rc = zmq_send_const (connectSocket, "foobar", 6, 0);
42  assert (rc == 6);
43 
44  // Cleanup
45  rc = zmq_close (connectSocket);
46  assert (rc == 0);
47 }
48 
49 static void simult_conn (void *payload)
50 {
51  // Pull out arguments - context followed by endpoint string
52  void* ctx = (void*)((void**)payload)[0];
53  char* endpt = (char*)((void**)payload)[1];
54 
55  // Connect
56  void *connectSocket = zmq_socket (ctx, ZMQ_SUB);
57  assert (connectSocket);
58  int rc = zmq_connect (connectSocket, endpt);
59  assert (rc == 0);
60 
61  // Cleanup
62  rc = zmq_close (connectSocket);
63  assert (rc == 0);
64 }
65 
66 static void simult_bind (void *payload)
67 {
68  // Pull out arguments - context followed by endpoint string
69  void* ctx = (void*)((void**)payload)[0];
70  char* endpt = (char*)((void**)payload)[1];
71 
72  // Bind
73  void *bindSocket = zmq_socket (ctx, ZMQ_PUB);
74  assert (bindSocket);
75  int rc = zmq_bind (bindSocket, endpt);
76  assert (rc == 0);
77 
78  // Cleanup
79  rc = zmq_close (bindSocket);
80  assert (rc == 0);
81 }
82 
84 {
85  void *ctx = zmq_ctx_new ();
86  assert (ctx);
87 
88  // Bind first
89  void *bindSocket = zmq_socket (ctx, ZMQ_PAIR);
90  assert (bindSocket);
91  int rc = zmq_bind (bindSocket, "inproc://bbc");
92  assert (rc == 0);
93 
94  // Now connect
95  void *connectSocket = zmq_socket (ctx, ZMQ_PAIR);
96  assert (connectSocket);
97  rc = zmq_connect (connectSocket, "inproc://bbc");
98  assert (rc == 0);
99 
100  // Queue up some data
101  rc = zmq_send_const (connectSocket, "foobar", 6, 0);
102  assert (rc == 6);
103 
104  // Read pending message
105  zmq_msg_t msg;
106  rc = zmq_msg_init (&msg);
107  assert (rc == 0);
108  rc = zmq_msg_recv (&msg, bindSocket, 0);
109  assert (rc == 6);
110  void *data = zmq_msg_data (&msg);
111  assert (memcmp ("foobar", data, 6) == 0);
112 
113  // Cleanup
114  rc = zmq_close (connectSocket);
115  assert (rc == 0);
116 
117  rc = zmq_close (bindSocket);
118  assert (rc == 0);
119 
120  rc = zmq_ctx_term (ctx);
121  assert (rc == 0);
122 }
123 
125 {
126  void *ctx = zmq_ctx_new ();
127  assert (ctx);
128 
129  // Connect first
130  void *connectSocket = zmq_socket (ctx, ZMQ_PAIR);
131  assert (connectSocket);
132  int rc = zmq_connect (connectSocket, "inproc://cbb");
133  assert (rc == 0);
134 
135  // Queue up some data
136  rc = zmq_send_const (connectSocket, "foobar", 6, 0);
137  assert (rc == 6);
138 
139  // Now bind
140  void *bindSocket = zmq_socket (ctx, ZMQ_PAIR);
141  assert (bindSocket);
142  rc = zmq_bind (bindSocket, "inproc://cbb");
143  assert (rc == 0);
144 
145  // Read pending message
146  zmq_msg_t msg;
147  rc = zmq_msg_init (&msg);
148  assert (rc == 0);
149  rc = zmq_msg_recv (&msg, bindSocket, 0);
150  assert (rc == 6);
151  void *data = zmq_msg_data (&msg);
152  assert (memcmp ("foobar", data, 6) == 0);
153 
154  // Cleanup
155  rc = zmq_close (connectSocket);
156  assert (rc == 0);
157 
158  rc = zmq_close (bindSocket);
159  assert (rc == 0);
160 
161  rc = zmq_ctx_term (ctx);
162  assert (rc == 0);
163 }
164 
166 {
167  void *ctx = zmq_ctx_new ();
168  assert (ctx);
169 
170  // Connect first
171  void *connectSocket = zmq_socket (ctx, ZMQ_PUB);
172  assert (connectSocket);
173  int rc = zmq_connect (connectSocket, "inproc://cbbps");
174  assert (rc == 0);
175 
176  // Queue up some data, this will be dropped
177  rc = zmq_send_const (connectSocket, "before", 6, 0);
178  assert (rc == 6);
179 
180  // Now bind
181  void *bindSocket = zmq_socket (ctx, ZMQ_SUB);
182  assert (bindSocket);
183  rc = zmq_setsockopt (bindSocket, ZMQ_SUBSCRIBE, "", 0);
184  assert (rc == 0);
185  rc = zmq_bind (bindSocket, "inproc://cbbps");
186  assert (rc == 0);
187 
188  // Wait for pub-sub connection to happen
190 
191  // Queue up some data, this not will be dropped
192  rc = zmq_send_const (connectSocket, "after", 6, 0);
193  assert (rc == 6);
194 
195  // Read pending message
196  zmq_msg_t msg;
197  rc = zmq_msg_init (&msg);
198  assert (rc == 0);
199  rc = zmq_msg_recv (&msg, bindSocket, 0);
200  assert (rc == 6);
201  void *data = zmq_msg_data (&msg);
202  assert (memcmp ("after", data, 5) == 0);
203 
204  // Cleanup
205  rc = zmq_close (connectSocket);
206  assert (rc == 0);
207 
208  rc = zmq_close (bindSocket);
209  assert (rc == 0);
210 
211  rc = zmq_ctx_term (ctx);
212  assert (rc == 0);
213 }
214 
216 {
217  const unsigned int no_of_connects = 10;
218  void *ctx = zmq_ctx_new ();
219  assert (ctx);
220 
221  int rc;
222  void *connectSocket [no_of_connects];
223 
224  // Connect first
225  for (unsigned int i = 0; i < no_of_connects; ++i)
226  {
227  connectSocket [i] = zmq_socket (ctx, ZMQ_PUSH);
228  assert (connectSocket [i]);
229  rc = zmq_connect (connectSocket [i], "inproc://multiple");
230  assert (rc == 0);
231 
232  // Queue up some data
233  rc = zmq_send_const (connectSocket [i], "foobar", 6, 0);
234  assert (rc == 6);
235  }
236 
237  // Now bind
238  void *bindSocket = zmq_socket (ctx, ZMQ_PULL);
239  assert (bindSocket);
240  rc = zmq_bind (bindSocket, "inproc://multiple");
241  assert (rc == 0);
242 
243  for (unsigned int i = 0; i < no_of_connects; ++i)
244  {
245  // Read pending message
246  zmq_msg_t msg;
247  rc = zmq_msg_init (&msg);
248  assert (rc == 0);
249  rc = zmq_msg_recv (&msg, bindSocket, 0);
250  assert (rc == 6);
251  void *data = zmq_msg_data (&msg);
252  assert (memcmp ("foobar", data, 6) == 0);
253  }
254 
255  // Cleanup
256  for (unsigned int i = 0; i < no_of_connects; ++i)
257  {
258  rc = zmq_close (connectSocket [i]);
259  assert (rc == 0);
260  }
261 
262  rc = zmq_close (bindSocket);
263  assert (rc == 0);
264 
265  rc = zmq_ctx_term (ctx);
266  assert (rc == 0);
267 }
268 
270 {
271  const unsigned int no_of_threads = 30;
272  void *ctx = zmq_ctx_new ();
273  assert (ctx);
274 
275  int rc;
276  void *threads [no_of_threads];
277 
278  // Connect first
279  for (unsigned int i = 0; i < no_of_threads; ++i)
280  {
281  threads [i] = zmq_threadstart (&pusher, ctx);
282  }
283 
284  // Now bind
285  void *bindSocket = zmq_socket (ctx, ZMQ_PULL);
286  assert (bindSocket);
287  rc = zmq_bind (bindSocket, "inproc://sink");
288  assert (rc == 0);
289 
290  for (unsigned int i = 0; i < no_of_threads; ++i)
291  {
292  // Read pending message
293  zmq_msg_t msg;
294  rc = zmq_msg_init (&msg);
295  assert (rc == 0);
296  rc = zmq_msg_recv (&msg, bindSocket, 0);
297  assert (rc == 6);
298  void *data = zmq_msg_data (&msg);
299  assert (memcmp ("foobar", data, 6) == 0);
300  }
301 
302  // Cleanup
303  for (unsigned int i = 0; i < no_of_threads; ++i)
304  {
305  zmq_threadclose (threads [i]);
306  }
307 
308  rc = zmq_close (bindSocket);
309  assert (rc == 0);
310 
311  rc = zmq_ctx_term (ctx);
312  assert (rc == 0);
313 }
314 
316 {
317  const unsigned int no_of_times = 50;
318  void *ctx = zmq_ctx_new ();
319  assert (ctx);
320 
321  void *threads[no_of_times*2];
322  void *thr_args[no_of_times][2];
323  char endpts[no_of_times][20];
324 
325  // Set up thread arguments: context followed by endpoint string
326  for (unsigned int i = 0; i < no_of_times; ++i)
327  {
328  thr_args[i][0] = (void*) ctx;
329  thr_args[i][1] = (void*) endpts[i];
330  sprintf (endpts[i], "inproc://foo_%d", i);
331  }
332 
333  // Spawn all threads as simultaneously as possible
334  for (unsigned int i = 0; i < no_of_times; ++i)
335  {
336  threads[i*2+0] = zmq_threadstart (&simult_conn, (void*)thr_args[i]);
337  threads[i*2+1] = zmq_threadstart (&simult_bind, (void*)thr_args[i]);
338  }
339 
340  // Close all threads
341  for (unsigned int i = 0; i < no_of_times; ++i)
342  {
343  zmq_threadclose (threads[i*2+0]);
344  zmq_threadclose (threads[i*2+1]);
345  }
346 
347  int rc = zmq_ctx_term (ctx);
348  assert (rc == 0);
349 }
350 
352 {
353  // Create the infrastructure
354  void *ctx = zmq_ctx_new ();
355  assert (ctx);
356 
357  void *sc = zmq_socket (ctx, ZMQ_DEALER);
358  assert (sc);
359 
360  int rc = zmq_connect (sc, "inproc://identity");
361  assert (rc == 0);
362 
363  void *sb = zmq_socket (ctx, ZMQ_ROUTER);
364  assert (sb);
365 
366  rc = zmq_bind (sb, "inproc://identity");
367  assert (rc == 0);
368 
369  // Send 2-part message.
370  rc = zmq_send (sc, "A", 1, ZMQ_SNDMORE);
371  assert (rc == 1);
372  rc = zmq_send (sc, "B", 1, 0);
373  assert (rc == 1);
374 
375  // Identity comes first.
376  zmq_msg_t msg;
377  rc = zmq_msg_init (&msg);
378  assert (rc == 0);
379  rc = zmq_msg_recv (&msg, sb, 0);
380  assert (rc >= 0);
381  int more = zmq_msg_more (&msg);
382  assert (more == 1);
383 
384  // Then the first part of the message body.
385  rc = zmq_msg_recv (&msg, sb, 0);
386  assert (rc == 1);
387  more = zmq_msg_more (&msg);
388  assert (more == 1);
389 
390  // And finally, the second part of the message body.
391  rc = zmq_msg_recv (&msg, sb, 0);
392  assert (rc == 1);
393  more = zmq_msg_more (&msg);
394  assert (more == 0);
395 
396  // Deallocate the infrastructure.
397  rc = zmq_close (sc);
398  assert (rc == 0);
399 
400  rc = zmq_close (sb);
401  assert (rc == 0);
402 
403  rc = zmq_ctx_term (ctx);
404  assert (rc == 0);
405 }
406 
408 {
409  void *ctx = zmq_ctx_new ();
410  assert (ctx);
411 
412  void *connectSocket = zmq_socket (ctx, ZMQ_PUSH);
413  assert (connectSocket);
414  int rc = zmq_connect (connectSocket, "inproc://a");
415  assert (rc == 0);
416 
417  rc = zmq_close (connectSocket);
418  assert (rc == 0);
419 
420  rc = zmq_ctx_term (ctx);
421  assert (rc == 0);
422 }
423 
424 
425 void test_unbind ()
426 {
427  void *ctx = zmq_ctx_new ();
428  assert (ctx);
429 
430  // Bind and unbind socket 1
431  void *bindSocket1 = zmq_socket (ctx, ZMQ_PAIR);
432  assert (bindSocket1);
433  int rc = zmq_bind (bindSocket1, "inproc://unbind");
434  assert (rc == 0);
435  zmq_unbind (bindSocket1, "inproc://unbind");
436  assert (rc == 0);
437 
438  // Bind socket 2
439  void *bindSocket2 = zmq_socket (ctx, ZMQ_PAIR);
440  assert (bindSocket2);
441  rc = zmq_bind (bindSocket2, "inproc://unbind");
442  assert (rc == 0);
443 
444  // Now connect
445  void *connectSocket = zmq_socket (ctx, ZMQ_PAIR);
446  assert (connectSocket);
447  rc = zmq_connect (connectSocket, "inproc://unbind");
448  assert (rc == 0);
449 
450  // Queue up some data
451  rc = zmq_send_const (connectSocket, "foobar", 6, 0);
452  assert (rc == 6);
453 
454  // Read pending message
455  zmq_msg_t msg;
456  rc = zmq_msg_init (&msg);
457  assert (rc == 0);
458  rc = zmq_msg_recv (&msg, bindSocket2, 0);
459  assert (rc == 6);
460  void *data = zmq_msg_data (&msg);
461  assert (memcmp ("foobar", data, 6) == 0);
462 
463  // Cleanup
464  rc = zmq_close (connectSocket);
465  assert (rc == 0);
466  rc = zmq_close (bindSocket1);
467  assert (rc == 0);
468  rc = zmq_close (bindSocket2);
469  assert (rc == 0);
470  rc = zmq_ctx_term (ctx);
471  assert (rc == 0);
472 }
473 
475 {
476  void *ctx = zmq_ctx_new ();
477  assert (ctx);
478 
479  // Connect first
480  void *connectSocket = zmq_socket (ctx, ZMQ_PAIR);
481  assert (connectSocket);
482  int rc = zmq_connect (connectSocket, "inproc://cbb");
483  assert (rc == 0);
484 
485  zmq_ctx_shutdown (ctx);
486 
487  // Cleanup
488  rc = zmq_close (connectSocket);
489  assert (rc == 0);
490 
491  rc = zmq_ctx_term (ctx);
492  assert (rc == 0);
493 }
494 
495 int main (void)
496 {
498 
505  test_identity ();
507  test_unbind ();
509 
510  return 0;
511 }
void msleep(int milliseconds)
Definition: testutil.hpp:316
ZMQ_EXPORT int zmq_setsockopt(void *s, int option, const void *optval, size_t optvallen)
Definition: zmq.cpp:265
ZMQ_EXPORT int zmq_unbind(void *s, const char *addr)
Definition: zmq.cpp:343
static void simult_bind(void *payload)
ZMQ_EXPORT void * zmq_ctx_new(void)
Definition: zmq.cpp:115
#define ZMQ_SNDMORE
Definition: zmq.h:346
static void pusher(void *ctx)
#define ZMQ_DEALER
Definition: zmq.h:251
#define ZMQ_SUB
Definition: zmq.h:248
#define ZMQ_SUBSCRIBE
Definition: zmq.h:266
void setup_test_environment(void)
Definition: testutil.hpp:285
#define SETTLE_TIME
Definition: testutil.hpp:44
ZMQ_EXPORT int zmq_send_const(void *s, const void *buf, size_t len, int flags)
Definition: zmq.cpp:416
#define ZMQ_PUB
Definition: zmq.h:247
void test_multiple_threads()
#define ZMQ_ROUTER
Definition: zmq.h:252
ZMQ_EXPORT void * zmq_msg_data(zmq_msg_t *msg)
Definition: zmq.cpp:666
ZMQ_EXPORT void * zmq_socket(void *, int type)
Definition: zmq.cpp:244
#define ZMQ_PUSH
Definition: zmq.h:254
void test_connect_only()
void test_unbind()
ZMQ_EXPORT void zmq_threadclose(void *thread)
Definition: zmq_utils.cpp:85
void test_connect_before_bind()
ZMQ_EXPORT int zmq_connect(void *s, const char *addr)
Definition: zmq.cpp:332
ZMQ_EXPORT int zmq_close(void *s)
Definition: zmq.cpp:255
ZMQ_EXPORT int zmq_ctx_shutdown(void *context)
Definition: zmq.cpp:191
ZMQ_EXPORT int zmq_msg_recv(zmq_msg_t *msg, void *s, int flags)
Definition: zmq.cpp:640
void test_connect_before_bind_pub_sub()
ZMQ_EXPORT int zmq_msg_more(zmq_msg_t *msg)
Definition: zmq.cpp:676
Definition: zmq.h:221
void test_shutdown_during_pend()
int main(void)
void test_bind_before_connect()
ZMQ_EXPORT int zmq_send(void *s, const void *buf, size_t len, int flags)
Definition: zmq.cpp:387
ZMQ_EXPORT int zmq_bind(void *s, const char *addr)
Definition: zmq.cpp:321
ZMQ_EXPORT int zmq_ctx_term(void *context)
Definition: zmq.cpp:162
void test_identity()
#define ZMQ_PAIR
Definition: zmq.h:246
ZMQ_EXPORT void * zmq_threadstart(zmq_thread_fn *func, void *arg)
Definition: zmq_utils.cpp:78
void test_simultaneous_connect_bind_threads()
void test_multiple_connects()
ZMQ_EXPORT int zmq_msg_init(zmq_msg_t *msg)
Definition: zmq.cpp:613
static void simult_conn(void *payload)
#define ZMQ_PULL
Definition: zmq.h:253