libzmq  master
ZeroMQ C++ Core Engine (LIBZMQ)
test_spec_req.cpp
Go to the documentation of this file.
1 /*
2  Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 
4  This file is part of libzmq, the ZeroMQ core engine in C++.
5 
6  libzmq is free software; you can redistribute it and/or modify it under
7  the terms of the GNU Lesser General Public License (LGPL) as published
8  by the Free Software Foundation; either version 3 of the License, or
9  (at your option) any later version.
10 
11  As a special exception, the Contributors give you permission to link
12  this library with independent modules to produce an executable,
13  regardless of the license terms of these independent modules, and to
14  copy and distribute the resulting executable under terms of your choice,
15  provided that you also meet, for each linked independent module, the
16  terms and conditions of the license of that module. An independent
17  module is a module which is not derived from or based on this library.
18  If you modify this library, you must extend this exception to your
19  version of the library.
20 
21  libzmq is distributed in the hope that it will be useful, but WITHOUT
22  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23  FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24  License for more details.
25 
26  You should have received a copy of the GNU Lesser General Public License
27  along with this program. If not, see <http://www.gnu.org/licenses/>.
28 */
29 
30 #include "testutil.hpp"
31 
32 const char *bind_address = 0;
33 const char *connect_address = 0;
34 
35 void test_round_robin_out (void *ctx)
36 {
37  void *req = zmq_socket (ctx, ZMQ_REQ);
38  assert (req);
39 
40  int rc = zmq_bind (req, bind_address);
41  assert (rc == 0);
42 
43  const size_t services = 5;
44  void *rep [services];
45  for (size_t peer = 0; peer < services; peer++) {
46  rep [peer] = zmq_socket (ctx, ZMQ_REP);
47  assert (rep [peer]);
48 
49  int timeout = 250;
50  rc = zmq_setsockopt (rep [peer], ZMQ_RCVTIMEO, &timeout, sizeof (int));
51  assert (rc == 0);
52 
53  rc = zmq_connect (rep [peer], connect_address);
54  assert (rc == 0);
55  }
56  // We have to give the connects time to finish otherwise the requests
57  // will not properly round-robin. We could alternatively connect the
58  // REQ sockets to the REP sockets.
60 
61  // Send our peer-replies, and expect every REP it used once in order
62  for (size_t peer = 0; peer < services; peer++) {
63  s_send_seq (req, "ABC", SEQ_END);
64  s_recv_seq (rep [peer], "ABC", SEQ_END);
65  s_send_seq (rep [peer], "DEF", SEQ_END);
66  s_recv_seq (req, "DEF", SEQ_END);
67  }
68 
69  close_zero_linger (req);
70  for (size_t peer = 0; peer < services; peer++)
71  close_zero_linger (rep [peer]);
72 
73  // Wait for disconnects.
75 }
76 
78 {
79  void *req = zmq_socket (ctx, ZMQ_REQ);
80  assert (req);
81 
82  int rc = zmq_setsockopt(req, ZMQ_IDENTITY, "A", 2);
83  assert (rc == 0);
84 
85  rc = zmq_bind (req, bind_address);
86  assert (rc == 0);
87 
88  const size_t services = 3;
89  void *router [services];
90 
91  for (size_t i = 0; i < services; ++i) {
92  router [i] = zmq_socket (ctx, ZMQ_ROUTER);
93  assert (router [i]);
94 
95  int timeout = 250;
96  rc = zmq_setsockopt (router [i], ZMQ_RCVTIMEO, &timeout, sizeof (timeout));
97  assert (rc == 0);
98 
99  int enabled = 1;
100  rc = zmq_setsockopt (router [i], ZMQ_ROUTER_MANDATORY, &enabled, sizeof (enabled));
101  assert (rc == 0);
102 
103  rc = zmq_connect (router [i], connect_address);
104  assert (rc == 0);
105  }
106 
107  // Wait for connects to finish.
109 
110  for (size_t i = 0; i < services; ++i) {
111  // There still is a race condition when a stale peer's message
112  // arrives at the REQ just after a request was sent to that peer.
113  // To avoid that happening in the test, sleep for a bit.
114  rc = zmq_poll (0, 0, 10);
115  assert (rc == 0);
116 
117  s_send_seq (req, "ABC", SEQ_END);
118 
119  // Receive on router i
120  s_recv_seq (router [i], "A", 0, "ABC", SEQ_END);
121 
122  // Send back replies on all routers
123  for (size_t j = 0; j < services; ++j) {
124  const char *replies [] = { "WRONG", "GOOD" };
125  const char *reply = replies [i == j ? 1 : 0];
126  s_send_seq (router [j], "A", 0, reply, SEQ_END);
127  }
128 
129  // Receive only the good reply
130  s_recv_seq (req, "GOOD", SEQ_END);
131  }
132 
133  close_zero_linger (req);
134  for (size_t i = 0; i < services; ++i)
135  close_zero_linger (router [i]);
136 
137  // Wait for disconnects.
139 }
140 
141 void test_req_message_format (void *ctx)
142 {
143  void *req = zmq_socket (ctx, ZMQ_REQ);
144  assert (req);
145 
146  void *router = zmq_socket (ctx, ZMQ_ROUTER);
147  assert (router);
148 
149  int rc = zmq_bind (req, bind_address);
150  assert (rc == 0);
151 
152  rc = zmq_connect (router, connect_address);
153  assert (rc == 0);
154 
155  // Send a multi-part request.
156  s_send_seq (req, "ABC", "DEF", SEQ_END);
157 
158  zmq_msg_t msg;
159  zmq_msg_init (&msg);
160 
161  // Receive peer identity
162  rc = zmq_msg_recv (&msg, router, 0);
163  assert (rc != -1);
164  assert (zmq_msg_size (&msg) > 0);
165  zmq_msg_t peer_id_msg;
166  zmq_msg_init (&peer_id_msg);
167  zmq_msg_copy (&peer_id_msg, &msg);
168 
169  int more = 0;
170  size_t more_size = sizeof (more);
171  rc = zmq_getsockopt (router, ZMQ_RCVMORE, &more, &more_size);
172  assert (rc == 0);
173  assert (more);
174 
175  // Receive the rest.
176  s_recv_seq (router, 0, "ABC", "DEF", SEQ_END);
177 
178  // Send back a single-part reply.
179  rc = zmq_msg_send (&peer_id_msg, router, ZMQ_SNDMORE);
180  assert (rc != -1);
181  s_send_seq (router, 0, "GHI", SEQ_END);
182 
183  // Receive reply.
184  s_recv_seq (req, "GHI", SEQ_END);
185 
186  rc = zmq_msg_close (&msg);
187  assert (rc == 0);
188 
189  rc = zmq_msg_close (&peer_id_msg);
190  assert (rc == 0);
191 
192  close_zero_linger (req);
193  close_zero_linger (router);
194 
195  // Wait for disconnects.
197 }
198 
200 {
201  void *sc = zmq_socket (ctx, ZMQ_REQ);
202  assert (sc);
203 
204  int timeout = 250;
205  int rc = zmq_setsockopt (sc, ZMQ_SNDTIMEO, &timeout, sizeof (timeout));
206  assert (rc == 0);
207 
208  rc = zmq_send (sc, 0, 0, ZMQ_DONTWAIT);
209  assert (rc == -1);
210  assert (errno == EAGAIN);
211 
212  rc = zmq_send (sc, 0, 0, 0);
213  assert (rc == -1);
214  assert (errno == EAGAIN);
215 
216  rc = zmq_close (sc);
217  assert (rc == 0);
218 }
219 
220 int main (void)
221 {
223  void *ctx = zmq_ctx_new ();
224  assert (ctx);
225 
226  const char *binds [] = { "inproc://a", "tcp://127.0.0.1:5555" };
227  const char *connects [] = { "inproc://a", "tcp://localhost:5555" };
228 
229  for (int transport = 0; transport < 2; transport++) {
230  bind_address = binds [transport];
231  connect_address = connects [transport];
232 
233  // SHALL route outgoing messages to connected peers using a round-robin
234  // strategy.
235  test_round_robin_out (ctx);
236 
237  // The request and reply messages SHALL have this format on the wire:
238  // * A delimiter, consisting of an empty frame, added by the REQ socket.
239  // * One or more data frames, comprising the message visible to the
240  // application.
242 
243  // SHALL block on sending, or return a suitable error, when it has no
244  // connected peers.
246 
247  // SHALL accept an incoming message only from the last peer that it sent a
248  // request to.
249  // SHALL discard silently any messages received from other peers.
250  // PH: this test is still failing; disabled for now to allow build to
251  // complete.
252  // test_req_only_listens_to_current_peer (ctx);
253  }
254 
255  int rc = zmq_ctx_term (ctx);
256  assert (rc == 0);
257 
258  return 0 ;
259 }
void msleep(int milliseconds)
Definition: testutil.hpp:316
void test_req_message_format(void *ctx)
ZMQ_EXPORT int zmq_setsockopt(void *s, int option, const void *optval, size_t optvallen)
Definition: zmq.cpp:265
ZMQ_EXPORT void * zmq_ctx_new(void)
Definition: zmq.cpp:115
#define ZMQ_SNDMORE
Definition: zmq.h:346
const char * bind_address
#define ZMQ_REP
Definition: zmq.h:250
void setup_test_environment(void)
Definition: testutil.hpp:285
ZMQ_EXPORT int zmq_msg_copy(zmq_msg_t *dest, zmq_msg_t *src)
Definition: zmq.cpp:661
#define SETTLE_TIME
Definition: testutil.hpp:44
#define ZMQ_ROUTER
Definition: zmq.h:252
ZMQ_EXPORT void * zmq_socket(void *, int type)
Definition: zmq.cpp:244
void test_req_only_listens_to_current_peer(void *ctx)
void test_block_on_send_no_peers(void *ctx)
#define ZMQ_SNDTIMEO
Definition: zmq.h:285
#define ZMQ_ROUTER_MANDATORY
Definition: zmq.h:287
#define ZMQ_IDENTITY
Definition: zmq.h:265
ZMQ_EXPORT int zmq_getsockopt(void *s, int option, void *optval, size_t *optvallen)
Definition: zmq.cpp:277
#define ZMQ_REQ
Definition: zmq.h:249
void test_round_robin_out(void *ctx)
int main(void)
ZMQ_EXPORT int zmq_msg_send(zmq_msg_t *msg, void *s, int flags)
Definition: zmq.cpp:629
ZMQ_EXPORT int zmq_connect(void *s, const char *addr)
Definition: zmq.cpp:332
ZMQ_EXPORT int zmq_close(void *s)
Definition: zmq.cpp:255
ZMQ_EXPORT int zmq_msg_recv(zmq_msg_t *msg, void *s, int flags)
Definition: zmq.cpp:640
Definition: zmq.h:221
#define ZMQ_RCVMORE
Definition: zmq.h:272
ZMQ_EXPORT int zmq_poll(zmq_pollitem_t *items, int nitems, long timeout)
Definition: zmq.cpp:748
const char * connect_address
ZMQ_EXPORT int zmq_send(void *s, const void *buf, size_t len, int flags)
Definition: zmq.cpp:387
void close_zero_linger(void *socket)
Definition: testutil.hpp:275
ZMQ_EXPORT int zmq_bind(void *s, const char *addr)
Definition: zmq.cpp:321
void s_recv_seq(void *socket,...)
Definition: testutil.hpp:236
ZMQ_EXPORT int zmq_ctx_term(void *context)
Definition: zmq.cpp:162
const char * SEQ_END
Definition: testutil.hpp:198
ZMQ_EXPORT int zmq_msg_close(zmq_msg_t *msg)
Definition: zmq.cpp:651
void s_send_seq(void *socket,...)
Definition: testutil.hpp:205
ZMQ_EXPORT size_t zmq_msg_size(zmq_msg_t *msg)
Definition: zmq.cpp:671
#define ZMQ_RCVTIMEO
Definition: zmq.h:284
#define ZMQ_DONTWAIT
Definition: zmq.h:345
ZMQ_EXPORT int zmq_msg_init(zmq_msg_t *msg)
Definition: zmq.cpp:613