libzmq  master
ZeroMQ C++ Core Engine (LIBZMQ)
test_hwm.cpp
Go to the documentation of this file.
1 /*
2  Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 
4  This file is part of libzmq, the ZeroMQ core engine in C++.
5 
6  libzmq is free software; you can redistribute it and/or modify it under
7  the terms of the GNU Lesser General Public License (LGPL) as published
8  by the Free Software Foundation; either version 3 of the License, or
9  (at your option) any later version.
10 
11  As a special exception, the Contributors give you permission to link
12  this library with independent modules to produce an executable,
13  regardless of the license terms of these independent modules, and to
14  copy and distribute the resulting executable under terms of your choice,
15  provided that you also meet, for each linked independent module, the
16  terms and conditions of the license of that module. An independent
17  module is a module which is not derived from or based on this library.
18  If you modify this library, you must extend this exception to your
19  version of the library.
20 
21  libzmq is distributed in the hope that it will be useful, but WITHOUT
22  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23  FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24  License for more details.
25 
26  You should have received a copy of the GNU Lesser General Public License
27  along with this program. If not, see <http://www.gnu.org/licenses/>.
28 */
29 
30 #include "testutil.hpp"
31 
32 const int MAX_SENDS = 10000;
33 
35 
37 {
38  void *ctx = zmq_ctx_new ();
39  assert (ctx);
40  int rc;
41 
42  // Set up bind socket
43  void *bind_socket = zmq_socket (ctx, ZMQ_PULL);
44  assert (bind_socket);
45  rc = zmq_bind (bind_socket, "inproc://a");
46  assert (rc == 0);
47 
48  // Set up connect socket
49  void *connect_socket = zmq_socket (ctx, ZMQ_PUSH);
50  assert (connect_socket);
51  rc = zmq_connect (connect_socket, "inproc://a");
52  assert (rc == 0);
53 
54  // Send until we block
55  int send_count = 0;
56  while (send_count < MAX_SENDS && zmq_send (connect_socket, NULL, 0, ZMQ_DONTWAIT) == 0)
57  ++send_count;
58 
59  // Now receive all sent messages
60  int recv_count = 0;
61  while (zmq_recv (bind_socket, NULL, 0, ZMQ_DONTWAIT) == 0)
62  ++recv_count;
63 
64  assert (send_count == recv_count);
65 
66  // Clean up
67  rc = zmq_close (connect_socket);
68  assert (rc == 0);
69 
70  rc = zmq_close (bind_socket);
71  assert (rc == 0);
72 
73  rc = zmq_ctx_term (ctx);
74  assert (rc == 0);
75 
76  return send_count;
77 }
78 
79 int count_msg (int send_hwm, int recv_hwm, TestType testType)
80 {
81  void *ctx = zmq_ctx_new ();
82  assert (ctx);
83  int rc;
84 
85  void *bind_socket;
86  void *connect_socket;
87  if (testType == BIND_FIRST)
88  {
89  // Set up bind socket
90  bind_socket = zmq_socket (ctx, ZMQ_PULL);
91  assert (bind_socket);
92  rc = zmq_setsockopt (bind_socket, ZMQ_RCVHWM, &recv_hwm, sizeof (recv_hwm));
93  assert (rc == 0);
94  rc = zmq_bind (bind_socket, "inproc://a");
95  assert (rc == 0);
96 
97  // Set up connect socket
98  connect_socket = zmq_socket (ctx, ZMQ_PUSH);
99  assert (connect_socket);
100  rc = zmq_setsockopt (connect_socket, ZMQ_SNDHWM, &send_hwm, sizeof (send_hwm));
101  assert (rc == 0);
102  rc = zmq_connect (connect_socket, "inproc://a");
103  assert (rc == 0);
104  }
105  else
106  {
107  // Set up connect socket
108  connect_socket = zmq_socket (ctx, ZMQ_PUSH);
109  assert (connect_socket);
110  rc = zmq_setsockopt (connect_socket, ZMQ_SNDHWM, &send_hwm, sizeof (send_hwm));
111  assert (rc == 0);
112  rc = zmq_connect (connect_socket, "inproc://a");
113  assert (rc == 0);
114 
115  // Set up bind socket
116  bind_socket = zmq_socket (ctx, ZMQ_PULL);
117  assert (bind_socket);
118  rc = zmq_setsockopt (bind_socket, ZMQ_RCVHWM, &recv_hwm, sizeof (recv_hwm));
119  assert (rc == 0);
120  rc = zmq_bind (bind_socket, "inproc://a");
121  assert (rc == 0);
122  }
123 
124  // Send until we block
125  int send_count = 0;
126  while (send_count < MAX_SENDS && zmq_send (connect_socket, NULL, 0, ZMQ_DONTWAIT) == 0)
127  ++send_count;
128 
129  // Now receive all sent messages
130  int recv_count = 0;
131  while (zmq_recv (bind_socket, NULL, 0, ZMQ_DONTWAIT) == 0)
132  ++recv_count;
133 
134  assert (send_count == recv_count);
135 
136  // Now it should be possible to send one more.
137  rc = zmq_send (connect_socket, NULL, 0, 0);
138  assert (rc == 0);
139 
140  // Consume the remaining message.
141  rc = zmq_recv (bind_socket, NULL, 0, 0);
142  assert (rc == 0);
143 
144  // Clean up
145  rc = zmq_close (connect_socket);
146  assert (rc == 0);
147 
148  rc = zmq_close (bind_socket);
149  assert (rc == 0);
150 
151  rc = zmq_ctx_term (ctx);
152  assert (rc == 0);
153 
154  return send_count;
155 }
156 
157 int test_inproc_bind_first (int send_hwm, int recv_hwm)
158 {
159  return count_msg(send_hwm, recv_hwm, BIND_FIRST);
160 }
161 
162 int test_inproc_connect_first (int send_hwm, int recv_hwm)
163 {
164  return count_msg(send_hwm, recv_hwm, CONNECT_FIRST);
165 }
166 
167 int test_inproc_connect_and_close_first (int send_hwm, int recv_hwm)
168 {
169  void *ctx = zmq_ctx_new ();
170  assert (ctx);
171  int rc;
172 
173  // Set up connect socket
174  void *connect_socket = zmq_socket (ctx, ZMQ_PUSH);
175  assert (connect_socket);
176  rc = zmq_setsockopt (connect_socket, ZMQ_SNDHWM, &send_hwm, sizeof (send_hwm));
177  assert (rc == 0);
178  rc = zmq_connect (connect_socket, "inproc://a");
179  assert (rc == 0);
180 
181  // Send until we block
182  int send_count = 0;
183  while (send_count < MAX_SENDS && zmq_send (connect_socket, NULL, 0, ZMQ_DONTWAIT) == 0)
184  ++send_count;
185 
186  // Close connect
187  rc = zmq_close (connect_socket);
188  assert (rc == 0);
189 
190  // Set up bind socket
191  void *bind_socket = zmq_socket (ctx, ZMQ_PULL);
192  assert (bind_socket);
193  rc = zmq_setsockopt (bind_socket, ZMQ_RCVHWM, &recv_hwm, sizeof (recv_hwm));
194  assert (rc == 0);
195  rc = zmq_bind (bind_socket, "inproc://a");
196  assert (rc == 0);
197 
198  // Now receive all sent messages
199  int recv_count = 0;
200  while (zmq_recv (bind_socket, NULL, 0, ZMQ_DONTWAIT) == 0)
201  ++recv_count;
202 
203  assert (send_count == recv_count);
204 
205  // Clean up
206  rc = zmq_close (bind_socket);
207  assert (rc == 0);
208 
209  rc = zmq_ctx_term (ctx);
210  assert (rc == 0);
211 
212  return send_count;
213 }
214 
215 int test_inproc_bind_and_close_first (int send_hwm, int /* recv_hwm */)
216 {
217  void *ctx = zmq_ctx_new ();
218  assert (ctx);
219  int rc;
220 
221  // Set up bind socket
222  void *bind_socket = zmq_socket (ctx, ZMQ_PUSH);
223  assert (bind_socket);
224  rc = zmq_setsockopt (bind_socket, ZMQ_SNDHWM, &send_hwm, sizeof (send_hwm));
225  assert (rc == 0);
226  rc = zmq_bind (bind_socket, "inproc://a");
227  assert (rc == 0);
228 
229  // Send until we block
230  int send_count = 0;
231  while (send_count < MAX_SENDS && zmq_send (bind_socket, NULL, 0, ZMQ_DONTWAIT) == 0)
232  ++send_count;
233 
234  // Close bind
235  rc = zmq_close (bind_socket);
236  assert (rc == 0);
237 
238  /* Can't currently do connect without then wiring up a bind as things hang, this needs top be fixed.
239  // Set up connect socket
240  void *connect_socket = zmq_socket (ctx, ZMQ_PULL);
241  assert (connect_socket);
242  rc = zmq_setsockopt (connect_socket, ZMQ_RCVHWM, &recv_hwm, sizeof (recv_hwm));
243  assert (rc == 0);
244  rc = zmq_connect (connect_socket, "inproc://a");
245  assert (rc == 0);
246 
247  // Now receive all sent messages
248  int recv_count = 0;
249  while (zmq_recv (connect_socket, NULL, 0, ZMQ_DONTWAIT) == 0)
250  ++recv_count;
251 
252  assert (send_count == recv_count);
253  */
254 
255  // Clean up
256  //rc = zmq_close (connect_socket);
257  //assert (rc == 0);
258 
259  rc = zmq_ctx_term (ctx);
260  assert (rc == 0);
261 
262  return send_count;
263 }
264 
265 int main (void)
266 {
268 
269  int count;
270 
271  // Default values are 1000 on send and 1000 one receive, so 2000 total
272  count = test_defaults ();
273  assert (count == 2000);
274 
275  // Infinite send and receive buffer
276  count = test_inproc_bind_first (0, 0);
277  assert (count == MAX_SENDS);
278  count = test_inproc_connect_first (0, 0);
279  assert (count == MAX_SENDS);
280 
281  // Infinite receive buffer
282  count = test_inproc_bind_first (1, 0);
283  assert (count == MAX_SENDS);
284  count = test_inproc_connect_first (1, 0);
285  assert (count == MAX_SENDS);
286 
287  // Infinite send buffer
288  count = test_inproc_bind_first (0, 1);
289  assert (count == MAX_SENDS);
290  count = test_inproc_connect_first (0, 1);
291  assert (count == MAX_SENDS);
292 
293  // Send and recv buffers hwm 1, so total that can be queued is 2
294  count = test_inproc_bind_first (1, 1);
295  assert (count == 2);
296  count = test_inproc_connect_first (1, 1);
297  assert (count == 2);
298 
299  // Send hwm of 1, send before bind so total that can be queued is 1
301  assert (count == 1);
302 
303  // Send hwm of 1, send from bind side before connect so total that can be queued should be 1,
304  // however currently all messages get thrown away before the connect. BUG?
305  count = test_inproc_bind_and_close_first (1, 0);
306  //assert (count == 1);
307 
308  return 0;
309 }
ZMQ_EXPORT int zmq_setsockopt(void *s, int option, const void *optval, size_t optvallen)
Definition: zmq.cpp:265
#define ZMQ_RCVHWM
Definition: zmq.h:282
ZMQ_EXPORT void * zmq_ctx_new(void)
Definition: zmq.cpp:115
int main(void)
Definition: test_hwm.cpp:265
void setup_test_environment(void)
Definition: testutil.hpp:285
ZMQ_EXPORT int zmq_recv(void *s, void *buf, size_t len, int flags)
Definition: zmq.cpp:507
int test_inproc_connect_and_close_first(int send_hwm, int recv_hwm)
Definition: test_hwm.cpp:167
ZMQ_EXPORT void * zmq_socket(void *, int type)
Definition: zmq.cpp:244
#define ZMQ_PUSH
Definition: zmq.h:254
#define ZMQ_SNDHWM
Definition: zmq.h:281
int test_defaults()
Definition: test_hwm.cpp:36
int test_inproc_connect_first(int send_hwm, int recv_hwm)
Definition: test_hwm.cpp:162
const int MAX_SENDS
Definition: test_hwm.cpp:32
ZMQ_EXPORT int zmq_connect(void *s, const char *addr)
Definition: zmq.cpp:332
ZMQ_EXPORT int zmq_close(void *s)
Definition: zmq.cpp:255
TestType
Definition: test_hwm.cpp:34
int test_inproc_bind_first(int send_hwm, int recv_hwm)
Definition: test_hwm.cpp:157
ZMQ_EXPORT int zmq_send(void *s, const void *buf, size_t len, int flags)
Definition: zmq.cpp:387
ZMQ_EXPORT int zmq_bind(void *s, const char *addr)
Definition: zmq.cpp:321
ZMQ_EXPORT int zmq_ctx_term(void *context)
Definition: zmq.cpp:162
int count_msg(int send_hwm, int recv_hwm, TestType testType)
Definition: test_hwm.cpp:79
#define ZMQ_DONTWAIT
Definition: zmq.h:345
int test_inproc_bind_and_close_first(int send_hwm, int)
Definition: test_hwm.cpp:215
#define ZMQ_PULL
Definition: zmq.h:253