libzmq  master
ZeroMQ C++ Core Engine (LIBZMQ)
test_spec_dealer.cpp
Go to the documentation of this file.
1 /*
2  Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 
4  This file is part of libzmq, the ZeroMQ core engine in C++.
5 
6  libzmq is free software; you can redistribute it and/or modify it under
7  the terms of the GNU Lesser General Public License (LGPL) as published
8  by the Free Software Foundation; either version 3 of the License, or
9  (at your option) any later version.
10 
11  As a special exception, the Contributors give you permission to link
12  this library with independent modules to produce an executable,
13  regardless of the license terms of these independent modules, and to
14  copy and distribute the resulting executable under terms of your choice,
15  provided that you also meet, for each linked independent module, the
16  terms and conditions of the license of that module. An independent
17  module is a module which is not derived from or based on this library.
18  If you modify this library, you must extend this exception to your
19  version of the library.
20 
21  libzmq is distributed in the hope that it will be useful, but WITHOUT
22  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23  FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24  License for more details.
25 
26  You should have received a copy of the GNU Lesser General Public License
27  along with this program. If not, see <http://www.gnu.org/licenses/>.
28 */
29 
30 #include "testutil.hpp"
31 
32 const char *bind_address = 0;
33 const char *connect_address = 0;
34 
35 void test_round_robin_out (void *ctx)
36 {
37  void *dealer = zmq_socket (ctx, ZMQ_DEALER);
38  assert (dealer);
39 
40  int rc = zmq_bind (dealer, bind_address);
41  assert (rc == 0);
42 
43  const size_t services = 5;
44  void *rep [services];
45  for (size_t peer = 0; peer < services; ++peer) {
46  rep [peer] = zmq_socket (ctx, ZMQ_REP);
47  assert (rep [peer]);
48 
49  int timeout = 250;
50  rc = zmq_setsockopt (rep [peer], ZMQ_RCVTIMEO, &timeout, sizeof (int));
51  assert (rc == 0);
52 
53  rc = zmq_connect (rep [peer], connect_address);
54  assert (rc == 0);
55  }
56 
57  // Wait for connections.
59 
60  // Send all requests
61  for (size_t i = 0; i < services; ++i)
62  s_send_seq (dealer, 0, "ABC", SEQ_END);
63 
64  // Expect every REP got one message
65  zmq_msg_t msg;
66  zmq_msg_init (&msg);
67 
68  for (size_t peer = 0; peer < services; ++peer)
69  s_recv_seq (rep [peer], "ABC", SEQ_END);
70 
71  rc = zmq_msg_close (&msg);
72  assert (rc == 0);
73 
74  close_zero_linger (dealer);
75 
76  for (size_t peer = 0; peer < services; ++peer)
77  close_zero_linger (rep [peer]);
78 
79  // Wait for disconnects.
81 }
82 
83 void test_fair_queue_in (void *ctx)
84 {
85  void *receiver = zmq_socket (ctx, ZMQ_DEALER);
86  assert (receiver);
87 
88  int timeout = 250;
89  int rc = zmq_setsockopt (receiver, ZMQ_RCVTIMEO, &timeout, sizeof (int));
90  assert (rc == 0);
91 
92  rc = zmq_bind (receiver, bind_address);
93  assert (rc == 0);
94 
95  const size_t services = 5;
96  void *senders [services];
97  for (size_t peer = 0; peer < services; ++peer) {
98  senders [peer] = zmq_socket (ctx, ZMQ_DEALER);
99  assert (senders [peer]);
100 
101  rc = zmq_setsockopt (senders [peer], ZMQ_RCVTIMEO, &timeout, sizeof (int));
102  assert (rc == 0);
103 
104  rc = zmq_connect (senders [peer], connect_address);
105  assert (rc == 0);
106  }
107 
108  zmq_msg_t msg;
109  rc = zmq_msg_init (&msg);
110  assert (rc == 0);
111 
112  s_send_seq (senders [0], "A", SEQ_END);
113  s_recv_seq (receiver, "A", SEQ_END);
114 
115  s_send_seq (senders [0], "A", SEQ_END);
116  s_recv_seq (receiver, "A", SEQ_END);
117 
118  // send our requests
119  for (size_t peer = 0; peer < services; ++peer)
120  s_send_seq (senders [peer], "B", SEQ_END);
121 
122  // Wait for data.
124 
125  // handle the requests
126  for (size_t peer = 0; peer < services; ++peer)
127  s_recv_seq (receiver, "B", SEQ_END);
128 
129  rc = zmq_msg_close (&msg);
130  assert (rc == 0);
131 
132  close_zero_linger (receiver);
133 
134  for (size_t peer = 0; peer < services; ++peer)
135  close_zero_linger (senders [peer]);
136 
137  // Wait for disconnects.
139 }
140 
142 {
143  void *A = zmq_socket (ctx, ZMQ_DEALER);
144  assert (A);
145 
146  int rc = zmq_bind (A, bind_address);
147  assert (rc == 0);
148 
149  void *B = zmq_socket (ctx, ZMQ_DEALER);
150  assert (B);
151 
152  rc = zmq_connect (B, connect_address);
153  assert (rc == 0);
154 
155  // Send a message in both directions
156  s_send_seq (A, "ABC", SEQ_END);
157  s_send_seq (B, "DEF", SEQ_END);
158 
160  assert (rc == 0);
161 
162  // Disconnect may take time and need command processing.
163  zmq_pollitem_t poller [2] = { { A, 0, 0, 0 }, { B, 0, 0, 0 } };
164  rc = zmq_poll (poller, 2, 100);
165  assert (rc == 0);
166  rc = zmq_poll (poller, 2, 100);
167  assert (rc == 0);
168 
169  // No messages should be available, sending should fail.
170  zmq_msg_t msg;
171  zmq_msg_init (&msg);
172 
173  rc = zmq_send (A, 0, 0, ZMQ_DONTWAIT);
174  assert (rc == -1);
175  assert (errno == EAGAIN);
176 
177  rc = zmq_msg_recv (&msg, A, ZMQ_DONTWAIT);
178  assert (rc == -1);
179  assert (errno == EAGAIN);
180 
181  // After a reconnect of B, the messages should still be gone
182  rc = zmq_connect (B, connect_address);
183  assert (rc == 0);
184 
185  rc = zmq_msg_recv (&msg, A, ZMQ_DONTWAIT);
186  assert (rc == -1);
187  assert (errno == EAGAIN);
188 
189  rc = zmq_msg_recv (&msg, B, ZMQ_DONTWAIT);
190  assert (rc == -1);
191  assert (errno == EAGAIN);
192 
193  rc = zmq_msg_close (&msg);
194  assert (rc == 0);
195 
196  close_zero_linger (A);
197  close_zero_linger (B);
198 
199  // Wait for disconnects.
201 }
202 
204 {
205  void *sc = zmq_socket (ctx, ZMQ_DEALER);
206  assert (sc);
207 
208  int timeout = 250;
209  int rc = zmq_setsockopt (sc, ZMQ_SNDTIMEO, &timeout, sizeof (timeout));
210  assert (rc == 0);
211 
212  rc = zmq_send (sc, 0, 0, ZMQ_DONTWAIT);
213  assert (rc == -1);
214  assert (errno == EAGAIN);
215 
216  rc = zmq_send (sc, 0, 0, 0);
217  assert (rc == -1);
218  assert (errno == EAGAIN);
219 
220  rc = zmq_close (sc);
221  assert (rc == 0);
222 }
223 
224 int main (void)
225 {
227  void *ctx = zmq_ctx_new ();
228  assert (ctx);
229 
230  const char *binds [] = { "inproc://a", "tcp://127.0.0.1:5555" };
231  const char *connects [] = { "inproc://a", "tcp://localhost:5555" };
232 
233  for (int transports = 0; transports < 2; ++transports) {
234  bind_address = binds [transports];
235  connect_address = connects [transports];
236 
237  // SHALL route outgoing messages to available peers using a round-robin
238  // strategy.
239  test_round_robin_out (ctx);
240 
241  // SHALL receive incoming messages from its peers using a fair-queuing
242  // strategy.
243  test_fair_queue_in (ctx);
244 
245  // SHALL block on sending, or return a suitable error, when it has no connected peers.
247 
248  // SHALL create a double queue when a peer connects to it. If this peer
249  // disconnects, the DEALER socket SHALL destroy its double queue and SHALL
250  // discard any messages it contains.
251  // *** Test disabled until libzmq does this properly ***
252  // test_destroy_queue_on_disconnect (ctx);
253  }
254 
255  int rc = zmq_ctx_term (ctx);
256  assert (rc == 0);
257 
258  return 0 ;
259 }
void msleep(int milliseconds)
Definition: testutil.hpp:316
ZMQ_EXPORT int zmq_setsockopt(void *s, int option, const void *optval, size_t optvallen)
Definition: zmq.cpp:265
ZMQ_EXPORT void * zmq_ctx_new(void)
Definition: zmq.cpp:115
#define ZMQ_DEALER
Definition: zmq.h:251
#define ZMQ_REP
Definition: zmq.h:250
void setup_test_environment(void)
Definition: testutil.hpp:285
#define SETTLE_TIME
Definition: testutil.hpp:44
ZMQ_EXPORT void * zmq_socket(void *, int type)
Definition: zmq.cpp:244
#define ZMQ_SNDTIMEO
Definition: zmq.h:285
const char * bind_address
void test_block_on_send_no_peers(void *ctx)
ZMQ_EXPORT int zmq_connect(void *s, const char *addr)
Definition: zmq.cpp:332
ZMQ_EXPORT int zmq_close(void *s)
Definition: zmq.cpp:255
static void receiver(void *socket)
ZMQ_EXPORT int zmq_msg_recv(zmq_msg_t *msg, void *s, int flags)
Definition: zmq.cpp:640
Definition: zmq.h:221
void test_round_robin_out(void *ctx)
ZMQ_EXPORT int zmq_poll(zmq_pollitem_t *items, int nitems, long timeout)
Definition: zmq.cpp:748
ZMQ_EXPORT int zmq_send(void *s, const void *buf, size_t len, int flags)
Definition: zmq.cpp:387
void close_zero_linger(void *socket)
Definition: testutil.hpp:275
ZMQ_EXPORT int zmq_bind(void *s, const char *addr)
Definition: zmq.cpp:321
void s_recv_seq(void *socket,...)
Definition: testutil.hpp:236
ZMQ_EXPORT int zmq_ctx_term(void *context)
Definition: zmq.cpp:162
void test_destroy_queue_on_disconnect(void *ctx)
const char * SEQ_END
Definition: testutil.hpp:198
ZMQ_EXPORT int zmq_msg_close(zmq_msg_t *msg)
Definition: zmq.cpp:651
const char * connect_address
int main(void)
void s_send_seq(void *socket,...)
Definition: testutil.hpp:205
void test_fair_queue_in(void *ctx)
#define ZMQ_RCVTIMEO
Definition: zmq.h:284
ZMQ_EXPORT int zmq_disconnect(void *s, const char *addr)
Definition: zmq.cpp:353
#define ZMQ_DONTWAIT
Definition: zmq.h:345
ZMQ_EXPORT int zmq_msg_init(zmq_msg_t *msg)
Definition: zmq.cpp:613