libzmq  master
ZeroMQ C++ Core Engine (LIBZMQ)
test_proxy.cpp
Go to the documentation of this file.
1 /*
2  Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 
4  This file is part of libzmq, the ZeroMQ core engine in C++.
5 
6  libzmq is free software; you can redistribute it and/or modify it under
7  the terms of the GNU Lesser General Public License (LGPL) as published
8  by the Free Software Foundation; either version 3 of the License, or
9  (at your option) any later version.
10 
11  As a special exception, the Contributors give you permission to link
12  this library with independent modules to produce an executable,
13  regardless of the license terms of these independent modules, and to
14  copy and distribute the resulting executable under terms of your choice,
15  provided that you also meet, for each linked independent module, the
16  terms and conditions of the license of that module. An independent
17  module is a module which is not derived from or based on this library.
18  If you modify this library, you must extend this exception to your
19  version of the library.
20 
21  libzmq is distributed in the hope that it will be useful, but WITHOUT
22  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23  FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24  License for more details.
25 
26  You should have received a copy of the GNU Lesser General Public License
27  along with this program. If not, see <http://www.gnu.org/licenses/>.
28 */
29 
30 #include "testutil.hpp"
31 
32 // Asynchronous client-to-server (DEALER to ROUTER) - pure libzmq
33 //
34 // While this example runs in a single process, that is to make
35 // it easier to start and stop the example. Each task may have its own
36 // context and conceptually acts as a separate process. To have this
37 // behaviour, it is necessary to replace the inproc transport of the
38 // control socket by a tcp transport.
39 
40 // This is our client task
41 // It connects to the server, and then sends a request once per second
42 // It collects responses as they arrive, and it prints them out. We will
43 // run several client tasks in parallel, each with a different random ID.
44 
45 #define CONTENT_SIZE 13
46 #define CONTENT_SIZE_MAX 32
47 #define ID_SIZE 10
48 #define ID_SIZE_MAX 32
49 #define QT_WORKERS 5
50 #define QT_CLIENTS 3
51 #define is_verbose 0
52 
53 static void
54 client_task (void *ctx)
55 {
56  void *client = zmq_socket (ctx, ZMQ_DEALER);
57  assert (client);
58 
59  // Control socket receives terminate command from main over inproc
60  void *control = zmq_socket (ctx, ZMQ_SUB);
61  assert (control);
62  int rc = zmq_setsockopt (control, ZMQ_SUBSCRIBE, "", 0);
63  assert (rc == 0);
64  int linger = 0;
65  rc = zmq_setsockopt (control, ZMQ_LINGER, &linger, sizeof (linger));
66  assert (rc == 0);
67  rc = zmq_connect (control, "inproc://control");
68  assert (rc == 0);
69 
70  char content [CONTENT_SIZE_MAX];
71  // Set random identity to make tracing easier
72  char identity [ID_SIZE];
73  sprintf (identity, "%04X-%04X", rand() % 0xFFFF, rand() % 0xFFFF);
74  rc = zmq_setsockopt (client, ZMQ_IDENTITY, identity, ID_SIZE); // includes '\0' as an helper for printf
75  assert (rc == 0);
76  linger = 0;
77  rc = zmq_setsockopt (client, ZMQ_LINGER, &linger, sizeof (linger));
78  assert (rc == 0);
79  rc = zmq_connect (client, "tcp://127.0.0.1:5563");
80  assert (rc == 0);
81 
82  zmq_pollitem_t items [] = { { client, 0, ZMQ_POLLIN, 0 }, { control, 0, ZMQ_POLLIN, 0 } };
83  int request_nbr = 0;
84  bool run = true;
85  while (run) {
86  // Tick once per 200 ms, pulling in arriving messages
87  int centitick;
88  for (centitick = 0; centitick < 20; centitick++) {
89  zmq_poll (items, 2, 10);
90  if (items [0].revents & ZMQ_POLLIN) {
91  int rcvmore;
92  size_t sz = sizeof (rcvmore);
93  rc = zmq_recv (client, content, CONTENT_SIZE_MAX, 0);
94  assert (rc == CONTENT_SIZE);
95  if (is_verbose) printf("client receive - identity = %s content = %s\n", identity, content);
96  // Check that message is still the same
97  assert (memcmp (content, "request #", 9) == 0);
98  rc = zmq_getsockopt (client, ZMQ_RCVMORE, &rcvmore, &sz);
99  assert (rc == 0);
100  assert (!rcvmore);
101  }
102  if (items [1].revents & ZMQ_POLLIN) {
103  rc = zmq_recv (control, content, CONTENT_SIZE_MAX, 0);
104  if (is_verbose) printf("client receive - identity = %s command = %s\n", identity, content);
105  if (memcmp (content, "TERMINATE", 9) == 0) {
106  run = false;
107  break;
108  }
109  }
110  }
111  sprintf(content, "request #%03d", ++request_nbr); // CONTENT_SIZE
112  rc = zmq_send (client, content, CONTENT_SIZE, 0);
113  assert (rc == CONTENT_SIZE);
114  }
115 
116  rc = zmq_close (client);
117  assert (rc == 0);
118  rc = zmq_close (control);
119  assert (rc == 0);
120 }
121 
122 // This is our server task.
123 // It uses the multithreaded server model to deal requests out to a pool
124 // of workers and route replies back to clients. One worker can handle
125 // one request at a time but one client can talk to multiple workers at
126 // once.
127 
128 static void server_worker (void *ctx);
129 
130 void
131 server_task (void *ctx)
132 {
133  // Frontend socket talks to clients over TCP
134  void *frontend = zmq_socket (ctx, ZMQ_ROUTER);
135  assert (frontend);
136  int linger = 0;
137  int rc = zmq_setsockopt (frontend, ZMQ_LINGER, &linger, sizeof (linger));
138  assert (rc == 0);
139  rc = zmq_bind (frontend, "tcp://127.0.0.1:5563");
140  assert (rc == 0);
141 
142  // Backend socket talks to workers over inproc
143  void *backend = zmq_socket (ctx, ZMQ_DEALER);
144  assert (backend);
145  rc = zmq_setsockopt (backend, ZMQ_LINGER, &linger, sizeof (linger));
146  assert (rc == 0);
147  rc = zmq_bind (backend, "inproc://backend");
148  assert (rc == 0);
149 
150  // Control socket receives terminate command from main over inproc
151  void *control = zmq_socket (ctx, ZMQ_SUB);
152  assert (control);
153  rc = zmq_setsockopt (control, ZMQ_SUBSCRIBE, "", 0);
154  assert (rc == 0);
155  rc = zmq_setsockopt (control, ZMQ_LINGER, &linger, sizeof (linger));
156  assert (rc == 0);
157  rc = zmq_connect (control, "inproc://control");
158  assert (rc == 0);
159 
160  // Launch pool of worker threads, precise number is not critical
161  int thread_nbr;
162  void* threads [5];
163  for (thread_nbr = 0; thread_nbr < QT_WORKERS; thread_nbr++)
164  threads[thread_nbr] = zmq_threadstart (&server_worker, ctx);
165 
166  // Connect backend to frontend via a proxy
167  rc = zmq_proxy_steerable (frontend, backend, NULL, control);
168  assert (rc == 0);
169 
170  for (thread_nbr = 0; thread_nbr < QT_WORKERS; thread_nbr++)
171  zmq_threadclose (threads[thread_nbr]);
172 
173  rc = zmq_close (frontend);
174  assert (rc == 0);
175  rc = zmq_close (backend);
176  assert (rc == 0);
177  rc = zmq_close (control);
178  assert (rc == 0);
179 }
180 
181 // Each worker task works on one request at a time and sends a random number
182 // of replies back, with random delays between replies:
183 // The comments in the first column, if suppressed, makes it a poller version
184 
185 static void
186 server_worker (void *ctx)
187 {
188  void *worker = zmq_socket (ctx, ZMQ_DEALER);
189  assert (worker);
190  int linger = 0;
191  int rc = zmq_setsockopt (worker, ZMQ_LINGER, &linger, sizeof (linger));
192  assert (rc == 0);
193  rc = zmq_connect (worker, "inproc://backend");
194  assert (rc == 0);
195 
196  // Control socket receives terminate command from main over inproc
197  void *control = zmq_socket (ctx, ZMQ_SUB);
198  assert (control);
199  rc = zmq_setsockopt (control, ZMQ_SUBSCRIBE, "", 0);
200  assert (rc == 0);
201  rc = zmq_setsockopt (control, ZMQ_LINGER, &linger, sizeof (linger));
202  assert (rc == 0);
203  rc = zmq_connect (control, "inproc://control");
204  assert (rc == 0);
205 
206  char content [CONTENT_SIZE_MAX]; // bigger than what we need to check that
207  char identity [ID_SIZE_MAX]; // the size received is the size sent
208 
209  bool run = true;
210  while (run) {
211  rc = zmq_recv (control, content, CONTENT_SIZE_MAX, ZMQ_DONTWAIT); // usually, rc == -1 (no message)
212  if (rc > 0) {
213  if (is_verbose)
214  printf("server_worker receives command = %s\n", content);
215  if (memcmp (content, "TERMINATE", 9) == 0)
216  run = false;
217  }
218  // The DEALER socket gives us the reply envelope and message
219  // if we don't poll, we have to use ZMQ_DONTWAIT, if we poll, we can block-receive with 0
220  rc = zmq_recv (worker, identity, ID_SIZE_MAX, ZMQ_DONTWAIT);
221  if (rc == ID_SIZE) {
222  rc = zmq_recv (worker, content, CONTENT_SIZE_MAX, 0);
223  assert (rc == CONTENT_SIZE);
224  if (is_verbose)
225  printf ("server receive - identity = %s content = %s\n", identity, content);
226 
227  // Send 0..4 replies back
228  int reply, replies = rand() % 5;
229  for (reply = 0; reply < replies; reply++) {
230  // Sleep for some fraction of a second
231  msleep (rand () % 10 + 1);
232  // Send message from server to client
233  rc = zmq_send (worker, identity, ID_SIZE, ZMQ_SNDMORE);
234  assert (rc == ID_SIZE);
235  rc = zmq_send (worker, content, CONTENT_SIZE, 0);
236  assert (rc == CONTENT_SIZE);
237  }
238  }
239  }
240  rc = zmq_close (worker);
241  assert (rc == 0);
242  rc = zmq_close (control);
243  assert (rc == 0);
244 }
245 
246 // The main thread simply starts several clients and a server, and then
247 // waits for the server to finish.
248 
249 int main (void)
250 {
252 
253  void *ctx = zmq_ctx_new ();
254  assert (ctx);
255  // Control socket receives terminate command from main over inproc
256  void *control = zmq_socket (ctx, ZMQ_PUB);
257  assert (control);
258  int linger = 0;
259  int rc = zmq_setsockopt (control, ZMQ_LINGER, &linger, sizeof (linger));
260  assert (rc == 0);
261  rc = zmq_bind (control, "inproc://control");
262  assert (rc == 0);
263 
264  void *threads [QT_CLIENTS + 1];
265  for (int i = 0; i < QT_CLIENTS; i++)
266  threads[i] = zmq_threadstart (&client_task, ctx);
267  threads[QT_CLIENTS] = zmq_threadstart (&server_task, ctx);
268  msleep (500); // Run for 500 ms then quit
269 
270  rc = zmq_send (control, "TERMINATE", 9, 0);
271  assert (rc == 9);
272 
273  rc = zmq_close (control);
274  assert (rc == 0);
275 
276  for (int i = 0; i < QT_CLIENTS + 1; i++)
277  zmq_threadclose (threads[i]);
278 
279  rc = zmq_ctx_term (ctx);
280  assert (rc == 0);
281  return 0;
282 }
void msleep(int milliseconds)
Definition: testutil.hpp:316
ZMQ_EXPORT int zmq_setsockopt(void *s, int option, const void *optval, size_t optvallen)
Definition: zmq.cpp:265
#define ID_SIZE_MAX
Definition: test_proxy.cpp:48
void server_task(void *ctx)
Definition: test_proxy.cpp:131
#define CONTENT_SIZE_MAX
Definition: test_proxy.cpp:46
ZMQ_EXPORT void * zmq_ctx_new(void)
Definition: zmq.cpp:115
#define ZMQ_SNDMORE
Definition: zmq.h:346
#define ZMQ_DEALER
Definition: zmq.h:251
#define ZMQ_SUB
Definition: zmq.h:248
static void client_task(void *ctx)
Definition: test_proxy.cpp:54
#define ZMQ_SUBSCRIBE
Definition: zmq.h:266
#define QT_WORKERS
Definition: test_proxy.cpp:49
void setup_test_environment(void)
Definition: testutil.hpp:285
ZMQ_EXPORT int zmq_recv(void *s, void *buf, size_t len, int flags)
Definition: zmq.cpp:507
static void worker(void *s)
#define ZMQ_PUB
Definition: zmq.h:247
int main(void)
Definition: test_proxy.cpp:249
#define ZMQ_ROUTER
Definition: zmq.h:252
ZMQ_EXPORT void * zmq_socket(void *, int type)
Definition: zmq.cpp:244
#define ZMQ_LINGER
Definition: zmq.h:276
#define QT_CLIENTS
Definition: test_proxy.cpp:50
#define ZMQ_IDENTITY
Definition: zmq.h:265
#define is_verbose
Definition: test_proxy.cpp:51
ZMQ_EXPORT void zmq_threadclose(void *thread)
Definition: zmq_utils.cpp:85
static void server_worker(void *ctx)
Definition: test_proxy.cpp:186
ZMQ_EXPORT int zmq_getsockopt(void *s, int option, void *optval, size_t *optvallen)
Definition: zmq.cpp:277
#define ID_SIZE
Definition: test_proxy.cpp:47
#define CONTENT_SIZE
Definition: test_proxy.cpp:45
ZMQ_EXPORT int zmq_connect(void *s, const char *addr)
Definition: zmq.cpp:332
ZMQ_EXPORT int zmq_close(void *s)
Definition: zmq.cpp:255
#define ZMQ_RCVMORE
Definition: zmq.h:272
ZMQ_EXPORT int zmq_poll(zmq_pollitem_t *items, int nitems, long timeout)
Definition: zmq.cpp:748
ZMQ_EXPORT int zmq_send(void *s, const void *buf, size_t len, int flags)
Definition: zmq.cpp:387
ZMQ_EXPORT int zmq_bind(void *s, const char *addr)
Definition: zmq.cpp:321
ZMQ_EXPORT int zmq_ctx_term(void *context)
Definition: zmq.cpp:162
ZMQ_EXPORT void * zmq_threadstart(zmq_thread_fn *func, void *arg)
Definition: zmq_utils.cpp:78
#define ZMQ_POLLIN
Definition: zmq.h:410
#define ZMQ_DONTWAIT
Definition: zmq.h:345
ZMQ_EXPORT int zmq_proxy_steerable(void *frontend, void *backend, void *capture, void *control)
Definition: zmq.cpp:1335