libzmq  master
ZeroMQ C++ Core Engine (LIBZMQ)
router.cpp
Go to the documentation of this file.
1 /*
2  Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 
4  This file is part of libzmq, the ZeroMQ core engine in C++.
5 
6  libzmq is free software; you can redistribute it and/or modify it under
7  the terms of the GNU Lesser General Public License (LGPL) as published
8  by the Free Software Foundation; either version 3 of the License, or
9  (at your option) any later version.
10 
11  As a special exception, the Contributors give you permission to link
12  this library with independent modules to produce an executable,
13  regardless of the license terms of these independent modules, and to
14  copy and distribute the resulting executable under terms of your choice,
15  provided that you also meet, for each linked independent module, the
16  terms and conditions of the license of that module. An independent
17  module is a module which is not derived from or based on this library.
18  If you modify this library, you must extend this exception to your
19  version of the library.
20 
21  libzmq is distributed in the hope that it will be useful, but WITHOUT
22  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23  FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24  License for more details.
25 
26  You should have received a copy of the GNU Lesser General Public License
27  along with this program. If not, see <http://www.gnu.org/licenses/>.
28 */
29 
30 #include "precompiled.hpp"
31 #include "macros.hpp"
32 #include "router.hpp"
33 #include "pipe.hpp"
34 #include "wire.hpp"
35 #include "random.hpp"
36 #include "likely.hpp"
37 #include "err.hpp"
38 
39 zmq::router_t::router_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
40  socket_base_t (parent_, tid_, sid_),
41  prefetched (false),
42  identity_sent (false),
43  current_in (NULL),
44  terminate_current_in (false),
45  more_in (false),
46  current_out (NULL),
47  more_out (false),
48  next_rid (generate_random ()),
49  mandatory (false),
50  // raw_socket functionality in ROUTER is deprecated
51  raw_socket (false),
52  probe_router (false),
53  handover (false)
54 {
56  options.recv_identity = true;
57  options.raw_socket = false;
58 
61 }
62 
64 {
65  zmq_assert (anonymous_pipes.empty ());;
66  zmq_assert (outpipes.empty ());
69 }
70 
71 void zmq::router_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
72 {
73  LIBZMQ_UNUSED (subscribe_to_all_);
74 
75  zmq_assert (pipe_);
76 
77  if (probe_router) {
78  msg_t probe_msg_;
79  int rc = probe_msg_.init ();
80  errno_assert (rc == 0);
81 
82  rc = pipe_->write (&probe_msg_);
83  // zmq_assert (rc) is not applicable here, since it is not a bug.
84  pipe_->flush ();
85 
86  rc = probe_msg_.close ();
87  errno_assert (rc == 0);
88  }
89 
90  bool identity_ok = identify_peer (pipe_);
91  if (identity_ok)
92  fq.attach (pipe_);
93  else
94  anonymous_pipes.insert (pipe_);
95 }
96 
97 int zmq::router_t::xsetsockopt (int option_, const void *optval_,
98  size_t optvallen_)
99 {
100  bool is_int = (optvallen_ == sizeof (int));
101  int value = 0;
102  if (is_int) memcpy(&value, optval_, sizeof (int));
103 
104  switch (option_) {
105  case ZMQ_CONNECT_RID:
106  if (optval_ && optvallen_) {
107  connect_rid.assign ((char *) optval_, optvallen_);
108  return 0;
109  }
110  break;
111 
112  case ZMQ_ROUTER_RAW:
113  if (is_int && value >= 0) {
114  raw_socket = (value != 0);
115  if (raw_socket) {
116  options.recv_identity = false;
117  options.raw_socket = true;
118  }
119  return 0;
120  }
121  break;
122 
124  if (is_int && value >= 0) {
125  mandatory = (value != 0);
126  return 0;
127  }
128  break;
129 
130  case ZMQ_PROBE_ROUTER:
131  if (is_int && value >= 0) {
132  probe_router = (value != 0);
133  return 0;
134  }
135  break;
136 
137  case ZMQ_ROUTER_HANDOVER:
138  if (is_int && value >= 0) {
139  handover = (value != 0);
140  return 0;
141  }
142  break;
143 
144  default:
145  break;
146  }
147  errno = EINVAL;
148  return -1;
149 }
150 
151 
153 {
154  std::set <pipe_t*>::iterator it = anonymous_pipes.find (pipe_);
155  if (it != anonymous_pipes.end ())
156  anonymous_pipes.erase (it);
157  else {
158  outpipes_t::iterator iter = outpipes.find (pipe_->get_identity ());
159  zmq_assert (iter != outpipes.end ());
160  outpipes.erase (iter);
161  fq.pipe_terminated (pipe_);
162  if (pipe_ == current_out)
163  current_out = NULL;
164  }
165 }
166 
168 {
169  std::set <pipe_t*>::iterator it = anonymous_pipes.find (pipe_);
170  if (it == anonymous_pipes.end ())
171  fq.activated (pipe_);
172  else {
173  bool identity_ok = identify_peer (pipe_);
174  if (identity_ok) {
175  anonymous_pipes.erase (it);
176  fq.attach (pipe_);
177  }
178  }
179 }
180 
182 {
183  outpipes_t::iterator it;
184  for (it = outpipes.begin (); it != outpipes.end (); ++it)
185  if (it->second.pipe == pipe_)
186  break;
187 
188  zmq_assert (it != outpipes.end ());
189  zmq_assert (!it->second.active);
190  it->second.active = true;
191 }
192 
194 {
195  // If this is the first part of the message it's the ID of the
196  // peer to send the message to.
197  if (!more_out) {
199 
200  // If we have malformed message (prefix with no subsequent message)
201  // then just silently ignore it.
202  // TODO: The connections should be killed instead.
203  if (msg_->flags () & msg_t::more) {
204 
205  more_out = true;
206 
207  // Find the pipe associated with the identity stored in the prefix.
208  // If there's no such pipe just silently ignore the message, unless
209  // router_mandatory is set.
210  blob_t identity ((unsigned char*) msg_->data (), msg_->size ());
211  outpipes_t::iterator it = outpipes.find (identity);
212 
213  if (it != outpipes.end ()) {
214  current_out = it->second.pipe;
215  if (!current_out->check_write ()) {
216  it->second.active = false;
217  current_out = NULL;
218  if (mandatory) {
219  more_out = false;
220  errno = EAGAIN;
221  return -1;
222  }
223  }
224  }
225  else
226  if (mandatory) {
227  more_out = false;
228  errno = EHOSTUNREACH;
229  return -1;
230  }
231  }
232 
233  int rc = msg_->close ();
234  errno_assert (rc == 0);
235  rc = msg_->init ();
236  errno_assert (rc == 0);
237  return 0;
238  }
239 
240  // Ignore the MORE flag for raw-sock or assert?
241  if (options.raw_socket)
242  msg_->reset_flags (msg_t::more);
243 
244  // Check whether this is the last part of the message.
245  more_out = msg_->flags () & msg_t::more ? true : false;
246 
247  // Push the message into the pipe. If there's no out pipe, just drop it.
248  if (current_out) {
249 
250  // Close the remote connection if user has asked to do so
251  // by sending zero length message.
252  // Pending messages in the pipe will be dropped (on receiving term- ack)
253  if (raw_socket && msg_->size() == 0) {
254  current_out->terminate (false);
255  int rc = msg_->close ();
256  errno_assert (rc == 0);
257  rc = msg_->init ();
258  errno_assert (rc == 0);
259  current_out = NULL;
260  return 0;
261  }
262 
263  bool ok = current_out->write (msg_);
264  if (unlikely (!ok)) {
265  // Message failed to send - we must close it ourselves.
266  int rc = msg_->close ();
267  errno_assert (rc == 0);
268  current_out = NULL;
269  } else {
270  if (!more_out) {
271  current_out->flush ();
272  current_out = NULL;
273  }
274  }
275  }
276  else {
277  int rc = msg_->close ();
278  errno_assert (rc == 0);
279  }
280 
281  // Detach the message from the data buffer.
282  int rc = msg_->init ();
283  errno_assert (rc == 0);
284 
285  return 0;
286 }
287 
289 {
290  if (prefetched) {
291  if (!identity_sent) {
292  int rc = msg_->move (prefetched_id);
293  errno_assert (rc == 0);
294  identity_sent = true;
295  }
296  else {
297  int rc = msg_->move (prefetched_msg);
298  errno_assert (rc == 0);
299  prefetched = false;
300  }
301  more_in = msg_->flags () & msg_t::more ? true : false;
302 
303  if (!more_in) {
304  if (terminate_current_in) {
305  current_in->terminate (true);
306  terminate_current_in = false;
307  }
308  current_in = NULL;
309  }
310  return 0;
311  }
312 
313  pipe_t *pipe = NULL;
314  int rc = fq.recvpipe (msg_, &pipe);
315 
316  // It's possible that we receive peer's identity. That happens
317  // after reconnection. The current implementation assumes that
318  // the peer always uses the same identity.
319  while (rc == 0 && msg_->is_identity ())
320  rc = fq.recvpipe (msg_, &pipe);
321 
322  if (rc != 0)
323  return -1;
324 
325  zmq_assert (pipe != NULL);
326 
327  // If we are in the middle of reading a message, just return the next part.
328  if (more_in) {
329  more_in = msg_->flags () & msg_t::more ? true : false;
330 
331  if (!more_in) {
332  if (terminate_current_in) {
333  current_in->terminate (true);
334  terminate_current_in = false;
335  }
336  current_in = NULL;
337  }
338  }
339  else {
340  // We are at the beginning of a message.
341  // Keep the message part we have in the prefetch buffer
342  // and return the ID of the peer instead.
343  rc = prefetched_msg.move (*msg_);
344  errno_assert (rc == 0);
345  prefetched = true;
346  current_in = pipe;
347 
348  blob_t identity = pipe->get_identity ();
349  rc = msg_->init_size (identity.size ());
350  errno_assert (rc == 0);
351  memcpy (msg_->data (), identity.data (), identity.size ());
352  msg_->set_flags (msg_t::more);
353  if (prefetched_msg.metadata())
355  identity_sent = true;
356  }
357 
358  return 0;
359 }
360 
362 {
363  if (current_out) {
364  current_out->rollback ();
365  current_out = NULL;
366  more_out = false;
367  }
368  return 0;
369 }
370 
372 {
373  // If we are in the middle of reading the messages, there are
374  // definitely more parts available.
375  if (more_in)
376  return true;
377 
378  // We may already have a message pre-fetched.
379  if (prefetched)
380  return true;
381 
382  // Try to read the next message.
383  // The message, if read, is kept in the pre-fetch buffer.
384  pipe_t *pipe = NULL;
385  int rc = fq.recvpipe (&prefetched_msg, &pipe);
386 
387  // It's possible that we receive peer's identity. That happens
388  // after reconnection. The current implementation assumes that
389  // the peer always uses the same identity.
390  // TODO: handle the situation when the peer changes its identity.
391  while (rc == 0 && prefetched_msg.is_identity ())
392  rc = fq.recvpipe (&prefetched_msg, &pipe);
393 
394  if (rc != 0)
395  return false;
396 
397  zmq_assert (pipe != NULL);
398 
399  blob_t identity = pipe->get_identity ();
400  rc = prefetched_id.init_size (identity.size ());
401  errno_assert (rc == 0);
402  memcpy (prefetched_id.data (), identity.data (), identity.size ());
404 
405  prefetched = true;
406  identity_sent = false;
407  current_in = pipe;
408 
409  return true;
410 }
411 
413 {
414  // In theory, ROUTER socket is always ready for writing. Whether actual
415  // attempt to write succeeds depends on which pipe the message is going
416  // to be routed to.
417  return true;
418 }
419 
421 {
422  return fq.get_credential ();
423 }
424 
426 {
427  msg_t msg;
428  blob_t identity;
429  bool ok;
430 
431  if (connect_rid.length()) {
432  identity = blob_t ((unsigned char*) connect_rid.c_str (),
433  connect_rid.length());
434  connect_rid.clear ();
435  outpipes_t::iterator it = outpipes.find (identity);
436  if (it != outpipes.end ())
437  zmq_assert(false); // Not allowed to duplicate an existing rid
438  }
439  else
440  if (options.raw_socket) { // Always assign identity for raw-socket
441  unsigned char buf [5];
442  buf [0] = 0;
443  put_uint32 (buf + 1, next_rid++);
444  identity = blob_t (buf, sizeof buf);
445  }
446  else
447  if (!options.raw_socket) {
448  // Pick up handshake cases and also case where next identity is set
449  msg.init ();
450  ok = pipe_->read (&msg);
451  if (!ok)
452  return false;
453 
454  if (msg.size () == 0) {
455  // Fall back on the auto-generation
456  unsigned char buf [5];
457  buf [0] = 0;
458  put_uint32 (buf + 1, next_rid++);
459  identity = blob_t (buf, sizeof buf);
460  msg.close ();
461  }
462  else {
463  identity = blob_t ((unsigned char*) msg.data (), msg.size ());
464  outpipes_t::iterator it = outpipes.find (identity);
465  msg.close ();
466 
467  if (it != outpipes.end ()) {
468  if (!handover)
469  // Ignore peers with duplicate ID
470  return false;
471  else {
472  // We will allow the new connection to take over this
473  // identity. Temporarily assign a new identity to the
474  // existing pipe so we can terminate it asynchronously.
475  unsigned char buf [5];
476  buf [0] = 0;
477  put_uint32 (buf + 1, next_rid++);
478  blob_t new_identity = blob_t (buf, sizeof buf);
479 
480  it->second.pipe->set_identity (new_identity);
481  outpipe_t existing_outpipe =
482  {it->second.pipe, it->second.active};
483 
484  ok = outpipes.insert (outpipes_t::value_type (
485  new_identity, existing_outpipe)).second;
486  zmq_assert (ok);
487 
488  // Remove the existing identity entry to allow the new
489  // connection to take the identity.
490  outpipes.erase (it);
491 
492  if (existing_outpipe.pipe == current_in)
493  terminate_current_in = true;
494  else
495  existing_outpipe.pipe->terminate (true);
496  }
497  }
498  }
499  }
500 
501  pipe_->set_identity (identity);
502  // Add the record into output pipes lookup table
503  outpipe_t outpipe = {pipe_, true};
504  ok = outpipes.insert (outpipes_t::value_type (identity, outpipe)).second;
505  zmq_assert (ok);
506 
507  return true;
508 }
bool check_write()
Definition: pipe.cpp:206
bool read(msg_t *msg_)
Definition: pipe.cpp:169
bool probe_router
Definition: router.hpp:133
int close()
Definition: msg.cpp:217
blob_t get_credential() const
Definition: fq.cpp:158
#define ZMQ_ROUTER_HANDOVER
Definition: zmq.h:309
bool write(msg_t *msg_)
Definition: pipe.cpp:221
bool handover
Definition: router.hpp:138
msg_t prefetched_msg
Definition: router.hpp:93
bool terminate_current_in
Definition: router.hpp:99
#define zmq_assert(x)
Definition: err.hpp:119
bool xhas_in()
Definition: router.cpp:371
zmq::pipe_t * current_in
Definition: router.hpp:96
int move(msg_t &src_)
Definition: msg.cpp:274
bool recv_identity
Definition: options.hpp:146
bool prefetched
Definition: router.hpp:83
bool is_identity() const
Definition: msg.cpp:417
router_t(zmq::ctx_t *parent_, uint32_t tid_, int sid)
Definition: router.cpp:39
int recvpipe(msg_t *msg_, pipe_t **pipe_)
Definition: fq.cpp:88
void pipe_terminated(pipe_t *pipe_)
Definition: fq.cpp:56
void xpipe_terminated(zmq::pipe_t *pipe_)
Definition: router.cpp:152
#define ZMQ_ROUTER
Definition: zmq.h:252
metadata_t * metadata
Definition: msg.hpp:175
void put_uint32(unsigned char *buffer_, uint32_t value)
Definition: wire.hpp:64
int xsend(zmq::msg_t *msg_)
Definition: router.cpp:193
bool xhas_out()
Definition: router.cpp:412
unsigned char size
Definition: msg.hpp:188
#define ZMQ_ROUTER_MANDATORY
Definition: zmq.h:287
int init_size(size_t size_)
Definition: msg.cpp:93
void rollback()
Definition: pipe.cpp:235
void reset_flags(unsigned char flags_)
Definition: msg.cpp:389
bool raw_socket
Definition: router.hpp:130
uint32_t next_rid
Definition: router.hpp:125
bool identity_sent
Definition: router.hpp:87
bool more_out
Definition: router.hpp:121
#define ZMQ_PROBE_ROUTER
Definition: zmq.h:304
#define LIBZMQ_UNUSED(object)
Definition: macros.hpp:6
int xrecv(zmq::msg_t *msg_)
Definition: router.cpp:288
#define unlikely(x)
Definition: likely.hpp:38
int init()
Definition: msg.cpp:82
uint32_t generate_random()
Definition: random.cpp:54
#define EHOSTUNREACH
Definition: zmq.h:160
void attach(pipe_t *pipe_)
Definition: fq.cpp:49
void xattach_pipe(zmq::pipe_t *pipe_, bool subscribe_to_all_)
Definition: router.cpp:71
std::basic_string< unsigned char > blob_t
Definition: blob.hpp:134
void flush()
Definition: pipe.cpp:248
int rollback()
Definition: router.cpp:361
std::string connect_rid
msg_t prefetched_id
Definition: router.hpp:90
void set_flags(unsigned char flags_)
Definition: msg.cpp:384
void terminate(bool delay_)
Definition: pipe.cpp:385
std::set< pipe_t * > anonymous_pipes
Definition: router.hpp:111
void activated(pipe_t *pipe_)
Definition: fq.cpp:76
#define errno_assert(x)
Definition: err.hpp:129
void xwrite_activated(zmq::pipe_t *pipe_)
Definition: router.cpp:181
zmq::pipe_t * pipe
Definition: router.hpp:106
bool more_in
Definition: router.hpp:102
unsigned char data[max_vsm_size]
Definition: msg.hpp:187
void set_identity(const blob_t &identity_)
Definition: pipe.cpp:128
#define ZMQ_ROUTER_RAW
Definition: zmq.h:294
options_t options
Definition: own.hpp:109
zmq::pipe_t * current_out
Definition: router.hpp:118
bool mandatory
Definition: router.hpp:129
void xread_activated(zmq::pipe_t *pipe_)
Definition: router.cpp:167
int xsetsockopt(int option_, const void *optval_, size_t optvallen_)
Definition: router.cpp:97
unsigned char flags
Definition: msg.hpp:181
#define ZMQ_CONNECT_RID
Definition: zmq.h:311
outpipes_t outpipes
Definition: router.hpp:115
blob_t get_credential() const
Definition: router.cpp:420
bool identify_peer(pipe_t *pipe_)
Definition: router.cpp:425
blob_t get_identity()
Definition: pipe.cpp:133
void set_metadata(metadata_t *metadata_)
Definition: msg.cpp:399