Line data Source code
1 : /*
2 : Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 :
4 : This file is part of libzmq, the ZeroMQ core engine in C++.
5 :
6 : libzmq is free software; you can redistribute it and/or modify it under
7 : the terms of the GNU Lesser General Public License (LGPL) as published
8 : by the Free Software Foundation; either version 3 of the License, or
9 : (at your option) any later version.
10 :
11 : As a special exception, the Contributors give you permission to link
12 : this library with independent modules to produce an executable,
13 : regardless of the license terms of these independent modules, and to
14 : copy and distribute the resulting executable under terms of your choice,
15 : provided that you also meet, for each linked independent module, the
16 : terms and conditions of the license of that module. An independent
17 : module is a module which is not derived from or based on this library.
18 : If you modify this library, you must extend this exception to your
19 : version of the library.
20 :
21 : libzmq is distributed in the hope that it will be useful, but WITHOUT
22 : ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 : FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 : License for more details.
25 :
26 : You should have received a copy of the GNU Lesser General Public License
27 : along with this program. If not, see <http://www.gnu.org/licenses/>.
28 : */
29 :
30 : #include "precompiled.hpp"
31 : #include "macros.hpp"
32 : #include "router.hpp"
33 : #include "pipe.hpp"
34 : #include "wire.hpp"
35 : #include "random.hpp"
36 : #include "likely.hpp"
37 : #include "err.hpp"
38 :
39 237 : zmq::router_t::router_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
40 : socket_base_t (parent_, tid_, sid_),
41 : prefetched (false),
42 : identity_sent (false),
43 : current_in (NULL),
44 : terminate_current_in (false),
45 : more_in (false),
46 : current_out (NULL),
47 : more_out (false),
48 237 : next_rid (generate_random ()),
49 : mandatory (false),
50 : // raw_socket functionality in ROUTER is deprecated
51 : raw_socket (false),
52 : probe_router (false),
53 948 : handover (false)
54 : {
55 237 : options.type = ZMQ_ROUTER;
56 237 : options.recv_identity = true;
57 237 : options.raw_socket = false;
58 :
59 237 : prefetched_id.init ();
60 237 : prefetched_msg.init ();
61 237 : }
62 :
63 1035 : zmq::router_t::~router_t ()
64 : {
65 474 : zmq_assert (anonymous_pipes.empty ());;
66 474 : zmq_assert (outpipes.empty ());
67 237 : prefetched_id.close ();
68 237 : prefetched_msg.close ();
69 324 : }
70 :
71 591 : void zmq::router_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
72 : {
73 : LIBZMQ_UNUSED (subscribe_to_all_);
74 :
75 591 : zmq_assert (pipe_);
76 :
77 591 : if (probe_router) {
78 : msg_t probe_msg_;
79 3 : int rc = probe_msg_.init ();
80 3 : errno_assert (rc == 0);
81 :
82 3 : rc = pipe_->write (&probe_msg_);
83 : // zmq_assert (rc) is not applicable here, since it is not a bug.
84 3 : pipe_->flush ();
85 :
86 3 : rc = probe_msg_.close ();
87 3 : errno_assert (rc == 0);
88 : }
89 :
90 591 : bool identity_ok = identify_peer (pipe_);
91 591 : if (identity_ok)
92 457 : fq.attach (pipe_);
93 : else
94 134 : anonymous_pipes.insert (pipe_);
95 591 : }
96 :
97 300 : int zmq::router_t::xsetsockopt (int option_, const void *optval_,
98 : size_t optvallen_)
99 : {
100 300 : bool is_int = (optvallen_ == sizeof (int));
101 300 : int value = 0;
102 300 : if (is_int) memcpy(&value, optval_, sizeof (int));
103 :
104 300 : switch (option_) {
105 : case ZMQ_CONNECT_RID:
106 6 : if (optval_ && optvallen_) {
107 6 : connect_rid.assign ((char *) optval_, optvallen_);
108 : return 0;
109 : }
110 : break;
111 :
112 : case ZMQ_ROUTER_RAW:
113 0 : if (is_int && value >= 0) {
114 0 : raw_socket = (value != 0);
115 0 : if (raw_socket) {
116 0 : options.recv_identity = false;
117 0 : options.raw_socket = true;
118 : }
119 : return 0;
120 : }
121 : break;
122 :
123 : case ZMQ_ROUTER_MANDATORY:
124 9 : if (is_int && value >= 0) {
125 9 : mandatory = (value != 0);
126 9 : return 0;
127 : }
128 : break;
129 :
130 : case ZMQ_PROBE_ROUTER:
131 3 : if (is_int && value >= 0) {
132 3 : probe_router = (value != 0);
133 3 : return 0;
134 : }
135 : break;
136 :
137 : case ZMQ_ROUTER_HANDOVER:
138 3 : if (is_int && value >= 0) {
139 3 : handover = (value != 0);
140 3 : return 0;
141 : }
142 : break;
143 :
144 : default:
145 : break;
146 : }
147 279 : errno = EINVAL;
148 279 : return -1;
149 : }
150 :
151 :
152 591 : void zmq::router_t::xpipe_terminated (pipe_t *pipe_)
153 : {
154 1182 : std::set <pipe_t*>::iterator it = anonymous_pipes.find (pipe_);
155 1182 : if (it != anonymous_pipes.end ())
156 0 : anonymous_pipes.erase (it);
157 : else {
158 1773 : outpipes_t::iterator iter = outpipes.find (pipe_->get_identity ());
159 1182 : zmq_assert (iter != outpipes.end ());
160 591 : outpipes.erase (iter);
161 591 : fq.pipe_terminated (pipe_);
162 591 : if (pipe_ == current_out)
163 36 : current_out = NULL;
164 : }
165 591 : }
166 :
167 289 : void zmq::router_t::xread_activated (pipe_t *pipe_)
168 : {
169 578 : std::set <pipe_t*>::iterator it = anonymous_pipes.find (pipe_);
170 578 : if (it == anonymous_pipes.end ())
171 155 : fq.activated (pipe_);
172 : else {
173 134 : bool identity_ok = identify_peer (pipe_);
174 134 : if (identity_ok) {
175 134 : anonymous_pipes.erase (it);
176 134 : fq.attach (pipe_);
177 : }
178 : }
179 289 : }
180 :
181 3 : void zmq::router_t::xwrite_activated (pipe_t *pipe_)
182 : {
183 : outpipes_t::iterator it;
184 12 : for (it = outpipes.begin (); it != outpipes.end (); ++it)
185 3 : if (it->second.pipe == pipe_)
186 : break;
187 :
188 6 : zmq_assert (it != outpipes.end ());
189 3 : zmq_assert (!it->second.active);
190 3 : it->second.active = true;
191 3 : }
192 :
193 1489 : int zmq::router_t::xsend (msg_t *msg_)
194 : {
195 : // If this is the first part of the message it's the ID of the
196 : // peer to send the message to.
197 1489 : if (!more_out) {
198 571 : zmq_assert (!current_out);
199 :
200 : // If we have malformed message (prefix with no subsequent message)
201 : // then just silently ignore it.
202 : // TODO: The connections should be killed instead.
203 571 : if (msg_->flags () & msg_t::more) {
204 :
205 568 : more_out = true;
206 :
207 : // Find the pipe associated with the identity stored in the prefix.
208 : // If there's no such pipe just silently ignore the message, unless
209 : // router_mandatory is set.
210 1136 : blob_t identity ((unsigned char*) msg_->data (), msg_->size ());
211 1136 : outpipes_t::iterator it = outpipes.find (identity);
212 :
213 1136 : if (it != outpipes.end ()) {
214 561 : current_out = it->second.pipe;
215 561 : if (!current_out->check_write ()) {
216 6 : it->second.active = false;
217 6 : current_out = NULL;
218 6 : if (mandatory) {
219 6 : more_out = false;
220 6 : errno = EAGAIN;
221 6 : return -1;
222 : }
223 : }
224 : }
225 : else
226 7 : if (mandatory) {
227 4 : more_out = false;
228 4 : errno = EHOSTUNREACH;
229 4 : return -1;
230 : }
231 : }
232 :
233 561 : int rc = msg_->close ();
234 561 : errno_assert (rc == 0);
235 561 : rc = msg_->init ();
236 561 : errno_assert (rc == 0);
237 : return 0;
238 : }
239 :
240 : // Ignore the MORE flag for raw-sock or assert?
241 918 : if (options.raw_socket)
242 0 : msg_->reset_flags (msg_t::more);
243 :
244 : // Check whether this is the last part of the message.
245 918 : more_out = msg_->flags () & msg_t::more ? true : false;
246 :
247 : // Push the message into the pipe. If there's no out pipe, just drop it.
248 918 : if (current_out) {
249 :
250 : // Close the remote connection if user has asked to do so
251 : // by sending zero length message.
252 : // Pending messages in the pipe will be dropped (on receiving term- ack)
253 915 : if (raw_socket && msg_->size() == 0) {
254 0 : current_out->terminate (false);
255 0 : int rc = msg_->close ();
256 0 : errno_assert (rc == 0);
257 0 : rc = msg_->init ();
258 0 : errno_assert (rc == 0);
259 0 : current_out = NULL;
260 0 : return 0;
261 : }
262 :
263 915 : bool ok = current_out->write (msg_);
264 915 : if (unlikely (!ok)) {
265 : // Message failed to send - we must close it ourselves.
266 0 : int rc = msg_->close ();
267 0 : errno_assert (rc == 0);
268 0 : current_out = NULL;
269 : } else {
270 915 : if (!more_out) {
271 519 : current_out->flush ();
272 519 : current_out = NULL;
273 : }
274 : }
275 : }
276 : else {
277 3 : int rc = msg_->close ();
278 3 : errno_assert (rc == 0);
279 : }
280 :
281 : // Detach the message from the data buffer.
282 918 : int rc = msg_->init ();
283 918 : errno_assert (rc == 0);
284 :
285 : return 0;
286 : }
287 :
288 1483 : int zmq::router_t::xrecv (msg_t *msg_)
289 : {
290 1483 : if (prefetched) {
291 312 : if (!identity_sent) {
292 24 : int rc = msg_->move (prefetched_id);
293 24 : errno_assert (rc == 0);
294 24 : identity_sent = true;
295 : }
296 : else {
297 288 : int rc = msg_->move (prefetched_msg);
298 288 : errno_assert (rc == 0);
299 288 : prefetched = false;
300 : }
301 312 : more_in = msg_->flags () & msg_t::more ? true : false;
302 :
303 312 : if (!more_in) {
304 87 : if (terminate_current_in) {
305 0 : current_in->terminate (true);
306 0 : terminate_current_in = false;
307 : }
308 87 : current_in = NULL;
309 : }
310 : return 0;
311 : }
312 :
313 1171 : pipe_t *pipe = NULL;
314 1171 : int rc = fq.recvpipe (msg_, &pipe);
315 :
316 : // It's possible that we receive peer's identity. That happens
317 : // after reconnection. The current implementation assumes that
318 : // the peer always uses the same identity.
319 1177 : while (rc == 0 && msg_->is_identity ())
320 6 : rc = fq.recvpipe (msg_, &pipe);
321 :
322 1171 : if (rc != 0)
323 : return -1;
324 :
325 699 : zmq_assert (pipe != NULL);
326 :
327 : // If we are in the middle of reading a message, just return the next part.
328 699 : if (more_in) {
329 426 : more_in = msg_->flags () & msg_t::more ? true : false;
330 :
331 426 : if (!more_in) {
332 201 : if (terminate_current_in) {
333 0 : current_in->terminate (true);
334 0 : terminate_current_in = false;
335 : }
336 201 : current_in = NULL;
337 : }
338 : }
339 : else {
340 : // We are at the beginning of a message.
341 : // Keep the message part we have in the prefetch buffer
342 : // and return the ID of the peer instead.
343 273 : rc = prefetched_msg.move (*msg_);
344 273 : errno_assert (rc == 0);
345 273 : prefetched = true;
346 273 : current_in = pipe;
347 :
348 273 : blob_t identity = pipe->get_identity ();
349 273 : rc = msg_->init_size (identity.size ());
350 273 : errno_assert (rc == 0);
351 273 : memcpy (msg_->data (), identity.data (), identity.size ());
352 273 : msg_->set_flags (msg_t::more);
353 273 : if (prefetched_msg.metadata())
354 174 : msg_->set_metadata(prefetched_msg.metadata());
355 273 : identity_sent = true;
356 : }
357 :
358 : return 0;
359 : }
360 :
361 0 : int zmq::router_t::rollback (void)
362 : {
363 0 : if (current_out) {
364 0 : current_out->rollback ();
365 0 : current_out = NULL;
366 0 : more_out = false;
367 : }
368 0 : return 0;
369 : }
370 :
371 232 : bool zmq::router_t::xhas_in ()
372 : {
373 : // If we are in the middle of reading the messages, there are
374 : // definitely more parts available.
375 232 : if (more_in)
376 : return true;
377 :
378 : // We may already have a message pre-fetched.
379 232 : if (prefetched)
380 : return true;
381 :
382 : // Try to read the next message.
383 : // The message, if read, is kept in the pre-fetch buffer.
384 208 : pipe_t *pipe = NULL;
385 208 : int rc = fq.recvpipe (&prefetched_msg, &pipe);
386 :
387 : // It's possible that we receive peer's identity. That happens
388 : // after reconnection. The current implementation assumes that
389 : // the peer always uses the same identity.
390 : // TODO: handle the situation when the peer changes its identity.
391 208 : while (rc == 0 && prefetched_msg.is_identity ())
392 0 : rc = fq.recvpipe (&prefetched_msg, &pipe);
393 :
394 208 : if (rc != 0)
395 : return false;
396 :
397 24 : zmq_assert (pipe != NULL);
398 :
399 24 : blob_t identity = pipe->get_identity ();
400 24 : rc = prefetched_id.init_size (identity.size ());
401 24 : errno_assert (rc == 0);
402 24 : memcpy (prefetched_id.data (), identity.data (), identity.size ());
403 24 : prefetched_id.set_flags (msg_t::more);
404 :
405 24 : prefetched = true;
406 24 : identity_sent = false;
407 24 : current_in = pipe;
408 :
409 24 : return true;
410 : }
411 :
412 178 : bool zmq::router_t::xhas_out ()
413 : {
414 : // In theory, ROUTER socket is always ready for writing. Whether actual
415 : // attempt to write succeeds depends on which pipe the message is going
416 : // to be routed to.
417 178 : return true;
418 : }
419 :
420 0 : zmq::blob_t zmq::router_t::get_credential () const
421 : {
422 0 : return fq.get_credential ();
423 : }
424 :
425 725 : bool zmq::router_t::identify_peer (pipe_t *pipe_)
426 : {
427 : msg_t msg;
428 : blob_t identity;
429 : bool ok;
430 :
431 1450 : if (connect_rid.length()) {
432 12 : identity = blob_t ((unsigned char*) connect_rid.c_str (),
433 : connect_rid.length());
434 6 : connect_rid.clear ();
435 12 : outpipes_t::iterator it = outpipes.find (identity);
436 12 : if (it != outpipes.end ())
437 0 : zmq_assert(false); // Not allowed to duplicate an existing rid
438 : }
439 : else
440 719 : if (options.raw_socket) { // Always assign identity for raw-socket
441 : unsigned char buf [5];
442 0 : buf [0] = 0;
443 0 : put_uint32 (buf + 1, next_rid++);
444 0 : identity = blob_t (buf, sizeof buf);
445 : }
446 : else
447 719 : if (!options.raw_socket) {
448 : // Pick up handshake cases and also case where next identity is set
449 719 : msg.init ();
450 719 : ok = pipe_->read (&msg);
451 719 : if (!ok)
452 : return false;
453 :
454 585 : if (msg.size () == 0) {
455 : // Fall back on the auto-generation
456 : unsigned char buf [5];
457 228 : buf [0] = 0;
458 228 : put_uint32 (buf + 1, next_rid++);
459 228 : identity = blob_t (buf, sizeof buf);
460 228 : msg.close ();
461 : }
462 : else {
463 714 : identity = blob_t ((unsigned char*) msg.data (), msg.size ());
464 714 : outpipes_t::iterator it = outpipes.find (identity);
465 357 : msg.close ();
466 :
467 714 : if (it != outpipes.end ()) {
468 3 : if (!handover)
469 : // Ignore peers with duplicate ID
470 : return false;
471 : else {
472 : // We will allow the new connection to take over this
473 : // identity. Temporarily assign a new identity to the
474 : // existing pipe so we can terminate it asynchronously.
475 : unsigned char buf [5];
476 3 : buf [0] = 0;
477 3 : put_uint32 (buf + 1, next_rid++);
478 3 : blob_t new_identity = blob_t (buf, sizeof buf);
479 :
480 3 : it->second.pipe->set_identity (new_identity);
481 : outpipe_t existing_outpipe =
482 6 : {it->second.pipe, it->second.active};
483 :
484 : ok = outpipes.insert (outpipes_t::value_type (
485 9 : new_identity, existing_outpipe)).second;
486 3 : zmq_assert (ok);
487 :
488 : // Remove the existing identity entry to allow the new
489 : // connection to take the identity.
490 3 : outpipes.erase (it);
491 :
492 3 : if (existing_outpipe.pipe == current_in)
493 0 : terminate_current_in = true;
494 : else
495 3 : existing_outpipe.pipe->terminate (true);
496 : }
497 : }
498 : }
499 : }
500 :
501 591 : pipe_->set_identity (identity);
502 : // Add the record into output pipes lookup table
503 591 : outpipe_t outpipe = {pipe_, true};
504 1773 : ok = outpipes.insert (outpipes_t::value_type (identity, outpipe)).second;
505 591 : zmq_assert (ok);
506 :
507 : return true;
508 : }
|