libzmq  master
ZeroMQ C++ Core Engine (LIBZMQ)
test_sockopt_hwm.cpp
Go to the documentation of this file.
1 /*
2  Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 
4  This file is part of libzmq, the ZeroMQ core engine in C++.
5 
6  libzmq is free software; you can redistribute it and/or modify it under
7  the terms of the GNU Lesser General Public License (LGPL) as published
8  by the Free Software Foundation; either version 3 of the License, or
9  (at your option) any later version.
10 
11  As a special exception, the Contributors give you permission to link
12  this library with independent modules to produce an executable,
13  regardless of the license terms of these independent modules, and to
14  copy and distribute the resulting executable under terms of your choice,
15  provided that you also meet, for each linked independent module, the
16  terms and conditions of the license of that module. An independent
17  module is a module which is not derived from or based on this library.
18  If you modify this library, you must extend this exception to your
19  version of the library.
20 
21  libzmq is distributed in the hope that it will be useful, but WITHOUT
22  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23  FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24  License for more details.
25 
26  You should have received a copy of the GNU Lesser General Public License
27  along with this program. If not, see <http://www.gnu.org/licenses/>.
28 */
29 
30 #include "testutil.hpp"
31 
32 const int MAX_SENDS = 10000;
33 
35 {
36  int rc;
37  void *ctx = zmq_ctx_new();
38 
39  void *bind_socket = zmq_socket(ctx, ZMQ_PUSH);
40  void *connect_socket = zmq_socket(ctx, ZMQ_PULL);
41 
42  int val = 2;
43  rc = zmq_setsockopt(connect_socket, ZMQ_RCVHWM, &val, sizeof(val));
44  assert(rc == 0);
45  rc = zmq_setsockopt(bind_socket, ZMQ_SNDHWM, &val, sizeof(val));
46  assert(rc == 0);
47 
48  zmq_connect(connect_socket, "inproc://a");
49  zmq_bind(bind_socket, "inproc://a");
50 
51  size_t placeholder = sizeof(val);
52  val = 0;
53  rc = zmq_getsockopt(bind_socket, ZMQ_SNDHWM, &val, &placeholder);
54  assert(rc == 0);
55  assert(val == 2);
56 
57  int send_count = 0;
58  while (send_count < MAX_SENDS && zmq_send(bind_socket, NULL, 0, ZMQ_DONTWAIT) == 0)
59  ++send_count;
60 
61  assert(send_count == 4);
62 
63  zmq_close(bind_socket);
64  zmq_close(connect_socket);
65  zmq_ctx_term(ctx);
66 }
67 
69 {
70  int rc;
71  void *ctx = zmq_ctx_new();
72 
73  void *bind_socket = zmq_socket(ctx, ZMQ_PUSH);
74  void *connect_socket = zmq_socket(ctx, ZMQ_PULL);
75 
76  int val = 1;
77  rc = zmq_setsockopt(connect_socket, ZMQ_RCVHWM, &val, sizeof(val));
78  assert(rc == 0);
79  rc = zmq_setsockopt(bind_socket, ZMQ_SNDHWM, &val, sizeof(val));
80  assert(rc == 0);
81 
82  zmq_connect(connect_socket, "inproc://a");
83  zmq_bind(bind_socket, "inproc://a");
84 
85  val = 5;
86  rc = zmq_setsockopt(bind_socket, ZMQ_SNDHWM, &val, sizeof(val));
87  assert(rc == 0);
88 
89  size_t placeholder = sizeof(val);
90  val = 0;
91  rc = zmq_getsockopt(bind_socket, ZMQ_SNDHWM, &val, &placeholder);
92  assert(rc == 0);
93  assert(val == 5);
94 
95  int send_count = 0;
96  while (send_count < MAX_SENDS && zmq_send(bind_socket, NULL, 0, ZMQ_DONTWAIT) == 0)
97  ++send_count;
98 
99  assert(send_count == 6);
100 
101  zmq_close(bind_socket);
102  zmq_close(connect_socket);
103  zmq_ctx_term(ctx);
104 }
105 
107 {
108  int rc;
109  void *ctx = zmq_ctx_new();
110 
111  void *bind_socket = zmq_socket(ctx, ZMQ_PUSH);
112  void *connect_socket = zmq_socket(ctx, ZMQ_PULL);
113 
114  int val = 1;
115  rc = zmq_setsockopt(connect_socket, ZMQ_RCVHWM, &val, sizeof(val));
116  assert(rc == 0);
117 
118  val = 100;
119  rc = zmq_setsockopt(bind_socket, ZMQ_SNDHWM, &val, sizeof(val));
120  assert(rc == 0);
121 
122  zmq_bind(bind_socket, "inproc://a");
123  zmq_connect(connect_socket, "inproc://a");
124 
125  // Fill up to hwm
126  int send_count = 0;
127  while (send_count < MAX_SENDS && zmq_send(bind_socket, &send_count, sizeof(send_count), ZMQ_DONTWAIT) == sizeof(send_count))
128  ++send_count;
129  assert(send_count == 101);
130 
131  // Descrease snd hwm
132  val = 70;
133  rc = zmq_setsockopt(bind_socket, ZMQ_SNDHWM, &val, sizeof(val));
134  assert(rc == 0);
135 
136  size_t placeholder = sizeof(val);
137  val = 0;
138  rc = zmq_getsockopt(bind_socket, ZMQ_SNDHWM, &val, &placeholder);
139  assert(rc == 0);
140  assert(val == 70);
141 
142  // Read out all data (should get up to previous hwm worth so none were dropped)
143  int read_count = 0;
144  int read_data = 0;
145  while (read_count < MAX_SENDS && zmq_recv(connect_socket, &read_data, sizeof(read_data), ZMQ_DONTWAIT) == sizeof(read_data)) {
146  assert(read_count == read_data);
147  ++read_count;
148  }
149 
150  assert(read_count == 101);
151 
152  // Give io thread some time to catch up
154 
155  // Fill up to new hwm
156  send_count = 0;
157  while (send_count < MAX_SENDS && zmq_send(bind_socket, &send_count, sizeof(send_count), ZMQ_DONTWAIT) == sizeof(send_count))
158  ++send_count;
159 
160  // Really this should be 71, but the lwm stuff kicks in doesn't seem quite right
161  assert(send_count > 0);
162 
163  zmq_close(bind_socket);
164  zmq_close(connect_socket);
165  zmq_ctx_term(ctx);
166 }
167 
168 
169 int main()
170 {
174 }
void msleep(int milliseconds)
Definition: testutil.hpp:316
ZMQ_EXPORT int zmq_setsockopt(void *s, int option, const void *optval, size_t optvallen)
Definition: zmq.cpp:265
#define ZMQ_RCVHWM
Definition: zmq.h:282
ZMQ_EXPORT void * zmq_ctx_new(void)
Definition: zmq.cpp:115
#define SETTLE_TIME
Definition: testutil.hpp:44
ZMQ_EXPORT int zmq_recv(void *s, void *buf, size_t len, int flags)
Definition: zmq.cpp:507
int main()
ZMQ_EXPORT void * zmq_socket(void *, int type)
Definition: zmq.cpp:244
#define ZMQ_PUSH
Definition: zmq.h:254
#define ZMQ_SNDHWM
Definition: zmq.h:281
ZMQ_EXPORT int zmq_getsockopt(void *s, int option, void *optval, size_t *optvallen)
Definition: zmq.cpp:277
const int MAX_SENDS
void test_change_after_connected()
ZMQ_EXPORT int zmq_connect(void *s, const char *addr)
Definition: zmq.cpp:332
ZMQ_EXPORT int zmq_close(void *s)
Definition: zmq.cpp:255
void test_change_before_connected()
void test_decrease_when_full()
ZMQ_EXPORT int zmq_send(void *s, const void *buf, size_t len, int flags)
Definition: zmq.cpp:387
ZMQ_EXPORT int zmq_bind(void *s, const char *addr)
Definition: zmq.cpp:321
ZMQ_EXPORT int zmq_ctx_term(void *context)
Definition: zmq.cpp:162
#define ZMQ_DONTWAIT
Definition: zmq.h:345
#define ZMQ_PULL
Definition: zmq.h:253