libzmq  master
ZeroMQ C++ Core Engine (LIBZMQ)
test_stream_timeout.cpp
Go to the documentation of this file.
1 /*
2  Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 
4  This file is part of libzmq, the ZeroMQ core engine in C++.
5 
6  libzmq is free software; you can redistribute it and/or modify it under
7  the terms of the GNU Lesser General Public License (LGPL) as published
8  by the Free Software Foundation; either version 3 of the License, or
9  (at your option) any later version.
10 
11  As a special exception, the Contributors give you permission to link
12  this library with independent modules to produce an executable,
13  regardless of the license terms of these independent modules, and to
14  copy and distribute the resulting executable under terms of your choice,
15  provided that you also meet, for each linked independent module, the
16  terms and conditions of the license of that module. An independent
17  module is a module which is not derived from or based on this library.
18  If you modify this library, you must extend this exception to your
19  version of the library.
20 
21  libzmq is distributed in the hope that it will be useful, but WITHOUT
22  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23  FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24  License for more details.
25 
26  You should have received a copy of the GNU Lesser General Public License
27  along with this program. If not, see <http://www.gnu.org/licenses/>.
28 */
29 
30 #include "testutil.hpp"
31 
32 // Read one event off the monitor socket; return value and address
33 // by reference, if not null, and event number by value. Returns -1
34 // in case of error.
35 
36 static int
37 get_monitor_event (void *monitor, int *value, char **address)
38 {
39  // First frame in message contains event number and value
40  zmq_msg_t msg;
41  zmq_msg_init (&msg);
42  if (zmq_msg_recv (&msg, monitor, 0) == -1)
43  return -1; // Interruped, presumably
44  assert (zmq_msg_more (&msg));
45 
46  uint8_t *data = (uint8_t *) zmq_msg_data (&msg);
47  uint16_t event = *(uint16_t *) (data);
48  if (value)
49  *value = *(uint32_t *) (data + 2);
50 
51  // Second frame in message contains event address
52  zmq_msg_init (&msg);
53  if (zmq_msg_recv (&msg, monitor, 0) == -1)
54  return -1; // Interruped, presumably
55  assert (!zmq_msg_more (&msg));
56 
57  if (address) {
58  uint8_t *data = (uint8_t *) zmq_msg_data (&msg);
59  size_t size = zmq_msg_size (&msg);
60  *address = (char *) malloc (size + 1);
61  memcpy (*address, data, size);
62  *address [size] = 0;
63  }
64  return event;
65 }
66 
67 static void
69 {
70  int rc;
71 
72  // Set up our context and sockets
73  void *ctx = zmq_ctx_new ();
74  assert (ctx);
75 
76  // We use this socket in raw mode, to make a connection and send nothing
77  void *stream = zmq_socket (ctx, ZMQ_STREAM);
78  assert (stream);
79 
80  int zero = 0;
81  rc = zmq_setsockopt (stream, ZMQ_LINGER, &zero, sizeof (zero));
82  assert (rc == 0);
83  rc = zmq_connect (stream, "tcp://localhost:5557");
84  assert (rc == 0);
85 
86  // We'll be using this socket to test TCP stream handshake timeout
87  void *dealer = zmq_socket (ctx, ZMQ_DEALER);
88  assert (dealer);
89  rc = zmq_setsockopt (dealer, ZMQ_LINGER, &zero, sizeof (zero));
90  assert (rc == 0);
91  int val, tenth = 100;
92  size_t vsize = sizeof(val);
93 
94  // check for the expected default handshake timeout value - 30 sec
95  rc = zmq_getsockopt (dealer, ZMQ_HANDSHAKE_IVL, &val, &vsize);
96  assert (rc == 0);
97  assert (vsize == sizeof(val));
98  assert (val == 30000);
99  // make handshake timeout faster - 1/10 sec
100  rc = zmq_setsockopt (dealer, ZMQ_HANDSHAKE_IVL, &tenth, sizeof (tenth));
101  assert (rc == 0);
102  vsize = sizeof(val);
103  // make sure zmq_setsockopt changed the value
104  rc = zmq_getsockopt (dealer, ZMQ_HANDSHAKE_IVL, &val, &vsize);
105  assert (rc == 0);
106  assert (vsize == sizeof(val));
107  assert (val == tenth);
108 
109  // Create and connect a socket for collecting monitor events on dealer
110  void *dealer_mon = zmq_socket (ctx, ZMQ_PAIR);
111  assert (dealer_mon);
112 
113  rc = zmq_socket_monitor (dealer, "inproc://monitor-dealer",
115  assert (rc == 0);
116 
117  // Connect to the inproc endpoint so we'll get events
118  rc = zmq_connect (dealer_mon, "inproc://monitor-dealer");
119  assert (rc == 0);
120 
121  // bind dealer socket to accept connection from non-sending stream socket
122  rc = zmq_bind (dealer, "tcp://127.0.0.1:5557");
123  assert (rc == 0);
124 
125  // we should get ZMQ_EVENT_ACCEPTED and then ZMQ_EVENT_DISCONNECTED
126  int event = get_monitor_event (dealer_mon, NULL, NULL);
127  assert (event == ZMQ_EVENT_ACCEPTED);
128  event = get_monitor_event (dealer_mon, NULL, NULL);
129  assert (event == ZMQ_EVENT_DISCONNECTED);
130 
131  rc = zmq_close (dealer);
132  assert (rc == 0);
133 
134  rc = zmq_close (dealer_mon);
135  assert (rc == 0);
136 
137  rc = zmq_close (stream);
138  assert (rc == 0);
139 
140  rc = zmq_ctx_term (ctx);
141  assert (rc == 0);
142 }
143 
144 static void
146 {
147  int rc;
148 
149  // Set up our context and sockets
150  void *ctx = zmq_ctx_new ();
151  assert (ctx);
152 
153  // We use this socket in raw mode, to accept a connection and send nothing
154  void *stream = zmq_socket (ctx, ZMQ_STREAM);
155  assert (stream);
156 
157  int zero = 0;
158  rc = zmq_setsockopt (stream, ZMQ_LINGER, &zero, sizeof (zero));
159  assert (rc == 0);
160  rc = zmq_bind (stream, "tcp://127.0.0.1:5556");
161  assert (rc == 0);
162 
163  // We'll be using this socket to test TCP stream handshake timeout
164  void *dealer = zmq_socket (ctx, ZMQ_DEALER);
165  assert (dealer);
166  rc = zmq_setsockopt (dealer, ZMQ_LINGER, &zero, sizeof (zero));
167  assert (rc == 0);
168  int val, tenth = 100;
169  size_t vsize = sizeof(val);
170 
171  // check for the expected default handshake timeout value - 30 sec
172  rc = zmq_getsockopt (dealer, ZMQ_HANDSHAKE_IVL, &val, &vsize);
173  assert (rc == 0);
174  assert (vsize == sizeof(val));
175  assert (val == 30000);
176  // make handshake timeout faster - 1/10 sec
177  rc = zmq_setsockopt (dealer, ZMQ_HANDSHAKE_IVL, &tenth, sizeof (tenth));
178  assert (rc == 0);
179  vsize = sizeof(val);
180  // make sure zmq_setsockopt changed the value
181  rc = zmq_getsockopt (dealer, ZMQ_HANDSHAKE_IVL, &val, &vsize);
182  assert (rc == 0);
183  assert (vsize == sizeof(val));
184  assert (val == tenth);
185 
186  // Create and connect a socket for collecting monitor events on dealer
187  void *dealer_mon = zmq_socket (ctx, ZMQ_PAIR);
188  assert (dealer_mon);
189 
190  rc = zmq_socket_monitor (dealer, "inproc://monitor-dealer",
192  assert (rc == 0);
193 
194  // Connect to the inproc endpoint so we'll get events
195  rc = zmq_connect (dealer_mon, "inproc://monitor-dealer");
196  assert (rc == 0);
197 
198  // connect dealer socket to non-sending stream socket
199  rc = zmq_connect (dealer, "tcp://localhost:5556");
200  assert (rc == 0);
201 
202  // we should get ZMQ_EVENT_CONNECTED and then ZMQ_EVENT_DISCONNECTED
203  int event = get_monitor_event (dealer_mon, NULL, NULL);
204  assert (event == ZMQ_EVENT_CONNECTED);
205  event = get_monitor_event (dealer_mon, NULL, NULL);
206  assert (event == ZMQ_EVENT_DISCONNECTED);
207 
208  rc = zmq_close (dealer);
209  assert (rc == 0);
210 
211  rc = zmq_close (dealer_mon);
212  assert (rc == 0);
213 
214  rc = zmq_close (stream);
215  assert (rc == 0);
216 
217  rc = zmq_ctx_term (ctx);
218  assert (rc == 0);
219 }
220 
221 int main (void)
222 {
226 }
#define size
ZMQ_EXPORT int zmq_setsockopt(void *s, int option, const void *optval, size_t optvallen)
Definition: zmq.cpp:265
#define ZMQ_STREAM
Definition: zmq.h:257
static void test_stream_handshake_timeout_connect(void)
#define ZMQ_EVENT_CONNECTED
Definition: zmq.h:377
ZMQ_EXPORT void * zmq_ctx_new(void)
Definition: zmq.cpp:115
#define ZMQ_DEALER
Definition: zmq.h:251
void setup_test_environment(void)
Definition: testutil.hpp:285
ZMQ_EXPORT void * zmq_msg_data(zmq_msg_t *msg)
Definition: zmq.cpp:666
ZMQ_EXPORT void * zmq_socket(void *, int type)
Definition: zmq.cpp:244
#define ZMQ_LINGER
Definition: zmq.h:276
ZMQ_EXPORT int zmq_socket_monitor(void *s, const char *addr, int events)
Definition: zmq.cpp:288
ZMQ_EXPORT int zmq_getsockopt(void *s, int option, void *optval, size_t *optvallen)
Definition: zmq.cpp:277
#define ZMQ_EVENT_DISCONNECTED
Definition: zmq.h:386
static void test_stream_handshake_timeout_accept(void)
ZMQ_EXPORT int zmq_connect(void *s, const char *addr)
Definition: zmq.cpp:332
ZMQ_EXPORT int zmq_close(void *s)
Definition: zmq.cpp:255
ZMQ_EXPORT int zmq_msg_recv(zmq_msg_t *msg, void *s, int flags)
Definition: zmq.cpp:640
ZMQ_EXPORT int zmq_msg_more(zmq_msg_t *msg)
Definition: zmq.cpp:676
Definition: zmq.h:221
#define ZMQ_EVENT_ACCEPTED
Definition: zmq.h:382
static int get_monitor_event(void *monitor, int *value, char **address)
ZMQ_EXPORT int zmq_bind(void *s, const char *addr)
Definition: zmq.cpp:321
ZMQ_EXPORT int zmq_ctx_term(void *context)
Definition: zmq.cpp:162
#define ZMQ_PAIR
Definition: zmq.h:246
ZMQ_EXPORT size_t zmq_msg_size(zmq_msg_t *msg)
Definition: zmq.cpp:671
const char * address
Definition: test_fork.cpp:32
#define ZMQ_HANDSHAKE_IVL
Definition: zmq.h:316
ZMQ_EXPORT int zmq_msg_init(zmq_msg_t *msg)
Definition: zmq.cpp:613
int main(void)