libzmq  master
ZeroMQ C++ Core Engine (LIBZMQ)
test_heartbeats.cpp
Go to the documentation of this file.
1 /*
2  Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 
4  This file is part of 0MQ.
5 
6  0MQ is free software; you can redistribute it and/or modify it under
7  the terms of the GNU Lesser General Public License as published by
8  the Free Software Foundation; either version 3 of the License, or
9  (at your option) any later version.
10 
11  0MQ is distributed in the hope that it will be useful,
12  but WITHOUT ANY WARRANTY; without even the implied warranty of
13  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  GNU Lesser General Public License for more details.
15 
16  You should have received a copy of the GNU Lesser General Public License
17  along with this program. If not, see <http://www.gnu.org/licenses/>.
18 */
19 
20 #include "testutil.hpp"
21 #if defined (ZMQ_HAVE_WINDOWS)
22 # include <winsock2.h>
23 # include <ws2tcpip.h>
24 # include <stdexcept>
25 # define close closesocket
26 #else
27 # include <arpa/inet.h>
28 #endif
29 
30 // Read one event off the monitor socket; return value and address
31 // by reference, if not null, and event number by value. Returns -1
32 // in case of error.
33 
34 static int
35 get_monitor_event (void *monitor)
36 {
37  for(int i = 0; i < 2; i++) {
38  // First frame in message contains event number and value
39  zmq_msg_t msg;
40  zmq_msg_init (&msg);
41  if (zmq_msg_recv (&msg, monitor, ZMQ_DONTWAIT) == -1) {
43  continue; // Interruped, presumably
44  }
45  assert (zmq_msg_more (&msg));
46 
47  uint8_t *data = (uint8_t *) zmq_msg_data (&msg);
48  uint16_t event = *(uint16_t *) (data);
49 
50  // Second frame in message contains event address
51  zmq_msg_init (&msg);
52  if (zmq_msg_recv (&msg, monitor, 0) == -1) {
53  return -1; // Interruped, presumably
54  }
55  assert (!zmq_msg_more (&msg));
56 
57  return event;
58  }
59  return -1;
60 }
61 
62 static void
63 recv_with_retry (int fd, char *buffer, int bytes) {
64  int received = 0;
65  while (true) {
66  int rc = recv(fd, buffer + received, bytes - received, 0);
67  assert(rc > 0);
68  received += rc;
69  assert(received <= bytes);
70  if (received == bytes) break;
71  }
72 }
73 
74 static void
75 mock_handshake (int fd) {
76  const uint8_t zmtp_greeting[33] = { 0xff, 0, 0, 0, 0, 0, 0, 0, 0, 0x7f, 3, 0, 'N', 'U', 'L', 'L', 0 };
77  char buffer[128];
78  memset(buffer, 0, sizeof(buffer));
79  memcpy(buffer, zmtp_greeting, sizeof(zmtp_greeting));
80 
81  int rc = send(fd, buffer, 64, 0);
82  assert(rc == 64);
83 
84  recv_with_retry(fd, buffer, 64);
85 
86  const uint8_t zmtp_ready[43] = {
87  4, 41, 5, 'R', 'E', 'A', 'D', 'Y', 11, 'S', 'o', 'c', 'k', 'e', 't', '-', 'T', 'y', 'p', 'e',
88  0, 0, 0, 6, 'D', 'E', 'A', 'L', 'E', 'R', 8, 'I', 'd', 'e', 'n', 't', 'i', 't', 'y',
89  0, 0, 0, 0
90  };
91 
92  memset(buffer, 0, sizeof(buffer));
93  memcpy(buffer, zmtp_ready, 43);
94  rc = send(fd, buffer, 43, 0);
95  assert(rc == 43);
96 
97  recv_with_retry(fd, buffer, 43);
98 }
99 
100 static void
101 setup_curve(void * socket, int is_server) {
102  const char *secret_key;
103  const char *public_key;
104  const char *server_key;
105 
106  if(is_server) {
107  secret_key = "JTKVSB%%)wK0E.X)V>+}o?pNmC{O&4W4b!Ni{Lh6";
108  public_key = "rq:rM>}U?@Lns47E1%kR.o@n%FcmmsL/@{H8]yf7";
109  server_key = NULL;
110  }
111  else {
112  secret_key = "D:)Q[IlAW!ahhC2ac:9*A}h:p?([4%wOTJ%JR%cs";
113  public_key = "Yne@$w-vo<fVvi]a<NY6T1ed:M$fCG*[IaLV{hID";
114  server_key = "rq:rM>}U?@Lns47E1%kR.o@n%FcmmsL/@{H8]yf7";
115  }
116 
117  zmq_setsockopt(socket, ZMQ_CURVE_SECRETKEY, secret_key, strlen(secret_key));
118  zmq_setsockopt(socket, ZMQ_CURVE_PUBLICKEY, public_key, strlen(public_key));
119  if(is_server)
120  zmq_setsockopt(socket, ZMQ_CURVE_SERVER, &is_server, sizeof(is_server));
121  else
122  zmq_setsockopt(socket, ZMQ_CURVE_SERVERKEY, server_key, strlen(server_key));
123 }
124 
125 static void
126 prep_server_socket(void * ctx, int set_heartbeats, int is_curve, void ** server_out, void ** mon_out)
127 {
128  int rc;
129  // We'll be using this socket in raw mode
130  void *server = zmq_socket (ctx, ZMQ_ROUTER);
131  assert (server);
132 
133  int value = 0;
134  rc = zmq_setsockopt (server, ZMQ_LINGER, &value, sizeof (value));
135  assert (rc == 0);
136 
137  if(set_heartbeats) {
138  value = 50;
139  rc = zmq_setsockopt (server, ZMQ_HEARTBEAT_IVL, &value, sizeof(value));
140  assert (rc == 0);
141  }
142 
143  if(is_curve)
144  setup_curve(server, 1);
145 
146  rc = zmq_bind (server, "tcp://127.0.0.1:5556");
147  assert (rc == 0);
148 
149  // Create and connect a socket for collecting monitor events on dealer
150  void *server_mon = zmq_socket (ctx, ZMQ_PAIR);
151  assert (server_mon);
152 
153  rc = zmq_socket_monitor (server, "inproc://monitor-dealer",
155  assert (rc == 0);
156 
157  // Connect to the inproc endpoint so we'll get events
158  rc = zmq_connect (server_mon, "inproc://monitor-dealer");
159  assert (rc == 0);
160 
161  *server_out = server;
162  *mon_out = server_mon;
163 }
164 
165 // This checks for a broken TCP connection (or, in this case a stuck one
166 // where the peer never responds to PINGS). There should be an accepted event
167 // then a disconnect event.
168 static void
170 {
171  int rc;
172 
173  // Set up our context and sockets
174  void *ctx = zmq_ctx_new ();
175  assert (ctx);
176 
177  void * server, * server_mon;
178  prep_server_socket(ctx, 1, 0, &server, &server_mon);
179 
180  struct sockaddr_in ip4addr;
181  int s;
182 
183  ip4addr.sin_family = AF_INET;
184  ip4addr.sin_port = htons (5556);
185 #if defined (ZMQ_HAVE_WINDOWS) && (_WIN32_WINNT < 0x0600)
186  ip4addr.sin_addr.s_addr = inet_addr ("127.0.0.1");
187 #else
188  inet_pton(AF_INET, "127.0.0.1", &ip4addr.sin_addr);
189 #endif
190 
191  s = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
192  rc = connect (s, (struct sockaddr*) &ip4addr, sizeof ip4addr);
193  assert (rc > -1);
194 
195  // Mock a ZMTP 3 client so we can forcibly time out a connection
196  mock_handshake(s);
197 
198  // By now everything should report as connected
199  rc = get_monitor_event(server_mon);
200  assert(rc == ZMQ_EVENT_ACCEPTED);
201 
202  // We should have been disconnected
203  rc = get_monitor_event(server_mon);
204  assert(rc == ZMQ_EVENT_DISCONNECTED);
205 
206  close(s);
207 
208  rc = zmq_close (server);
209  assert (rc == 0);
210 
211  rc = zmq_close (server_mon);
212  assert (rc == 0);
213 
214  rc = zmq_ctx_term (ctx);
215  assert (rc == 0);
216 }
217 
218 // This checks that peers respect the TTL value in ping messages
219 // We set up a mock ZMTP 3 client and send a ping message with a TLL
220 // to a server that is not doing any heartbeating. Then we sleep,
221 // if the server disconnects the client, then we know the TTL did
222 // its thing correctly.
223 static void
225 {
226  int rc, value;
227 
228  // Set up our context and sockets
229  void *ctx = zmq_ctx_new ();
230  assert (ctx);
231 
232  void * server, * server_mon, *client;
233  prep_server_socket(ctx, 0, 0, &server, &server_mon);
234 
235  client = zmq_socket(ctx, ZMQ_DEALER);
236  assert(client != NULL);
237 
238  // Set the heartbeat TTL to 0.1 seconds
239  value = 100;
240  zmq_setsockopt(client, ZMQ_HEARTBEAT_TTL, &value, sizeof(value));
241 
242  // Set the heartbeat interval to much longer than the TTL so that
243  // the socket times out oon the remote side.
244  value = 250;
245  zmq_setsockopt(client, ZMQ_HEARTBEAT_IVL, &value, sizeof(value));
246 
247  rc = zmq_connect(client, "tcp://localhost:5556");
248  assert(rc == 0);
249 
250  // By now everything should report as connected
251  rc = get_monitor_event(server_mon);
252  assert(rc == ZMQ_EVENT_ACCEPTED);
253 
255 
256  // We should have been disconnected
257  rc = get_monitor_event(server_mon);
258  assert(rc == ZMQ_EVENT_DISCONNECTED);
259 
260  rc = zmq_close (server);
261  assert (rc == 0);
262 
263  rc = zmq_close (server_mon);
264  assert (rc == 0);
265 
266  rc = zmq_close (client);
267  assert (rc == 0);
268 
269  rc = zmq_ctx_term (ctx);
270  assert (rc == 0);
271 }
272 
273 // This checks for normal operation - that is pings and pongs being
274 // exchanged normally. There should be an accepted event on the server,
275 // and then no event afterwards.
276 static void
278 {
279  int rc;
280 
281  // Set up our context and sockets
282  void *ctx = zmq_ctx_new ();
283  assert (ctx);
284 
285  void * server, * server_mon;
286  prep_server_socket(ctx, 1, is_curve, &server, &server_mon);
287 
288  void * client = zmq_socket(ctx, ZMQ_DEALER);
289  if(is_curve)
290  setup_curve(client, 0);
291  rc = zmq_connect(client, "tcp://127.0.0.1:5556");
292 
293  // Give it a sec to connect and handshake
295 
296  // By now everything should report as connected
297  rc = get_monitor_event(server_mon);
298  assert(rc == ZMQ_EVENT_ACCEPTED);
299 
300  // We should still be connected because pings and pongs are happenin'
301  rc = get_monitor_event(server_mon);
302  assert(rc == -1);
303 
304  rc = zmq_close (client);
305  assert (rc == 0);
306 
307  rc = zmq_close (server);
308  assert (rc == 0);
309 
310  rc = zmq_close (server_mon);
311  assert (rc == 0);
312 
313  rc = zmq_ctx_term (ctx);
314  assert (rc == 0);
315 }
316 
317 int main (void)
318 {
322  // Run this test without curve
324  // Then rerun it with curve
326 }
void msleep(int milliseconds)
Definition: testutil.hpp:316
ZMQ_EXPORT int zmq_setsockopt(void *s, int option, const void *optval, size_t optvallen)
Definition: zmq.cpp:265
#define ZMQ_CURVE_SERVERKEY
Definition: zmq.h:303
#define ZMQ_EVENT_CONNECTED
Definition: zmq.h:377
ZMQ_EXPORT void * zmq_ctx_new(void)
Definition: zmq.cpp:115
#define ZMQ_CURVE_SECRETKEY
Definition: zmq.h:302
#define ZMQ_DEALER
Definition: zmq.h:251
#define ZMQ_HEARTBEAT_IVL
Definition: zmq.h:326
void setup_test_environment(void)
Definition: testutil.hpp:285
#define SETTLE_TIME
Definition: testutil.hpp:44
static void recv_with_retry(int fd, char *buffer, int bytes)
#define ZMQ_CURVE_PUBLICKEY
Definition: zmq.h:301
static void test_heartbeat_ttl(void)
static void setup_curve(void *socket, int is_server)
static void prep_server_socket(void *ctx, int set_heartbeats, int is_curve, void **server_out, void **mon_out)
#define ZMQ_ROUTER
Definition: zmq.h:252
ZMQ_EXPORT void * zmq_msg_data(zmq_msg_t *msg)
Definition: zmq.cpp:666
ZMQ_EXPORT void * zmq_socket(void *, int type)
Definition: zmq.cpp:244
#define ZMQ_LINGER
Definition: zmq.h:276
ZMQ_EXPORT int zmq_socket_monitor(void *s, const char *addr, int events)
Definition: zmq.cpp:288
#define ZMQ_EVENT_DISCONNECTED
Definition: zmq.h:386
static void test_heartbeat_notimeout(int is_curve)
ZMQ_EXPORT int zmq_connect(void *s, const char *addr)
Definition: zmq.cpp:332
#define ZMQ_HEARTBEAT_TTL
Definition: zmq.h:327
ZMQ_EXPORT int zmq_close(void *s)
Definition: zmq.cpp:255
ZMQ_EXPORT int zmq_msg_recv(zmq_msg_t *msg, void *s, int flags)
Definition: zmq.cpp:640
ZMQ_EXPORT int zmq_msg_more(zmq_msg_t *msg)
Definition: zmq.cpp:676
Definition: zmq.h:221
#define ZMQ_EVENT_ACCEPTED
Definition: zmq.h:382
#define ZMQ_CURVE_SERVER
Definition: zmq.h:300
static void mock_handshake(int fd)
ZMQ_EXPORT int zmq_bind(void *s, const char *addr)
Definition: zmq.cpp:321
ZMQ_EXPORT int zmq_ctx_term(void *context)
Definition: zmq.cpp:162
static int get_monitor_event(void *monitor)
#define ZMQ_PAIR
Definition: zmq.h:246
static void test_heartbeat_timeout(void)
int main(void)
#define ZMQ_DONTWAIT
Definition: zmq.h:345
ZMQ_EXPORT int zmq_msg_init(zmq_msg_t *msg)
Definition: zmq.cpp:613