libzmq  master
ZeroMQ C++ Core Engine (LIBZMQ)
test_monitor.cpp
Go to the documentation of this file.
1 /*
2  Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 
4  This file is part of libzmq, the ZeroMQ core engine in C++.
5 
6  libzmq is free software; you can redistribute it and/or modify it under
7  the terms of the GNU Lesser General Public License (LGPL) as published
8  by the Free Software Foundation; either version 3 of the License, or
9  (at your option) any later version.
10 
11  As a special exception, the Contributors give you permission to link
12  this library with independent modules to produce an executable,
13  regardless of the license terms of these independent modules, and to
14  copy and distribute the resulting executable under terms of your choice,
15  provided that you also meet, for each linked independent module, the
16  terms and conditions of the license of that module. An independent
17  module is a module which is not derived from or based on this library.
18  If you modify this library, you must extend this exception to your
19  version of the library.
20 
21  libzmq is distributed in the hope that it will be useful, but WITHOUT
22  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23  FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24  License for more details.
25 
26  You should have received a copy of the GNU Lesser General Public License
27  along with this program. If not, see <http://www.gnu.org/licenses/>.
28 */
29 
30 #include "testutil.hpp"
31 
32 // Read one event off the monitor socket; return value and address
33 // by reference, if not null, and event number by value. Returns -1
34 // in case of error.
35 
36 static int
37 get_monitor_event (void *monitor, int *value, char **address)
38 {
39  // First frame in message contains event number and value
40  zmq_msg_t msg;
41  zmq_msg_init (&msg);
42  if (zmq_msg_recv (&msg, monitor, 0) == -1)
43  return -1; // Interruped, presumably
44  assert (zmq_msg_more (&msg));
45 
46  uint8_t *data = (uint8_t *) zmq_msg_data (&msg);
47  uint16_t event = *(uint16_t *) (data);
48  if (value)
49  *value = *(uint32_t *) (data + 2);
50 
51  // Second frame in message contains event address
52  zmq_msg_init (&msg);
53  if (zmq_msg_recv (&msg, monitor, 0) == -1)
54  return -1; // Interruped, presumably
55  assert (!zmq_msg_more (&msg));
56 
57  if (address) {
58  uint8_t *data = (uint8_t *) zmq_msg_data (&msg);
59  size_t size = zmq_msg_size (&msg);
60  *address = (char *) malloc (size + 1);
61  memcpy (*address, data, size);
62  *address [size] = 0;
63  }
64  return event;
65 }
66 
67 int main (void)
68 {
70 
71  void *ctx = zmq_ctx_new ();
72  assert (ctx);
73 
74  // We'll monitor these two sockets
75  void *client = zmq_socket (ctx, ZMQ_DEALER);
76  assert (client);
77  void *server = zmq_socket (ctx, ZMQ_DEALER);
78  assert (server);
79 
80  // Socket monitoring only works over inproc://
81  int rc = zmq_socket_monitor (client, "tcp://127.0.0.1:9999", 0);
82  assert (rc == -1);
83  assert (zmq_errno () == EPROTONOSUPPORT);
84 
85  // Monitor all events on client and server sockets
86  rc = zmq_socket_monitor (client, "inproc://monitor-client", ZMQ_EVENT_ALL);
87  assert (rc == 0);
88  rc = zmq_socket_monitor (server, "inproc://monitor-server", ZMQ_EVENT_ALL);
89  assert (rc == 0);
90 
91  // Create two sockets for collecting monitor events
92  void *client_mon = zmq_socket (ctx, ZMQ_PAIR);
93  assert (client_mon);
94  void *server_mon = zmq_socket (ctx, ZMQ_PAIR);
95  assert (server_mon);
96 
97  // Connect these to the inproc endpoints so they'll get events
98  rc = zmq_connect (client_mon, "inproc://monitor-client");
99  assert (rc == 0);
100  rc = zmq_connect (server_mon, "inproc://monitor-server");
101  assert (rc == 0);
102 
103  // Now do a basic ping test
104  rc = zmq_bind (server, "tcp://127.0.0.1:9998");
105  assert (rc == 0);
106  rc = zmq_connect (client, "tcp://127.0.0.1:9998");
107  assert (rc == 0);
108  bounce (server, client);
109 
110  // Close client and server
111  close_zero_linger (client);
112  close_zero_linger (server);
113 
114  // Now collect and check events from both sockets
115  int event = get_monitor_event (client_mon, NULL, NULL);
116  if (event == ZMQ_EVENT_CONNECT_DELAYED)
117  event = get_monitor_event (client_mon, NULL, NULL);
118  assert (event == ZMQ_EVENT_CONNECTED);
119  event = get_monitor_event (client_mon, NULL, NULL);
120  assert (event == ZMQ_EVENT_MONITOR_STOPPED);
121 
122  // This is the flow of server events
123  event = get_monitor_event (server_mon, NULL, NULL);
124  assert (event == ZMQ_EVENT_LISTENING);
125  event = get_monitor_event (server_mon, NULL, NULL);
126  assert (event == ZMQ_EVENT_ACCEPTED);
127  event = get_monitor_event (server_mon, NULL, NULL);
128  // Sometimes the server sees the client closing before it gets closed.
129  if (event != ZMQ_EVENT_DISCONNECTED) {
130  assert (event == ZMQ_EVENT_CLOSED);
131  event = get_monitor_event (server_mon, NULL, NULL);
132  }
133  if (event != ZMQ_EVENT_DISCONNECTED) {
134  assert (event == ZMQ_EVENT_MONITOR_STOPPED);
135  }
136 
137  // Close down the sockets
138  close_zero_linger (client_mon);
139  close_zero_linger (server_mon);
140  zmq_ctx_term (ctx);
141 
142  return 0 ;
143 }
#define ZMQ_EVENT_CONNECT_DELAYED
Definition: zmq.h:378
#define size
#define ZMQ_EVENT_CONNECTED
Definition: zmq.h:377
ZMQ_EXPORT void * zmq_ctx_new(void)
Definition: zmq.cpp:115
#define ZMQ_DEALER
Definition: zmq.h:251
void setup_test_environment(void)
Definition: testutil.hpp:285
#define ZMQ_EVENT_CLOSED
Definition: zmq.h:384
ZMQ_EXPORT int zmq_errno(void)
Definition: zmq.cpp:107
ZMQ_EXPORT void * zmq_msg_data(zmq_msg_t *msg)
Definition: zmq.cpp:666
ZMQ_EXPORT void * zmq_socket(void *, int type)
Definition: zmq.cpp:244
ZMQ_EXPORT int zmq_socket_monitor(void *s, const char *addr, int events)
Definition: zmq.cpp:288
int main(void)
#define ZMQ_EVENT_DISCONNECTED
Definition: zmq.h:386
ZMQ_EXPORT int zmq_connect(void *s, const char *addr)
Definition: zmq.cpp:332
ZMQ_EXPORT int zmq_msg_recv(zmq_msg_t *msg, void *s, int flags)
Definition: zmq.cpp:640
ZMQ_EXPORT int zmq_msg_more(zmq_msg_t *msg)
Definition: zmq.cpp:676
Definition: zmq.h:221
#define ZMQ_EVENT_ACCEPTED
Definition: zmq.h:382
#define ZMQ_EVENT_LISTENING
Definition: zmq.h:380
#define EPROTONOSUPPORT
Definition: zmq.h:115
void close_zero_linger(void *socket)
Definition: testutil.hpp:275
void bounce(void *server, void *client)
Definition: testutil.hpp:73
ZMQ_EXPORT int zmq_bind(void *s, const char *addr)
Definition: zmq.cpp:321
#define ZMQ_EVENT_MONITOR_STOPPED
Definition: zmq.h:387
ZMQ_EXPORT int zmq_ctx_term(void *context)
Definition: zmq.cpp:162
#define ZMQ_PAIR
Definition: zmq.h:246
static int get_monitor_event(void *monitor, int *value, char **address)
ZMQ_EXPORT size_t zmq_msg_size(zmq_msg_t *msg)
Definition: zmq.cpp:671
#define ZMQ_EVENT_ALL
Definition: zmq.h:388
const char * address
Definition: test_fork.cpp:32
ZMQ_EXPORT int zmq_msg_init(zmq_msg_t *msg)
Definition: zmq.cpp:613