libzmq  master
ZeroMQ C++ Core Engine (LIBZMQ)
test_hwm_pubsub.cpp
Go to the documentation of this file.
1 /*
2  Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 
4  This file is part of libzmq, the ZeroMQ core engine in C++.
5 
6  libzmq is free software; you can redistribute it and/or modify it under
7  the terms of the GNU Lesser General Public License (LGPL) as published
8  by the Free Software Foundation; either version 3 of the License, or
9  (at your option) any later version.
10 
11  As a special exception, the Contributors give you permission to link
12  this library with independent modules to produce an executable,
13  regardless of the license terms of these independent modules, and to
14  copy and distribute the resulting executable under terms of your choice,
15  provided that you also meet, for each linked independent module, the
16  terms and conditions of the license of that module. An independent
17  module is a module which is not derived from or based on this library.
18  If you modify this library, you must extend this exception to your
19  version of the library.
20 
21  libzmq is distributed in the hope that it will be useful, but WITHOUT
22  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23  FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24  License for more details.
25 
26  You should have received a copy of the GNU Lesser General Public License
27  along with this program. If not, see <http://www.gnu.org/licenses/>.
28 */
29 
30 #include "testutil.hpp"
31 
32 // const int MAX_SENDS = 10000;
33 
34 int test_defaults (int send_hwm, int msgCnt)
35 {
36  void *ctx = zmq_ctx_new ();
37  assert (ctx);
38  int rc;
39 
40  // Set up bind socket
41  void *pub_socket = zmq_socket (ctx, ZMQ_PUB);
42  assert (pub_socket);
43  rc = zmq_bind (pub_socket, "inproc://a");
44  assert (rc == 0);
45 
46  // Set up connect socket
47  void *sub_socket = zmq_socket (ctx, ZMQ_SUB);
48  assert (sub_socket);
49  rc = zmq_connect (sub_socket, "inproc://a");
50  assert (rc == 0);
51 
52  //set a hwm on publisher
53  rc = zmq_setsockopt (pub_socket, ZMQ_SNDHWM, &send_hwm, sizeof (send_hwm));
54  rc = zmq_setsockopt( sub_socket, ZMQ_SUBSCRIBE, 0, 0);
55 
56  // Send until we block
57  int send_count = 0;
58  while (send_count < msgCnt && zmq_send (pub_socket, NULL, 0, ZMQ_DONTWAIT) == 0)
59  ++send_count;
60 
61  // Now receive all sent messages
62  int recv_count = 0;
63  while (0 == zmq_recv (sub_socket, NULL, 0, ZMQ_DONTWAIT))
64  {
65  ++recv_count;
66  }
67 
68  assert (send_hwm == recv_count);
69 
70  // Clean up
71  rc = zmq_close (sub_socket);
72  assert (rc == 0);
73 
74  rc = zmq_close (pub_socket);
75  assert (rc == 0);
76 
77  rc = zmq_ctx_term (ctx);
78  assert (rc == 0);
79 
80  return recv_count;
81 }
82 
83 int receive( void* socket)
84 {
85  int recv_count = 0;
86  // Now receive all sent messages
87  while (0 == zmq_recv (socket, NULL, 0, ZMQ_DONTWAIT))
88  {
89  ++recv_count;
90  }
91 
92  return recv_count;
93 
94 }
95 
96 
97 int test_blocking (int send_hwm, int msgCnt)
98 {
99  void *ctx = zmq_ctx_new ();
100  assert (ctx);
101  int rc;
102 
103  // Set up bind socket
104  void *pub_socket = zmq_socket (ctx, ZMQ_PUB);
105  assert (pub_socket);
106  rc = zmq_bind (pub_socket, "inproc://a");
107  assert (rc == 0);
108 
109  // Set up connect socket
110  void *sub_socket = zmq_socket (ctx, ZMQ_SUB);
111  assert (sub_socket);
112  rc = zmq_connect (sub_socket, "inproc://a");
113  assert (rc == 0);
114 
115  //set a hwm on publisher
116  rc = zmq_setsockopt (pub_socket, ZMQ_SNDHWM, &send_hwm, sizeof (send_hwm));
117  int wait = 1;
118  rc = zmq_setsockopt (pub_socket, ZMQ_XPUB_NODROP, &wait, sizeof(wait));
119  rc = zmq_setsockopt( sub_socket, ZMQ_SUBSCRIBE, 0, 0);
120 
121  // Send until we block
122  int send_count = 0;
123  int recv_count = 0;
124  while (send_count < msgCnt )
125  {
126  rc = zmq_send (pub_socket, NULL, 0, ZMQ_DONTWAIT);
127  if( rc == 0)
128  {
129  ++send_count;
130  }
131  else if( -1 == rc)
132  {
133  assert(EAGAIN == errno);
134  recv_count += receive(sub_socket);
135  assert(recv_count == send_count);
136  }
137  }
138 
139  recv_count += receive(sub_socket);
140 
141  // Clean up
142  rc = zmq_close (sub_socket);
143  assert (rc == 0);
144 
145  rc = zmq_close (pub_socket);
146  assert (rc == 0);
147 
148  rc = zmq_ctx_term (ctx);
149  assert (rc == 0);
150 
151  return recv_count;
152 }
153 
154 // with hwm 11024: send 9999 msg, receive 9999, send 1100, receive 1100
156 {
157  int first_count = 9999;
158  int second_count = 1100;
159  int hwm = 11024;
160 
161  void *ctx = zmq_ctx_new ();
162  assert (ctx);
163  int rc;
164 
165  // Set up bind socket
166  void *pub_socket = zmq_socket (ctx, ZMQ_PUB);
167  assert (pub_socket);
168  rc = zmq_setsockopt (pub_socket, ZMQ_SNDHWM, &hwm, sizeof (hwm));
169  assert (rc == 0);
170  rc = zmq_bind (pub_socket, "tcp://127.0.0.1:1234");
171  assert (rc == 0);
172 
173  // Set up connect socket
174  void *sub_socket = zmq_socket (ctx, ZMQ_SUB);
175  assert (sub_socket);
176  rc = zmq_setsockopt (sub_socket, ZMQ_RCVHWM, &hwm, sizeof (hwm));
177  assert (rc == 0);
178  rc = zmq_connect (sub_socket, "tcp://127.0.0.1:1234");
179  assert (rc == 0);
180  rc = zmq_setsockopt( sub_socket, ZMQ_SUBSCRIBE, 0, 0);
181  assert (rc == 0);
182 
184 
185  // Send messages
186  int send_count = 0;
187  while (send_count < first_count && zmq_send (pub_socket, NULL, 0, ZMQ_DONTWAIT) == 0)
188  ++send_count;
189  assert (first_count == send_count);
190 
192 
193  // Now receive all sent messages
194  int recv_count = 0;
195  while (0 == zmq_recv (sub_socket, NULL, 0, ZMQ_DONTWAIT))
196  {
197  ++recv_count;
198  }
199  assert (first_count == recv_count);
200 
202 
203  // Send messages
204  send_count = 0;
205  while (send_count < second_count && zmq_send (pub_socket, NULL, 0, ZMQ_DONTWAIT) == 0)
206  ++send_count;
207  assert (second_count == send_count);
208 
210 
211  // Now receive all sent messages
212  recv_count = 0;
213  while (0 == zmq_recv (sub_socket, NULL, 0, ZMQ_DONTWAIT))
214  {
215  ++recv_count;
216  }
217  assert (second_count == recv_count);
218 
219  // Clean up
220  rc = zmq_close (sub_socket);
221  assert (rc == 0);
222 
223  rc = zmq_close (pub_socket);
224  assert (rc == 0);
225 
226  rc = zmq_ctx_term (ctx);
227  assert (rc == 0);
228 }
229 
230 int main (void)
231 {
233 
234  int count;
235 
236  // send 1000 msg on hwm 1000, receive 1000
237  count = test_defaults (1000,1000);
238  assert (count == 1000);
239 
240  // send 6000 msg on hwm 2000, drops above hwm, only receive hwm
241  count = test_blocking (2000,6000);
242  assert (count == 6000);
243 
244  // hwm should apply to the messages that have already been received
245  test_reset_hwm ();
246 
247  return 0;
248 }
void msleep(int milliseconds)
Definition: testutil.hpp:316
ZMQ_EXPORT int zmq_setsockopt(void *s, int option, const void *optval, size_t optvallen)
Definition: zmq.cpp:265
int main(void)
#define ZMQ_RCVHWM
Definition: zmq.h:282
ZMQ_EXPORT void * zmq_ctx_new(void)
Definition: zmq.cpp:115
#define ZMQ_SUB
Definition: zmq.h:248
#define ZMQ_SUBSCRIBE
Definition: zmq.h:266
void setup_test_environment(void)
Definition: testutil.hpp:285
#define SETTLE_TIME
Definition: testutil.hpp:44
ZMQ_EXPORT int zmq_recv(void *s, void *buf, size_t len, int flags)
Definition: zmq.cpp:507
int test_blocking(int send_hwm, int msgCnt)
#define ZMQ_PUB
Definition: zmq.h:247
ZMQ_EXPORT void * zmq_socket(void *, int type)
Definition: zmq.cpp:244
#define ZMQ_SNDHWM
Definition: zmq.h:281
int test_defaults(int send_hwm, int msgCnt)
ZMQ_EXPORT int zmq_connect(void *s, const char *addr)
Definition: zmq.cpp:332
ZMQ_EXPORT int zmq_close(void *s)
Definition: zmq.cpp:255
int receive(void *socket)
ZMQ_EXPORT int zmq_send(void *s, const void *buf, size_t len, int flags)
Definition: zmq.cpp:387
ZMQ_EXPORT int zmq_bind(void *s, const char *addr)
Definition: zmq.cpp:321
ZMQ_EXPORT int zmq_ctx_term(void *context)
Definition: zmq.cpp:162
void test_reset_hwm()
#define ZMQ_DONTWAIT
Definition: zmq.h:345
#define ZMQ_XPUB_NODROP
Definition: zmq.h:318