libzmq  master
ZeroMQ C++ Core Engine (LIBZMQ)
test_stream_disconnect.cpp
Go to the documentation of this file.
1 /*
2  Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 
4  This file is part of libzmq, the ZeroMQ core engine in C++.
5 
6  libzmq is free software; you can redistribute it and/or modify it under
7  the terms of the GNU Lesser General Public License (LGPL) as published
8  by the Free Software Foundation; either version 3 of the License, or
9  (at your option) any later version.
10 
11  As a special exception, the Contributors give you permission to link
12  this library with independent modules to produce an executable,
13  regardless of the license terms of these independent modules, and to
14  copy and distribute the resulting executable under terms of your choice,
15  provided that you also meet, for each linked independent module, the
16  terms and conditions of the license of that module. An independent
17  module is a module which is not derived from or based on this library.
18  If you modify this library, you must extend this exception to your
19  version of the library.
20 
21  libzmq is distributed in the hope that it will be useful, but WITHOUT
22  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23  FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24  License for more details.
25 
26  You should have received a copy of the GNU Lesser General Public License
27  along with this program. If not, see <http://www.gnu.org/licenses/>.
28 */
29 
30 #include "testutil.hpp"
31 
32 static const int SERVER = 0;
33 static const int CLIENT = 1;
34 
36  int turn;
37  const char * text;
38 };
39 
40 // NOTE: messages are sent without null terminator.
41 const test_message_t dialog [] = {
42  {CLIENT, "i can haz cheez burger?"},
43  {SERVER, "y u no disonnect?"},
44  {CLIENT, ""},
45 };
46 const int steps = sizeof(dialog) / sizeof(dialog[0]);
47 
48 bool has_more (void* socket)
49 {
50  int more = 0;
51  size_t more_size = sizeof(more);
52  int rc = zmq_getsockopt (socket, ZMQ_RCVMORE, &more, &more_size);
53  if (rc != 0)
54  return false;
55  return more != 0;
56 }
57 
58 bool get_identity (void* socket, char* data, size_t* size)
59 {
60  int rc = zmq_getsockopt (socket, ZMQ_IDENTITY, data, size);
61  return rc == 0;
62 }
63 
64 int main(int, char**)
65 {
67 
68  void *context = zmq_ctx_new ();
69  void *sockets [2];
70  int rc = 0;
71 
72  sockets [SERVER] = zmq_socket (context, ZMQ_STREAM);
73  int enabled = 1;
74  rc = zmq_setsockopt (sockets [SERVER], ZMQ_STREAM_NOTIFY, &enabled, sizeof (enabled));
75  assert (rc == 0);
76  rc = zmq_bind (sockets [SERVER], "tcp://0.0.0.0:6666");
77  assert (rc == 0);
78 
79  sockets [CLIENT] = zmq_socket (context, ZMQ_STREAM);
80  rc = zmq_setsockopt (sockets [CLIENT], ZMQ_STREAM_NOTIFY, &enabled, sizeof (enabled));
81  assert (rc == 0);
82  rc = zmq_connect (sockets [CLIENT], "tcp://localhost:6666");
83  assert (rc == 0);
84 
85  // wait for connect notification
86  // Server: Grab the 1st frame (peer identity).
87  zmq_msg_t peer_frame;
88  rc = zmq_msg_init (&peer_frame);
89  assert (rc == 0);
90  rc = zmq_msg_recv (&peer_frame, sockets [SERVER], 0);
91  assert (rc != -1);
92  assert(zmq_msg_size (&peer_frame) > 0);
93  assert (has_more (sockets [SERVER]));
94  rc = zmq_msg_close (&peer_frame);
95  assert (rc == 0);
96 
97  // Server: Grab the 2nd frame (actual payload).
98  zmq_msg_t data_frame;
99  rc = zmq_msg_init (&data_frame);
100  assert (rc == 0);
101  rc = zmq_msg_recv (&data_frame, sockets [SERVER], 0);
102  assert (rc != -1);
103  assert(zmq_msg_size (&data_frame) == 0);
104  rc = zmq_msg_close (&data_frame);
105  assert (rc == 0);
106 
107  // Client: Grab the 1st frame (peer identity).
108  rc = zmq_msg_init (&peer_frame);
109  assert (rc == 0);
110  rc = zmq_msg_recv (&peer_frame, sockets [CLIENT], 0);
111  assert (rc != -1);
112  assert(zmq_msg_size (&peer_frame) > 0);
113  assert (has_more (sockets [CLIENT]));
114  rc = zmq_msg_close (&peer_frame);
115  assert (rc == 0);
116 
117  // Client: Grab the 2nd frame (actual payload).
118  rc = zmq_msg_init (&data_frame);
119  assert (rc == 0);
120  rc = zmq_msg_recv (&data_frame, sockets [CLIENT], 0);
121  assert (rc != -1);
122  assert(zmq_msg_size (&data_frame) == 0);
123  rc = zmq_msg_close (&data_frame);
124  assert (rc == 0);
125 
126  // Send initial message.
127  char blob_data [256];
128  size_t blob_size = sizeof(blob_data);
129  rc = zmq_getsockopt (sockets [CLIENT], ZMQ_IDENTITY, blob_data, &blob_size);
130  assert (rc != -1);
131  assert(blob_size > 0);
132  zmq_msg_t msg;
133  rc = zmq_msg_init_size (&msg, blob_size);
134  assert (rc == 0);
135  memcpy (zmq_msg_data (&msg), blob_data, blob_size);
136  rc = zmq_msg_send (&msg, sockets [dialog [0].turn], ZMQ_SNDMORE);
137  assert (rc != -1);
138  rc = zmq_msg_close (&msg);
139  assert (rc == 0);
140  rc = zmq_msg_init_size (&msg, strlen(dialog [0].text));
141  assert (rc == 0);
142  memcpy (zmq_msg_data (&msg), dialog [0].text, strlen(dialog [0].text));
143  rc = zmq_msg_send (&msg, sockets [dialog [0].turn], ZMQ_SNDMORE);
144  assert (rc != -1);
145  rc = zmq_msg_close (&msg);
146  assert (rc == 0);
147 
148  // TODO: make sure this loop doesn't loop forever if something is wrong
149  // with the test (or the implementation).
150 
151  int step = 0;
152  while (step < steps) {
153  // Wait until something happens.
154  zmq_pollitem_t items [] = {
155  { sockets [SERVER], 0, ZMQ_POLLIN, 0 },
156  { sockets [CLIENT], 0, ZMQ_POLLIN, 0 },
157  };
158  int rc = zmq_poll (items, 2, 100);
159  assert (rc >= 0);
160 
161  // Check for data received by the server.
162  if (items [SERVER].revents & ZMQ_POLLIN) {
163  assert (dialog [step].turn == CLIENT);
164 
165  // Grab the 1st frame (peer identity).
166  zmq_msg_t peer_frame;
167  rc = zmq_msg_init (&peer_frame);
168  assert (rc == 0);
169  rc = zmq_msg_recv (&peer_frame, sockets [SERVER], 0);
170  assert (rc != -1);
171  assert(zmq_msg_size (&peer_frame) > 0);
172  assert (has_more (sockets [SERVER]));
173 
174  // Grab the 2nd frame (actual payload).
175  zmq_msg_t data_frame;
176  rc = zmq_msg_init (&data_frame);
177  assert (rc == 0);
178  rc = zmq_msg_recv (&data_frame, sockets [SERVER], 0);
179  assert (rc != -1);
180 
181  // Make sure payload matches what we expect.
182  const char * const data = (const char*)zmq_msg_data (&data_frame);
183  const int size = zmq_msg_size (&data_frame);
184  // 0-length frame is a disconnection notification. The server
185  // should receive it as the last step in the dialogue.
186  if (size == 0) {
187  ++step;
188  assert (step == steps);
189  }
190  else {
191  assert ((size_t) size == strlen (dialog [step].text));
192  int cmp = memcmp (dialog [step].text, data, size);
193  assert (cmp == 0);
194 
195  ++step;
196 
197  assert (step < steps);
198 
199  // Prepare the response.
200  rc = zmq_msg_close (&data_frame);
201  assert (rc == 0);
202  rc = zmq_msg_init_size (&data_frame,
203  strlen (dialog [step].text));
204  assert (rc == 0);
205  memcpy (zmq_msg_data (&data_frame), dialog [step].text,
206  zmq_msg_size (&data_frame));
207 
208  // Send the response.
209  rc = zmq_msg_send (&peer_frame, sockets [SERVER], ZMQ_SNDMORE);
210  assert (rc != -1);
211  rc = zmq_msg_send (&data_frame, sockets [SERVER], ZMQ_SNDMORE);
212  assert (rc != -1);
213  }
214 
215  // Release resources.
216  rc = zmq_msg_close (&peer_frame);
217  assert (rc == 0);
218  rc = zmq_msg_close (&data_frame);
219  assert (rc == 0);
220  }
221 
222  // Check for data received by the client.
223  if (items [CLIENT].revents & ZMQ_POLLIN) {
224  assert (dialog [step].turn == SERVER);
225 
226  // Grab the 1st frame (peer identity).
227  zmq_msg_t peer_frame;
228  rc = zmq_msg_init (&peer_frame);
229  assert (rc == 0);
230  rc = zmq_msg_recv (&peer_frame, sockets [CLIENT], 0);
231  assert (rc != -1);
232  assert(zmq_msg_size (&peer_frame) > 0);
233  assert (has_more (sockets [CLIENT]));
234 
235  // Grab the 2nd frame (actual payload).
236  zmq_msg_t data_frame;
237  rc = zmq_msg_init (&data_frame);
238  assert (rc == 0);
239  rc = zmq_msg_recv (&data_frame, sockets [CLIENT], 0);
240  assert (rc != -1);
241  assert(zmq_msg_size (&data_frame) > 0);
242 
243  // Make sure payload matches what we expect.
244  const char * const data = (const char*)zmq_msg_data (&data_frame);
245  const int size = zmq_msg_size (&data_frame);
246  assert ((size_t)size == strlen(dialog [step].text));
247  int cmp = memcmp(dialog [step].text, data, size);
248  assert (cmp == 0);
249 
250  ++step;
251 
252  // Prepare the response (next line in the dialog).
253  assert (step < steps);
254  rc = zmq_msg_close (&data_frame);
255  assert (rc == 0);
256  rc = zmq_msg_init_size (&data_frame, strlen (dialog [step].text));
257  assert (rc == 0);
258  memcpy (zmq_msg_data (&data_frame), dialog [step].text, zmq_msg_size (&data_frame));
259 
260  // Send the response.
261  rc = zmq_msg_send (&peer_frame, sockets [CLIENT], ZMQ_SNDMORE);
262  assert (rc != -1);
263  rc = zmq_msg_send (&data_frame, sockets [CLIENT], ZMQ_SNDMORE);
264  assert (rc != -1);
265 
266  // Release resources.
267  rc = zmq_msg_close (&peer_frame);
268  assert (rc == 0);
269  rc = zmq_msg_close (&data_frame);
270  assert (rc == 0);
271  }
272  }
273  assert (step == steps);
274  rc = zmq_close (sockets [CLIENT]);
275  assert (rc == 0);
276  rc = zmq_close (sockets [SERVER]);
277  assert (rc == 0);
278  rc = zmq_ctx_term (context);
279  assert (rc == 0);
280  return 0;
281 }
#define size
ZMQ_EXPORT int zmq_setsockopt(void *s, int option, const void *optval, size_t optvallen)
Definition: zmq.cpp:265
#define ZMQ_STREAM
Definition: zmq.h:257
ZMQ_EXPORT void * zmq_ctx_new(void)
Definition: zmq.cpp:115
#define ZMQ_SNDMORE
Definition: zmq.h:346
void setup_test_environment(void)
Definition: testutil.hpp:285
bool get_identity(void *socket, char *data, size_t *size)
#define ZMQ_STREAM_NOTIFY
Definition: zmq.h:324
ZMQ_EXPORT void * zmq_msg_data(zmq_msg_t *msg)
Definition: zmq.cpp:666
ZMQ_EXPORT void * zmq_socket(void *, int type)
Definition: zmq.cpp:244
#define ZMQ_IDENTITY
Definition: zmq.h:265
static const int CLIENT
static const int SERVER
ZMQ_EXPORT int zmq_getsockopt(void *s, int option, void *optval, size_t *optvallen)
Definition: zmq.cpp:277
bool has_more(void *socket)
ZMQ_EXPORT int zmq_msg_send(zmq_msg_t *msg, void *s, int flags)
Definition: zmq.cpp:629
ZMQ_EXPORT int zmq_connect(void *s, const char *addr)
Definition: zmq.cpp:332
ZMQ_EXPORT int zmq_close(void *s)
Definition: zmq.cpp:255
const test_message_t dialog[]
ZMQ_EXPORT int zmq_msg_recv(zmq_msg_t *msg, void *s, int flags)
Definition: zmq.cpp:640
Definition: zmq.h:221
#define ZMQ_RCVMORE
Definition: zmq.h:272
int main(int, char **)
ZMQ_EXPORT int zmq_poll(zmq_pollitem_t *items, int nitems, long timeout)
Definition: zmq.cpp:748
ZMQ_EXPORT int zmq_bind(void *s, const char *addr)
Definition: zmq.cpp:321
ZMQ_EXPORT int zmq_ctx_term(void *context)
Definition: zmq.cpp:162
ZMQ_EXPORT int zmq_msg_close(zmq_msg_t *msg)
Definition: zmq.cpp:651
const int steps
ZMQ_EXPORT int zmq_msg_init_size(zmq_msg_t *msg, size_t size)
Definition: zmq.cpp:618
#define ZMQ_POLLIN
Definition: zmq.h:410
ZMQ_EXPORT size_t zmq_msg_size(zmq_msg_t *msg)
Definition: zmq.cpp:671
ZMQ_EXPORT int zmq_msg_init(zmq_msg_t *msg)
Definition: zmq.cpp:613