libzmq  master
ZeroMQ C++ Core Engine (LIBZMQ)
test_immediate.cpp
Go to the documentation of this file.
1 /*
2  Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 
4  This file is part of libzmq, the ZeroMQ core engine in C++.
5 
6  libzmq is free software; you can redistribute it and/or modify it under
7  the terms of the GNU Lesser General Public License (LGPL) as published
8  by the Free Software Foundation; either version 3 of the License, or
9  (at your option) any later version.
10 
11  As a special exception, the Contributors give you permission to link
12  this library with independent modules to produce an executable,
13  regardless of the license terms of these independent modules, and to
14  copy and distribute the resulting executable under terms of your choice,
15  provided that you also meet, for each linked independent module, the
16  terms and conditions of the license of that module. An independent
17  module is a module which is not derived from or based on this library.
18  If you modify this library, you must extend this exception to your
19  version of the library.
20 
21  libzmq is distributed in the hope that it will be useful, but WITHOUT
22  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23  FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24  License for more details.
25 
26  You should have received a copy of the GNU Lesser General Public License
27  along with this program. If not, see <http://www.gnu.org/licenses/>.
28 */
29 
30 #include "testutil.hpp"
31 
32 int main (void)
33 {
35  int val;
36  int rc;
37  char buffer[16];
38  // TEST 1.
39  // First we're going to attempt to send messages to two
40  // pipes, one connected, the other not. We should see
41  // the PUSH load balancing to both pipes, and hence half
42  // of the messages getting queued, as connect() creates a
43  // pipe immediately.
44 
45  void *context = zmq_ctx_new();
46  assert (context);
47  void *to = zmq_socket(context, ZMQ_PULL);
48  assert (to);
49 
50  // Bind the one valid receiver
51  val = 0;
52  rc = zmq_setsockopt(to, ZMQ_LINGER, &val, sizeof(val));
53  assert (rc == 0);
54  rc = zmq_bind (to, "tcp://127.0.0.1:6555");
55  assert (rc == 0);
56 
57  // Create a socket pushing to two endpoints - only 1 message should arrive.
58  void *from = zmq_socket (context, ZMQ_PUSH);
59  assert(from);
60 
61  val = 0;
62  zmq_setsockopt (from, ZMQ_LINGER, &val, sizeof (val));
63  // This pipe will not connect
64  rc = zmq_connect (from, "tcp://localhost:5556");
65  assert (rc == 0);
66  // This pipe will
67  rc = zmq_connect (from, "tcp://localhost:6555");
68  assert (rc == 0);
69 
71 
72  // We send 10 messages, 5 should just get stuck in the queue
73  // for the not-yet-connected pipe
74  for (int i = 0; i < 10; ++i) {
75  rc = zmq_send (from, "Hello", 5, 0);
76  assert (rc == 5);
77  }
78 
79  // We now consume from the connected pipe
80  // - we should see just 5
81  int timeout = 250;
82  rc = zmq_setsockopt (to, ZMQ_RCVTIMEO, &timeout, sizeof (int));
83  assert (rc == 0);
84 
85  int seen = 0;
86  while (true) {
87  rc = zmq_recv (to, &buffer, sizeof (buffer), 0);
88  if (rc == -1)
89  break; // Break when we didn't get a message
90  seen++;
91  }
92  assert (seen == 5);
93 
94  rc = zmq_close (from);
95  assert (rc == 0);
96 
97  rc = zmq_close (to);
98  assert (rc == 0);
99 
100  rc = zmq_ctx_term (context);
101  assert (rc == 0);
102 
103  // TEST 2
104  // This time we will do the same thing, connect two pipes,
105  // one of which will succeed in connecting to a bound
106  // receiver, the other of which will fail. However, we will
107  // also set the delay attach on connect flag, which should
108  // cause the pipe attachment to be delayed until the connection
109  // succeeds.
110  context = zmq_ctx_new();
111 
112  // Bind the valid socket
113  to = zmq_socket (context, ZMQ_PULL);
114  assert (to);
115  rc = zmq_bind (to, "tcp://127.0.0.1:5560");
116  assert (rc == 0);
117 
118  val = 0;
119  rc = zmq_setsockopt (to, ZMQ_LINGER, &val, sizeof(val));
120  assert (rc == 0);
121 
122  // Create a socket pushing to two endpoints - all messages should arrive.
123  from = zmq_socket (context, ZMQ_PUSH);
124  assert (from);
125 
126  val = 0;
127  rc = zmq_setsockopt (from, ZMQ_LINGER, &val, sizeof(val));
128  assert (rc == 0);
129 
130  // Set the key flag
131  val = 1;
132  rc = zmq_setsockopt (from, ZMQ_IMMEDIATE, &val, sizeof(val));
133  assert (rc == 0);
134 
135  // Connect to the invalid socket
136  rc = zmq_connect (from, "tcp://localhost:5561");
137  assert (rc == 0);
138  // Connect to the valid socket
139  rc = zmq_connect (from, "tcp://localhost:5560");
140  assert (rc == 0);
141 
142  // Send 10 messages, all should be routed to the connected pipe
143  for (int i = 0; i < 10; ++i) {
144  rc = zmq_send (from, "Hello", 5, 0);
145  assert (rc == 5);
146  }
147  rc = zmq_setsockopt (to, ZMQ_RCVTIMEO, &timeout, sizeof (int));
148  assert (rc == 0);
149 
150  seen = 0;
151  while (true) {
152  rc = zmq_recv (to, &buffer, sizeof (buffer), 0);
153  if (rc == -1)
154  break; // Break when we didn't get a message
155  seen++;
156  }
157  assert (seen == 10);
158 
159  rc = zmq_close (from);
160  assert (rc == 0);
161 
162  rc = zmq_close (to);
163  assert (rc == 0);
164 
165  rc = zmq_ctx_term (context);
166  assert (rc == 0);
167 
168  // TEST 3
169  // This time we want to validate that the same blocking behaviour
170  // occurs with an existing connection that is broken. We will send
171  // messages to a connected pipe, disconnect and verify the messages
172  // block. Then we reconnect and verify messages flow again.
173  context = zmq_ctx_new ();
174 
175  void *backend = zmq_socket (context, ZMQ_DEALER);
176  assert (backend);
177  void *frontend = zmq_socket (context, ZMQ_DEALER);
178  assert (frontend);
179  int zero = 0;
180  rc = zmq_setsockopt (backend, ZMQ_LINGER, &zero, sizeof (zero));
181  assert (rc == 0);
182  rc = zmq_setsockopt (frontend, ZMQ_LINGER, &zero, sizeof (zero));
183  assert (rc == 0);
184 
185  // Frontend connects to backend using IMMEDIATE
186  int on = 1;
187  rc = zmq_setsockopt (frontend, ZMQ_IMMEDIATE, &on, sizeof (on));
188  assert (rc == 0);
189  rc = zmq_bind (backend, "tcp://127.0.0.1:5560");
190  assert (rc == 0);
191  rc = zmq_connect (frontend, "tcp://localhost:5560");
192  assert (rc == 0);
193 
194  // Ping backend to frontend so we know when the connection is up
195  rc = zmq_send (backend, "Hello", 5, 0);
196  assert (rc == 5);
197  rc = zmq_recv (frontend, buffer, 255, 0);
198  assert (rc == 5);
199 
200  // Send message from frontend to backend
201  rc = zmq_send (frontend, "Hello", 5, ZMQ_DONTWAIT);
202  assert (rc == 5);
203 
204  rc = zmq_close (backend);
205  assert (rc == 0);
206 
207  // Give time to process disconnect
208  msleep (SETTLE_TIME * 10);
209 
210  // Send a message, should fail
211  rc = zmq_send (frontend, "Hello", 5, ZMQ_DONTWAIT);
212  assert (rc == -1);
213 
214  // Recreate backend socket
215  backend = zmq_socket (context, ZMQ_DEALER);
216  assert (backend);
217  rc = zmq_setsockopt (backend, ZMQ_LINGER, &zero, sizeof (zero));
218  assert (rc == 0);
219  rc = zmq_bind (backend, "tcp://127.0.0.1:5560");
220  assert (rc == 0);
221 
222  // Ping backend to frontend so we know when the connection is up
223  rc = zmq_send (backend, "Hello", 5, 0);
224  assert (rc == 5);
225  rc = zmq_recv (frontend, buffer, 255, 0);
226  assert (rc == 5);
227 
228  // After the reconnect, should succeed
229  rc = zmq_send (frontend, "Hello", 5, ZMQ_DONTWAIT);
230  assert (rc == 5);
231 
232  rc = zmq_close (backend);
233  assert (rc == 0);
234 
235  rc = zmq_close (frontend);
236  assert (rc == 0);
237 
238  rc = zmq_ctx_term (context);
239  assert (rc == 0);
240 }
void msleep(int milliseconds)
Definition: testutil.hpp:316
ZMQ_EXPORT int zmq_setsockopt(void *s, int option, const void *optval, size_t optvallen)
Definition: zmq.cpp:265
ZMQ_EXPORT void * zmq_ctx_new(void)
Definition: zmq.cpp:115
#define ZMQ_DEALER
Definition: zmq.h:251
void setup_test_environment(void)
Definition: testutil.hpp:285
#define SETTLE_TIME
Definition: testutil.hpp:44
ZMQ_EXPORT int zmq_recv(void *s, void *buf, size_t len, int flags)
Definition: zmq.cpp:507
ZMQ_EXPORT void * zmq_socket(void *, int type)
Definition: zmq.cpp:244
#define ZMQ_PUSH
Definition: zmq.h:254
#define ZMQ_LINGER
Definition: zmq.h:276
int main(void)
ZMQ_EXPORT int zmq_connect(void *s, const char *addr)
Definition: zmq.cpp:332
ZMQ_EXPORT int zmq_close(void *s)
Definition: zmq.cpp:255
ZMQ_EXPORT int zmq_send(void *s, const void *buf, size_t len, int flags)
Definition: zmq.cpp:387
ZMQ_EXPORT int zmq_bind(void *s, const char *addr)
Definition: zmq.cpp:321
ZMQ_EXPORT int zmq_ctx_term(void *context)
Definition: zmq.cpp:162
#define ZMQ_RCVTIMEO
Definition: zmq.h:284
#define ZMQ_IMMEDIATE
Definition: zmq.h:292
#define ZMQ_DONTWAIT
Definition: zmq.h:345
#define ZMQ_PULL
Definition: zmq.h:253