libzmq  master
ZeroMQ C++ Core Engine (LIBZMQ)
stream_engine.cpp
Go to the documentation of this file.
1 /*
2  Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 
4  This file is part of libzmq, the ZeroMQ core engine in C++.
5 
6  libzmq is free software; you can redistribute it and/or modify it under
7  the terms of the GNU Lesser General Public License (LGPL) as published
8  by the Free Software Foundation; either version 3 of the License, or
9  (at your option) any later version.
10 
11  As a special exception, the Contributors give you permission to link
12  this library with independent modules to produce an executable,
13  regardless of the license terms of these independent modules, and to
14  copy and distribute the resulting executable under terms of your choice,
15  provided that you also meet, for each linked independent module, the
16  terms and conditions of the license of that module. An independent
17  module is a module which is not derived from or based on this library.
18  If you modify this library, you must extend this exception to your
19  version of the library.
20 
21  libzmq is distributed in the hope that it will be useful, but WITHOUT
22  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23  FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24  License for more details.
25 
26  You should have received a copy of the GNU Lesser General Public License
27  along with this program. If not, see <http://www.gnu.org/licenses/>.
28 */
29 
30 #include "precompiled.hpp"
31 #include "macros.hpp"
32 #include "platform.hpp"
33 
34 #include <string.h>
35 #include <new>
36 #include <sstream>
37 
38 #include "stream_engine.hpp"
39 #include "io_thread.hpp"
40 #include "session_base.hpp"
41 #include "v1_encoder.hpp"
42 #include "v1_decoder.hpp"
43 #include "v2_encoder.hpp"
44 #include "v2_decoder.hpp"
45 #include "null_mechanism.hpp"
46 #include "plain_client.hpp"
47 #include "plain_server.hpp"
48 #include "gssapi_client.hpp"
49 #include "gssapi_server.hpp"
50 #include "curve_client.hpp"
51 #include "curve_server.hpp"
52 #include "raw_decoder.hpp"
53 #include "raw_encoder.hpp"
54 #include "config.hpp"
55 #include "err.hpp"
56 #include "ip.hpp"
57 #include "tcp.hpp"
58 #include "likely.hpp"
59 #include "wire.hpp"
60 
62  const std::string &endpoint_) :
63  s (fd_),
64  as_server(false),
65  handle(NULL),
66  inpos (NULL),
67  insize (0),
68  decoder (NULL),
69  outpos (NULL),
70  outsize (0),
71  encoder (NULL),
72  metadata (NULL),
73  handshaking (true),
74  greeting_size (v2_greeting_size),
75  greeting_bytes_read (0),
76  session (NULL),
77  options (options_),
78  endpoint (endpoint_),
79  plugged (false),
80  next_msg (&stream_engine_t::identity_msg),
81  process_msg (&stream_engine_t::process_identity_msg),
82  io_error (false),
83  subscription_required (false),
84  mechanism (NULL),
85  input_stopped (false),
86  output_stopped (false),
87  has_handshake_timer (false),
88  has_ttl_timer (false),
89  has_timeout_timer (false),
90  has_heartbeat_timer (false),
91  heartbeat_timeout (0),
92  socket (NULL)
93 {
94  int rc = tx_msg.init ();
95  errno_assert (rc == 0);
96 
97  // Put the socket into non-blocking mode.
98  unblock_socket (s);
99 
100  int family = get_peer_ip_address (s, peer_address);
101  if (family == 0)
102  peer_address.clear();
103 #if defined ZMQ_HAVE_SO_PEERCRED
104  else
105  if (family == PF_UNIX) {
106  struct ucred cred;
107  socklen_t size = sizeof (cred);
108  if (!getsockopt (s, SOL_SOCKET, SO_PEERCRED, &cred, &size)) {
109  std::ostringstream buf;
110  buf << ":" << cred.uid << ":" << cred.gid << ":" << cred.pid;
111  peer_address += buf.str ();
112  }
113  }
114 #elif defined ZMQ_HAVE_LOCAL_PEERCRED
115  else
116  if (family == PF_UNIX) {
117  struct xucred cred;
118  socklen_t size = sizeof (cred);
119  if (!getsockopt (s, 0, LOCAL_PEERCRED, &cred, &size)
120  && cred.cr_version == XUCRED_VERSION) {
121  std::ostringstream buf;
122  buf << ":" << cred.cr_uid << ":";
123  if (cred.cr_ngroups > 0)
124  buf << cred.cr_groups[0];
125  buf << ":";
126  peer_address += buf.str ();
127  }
128  }
129 #endif
130 
131 #ifdef SO_NOSIGPIPE
132  // Make sure that SIGPIPE signal is not generated when writing to a
133  // connection that was already closed by the peer.
134  int set = 1;
135  rc = setsockopt (s, SOL_SOCKET, SO_NOSIGPIPE, &set, sizeof (int));
136  errno_assert (rc == 0);
137 #endif
138  if(options.heartbeat_interval > 0) {
140  if(heartbeat_timeout == -1)
142  }
143 }
144 
146 {
147  zmq_assert (!plugged);
148 
149  if (s != retired_fd) {
150 #ifdef ZMQ_HAVE_WINDOWS
151  int rc = closesocket (s);
152  wsa_assert (rc != SOCKET_ERROR);
153 #else
154  int rc = close (s);
155  errno_assert (rc == 0);
156 #endif
157  s = retired_fd;
158  }
159 
160  int rc = tx_msg.close ();
161  errno_assert (rc == 0);
162 
163  // Drop reference to metadata and destroy it if we are
164  // the only user.
165  if (metadata != NULL) {
166  if (metadata->drop_ref ()) {
168  }
169  }
170 
174 }
175 
177  session_base_t *session_)
178 {
179  zmq_assert (!plugged);
180  plugged = true;
181 
182  // Connect to session object.
183  zmq_assert (!session);
184  zmq_assert (session_);
185  session = session_;
186  socket = session-> get_socket ();
187 
188  // Connect to I/O threads poller object.
189  io_object_t::plug (io_thread_);
190  handle = add_fd (s);
191  io_error = false;
192 
193  if (options.raw_socket) {
194  // no handshaking for raw sock, instantiate raw encoder and decoders
195  encoder = new (std::nothrow) raw_encoder_t (out_batch_size);
197 
198  decoder = new (std::nothrow) raw_decoder_t (in_batch_size);
200 
201  // disable handshaking for raw socket
202  handshaking = false;
203 
206 
207  properties_t properties;
208  if (init_properties(properties)) {
209  // Compile metadata.
210  zmq_assert (metadata == NULL);
211  metadata = new (std::nothrow) metadata_t (properties);
212  }
213 
214  if (options.raw_notify) {
215  // For raw sockets, send an initial 0-length message to the
216  // application so that it knows a peer has connected.
217  msg_t connector;
218  connector.init();
219  push_raw_msg_to_session (&connector);
220  connector.close();
221  session->flush ();
222  }
223  }
224  else {
225  // start optional timer, to prevent handshake hanging on no input
227 
228  // Send the 'length' and 'flags' fields of the identity message.
229  // The 'length' field is encoded in the long format.
231  outpos [outsize++] = 0xff;
233  outsize += 8;
234  outpos [outsize++] = 0x7f;
235  }
236 
237  set_pollin (handle);
239  // Flush all the data that may have been already received downstream.
240  in_event ();
241 }
242 
244 {
246  plugged = false;
247 
248  // Cancel all timers.
249  if (has_handshake_timer) {
251  has_handshake_timer = false;
252  }
253 
254  if (has_ttl_timer) {
256  has_ttl_timer = false;
257  }
258 
259  if (has_timeout_timer) {
261  has_timeout_timer = false;
262  }
263 
264  if (has_heartbeat_timer) {
266  has_heartbeat_timer = false;
267  }
268  // Cancel all fd subscriptions.
269  if (!io_error)
270  rm_fd (handle);
271 
272  // Disconnect from I/O threads poller object.
274 
275  session = NULL;
276 }
277 
279 {
280  unplug ();
281  delete this;
282 }
283 
285 {
286  zmq_assert (!io_error);
287 
288  // If still handshaking, receive and process the greeting message.
289  if (unlikely (handshaking))
290  if (!handshake ())
291  return;
292 
294 
295  // If there has been an I/O error, stop polling.
296  if (input_stopped) {
297  rm_fd (handle);
298  io_error = true;
299  return;
300  }
301 
302  // If there's no data to process in the buffer...
303  if (!insize) {
304 
305  // Retrieve the buffer and read as much data as possible.
306  // Note that buffer can be arbitrarily large. However, we assume
307  // the underlying TCP layer has fixed buffer size and thus the
308  // number of bytes read will be always limited.
309  size_t bufsize = 0;
310  decoder->get_buffer (&inpos, &bufsize);
311 
312  const int rc = tcp_read (s, inpos, bufsize);
313 
314  if (rc == 0) {
316  return;
317  }
318  if (rc == -1) {
319  if (errno != EAGAIN)
321  return;
322  }
323 
324  // Adjust input size
325  insize = static_cast <size_t> (rc);
326  // Adjust buffer size to received bytes
328  }
329 
330  int rc = 0;
331  size_t processed = 0;
332 
333  while (insize > 0) {
334  rc = decoder->decode (inpos, insize, processed);
335  zmq_assert (processed <= insize);
336  inpos += processed;
337  insize -= processed;
338  if (rc == 0 || rc == -1)
339  break;
340  rc = (this->*process_msg) (decoder->msg ());
341  if (rc == -1)
342  break;
343  }
344 
345  // Tear down the connection if we have failed to decode input data
346  // or the session has rejected the message.
347  if (rc == -1) {
348  if (errno != EAGAIN) {
350  return;
351  }
352  input_stopped = true;
354  }
355 
356  session->flush ();
357 }
358 
360 {
361  zmq_assert (!io_error);
362 
363  // If write buffer is empty, try to read new data from the encoder.
364  if (!outsize) {
365 
366  // Even when we stop polling as soon as there is no
367  // data to send, the poller may invoke out_event one
368  // more time due to 'speculative write' optimisation.
369  if (unlikely (encoder == NULL)) {
371  return;
372  }
373 
374  outpos = NULL;
375  outsize = encoder->encode (&outpos, 0);
376 
377  while (outsize < (size_t) out_batch_size) {
378  if ((this->*next_msg) (&tx_msg) == -1)
379  break;
380  encoder->load_msg (&tx_msg);
381  unsigned char *bufptr = outpos + outsize;
382  size_t n = encoder->encode (&bufptr, out_batch_size - outsize);
383  zmq_assert (n > 0);
384  if (outpos == NULL)
385  outpos = bufptr;
386  outsize += n;
387  }
388 
389  // If there is no data to send, stop polling for output.
390  if (outsize == 0) {
391  output_stopped = true;
393  return;
394  }
395  }
396 
397  // If there are any data to write in write buffer, write as much as
398  // possible to the socket. Note that amount of data to write can be
399  // arbitrarily large. However, we assume that underlying TCP layer has
400  // limited transmission buffer and thus the actual number of bytes
401  // written should be reasonably modest.
402  const int nbytes = tcp_write (s, outpos, outsize);
403 
404  // IO error has occurred. We stop waiting for output events.
405  // The engine is not terminated until we detect input error;
406  // this is necessary to prevent losing incoming messages.
407  if (nbytes == -1) {
409  return;
410  }
411 
412  outpos += nbytes;
413  outsize -= nbytes;
414 
415  // If we are still handshaking and there are no data
416  // to send, stop polling for output.
417  if (unlikely (handshaking))
418  if (outsize == 0)
420 }
421 
423 {
424  if (unlikely (io_error))
425  return;
426 
427  if (likely (output_stopped)) {
429  output_stopped = false;
430  }
431 
432  // Speculative write: The assumption is that at the moment new message
433  // was sent by the user the socket is probably available for writing.
434  // Thus we try to write the data to socket avoiding polling for POLLOUT.
435  // Consequently, the latency should be better in request/reply scenarios.
436  out_event ();
437 }
438 
440 {
442  zmq_assert (session != NULL);
443  zmq_assert (decoder != NULL);
444 
445  int rc = (this->*process_msg) (decoder->msg ());
446  if (rc == -1) {
447  if (errno == EAGAIN)
448  session->flush ();
449  else
451  return;
452  }
453 
454  while (insize > 0) {
455  size_t processed = 0;
456  rc = decoder->decode (inpos, insize, processed);
457  zmq_assert (processed <= insize);
458  inpos += processed;
459  insize -= processed;
460  if (rc == 0 || rc == -1)
461  break;
462  rc = (this->*process_msg) (decoder->msg ());
463  if (rc == -1)
464  break;
465  }
466 
467  if (rc == -1 && errno == EAGAIN)
468  session->flush ();
469  else
470  if (io_error)
472  else
473  if (rc == -1)
475  else {
476  input_stopped = false;
477  set_pollin (handle);
478  session->flush ();
479 
480  // Speculative read.
481  in_event ();
482  }
483 }
484 
486 {
489  // Receive the greeting.
491  const int n = tcp_read (s, greeting_recv + greeting_bytes_read,
493  if (n == 0) {
495  return false;
496  }
497  if (n == -1) {
498  if (errno != EAGAIN)
500  return false;
501  }
502 
503  greeting_bytes_read += n;
504 
505  // We have received at least one byte from the peer.
506  // If the first byte is not 0xff, we know that the
507  // peer is using unversioned protocol.
508  if (greeting_recv [0] != 0xff)
509  break;
510 
512  continue;
513 
514  // Inspect the right-most bit of the 10th byte (which coincides
515  // with the 'flags' field if a regular message was sent).
516  // Zero indicates this is a header of identity message
517  // (i.e. the peer is using the unversioned protocol).
518  if (!(greeting_recv [9] & 0x01))
519  break;
520 
521  // The peer is using versioned protocol.
522  // Send the major version number.
524  if (outsize == 0)
526  outpos [outsize++] = 3; // Major version number
527  }
528 
530  if (outpos + outsize == greeting_send + signature_size + 1) {
531  if (outsize == 0)
533 
534  // Use ZMTP/2.0 to talk to older peers.
535  if (greeting_recv [10] == ZMTP_1_0
536  || greeting_recv [10] == ZMTP_2_0)
537  outpos [outsize++] = options.type;
538  else {
539  outpos [outsize++] = 0; // Minor version number
540  memset (outpos + outsize, 0, 20);
541 
546 
547  if (options.mechanism == ZMQ_NULL)
548  memcpy (outpos + outsize, "NULL", 4);
549  else
550  if (options.mechanism == ZMQ_PLAIN)
551  memcpy (outpos + outsize, "PLAIN", 5);
552  else
554  memcpy (outpos + outsize, "GSSAPI", 6);
555  else
556  if (options.mechanism == ZMQ_CURVE)
557  memcpy (outpos + outsize, "CURVE", 5);
558  outsize += 20;
559  memset (outpos + outsize, 0, 32);
560  outsize += 32;
562  }
563  }
564  }
565  }
566 
567  // Position of the revision field in the greeting.
568  const size_t revision_pos = 10;
569 
570  // Is the peer using ZMTP/1.0 with no revision number?
571  // If so, we send and receive rest of identity message
572  if (greeting_recv [0] != 0xff || !(greeting_recv [9] & 0x01)) {
573  if (session->zap_enabled ()) {
574  // reject ZMTP 1.0 connections if ZAP is enabled
576  return false;
577  }
578 
579  encoder = new (std::nothrow) v1_encoder_t (out_batch_size);
581 
582  decoder = new (std::nothrow) v1_decoder_t (in_batch_size, options.maxmsgsize);
584 
585  // We have already sent the message header.
586  // Since there is no way to tell the encoder to
587  // skip the message header, we simply throw that
588  // header data away.
589  const size_t header_size = options.identity_size + 1 >= 255 ? 10 : 2;
590  unsigned char tmp [10], *bufferp = tmp;
591 
592  // Prepare the identity message and load it into encoder.
593  // Then consume bytes we have already sent to the peer.
594  const int rc = tx_msg.init_size (options.identity_size);
595  zmq_assert (rc == 0);
597  encoder->load_msg (&tx_msg);
598  size_t buffer_size = encoder->encode (&bufferp, header_size);
599  zmq_assert (buffer_size == header_size);
600 
601  // Make sure the decoder sees the data we have already received.
604 
605  // To allow for interoperability with peers that do not forward
606  // their subscriptions, we inject a phantom subscription message
607  // message into the incoming message stream.
608  if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB)
609  subscription_required = true;
610 
611  // We are sending our identity now and the next message
612  // will come from the socket.
614 
615  // We are expecting identity message.
617  }
618  else
619  if (greeting_recv [revision_pos] == ZMTP_1_0) {
620  if (session->zap_enabled ()) {
621  // reject ZMTP 1.0 connections if ZAP is enabled
623  return false;
624  }
625 
626  encoder = new (std::nothrow) v1_encoder_t (out_batch_size);
628 
629  decoder = new (std::nothrow) v1_decoder_t (
632  }
633  else
634  if (greeting_recv [revision_pos] == ZMTP_2_0) {
635  if (session->zap_enabled ()) {
636  // reject ZMTP 2.0 connections if ZAP is enabled
638  return false;
639  }
640 
641  encoder = new (std::nothrow) v2_encoder_t (out_batch_size);
643 
644  decoder = new (std::nothrow) v2_decoder_t (
647  }
648  else {
649  encoder = new (std::nothrow) v2_encoder_t (out_batch_size);
651 
652  decoder = new (std::nothrow) v2_decoder_t (
655 
656  if (options.mechanism == ZMQ_NULL
657  && memcmp (greeting_recv + 12, "NULL\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20) == 0) {
658  mechanism = new (std::nothrow)
661  }
662  else
664  && memcmp (greeting_recv + 12, "PLAIN\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20) == 0) {
665  if (options.as_server)
666  mechanism = new (std::nothrow)
668  else
669  mechanism = new (std::nothrow)
672  }
673 #ifdef ZMQ_HAVE_CURVE
674  else
676  && memcmp (greeting_recv + 12, "CURVE\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20) == 0) {
677  if (options.as_server)
678  mechanism = new (std::nothrow)
679  curve_server_t (session, peer_address, options);
680  else
681  mechanism = new (std::nothrow) curve_client_t (options);
683  }
684 #endif
685 #ifdef HAVE_LIBGSSAPI_KRB5
686  else
688  && memcmp (greeting_recv + 12, "GSSAPI\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20) == 0) {
689  if (options.as_server)
690  mechanism = new (std::nothrow)
691  gssapi_server_t (session, peer_address, options);
692  else
693  mechanism = new (std::nothrow) gssapi_client_t (options);
695  }
696 #endif
697  else {
699  return false;
700  }
703  }
704 
705  // Start polling for output if necessary.
706  if (outsize == 0)
708 
709  // Handshaking was successful.
710  // Switch into the normal message flow.
711  handshaking = false;
712 
713  if (has_handshake_timer) {
715  has_handshake_timer = false;
716  }
717 
718  return true;
719 }
720 
722 {
723  int rc = msg_->init_size (options.identity_size);
724  errno_assert (rc == 0);
725  if (options.identity_size > 0)
726  memcpy (msg_->data (), options.identity, options.identity_size);
728  return 0;
729 }
730 
732 {
733  if (options.recv_identity) {
734  msg_->set_flags (msg_t::identity);
735  int rc = session->push_msg (msg_);
736  errno_assert (rc == 0);
737  }
738  else {
739  int rc = msg_->close ();
740  errno_assert (rc == 0);
741  rc = msg_->init ();
742  errno_assert (rc == 0);
743  }
744 
747  else
749 
750  return 0;
751 }
752 
754 {
755  zmq_assert (mechanism != NULL);
756 
757  if (mechanism->status () == mechanism_t::ready) {
758  mechanism_ready ();
759  return pull_and_encode (msg_);
760  }
761  else
762  if (mechanism->status () == mechanism_t::error) {
763  errno = EPROTO;
764  return -1;
765  }
766  else {
767  const int rc = mechanism->next_handshake_command (msg_);
768  if (rc == 0)
769  msg_->set_flags (msg_t::command);
770  return rc;
771  }
772 }
773 
775 {
776  zmq_assert (mechanism != NULL);
777  const int rc = mechanism->process_handshake_command (msg_);
778  if (rc == 0) {
780  mechanism_ready ();
781  else
782  if (mechanism->status () == mechanism_t::error) {
783  errno = EPROTO;
784  return -1;
785  }
786  if (output_stopped)
787  restart_output ();
788  }
789 
790  return rc;
791 }
792 
794 {
795  zmq_assert (mechanism != NULL);
796 
797  const int rc = mechanism->zap_msg_available ();
798  if (rc == -1) {
800  return;
801  }
802  if (input_stopped)
803  restart_input ();
804  if (output_stopped)
805  restart_output ();
806 }
807 
809 {
810  if (options.heartbeat_interval > 0) {
812  has_heartbeat_timer = true;
813  }
814 
815  if (options.recv_identity) {
816  msg_t identity;
817  mechanism->peer_identity (&identity);
818  const int rc = session->push_msg (&identity);
819  if (rc == -1 && errno == EAGAIN) {
820  // If the write is failing at this stage with
821  // an EAGAIN the pipe must be being shut down,
822  // so we can just bail out of the identity set.
823  return;
824  }
825  errno_assert (rc == 0);
826  session->flush ();
827  }
828 
831 
832  // Compile metadata.
833  properties_t properties;
834  init_properties(properties);
835 
836  // Add ZAP properties.
837  const properties_t& zap_properties = mechanism->get_zap_properties ();
838  properties.insert(zap_properties.begin (), zap_properties.end ());
839 
840  // Add ZMTP properties.
841  const properties_t& zmtp_properties = mechanism->get_zmtp_properties ();
842  properties.insert(zmtp_properties.begin (), zmtp_properties.end ());
843 
844  zmq_assert (metadata == NULL);
845  if (!properties.empty ())
846  metadata = new (std::nothrow) metadata_t (properties);
847 }
848 
850 {
851  return session->pull_msg (msg_);
852 }
853 
855 {
856  return session->push_msg (msg_);
857 }
858 
860  if (metadata && metadata != msg_->metadata())
861  msg_->set_metadata(metadata);
862  return push_msg_to_session(msg_);
863 }
864 
866 {
867  zmq_assert (mechanism != NULL);
868  zmq_assert (session != NULL);
869 
870  const blob_t credential = mechanism->get_user_id ();
871  if (credential.size () > 0) {
872  msg_t msg;
873  int rc = msg.init_size (credential.size ());
874  zmq_assert (rc == 0);
875  memcpy (msg.data (), credential.data (), credential.size ());
877  rc = session->push_msg (&msg);
878  if (rc == -1) {
879  rc = msg.close ();
880  errno_assert (rc == 0);
881  return -1;
882  }
883  }
885  return decode_and_push (msg_);
886 }
887 
889 {
890  zmq_assert (mechanism != NULL);
891 
892  if (session->pull_msg (msg_) == -1)
893  return -1;
894  if (mechanism->encode (msg_) == -1)
895  return -1;
896  return 0;
897 }
898 
900 {
901  zmq_assert (mechanism != NULL);
902 
903  if (mechanism->decode (msg_) == -1)
904  return -1;
905 
906  if(has_timeout_timer) {
907  has_timeout_timer = false;
909  }
910 
911  if(has_ttl_timer) {
912  has_ttl_timer = false;
914  }
915 
916  if(msg_->flags() & msg_t::command) {
917  uint8_t cmd_id = *((uint8_t*)msg_->data());
918  if(cmd_id == 4)
920  }
921 
922  if (metadata)
923  msg_->set_metadata (metadata);
924  if (session->push_msg (msg_) == -1) {
925  if (errno == EAGAIN)
927  return -1;
928  }
929  return 0;
930 }
931 
933 {
934  const int rc = session->push_msg (msg_);
935  if (rc == 0)
937  return rc;
938 }
939 
941 {
942  msg_t subscription;
943 
944  // Inject the subscription message, so that also
945  // ZMQ 2.x peers receive published messages.
946  int rc = subscription.init_size (1);
947  errno_assert (rc == 0);
948  *(unsigned char*) subscription.data () = 1;
949  rc = session->push_msg (&subscription);
950  if (rc == -1)
951  return -1;
952 
954  return push_msg_to_session (msg_);
955 }
956 
958 {
960  // For raw sockets, send a final 0-length message to the application
961  // so that it knows the peer has been disconnected.
962  msg_t terminator;
963  terminator.init();
964  (this->*process_msg) (&terminator);
965  terminator.close();
966  }
969  session->flush ();
970  session->engine_error (reason);
971  unplug ();
972  delete this;
973 }
974 
976 {
978 
979  if (!options.raw_socket && options.handshake_ivl > 0) {
981  has_handshake_timer = true;
982  }
983 }
984 
986  if (peer_address.empty()) return false;
987  properties.insert (std::make_pair("Peer-Address", peer_address));
988 
989  // Private property to support deprecated SRCFD
990  std::ostringstream stream;
991  stream << (int)s;
992  std::string fd_string = stream.str();
993  properties.insert(std::make_pair("__fd", fd_string));
994  return true;
995 }
996 
998 {
999  if(id_ == handshake_timer_id) {
1000  has_handshake_timer = false;
1001  // handshake timer expired before handshake completed, so engine fail
1002  error (timeout_error);
1003  }
1004  else if(id_ == heartbeat_ivl_timer_id) {
1006  out_event();
1008  }
1009  else if(id_ == heartbeat_ttl_timer_id) {
1010  has_ttl_timer = false;
1012  }
1013  else if(id_ == heartbeat_timeout_timer_id) {
1014  has_timeout_timer = false;
1016  }
1017  else
1018  // There are no other valid timer ids!
1019  assert(false);
1020 }
1021 
1023 {
1024  int rc = 0;
1025  zmq_assert (mechanism != NULL);
1026 
1027  // 16-bit TTL + \4PING == 7
1028  rc = msg_->init_size(7);
1029  errno_assert(rc == 0);
1030  msg_->set_flags(msg_t::command);
1031  // Copy in the command message
1032  memcpy(msg_->data(), "\4PING", 5);
1033 
1034  uint16_t ttl_val = htons(options.heartbeat_ttl);
1035  memcpy(((uint8_t*)msg_->data()) + 5, &ttl_val, sizeof(ttl_val));
1036 
1037  rc = mechanism->encode (msg_);
1039  if(!has_timeout_timer && heartbeat_timeout > 0) {
1041  has_timeout_timer = true;
1042  }
1043  return rc;
1044 }
1045 
1047 {
1048  int rc = 0;
1049  zmq_assert (mechanism != NULL);
1050 
1051  rc = msg_->init_size(5);
1052  errno_assert(rc == 0);
1053  msg_->set_flags(msg_t::command);
1054 
1055  memcpy(msg_->data(), "\4PONG", 5);
1056 
1057  rc = mechanism->encode (msg_);
1059  return rc;
1060 }
1061 
1063 {
1064  if(memcmp(msg_->data(), "\4PING", 5) == 0) {
1065  uint16_t remote_heartbeat_ttl;
1066  // Get the remote heartbeat TTL to setup the timer
1067  memcpy(&remote_heartbeat_ttl, (uint8_t*)msg_->data() + 5, 2);
1068  remote_heartbeat_ttl = ntohs(remote_heartbeat_ttl);
1069  // The remote heartbeat is in 10ths of a second
1070  // so we multiply it by 100 to get the timer interval in ms.
1071  remote_heartbeat_ttl *= 100;
1072 
1073  if(!has_ttl_timer && remote_heartbeat_ttl > 0) {
1074  add_timer(remote_heartbeat_ttl, heartbeat_ttl_timer_id);
1075  has_ttl_timer = true;
1076  }
1077 
1079  out_event();
1080  }
1081 
1082  return 0;
1083 }
#define size
#define LIBZMQ_DELETE(p_object)
Definition: macros.hpp:7
static const size_t v3_greeting_size
int process_handshake_command(msg_t *msg)
unsigned char greeting_send[v3_greeting_size]
int close()
Definition: msg.cpp:217
int heartbeat_timeout
Definition: options.hpp:225
void set_pollout(handle_t handle_)
Definition: io_object.cpp:84
#define ZMQ_PLAIN
Definition: zmq.h:350
int produce_ping_message(msg_t *msg_)
mechanism_t * mechanism
int process_identity_msg(msg_t *msg_)
void plug(zmq::io_thread_t *io_thread_)
Definition: io_object.cpp:46
void unblock_socket(fd_t s_)
Definition: ip.cpp:84
#define zmq_assert(x)
Definition: err.hpp:119
virtual int push_msg(msg_t *msg_)
virtual msg_t * msg()=0
#define ZMQ_XPUB
Definition: zmq.h:255
void cancel_timer(int id_)
Definition: io_object.cpp:99
bool recv_identity
Definition: options.hpp:146
int64_t maxmsgsize
Definition: options.hpp:124
int pull_msg_from_session(msg_t *msg_)
unsigned char * outpos
#define ZMQ_CURVE
Definition: zmq.h:351
int decode_and_push(msg_t *msg_)
bool drop_ref()
Definition: metadata.cpp:53
int produce_pong_message(msg_t *msg_)
void engine_error(zmq::stream_engine_t::error_reason_t reason)
virtual int process_handshake_command(msg_t *msg_)=0
#define ZMQ_PUB
Definition: zmq.h:247
zmq::socket_base_t * socket
stream_engine_t(fd_t fd_, const options_t &options_, const std::string &endpoint)
void plug(zmq::io_thread_t *io_thread_, zmq::session_base_t *session_)
metadata_t * metadata
Definition: msg.hpp:175
unsigned char * inpos
int init_size(size_t size_)
Definition: msg.cpp:93
virtual status_t status() const =0
int(stream_engine_t::* process_msg)(msg_t *msg_)
void set_pollin(handle_t handle_)
Definition: io_object.cpp:74
int(stream_engine_t::* next_msg)(msg_t *msg_)
void error(error_reason_t reason)
int write_credential(msg_t *msg_)
unsigned int greeting_bytes_read
int get_peer_ip_address(fd_t sockfd_, std::string &ip_addr_)
Definition: ip.cpp:123
#define unlikely(x)
Definition: likely.hpp:38
virtual void get_buffer(unsigned char **data_, size_t *size_)=0
int init()
Definition: msg.cpp:82
#define ZMQ_NULL
Definition: zmq.h:349
virtual int encode(msg_t *)
Definition: mechanism.hpp:66
unsigned char identity[256]
Definition: options.hpp:74
std::basic_string< unsigned char > blob_t
Definition: blob.hpp:134
virtual int pull_msg(msg_t *msg_)
metadata_t::dict_t properties_t
#define EPROTO
Definition: err.hpp:58
void reset_pollout(handle_t handle_)
Definition: io_object.cpp:89
virtual int decode(const unsigned char *data_, size_t size_, size_t &processed)=0
void add_timer(int timout_, int id_)
Definition: io_object.cpp:94
int next_handshake_command(msg_t *msg)
bool init_properties(properties_t &properties)
void peer_identity(msg_t *msg_)
Definition: mechanism.cpp:53
#define ZMQ_GSSAPI
Definition: zmq.h:352
int identity_msg(msg_t *msg_)
void put_uint64(unsigned char *buffer_, uint64_t value)
Definition: wire.hpp:81
int push_raw_msg_to_session(msg_t *msg)
virtual int zap_msg_available()
Definition: mechanism.hpp:71
void reset_pollin(handle_t handle_)
Definition: io_object.cpp:79
int tcp_write(fd_t s_, const void *data_, size_t size_)
Definition: tcp.cpp:187
void set_flags(unsigned char flags_)
Definition: msg.cpp:384
void timer_event(int id_)
void event_disconnected(const std::string &addr_, int fd_)
#define alloc_assert(x)
Definition: err.hpp:159
unsigned char greeting_recv[v3_greeting_size]
#define errno_assert(x)
Definition: err.hpp:129
int push_msg_to_session(msg_t *msg)
int process_heartbeat_message(msg_t *msg_)
virtual void load_msg(msg_t *msg_)=0
unsigned char data[max_vsm_size]
Definition: msg.hpp:187
virtual int next_handshake_command(msg_t *msg_)=0
static char encoder[85+1]
Definition: zmq_utils.cpp:95
handle_t add_fd(fd_t fd_)
Definition: io_object.cpp:64
virtual int decode(msg_t *)
Definition: mechanism.hpp:68
virtual void resize_buffer(size_t)=0
zmq::session_base_t * session
uint16_t heartbeat_ttl
Definition: options.hpp:221
#define likely(x)
Definition: likely.hpp:37
virtual size_t encode(unsigned char **data_, size_t size)=0
int heartbeat_interval
Definition: options.hpp:223
unsigned char identity_size
Definition: options.hpp:73
int write_subscription_msg(msg_t *msg_)
int push_one_then_decode_and_push(msg_t *msg_)
void rm_fd(handle_t handle_)
Definition: io_object.cpp:69
static const size_t signature_size
blob_t get_user_id() const
Definition: mechanism.cpp:69
int pull_and_encode(msg_t *msg_)
unsigned char flags
Definition: msg.hpp:181
int tcp_read(fd_t s_, void *data_, size_t size_)
Definition: tcp.cpp:248
const metadata_t::dict_t & get_zap_properties()
Definition: mechanism.hpp:88
const metadata_t::dict_t & get_zmtp_properties()
Definition: mechanism.hpp:84
static uint8_t decoder[96]
Definition: zmq_utils.cpp:103
void set_metadata(metadata_t *metadata_)
Definition: msg.cpp:399