libzmq  master
ZeroMQ C++ Core Engine (LIBZMQ)
test_connect_delay_tipc.cpp
Go to the documentation of this file.
1 /*
2  Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 
4  This file is part of libzmq, the ZeroMQ core engine in C++.
5 
6  libzmq is free software; you can redistribute it and/or modify it under
7  the terms of the GNU Lesser General Public License (LGPL) as published
8  by the Free Software Foundation; either version 3 of the License, or
9  (at your option) any later version.
10 
11  As a special exception, the Contributors give you permission to link
12  this library with independent modules to produce an executable,
13  regardless of the license terms of these independent modules, and to
14  copy and distribute the resulting executable under terms of your choice,
15  provided that you also meet, for each linked independent module, the
16  terms and conditions of the license of that module. An independent
17  module is a module which is not derived from or based on this library.
18  If you modify this library, you must extend this exception to your
19  version of the library.
20 
21  libzmq is distributed in the hope that it will be useful, but WITHOUT
22  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23  FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24  License for more details.
25 
26  You should have received a copy of the GNU Lesser General Public License
27  along with this program. If not, see <http://www.gnu.org/licenses/>.
28 */
29 
30 #include "testutil.hpp"
31 
32 int main (void)
33 {
34  int val;
35  int rc;
36  char buffer[16];
37  // TEST 1.
38  // First we're going to attempt to send messages to two
39  // pipes, one connected, the other not. We should see
40  // the PUSH load balancing to both pipes, and hence half
41  // of the messages getting queued, as connect() creates a
42  // pipe immediately.
43 
44  void *context = zmq_ctx_new();
45  assert (context);
46  void *to = zmq_socket(context, ZMQ_PULL);
47  assert (to);
48 
49  // Bind the one valid receiver
50  val = 0;
51  rc = zmq_setsockopt(to, ZMQ_LINGER, &val, sizeof(val));
52  assert (rc == 0);
53  rc = zmq_bind (to, "tipc://{6555,0,0}");
54  assert (rc == 0);
55 
56  // Create a socket pushing to two endpoints - only 1 message should arrive.
57  void *from = zmq_socket (context, ZMQ_PUSH);
58  assert(from);
59 
60  val = 0;
61  zmq_setsockopt (from, ZMQ_LINGER, &val, sizeof (val));
62  // This pipe will not connect
63  rc = zmq_connect (from, "tipc://{5556,0}@0.0.0");
64  assert (rc == 0);
65  // This pipe will
66  rc = zmq_connect (from, "tipc://{6555,0}@0.0.0");
67  assert (rc == 0);
68 
69  // We send 10 messages, 5 should just get stuck in the queue
70  // for the not-yet-connected pipe
71  for (int i = 0; i < 10; ++i) {
72  rc = zmq_send (from, "Hello", 5, 0);
73  assert (rc == 5);
74  }
75 
76  // We now consume from the connected pipe
77  // - we should see just 5
78  int timeout = 250;
79  rc = zmq_setsockopt (to, ZMQ_RCVTIMEO, &timeout, sizeof (int));
80  assert (rc == 0);
81 
82  int seen = 0;
83  while (true) {
84  rc = zmq_recv (to, &buffer, sizeof (buffer), 0);
85  if (rc == -1)
86  break; // Break when we didn't get a message
87  seen++;
88  }
89  assert (seen == 5);
90 
91  rc = zmq_close (from);
92  assert (rc == 0);
93 
94  rc = zmq_close (to);
95  assert (rc == 0);
96 
97  rc = zmq_ctx_term (context);
98  assert (rc == 0);
99 
100  // TEST 2
101  // This time we will do the same thing, connect two pipes,
102  // one of which will succeed in connecting to a bound
103  // receiver, the other of which will fail. However, we will
104  // also set the delay attach on connect flag, which should
105  // cause the pipe attachment to be delayed until the connection
106  // succeeds.
107  context = zmq_ctx_new();
108 
109  // Bind the valid socket
110  to = zmq_socket (context, ZMQ_PULL);
111  assert (to);
112  rc = zmq_bind (to, "tipc://{5560,0,0}");
113  assert (rc == 0);
114 
115  val = 0;
116  rc = zmq_setsockopt (to, ZMQ_LINGER, &val, sizeof(val));
117  assert (rc == 0);
118 
119  // Create a socket pushing to two endpoints - all messages should arrive.
120  from = zmq_socket (context, ZMQ_PUSH);
121  assert (from);
122 
123  val = 0;
124  rc = zmq_setsockopt (from, ZMQ_LINGER, &val, sizeof(val));
125  assert (rc == 0);
126 
127  // Set the key flag
128  val = 1;
129  rc = zmq_setsockopt (from, ZMQ_DELAY_ATTACH_ON_CONNECT, &val, sizeof(val));
130  assert (rc == 0);
131 
132  // Connect to the invalid socket
133  rc = zmq_connect (from, "tipc://{5561,0}@0.0.0");
134  assert (rc == 0);
135  // Connect to the valid socket
136  rc = zmq_connect (from, "tipc://{5560,0}@0.0.0");
137  assert (rc == 0);
138 
139  // Send 10 messages, all should be routed to the connected pipe
140  for (int i = 0; i < 10; ++i) {
141  rc = zmq_send (from, "Hello", 5, 0);
142  assert (rc == 5);
143  }
144  rc = zmq_setsockopt (to, ZMQ_RCVTIMEO, &timeout, sizeof (int));
145  assert (rc == 0);
146 
147  seen = 0;
148  while (true) {
149  rc = zmq_recv (to, &buffer, sizeof (buffer), 0);
150  if (rc == -1)
151  break; // Break when we didn't get a message
152  seen++;
153  }
154  assert (seen == 10);
155 
156  rc = zmq_close (from);
157  assert (rc == 0);
158 
159  rc = zmq_close (to);
160  assert (rc == 0);
161 
162  rc = zmq_ctx_term (context);
163  assert (rc == 0);
164 
165  // TEST 3
166  // This time we want to validate that the same blocking behaviour
167  // occurs with an existing connection that is broken. We will send
168  // messages to a connected pipe, disconnect and verify the messages
169  // block. Then we reconnect and verify messages flow again.
170  context = zmq_ctx_new ();
171 
172  void *backend = zmq_socket (context, ZMQ_DEALER);
173  assert (backend);
174  void *frontend = zmq_socket (context, ZMQ_DEALER);
175  assert (frontend);
176  int zero = 0;
177  rc = zmq_setsockopt (backend, ZMQ_LINGER, &zero, sizeof (zero));
178  assert (rc == 0);
179  rc = zmq_setsockopt (frontend, ZMQ_LINGER, &zero, sizeof (zero));
180  assert (rc == 0);
181 
182  // Frontend connects to backend using DELAY_ATTACH_ON_CONNECT
183  int on = 1;
184  rc = zmq_setsockopt (frontend, ZMQ_DELAY_ATTACH_ON_CONNECT, &on, sizeof (on));
185  assert (rc == 0);
186  rc = zmq_bind (backend, "tipc://{5560,0,0}");
187  assert (rc == 0);
188  rc = zmq_connect (frontend, "tipc://{5560,0}@0.0.0");
189  assert (rc == 0);
190 
191  // Ping backend to frontend so we know when the connection is up
192  rc = zmq_send (backend, "Hello", 5, 0);
193  assert (rc == 5);
194  rc = zmq_recv (frontend, buffer, 255, 0);
195  assert (rc == 5);
196 
197  // Send message from frontend to backend
198  rc = zmq_send (frontend, "Hello", 5, ZMQ_DONTWAIT);
199  assert (rc == 5);
200 
201  rc = zmq_close (backend);
202  assert (rc == 0);
203 
204  // Give time to process disconnect
206 
207  // Send a message, should fail
208  rc = zmq_send (frontend, "Hello", 5, ZMQ_DONTWAIT);
209  assert (rc == -1);
210 
211  // Recreate backend socket
212  backend = zmq_socket (context, ZMQ_DEALER);
213  assert (backend);
214  rc = zmq_setsockopt (backend, ZMQ_LINGER, &zero, sizeof (zero));
215  assert (rc == 0);
216  rc = zmq_bind (backend, "tipc://{5560,0,0}");
217  assert (rc == 0);
218 
219  // Ping backend to frontend so we know when the connection is up
220  rc = zmq_send (backend, "Hello", 5, 0);
221  assert (rc == 5);
222  rc = zmq_recv (frontend, buffer, 255, 0);
223  assert (rc == 5);
224 
225  // After the reconnect, should succeed
226  rc = zmq_send (frontend, "Hello", 5, ZMQ_DONTWAIT);
227  assert (rc == 5);
228 
229  rc = zmq_close (backend);
230  assert (rc == 0);
231 
232  rc = zmq_close (frontend);
233  assert (rc == 0);
234 
235  rc = zmq_ctx_term (context);
236  assert (rc == 0);
237 }
238 
void msleep(int milliseconds)
Definition: testutil.hpp:316
ZMQ_EXPORT int zmq_setsockopt(void *s, int option, const void *optval, size_t optvallen)
Definition: zmq.cpp:265
ZMQ_EXPORT void * zmq_ctx_new(void)
Definition: zmq.cpp:115
#define ZMQ_DEALER
Definition: zmq.h:251
#define SETTLE_TIME
Definition: testutil.hpp:44
ZMQ_EXPORT int zmq_recv(void *s, void *buf, size_t len, int flags)
Definition: zmq.cpp:507
#define ZMQ_DELAY_ATTACH_ON_CONNECT
Definition: zmq.h:363
ZMQ_EXPORT void * zmq_socket(void *, int type)
Definition: zmq.cpp:244
#define ZMQ_PUSH
Definition: zmq.h:254
#define ZMQ_LINGER
Definition: zmq.h:276
int main(void)
ZMQ_EXPORT int zmq_connect(void *s, const char *addr)
Definition: zmq.cpp:332
ZMQ_EXPORT int zmq_close(void *s)
Definition: zmq.cpp:255
ZMQ_EXPORT int zmq_send(void *s, const void *buf, size_t len, int flags)
Definition: zmq.cpp:387
ZMQ_EXPORT int zmq_bind(void *s, const char *addr)
Definition: zmq.cpp:321
ZMQ_EXPORT int zmq_ctx_term(void *context)
Definition: zmq.cpp:162
#define ZMQ_RCVTIMEO
Definition: zmq.h:284
#define ZMQ_DONTWAIT
Definition: zmq.h:345
#define ZMQ_PULL
Definition: zmq.h:253