libzmq  master
ZeroMQ C++ Core Engine (LIBZMQ)
test_spec_pushpull.cpp
Go to the documentation of this file.
1 /*
2  Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 
4  This file is part of libzmq, the ZeroMQ core engine in C++.
5 
6  libzmq is free software; you can redistribute it and/or modify it under
7  the terms of the GNU Lesser General Public License (LGPL) as published
8  by the Free Software Foundation; either version 3 of the License, or
9  (at your option) any later version.
10 
11  As a special exception, the Contributors give you permission to link
12  this library with independent modules to produce an executable,
13  regardless of the license terms of these independent modules, and to
14  copy and distribute the resulting executable under terms of your choice,
15  provided that you also meet, for each linked independent module, the
16  terms and conditions of the license of that module. An independent
17  module is a module which is not derived from or based on this library.
18  If you modify this library, you must extend this exception to your
19  version of the library.
20 
21  libzmq is distributed in the hope that it will be useful, but WITHOUT
22  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23  FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24  License for more details.
25 
26  You should have received a copy of the GNU Lesser General Public License
27  along with this program. If not, see <http://www.gnu.org/licenses/>.
28 */
29 
30 #include "testutil.hpp"
31 
32 const char *bind_address = 0;
33 const char *connect_address = 0;
34 
35 void test_push_round_robin_out (void *ctx)
36 {
37  void *push = zmq_socket (ctx, ZMQ_PUSH);
38  assert (push);
39 
40  int rc = zmq_bind (push, bind_address);
41  assert (rc == 0);
42 
43  const size_t services = 5;
44  void *pulls [services];
45  for (size_t peer = 0; peer < services; ++peer) {
46  pulls [peer] = zmq_socket (ctx, ZMQ_PULL);
47  assert (pulls [peer]);
48 
49  int timeout = 250;
50  rc = zmq_setsockopt (pulls [peer], ZMQ_RCVTIMEO, &timeout, sizeof (int));
51  assert (rc == 0);
52 
53  rc = zmq_connect (pulls [peer], connect_address);
54  assert (rc == 0);
55  }
56 
57  // Wait for connections.
59 
60  // Send 2N messages
61  for (size_t peer = 0; peer < services; ++peer)
62  s_send_seq (push, "ABC", SEQ_END);
63  for (size_t peer = 0; peer < services; ++peer)
64  s_send_seq (push, "DEF", SEQ_END);
65 
66  // Expect every PULL got one of each
67  for (size_t peer = 0; peer < services; ++peer) {
68  s_recv_seq (pulls [peer], "ABC", SEQ_END);
69  s_recv_seq (pulls [peer], "DEF", SEQ_END);
70  }
71 
72  close_zero_linger (push);
73 
74  for (size_t peer = 0; peer < services; ++peer)
75  close_zero_linger (pulls [peer]);
76 
77  // Wait for disconnects.
79 }
80 
81 void test_pull_fair_queue_in (void *ctx)
82 {
83  void *pull = zmq_socket (ctx, ZMQ_PULL);
84  assert (pull);
85 
86  int rc = zmq_bind (pull, bind_address);
87  assert (rc == 0);
88 
89  const size_t services = 5;
90  void *pushs [services];
91  for (size_t peer = 0; peer < services; ++peer)
92  {
93  pushs [peer] = zmq_socket (ctx, ZMQ_PUSH);
94  assert (pushs [peer]);
95 
96  rc = zmq_connect (pushs [peer], connect_address);
97  assert (rc == 0);
98  }
99 
100  // Wait for connections.
102 
103  int first_half = 0;
104  int second_half = 0;
105 
106  // Send 2N messages
107  for (size_t peer = 0; peer < services; ++peer) {
108  char *str = strdup("A");
109 
110  str [0] += peer;
111  s_send_seq (pushs [peer], str, SEQ_END);
112  first_half += str [0];
113 
114  str [0] += services;
115  s_send_seq (pushs [peer], str, SEQ_END);
116  second_half += str [0];
117 
118  free (str);
119  }
120 
121  // Wait for data.
123 
124  zmq_msg_t msg;
125  rc = zmq_msg_init (&msg);
126  assert (rc == 0);
127 
128  // Expect to pull one from each first
129  for (size_t peer = 0; peer < services; ++peer) {
130  rc = zmq_msg_recv (&msg, pull, 0);
131  assert (rc == 2);
132  const char *str = (const char *)zmq_msg_data (&msg);
133  first_half -= str [0];
134  }
135  assert (first_half == 0);
136 
137  // And then get the second batch
138  for (size_t peer = 0; peer < services; ++peer) {
139  rc = zmq_msg_recv (&msg, pull, 0);
140  assert (rc == 2);
141  const char *str = (const char *)zmq_msg_data (&msg);
142  second_half -= str [0];
143  }
144  assert (second_half == 0);
145 
146  rc = zmq_msg_close (&msg);
147  assert (rc == 0);
148 
149  close_zero_linger (pull);
150 
151  for (size_t peer = 0; peer < services; ++peer)
152  close_zero_linger (pushs [peer]);
153 
154  // Wait for disconnects.
156 }
157 
159 {
160  void *sc = zmq_socket (ctx, ZMQ_PUSH);
161  assert (sc);
162 
163  int timeout = 250;
164  int rc = zmq_setsockopt (sc, ZMQ_SNDTIMEO, &timeout, sizeof (timeout));
165  assert (rc == 0);
166 
167  rc = zmq_send (sc, 0, 0, ZMQ_DONTWAIT);
168  assert (rc == -1);
169  assert (errno == EAGAIN);
170 
171  rc = zmq_send (sc, 0, 0, 0);
172  assert (rc == -1);
173  assert (errno == EAGAIN);
174 
175  rc = zmq_close (sc);
176  assert (rc == 0);
177 }
178 
180 {
181  void *A = zmq_socket (ctx, ZMQ_PUSH);
182  assert (A);
183 
184  int hwm = 1;
185  int rc = zmq_setsockopt (A, ZMQ_SNDHWM, &hwm, sizeof (hwm));
186  assert (rc == 0);
187 
188  rc = zmq_bind (A, bind_address);
189  assert (rc == 0);
190 
191  void *B = zmq_socket (ctx, ZMQ_PULL);
192  assert (B);
193 
194  rc = zmq_setsockopt (B, ZMQ_RCVHWM, &hwm, sizeof (hwm));
195  assert (rc == 0);
196 
197  rc = zmq_connect (B, connect_address);
198  assert (rc == 0);
199 
200  // Send two messages, one should be stuck in A's outgoing queue, the other
201  // arrives at B.
202  s_send_seq (A, "ABC", SEQ_END);
203  s_send_seq (A, "DEF", SEQ_END);
204 
205  // Both queues should now be full, indicated by A blocking on send.
206  rc = zmq_send (A, 0, 0, ZMQ_DONTWAIT);
207  assert (rc == -1);
208  assert (errno == EAGAIN);
209 
211  assert (rc == 0);
212 
213  // Disconnect may take time and need command processing.
214  zmq_pollitem_t poller [2] = { { A, 0, 0, 0 }, { B, 0, 0, 0 } };
215  rc = zmq_poll (poller, 2, 100);
216  assert (rc == 0);
217  rc = zmq_poll (poller, 2, 100);
218  assert (rc == 0);
219 
220  zmq_msg_t msg;
221  rc = zmq_msg_init (&msg);
222  assert (rc == 0);
223 
224  // Can't receive old data on B.
225  rc = zmq_msg_recv (&msg, B, ZMQ_DONTWAIT);
226  assert (rc == -1);
227  assert (errno == EAGAIN);
228 
229  // Sending fails.
230  rc = zmq_send (A, 0, 0, ZMQ_DONTWAIT);
231  assert (rc == -1);
232  assert (errno == EAGAIN);
233 
234  // Reconnect B
235  rc = zmq_connect (B, connect_address);
236  assert (rc == 0);
237 
238  // Still can't receive old data on B.
239  rc = zmq_msg_recv (&msg, B, ZMQ_DONTWAIT);
240  assert (rc == -1);
241  assert (errno == EAGAIN);
242 
243  // two messages should be sendable before the queues are filled up.
244  s_send_seq (A, "ABC", SEQ_END);
245  s_send_seq (A, "DEF", SEQ_END);
246 
247  rc = zmq_send (A, 0, 0, ZMQ_DONTWAIT);
248  assert (rc == -1);
249  assert (errno == EAGAIN);
250 
251  rc = zmq_msg_close (&msg);
252  assert (rc == 0);
253 
254  close_zero_linger (A);
255  close_zero_linger (B);
256 
257  // Wait for disconnects.
259 }
260 
261 int main (void)
262 {
264  void *ctx = zmq_ctx_new ();
265  assert (ctx);
266 
267  const char *binds [] = { "inproc://a", "tcp://127.0.0.1:5555" };
268  const char *connects [] = { "inproc://a", "tcp://localhost:5555" };
269 
270  for (int transport = 0; transport < 2; ++transport) {
271  bind_address = binds [transport];
272  connect_address = connects [transport];
273 
274  // PUSH: SHALL route outgoing messages to connected peers using a
275  // round-robin strategy.
277 
278  // PULL: SHALL receive incoming messages from its peers using a fair-queuing
279  // strategy.
281 
282  // PUSH: SHALL block on sending, or return a suitable error, when it has no
283  // available peers.
285 
286  // PUSH and PULL: SHALL create this queue when a peer connects to it. If
287  // this peer disconnects, the socket SHALL destroy its queue and SHALL
288  // discard any messages it contains.
289  // *** Test disabled until libzmq does this properly ***
290  // test_destroy_queue_on_disconnect (ctx);
291  }
292 
293  int rc = zmq_ctx_term (ctx);
294  assert (rc == 0);
295 
296  return 0 ;
297 }
void msleep(int milliseconds)
Definition: testutil.hpp:316
ZMQ_EXPORT int zmq_setsockopt(void *s, int option, const void *optval, size_t optvallen)
Definition: zmq.cpp:265
#define ZMQ_RCVHWM
Definition: zmq.h:282
ZMQ_EXPORT void * zmq_ctx_new(void)
Definition: zmq.cpp:115
void setup_test_environment(void)
Definition: testutil.hpp:285
#define SETTLE_TIME
Definition: testutil.hpp:44
void test_pull_fair_queue_in(void *ctx)
ZMQ_EXPORT void * zmq_msg_data(zmq_msg_t *msg)
Definition: zmq.cpp:666
ZMQ_EXPORT void * zmq_socket(void *, int type)
Definition: zmq.cpp:244
#define ZMQ_PUSH
Definition: zmq.h:254
#define ZMQ_SNDHWM
Definition: zmq.h:281
#define ZMQ_SNDTIMEO
Definition: zmq.h:285
ZMQ_EXPORT int zmq_connect(void *s, const char *addr)
Definition: zmq.cpp:332
ZMQ_EXPORT int zmq_close(void *s)
Definition: zmq.cpp:255
const char * bind_address
ZMQ_EXPORT int zmq_msg_recv(zmq_msg_t *msg, void *s, int flags)
Definition: zmq.cpp:640
Definition: zmq.h:221
ZMQ_EXPORT int zmq_poll(zmq_pollitem_t *items, int nitems, long timeout)
Definition: zmq.cpp:748
ZMQ_EXPORT int zmq_send(void *s, const void *buf, size_t len, int flags)
Definition: zmq.cpp:387
void close_zero_linger(void *socket)
Definition: testutil.hpp:275
ZMQ_EXPORT int zmq_bind(void *s, const char *addr)
Definition: zmq.cpp:321
void s_recv_seq(void *socket,...)
Definition: testutil.hpp:236
ZMQ_EXPORT int zmq_ctx_term(void *context)
Definition: zmq.cpp:162
const char * SEQ_END
Definition: testutil.hpp:198
ZMQ_EXPORT int zmq_msg_close(zmq_msg_t *msg)
Definition: zmq.cpp:651
void s_send_seq(void *socket,...)
Definition: testutil.hpp:205
#define ZMQ_RCVTIMEO
Definition: zmq.h:284
void test_push_round_robin_out(void *ctx)
ZMQ_EXPORT int zmq_disconnect(void *s, const char *addr)
Definition: zmq.cpp:353
void test_destroy_queue_on_disconnect(void *ctx)
void test_push_block_on_send_no_peers(void *ctx)
#define ZMQ_DONTWAIT
Definition: zmq.h:345
int main(void)
ZMQ_EXPORT int zmq_msg_init(zmq_msg_t *msg)
Definition: zmq.cpp:613
#define ZMQ_PULL
Definition: zmq.h:253
const char * connect_address