LCOV - code coverage report
Current view: top level - home/h/core/forks/m4-libzmq/src - stream_engine.cpp (source / functions) Hit Total Coverage
Test: zeromq-4.2.0 Code Coverage Lines: 432 515 83.9 %
Date: 2016-05-09 Functions: 28 32 87.5 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*
       2             :     Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
       3             : 
       4             :     This file is part of libzmq, the ZeroMQ core engine in C++.
       5             : 
       6             :     libzmq is free software; you can redistribute it and/or modify it under
       7             :     the terms of the GNU Lesser General Public License (LGPL) as published
       8             :     by the Free Software Foundation; either version 3 of the License, or
       9             :     (at your option) any later version.
      10             : 
      11             :     As a special exception, the Contributors give you permission to link
      12             :     this library with independent modules to produce an executable,
      13             :     regardless of the license terms of these independent modules, and to
      14             :     copy and distribute the resulting executable under terms of your choice,
      15             :     provided that you also meet, for each linked independent module, the
      16             :     terms and conditions of the license of that module. An independent
      17             :     module is a module which is not derived from or based on this library.
      18             :     If you modify this library, you must extend this exception to your
      19             :     version of the library.
      20             : 
      21             :     libzmq is distributed in the hope that it will be useful, but WITHOUT
      22             :     ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
      23             :     FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
      24             :     License for more details.
      25             : 
      26             :     You should have received a copy of the GNU Lesser General Public License
      27             :     along with this program.  If not, see <http://www.gnu.org/licenses/>.
      28             : */
      29             : 
      30             : #include "precompiled.hpp"
      31             : #include "macros.hpp"
      32             : #include "platform.hpp"
      33             : 
      34             : #include <string.h>
      35             : #include <new>
      36             : #include <sstream>
      37             : 
      38             : #include "stream_engine.hpp"
      39             : #include "io_thread.hpp"
      40             : #include "session_base.hpp"
      41             : #include "v1_encoder.hpp"
      42             : #include "v1_decoder.hpp"
      43             : #include "v2_encoder.hpp"
      44             : #include "v2_decoder.hpp"
      45             : #include "null_mechanism.hpp"
      46             : #include "plain_client.hpp"
      47             : #include "plain_server.hpp"
      48             : #include "gssapi_client.hpp"
      49             : #include "gssapi_server.hpp"
      50             : #include "curve_client.hpp"
      51             : #include "curve_server.hpp"
      52             : #include "raw_decoder.hpp"
      53             : #include "raw_encoder.hpp"
      54             : #include "config.hpp"
      55             : #include "err.hpp"
      56             : #include "ip.hpp"
      57             : #include "tcp.hpp"
      58             : #include "likely.hpp"
      59             : #include "wire.hpp"
      60             : 
      61        6762 : zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_,
      62             :                                        const std::string &endpoint_) :
      63             :     s (fd_),
      64             :     as_server(false),
      65             :     handle(NULL),
      66             :     inpos (NULL),
      67             :     insize (0),
      68             :     decoder (NULL),
      69             :     outpos (NULL),
      70             :     outsize (0),
      71             :     encoder (NULL),
      72             :     metadata (NULL),
      73             :     handshaking (true),
      74             :     greeting_size (v2_greeting_size),
      75             :     greeting_bytes_read (0),
      76             :     session (NULL),
      77             :     options (options_),
      78             :     endpoint (endpoint_),
      79             :     plugged (false),
      80             :     next_msg (&stream_engine_t::identity_msg),
      81             :     process_msg (&stream_engine_t::process_identity_msg),
      82             :     io_error (false),
      83             :     subscription_required (false),
      84             :     mechanism (NULL),
      85             :     input_stopped (false),
      86             :     output_stopped (false),
      87             :     has_handshake_timer (false),
      88             :     has_ttl_timer (false),
      89             :     has_timeout_timer (false),
      90             :     has_heartbeat_timer (false),
      91             :     heartbeat_timeout (0),
      92       13524 :     socket (NULL)
      93             : {
      94        6762 :     int rc = tx_msg.init ();
      95        6761 :     errno_assert (rc == 0);
      96             : 
      97             :     //  Put the socket into non-blocking mode.
      98        6761 :     unblock_socket (s);
      99             : 
     100        6761 :     int family = get_peer_ip_address (s, peer_address);
     101        6761 :     if (family == 0)
     102           0 :         peer_address.clear();
     103             : #if defined ZMQ_HAVE_SO_PEERCRED
     104             :     else
     105        6761 :     if (family == PF_UNIX) {
     106             :         struct ucred cred;
     107         117 :         socklen_t size = sizeof (cred);
     108         117 :         if (!getsockopt (s, SOL_SOCKET, SO_PEERCRED, &cred, &size)) {
     109         117 :             std::ostringstream buf;
     110         351 :             buf << ":" << cred.uid << ":" << cred.gid << ":" << cred.pid;
     111         234 :             peer_address += buf.str ();
     112             :         }
     113             :     }
     114             : #elif defined ZMQ_HAVE_LOCAL_PEERCRED
     115             :     else
     116             :     if (family == PF_UNIX) {
     117             :         struct xucred cred;
     118             :         socklen_t size = sizeof (cred);
     119             :         if (!getsockopt (s, 0, LOCAL_PEERCRED, &cred, &size)
     120             :                 && cred.cr_version == XUCRED_VERSION) {
     121             :             std::ostringstream buf;
     122             :             buf << ":" << cred.cr_uid << ":";
     123             :             if (cred.cr_ngroups > 0)
     124             :                 buf << cred.cr_groups[0];
     125             :             buf << ":";
     126             :             peer_address += buf.str ();
     127             :         }
     128             :     }
     129             : #endif
     130             : 
     131             : #ifdef SO_NOSIGPIPE
     132             :     //  Make sure that SIGPIPE signal is not generated when writing to a
     133             :     //  connection that was already closed by the peer.
     134             :     int set = 1;
     135             :     rc = setsockopt (s, SOL_SOCKET, SO_NOSIGPIPE, &set, sizeof (int));
     136             :     errno_assert (rc == 0);
     137             : #endif
     138        6761 :     if(options.heartbeat_interval > 0) {
     139          15 :         heartbeat_timeout = options.heartbeat_timeout;
     140          15 :         if(heartbeat_timeout == -1)
     141          15 :             heartbeat_timeout = options.heartbeat_interval;
     142             :     }
     143        6761 : }
     144             : 
     145       40570 : zmq::stream_engine_t::~stream_engine_t ()
     146             : {
     147        6762 :     zmq_assert (!plugged);
     148             : 
     149        6762 :     if (s != retired_fd) {
     150             : #ifdef ZMQ_HAVE_WINDOWS
     151             :         int rc = closesocket (s);
     152             :         wsa_assert (rc != SOCKET_ERROR);
     153             : #else
     154        6762 :         int rc = close (s);
     155        6762 :         errno_assert (rc == 0);
     156             : #endif
     157        6762 :         s = retired_fd;
     158             :     }
     159             : 
     160        6762 :     int rc = tx_msg.close ();
     161        6762 :     errno_assert (rc == 0);
     162             : 
     163             :     //  Drop reference to metadata and destroy it if we are
     164             :     //  the only user.
     165        6762 :     if (metadata != NULL) {
     166        3801 :         if (metadata->drop_ref ()) {
     167        7478 :             LIBZMQ_DELETE(metadata);
     168             :         }
     169             :     }
     170             : 
     171        6762 :     LIBZMQ_DELETE(encoder);
     172        6762 :     LIBZMQ_DELETE(decoder);
     173        6762 :     LIBZMQ_DELETE(mechanism);
     174       13524 : }
     175             : 
     176        6762 : void zmq::stream_engine_t::plug (io_thread_t *io_thread_,
     177             :     session_base_t *session_)
     178             : {
     179        6762 :     zmq_assert (!plugged);
     180        6762 :     plugged = true;
     181             : 
     182             :     //  Connect to session object.
     183        6762 :     zmq_assert (!session);
     184        6762 :     zmq_assert (session_);
     185        6762 :     session = session_;
     186        6762 :     socket = session-> get_socket ();
     187             : 
     188             :     //  Connect to I/O threads poller object.
     189        6762 :     io_object_t::plug (io_thread_);
     190        6762 :     handle = add_fd (s);
     191        6762 :     io_error = false;
     192             : 
     193        6762 :     if (options.raw_socket) {
     194             :         // no handshaking for raw sock, instantiate raw encoder and decoders
     195          33 :         encoder = new (std::nothrow) raw_encoder_t (out_batch_size);
     196          33 :         alloc_assert (encoder);
     197             : 
     198          33 :         decoder = new (std::nothrow) raw_decoder_t (in_batch_size);
     199          33 :         alloc_assert (decoder);
     200             : 
     201             :         // disable handshaking for raw socket
     202          33 :         handshaking = false;
     203             : 
     204          33 :         next_msg = &stream_engine_t::pull_msg_from_session;
     205          33 :         process_msg = &stream_engine_t::push_raw_msg_to_session;
     206             : 
     207             :         properties_t properties;
     208          33 :         if (init_properties(properties)) {
     209             :             //  Compile metadata.
     210          33 :             zmq_assert (metadata == NULL);
     211          33 :             metadata = new (std::nothrow) metadata_t (properties);
     212             :         }
     213             : 
     214          33 :         if (options.raw_notify) {
     215             :             //  For raw sockets, send an initial 0-length message to the
     216             :             // application so that it knows a peer has connected.
     217             :             msg_t connector;
     218          30 :             connector.init();
     219          30 :             push_raw_msg_to_session (&connector);
     220          30 :             connector.close();
     221          30 :             session->flush ();
     222             :         }
     223             :     }
     224             :     else {
     225             :         // start optional timer, to prevent handshake hanging on no input
     226        6729 :         set_handshake_timer ();
     227             : 
     228             :         //  Send the 'length' and 'flags' fields of the identity message.
     229             :         //  The 'length' field is encoded in the long format.
     230        6728 :         outpos = greeting_send;
     231        6728 :         outpos [outsize++] = 0xff;
     232        6728 :         put_uint64 (&outpos [outsize], options.identity_size + 1);
     233        6728 :         outsize += 8;
     234        6728 :         outpos [outsize++] = 0x7f;
     235             :     }
     236             : 
     237        6761 :     set_pollin (handle);
     238        6762 :     set_pollout (handle);
     239             :     //  Flush all the data that may have been already received downstream.
     240        6762 :     in_event ();
     241        6762 : }
     242             : 
     243        6762 : void zmq::stream_engine_t::unplug ()
     244             : {
     245        6762 :     zmq_assert (plugged);
     246        6761 :     plugged = false;
     247             : 
     248             :     //  Cancel all timers.
     249        6761 :     if (has_handshake_timer) {
     250        2406 :         cancel_timer (handshake_timer_id);
     251        2406 :         has_handshake_timer = false;
     252             :     }
     253             : 
     254        6761 :     if (has_ttl_timer) {
     255           0 :         cancel_timer (heartbeat_ttl_timer_id);
     256           0 :         has_ttl_timer = false;
     257             :     }
     258             : 
     259        6761 :     if (has_timeout_timer) {
     260           0 :         cancel_timer (heartbeat_timeout_timer_id);
     261           0 :         has_timeout_timer = false;
     262             :     }
     263             : 
     264        6761 :     if (has_heartbeat_timer) {
     265          15 :         cancel_timer (heartbeat_ivl_timer_id);
     266          15 :         has_heartbeat_timer = false;
     267             :     }
     268             :     //  Cancel all fd subscriptions.
     269        6761 :     if (!io_error)
     270        6761 :         rm_fd (handle);
     271             : 
     272             :     //  Disconnect from I/O threads poller object.
     273        6762 :     io_object_t::unplug ();
     274             : 
     275        6762 :     session = NULL;
     276        6762 : }
     277             : 
     278        4361 : void zmq::stream_engine_t::terminate ()
     279             : {
     280        4361 :     unplug ();
     281        4361 :     delete this;
     282        4361 : }
     283             : 
     284       19704 : void zmq::stream_engine_t::in_event ()
     285             : {
     286       19704 :     zmq_assert (!io_error);
     287             : 
     288             :     //  If still handshaking, receive and process the greeting message.
     289       19704 :     if (unlikely (handshaking))
     290       14912 :         if (!handshake ())
     291       14258 :             return;
     292             : 
     293        9109 :     zmq_assert (decoder);
     294             : 
     295             :     //  If there has been an I/O error, stop polling.
     296        9109 :     if (input_stopped) {
     297           0 :         rm_fd (handle);
     298           0 :         io_error = true;
     299           0 :         return;
     300             :     }
     301             : 
     302             :     //  If there's no data to process in the buffer...
     303        9109 :     if (!insize) {
     304             : 
     305             :         //  Retrieve the buffer and read as much data as possible.
     306             :         //  Note that buffer can be arbitrarily large. However, we assume
     307             :         //  the underlying TCP layer has fixed buffer size and thus the
     308             :         //  number of bytes read will be always limited.
     309        9109 :         size_t bufsize = 0;
     310        9109 :         decoder->get_buffer (&inpos, &bufsize);
     311             : 
     312        9109 :         const int rc = tcp_read (s, inpos, bufsize);
     313             : 
     314        9109 :         if (rc == 0) {
     315         942 :             error (connection_error);
     316        3618 :             return;
     317             :         }
     318        8167 :         if (rc == -1) {
     319        2676 :             if (errno != EAGAIN)
     320          47 :                 error (connection_error);
     321             :             return;
     322             :         }
     323             : 
     324             :         //  Adjust input size
     325        5491 :         insize = static_cast <size_t> (rc);
     326             :         // Adjust buffer size to received bytes
     327        5491 :         decoder->resize_buffer(insize);
     328             :     }
     329             : 
     330        5490 :     int rc = 0;
     331        5490 :     size_t processed = 0;
     332             : 
     333      107459 :     while (insize > 0) {
     334       96768 :         rc = decoder->decode (inpos, insize, processed);
     335       96769 :         zmq_assert (processed <= insize);
     336       96769 :         inpos += processed;
     337       96769 :         insize -= processed;
     338       96769 :         if (rc == 0 || rc == -1)
     339             :             break;
     340       96763 :         rc = (this->*process_msg) (decoder->msg ());
     341       96763 :         if (rc == -1)
     342             :             break;
     343             :     }
     344             : 
     345             :     //  Tear down the connection if we have failed to decode input data
     346             :     //  or the session has rejected the message.
     347        5491 :     if (rc == -1) {
     348         284 :         if (errno != EAGAIN) {
     349          45 :             error (protocol_error);
     350             :             return;
     351             :         }
     352         239 :         input_stopped = true;
     353         239 :         reset_pollin (handle);
     354             :     }
     355             : 
     356        5446 :     session->flush ();
     357             : }
     358             : 
     359       30876 : void zmq::stream_engine_t::out_event ()
     360             : {
     361       30876 :     zmq_assert (!io_error);
     362             : 
     363             :     //  If write buffer is empty, try to read new data from the encoder.
     364       30878 :     if (!outsize) {
     365             : 
     366             :         //  Even when we stop polling as soon as there is no
     367             :         //  data to send, the poller may invoke out_event one
     368             :         //  more time due to 'speculative write' optimisation.
     369       20035 :         if (unlikely (encoder == NULL)) {
     370           0 :             zmq_assert (handshaking);
     371             :             return;
     372             :         }
     373             : 
     374       20035 :         outpos = NULL;
     375       20035 :         outsize = encoder->encode (&outpos, 0);
     376             : 
     377      679449 :         while (outsize < (size_t) out_batch_size) {
     378      659323 :             if ((this->*next_msg) (&tx_msg) == -1)
     379             :                 break;
     380      639379 :             encoder->load_msg (&tx_msg);
     381      639379 :             unsigned char *bufptr = outpos + outsize;
     382      639379 :             size_t n = encoder->encode (&bufptr, out_batch_size - outsize);
     383      639379 :             zmq_assert (n > 0);
     384      639379 :             if (outpos == NULL)
     385       11008 :                 outpos = bufptr;
     386      639379 :             outsize += n;
     387             :         }
     388             : 
     389             :         //  If there is no data to send, stop polling for output.
     390       20037 :         if (outsize == 0) {
     391        8948 :             output_stopped = true;
     392        8948 :             reset_pollout (handle);
     393        8948 :             return;
     394             :         }
     395             :     }
     396             : 
     397             :     //  If there are any data to write in write buffer, write as much as
     398             :     //  possible to the socket. Note that amount of data to write can be
     399             :     //  arbitrarily large. However, we assume that underlying TCP layer has
     400             :     //  limited transmission buffer and thus the actual number of bytes
     401             :     //  written should be reasonably modest.
     402       21932 :     const int nbytes = tcp_write (s, outpos, outsize);
     403             : 
     404             :     //  IO error has occurred. We stop waiting for output events.
     405             :     //  The engine is not terminated until we detect input error;
     406             :     //  this is necessary to prevent losing incoming messages.
     407       21928 :     if (nbytes == -1) {
     408          12 :         reset_pollout (handle);
     409          12 :         return;
     410             :     }
     411             : 
     412       21916 :     outpos += nbytes;
     413       21916 :     outsize -= nbytes;
     414             : 
     415             :     //  If we are still handshaking and there are no data
     416             :     //  to send, stop polling for output.
     417       21916 :     if (unlikely (handshaking))
     418        8986 :         if (outsize == 0)
     419        8986 :             reset_pollout (handle);
     420             : }
     421             : 
     422       10065 : void zmq::stream_engine_t::restart_output ()
     423             : {
     424       10065 :     if (unlikely (io_error))
     425       10065 :         return;
     426             : 
     427       10065 :     if (likely (output_stopped)) {
     428        4840 :         set_pollout (handle);
     429        4840 :         output_stopped = false;
     430             :     }
     431             : 
     432             :     //  Speculative write: The assumption is that at the moment new message
     433             :     //  was sent by the user the socket is probably available for writing.
     434             :     //  Thus we try to write the data to socket avoiding polling for POLLOUT.
     435             :     //  Consequently, the latency should be better in request/reply scenarios.
     436       10065 :     out_event ();
     437             : }
     438             : 
     439        1191 : void zmq::stream_engine_t::restart_input ()
     440             : {
     441        1191 :     zmq_assert (input_stopped);
     442        1191 :     zmq_assert (session != NULL);
     443        1191 :     zmq_assert (decoder != NULL);
     444             : 
     445        1191 :     int rc = (this->*process_msg) (decoder->msg ());
     446        1191 :     if (rc == -1) {
     447           0 :         if (errno == EAGAIN)
     448           0 :             session->flush ();
     449             :         else
     450           0 :             error (protocol_error);
     451        1191 :         return;
     452             :     }
     453             : 
     454      542715 :     while (insize > 0) {
     455      542629 :         size_t processed = 0;
     456      542629 :         rc = decoder->decode (inpos, insize, processed);
     457      542629 :         zmq_assert (processed <= insize);
     458      542629 :         inpos += processed;
     459      542629 :         insize -= processed;
     460      542629 :         if (rc == 0 || rc == -1)
     461             :             break;
     462      542489 :         rc = (this->*process_msg) (decoder->msg ());
     463      542489 :         if (rc == -1)
     464             :             break;
     465             :     }
     466             : 
     467        1191 :     if (rc == -1 && errno == EAGAIN)
     468         965 :         session->flush ();
     469             :     else
     470         226 :     if (io_error)
     471           0 :         error (connection_error);
     472             :     else
     473         226 :     if (rc == -1)
     474           0 :         error (protocol_error);
     475             :     else {
     476         226 :         input_stopped = false;
     477         226 :         set_pollin (handle);
     478         226 :         session->flush ();
     479             : 
     480             :         //  Speculative read.
     481         226 :         in_event ();
     482             :     }
     483             : }
     484             : 
     485       14912 : bool zmq::stream_engine_t::handshake ()
     486             : {
     487       14912 :     zmq_assert (handshaking);
     488       14909 :     zmq_assert (greeting_bytes_read < greeting_size);
     489             :     //  Receive the greeting.
     490       26996 :     while (greeting_bytes_read < greeting_size) {
     491       22674 :         const int n = tcp_read (s, greeting_recv + greeting_bytes_read,
     492       45348 :                                 greeting_size - greeting_bytes_read);
     493       22676 :         if (n == 0) {
     494         898 :             error (connection_error);
     495         898 :             return false;
     496             :         }
     497       21778 :         if (n == -1) {
     498        9681 :             if (errno != EAGAIN)
     499         442 :                 error (connection_error);
     500             :             return false;
     501             :         }
     502             : 
     503       12097 :         greeting_bytes_read += n;
     504             : 
     505             :         //  We have received at least one byte from the peer.
     506             :         //  If the first byte is not 0xff, we know that the
     507             :         //  peer is using unversioned protocol.
     508       12097 :         if (greeting_recv [0] != 0xff)
     509             :             break;
     510             : 
     511       12088 :         if (greeting_bytes_read < signature_size)
     512             :             continue;
     513             : 
     514             :         //  Inspect the right-most bit of the 10th byte (which coincides
     515             :         //  with the 'flags' field if a regular message was sent).
     516             :         //  Zero indicates this is a header of identity message
     517             :         //  (i.e. the peer is using the unversioned protocol).
     518       12088 :         if (!(greeting_recv [9] & 0x01))
     519             :             break;
     520             : 
     521             :         //  The peer is using versioned protocol.
     522             :         //  Send the major version number.
     523       12089 :         if (outpos + outsize == greeting_send + signature_size) {
     524        5235 :             if (outsize == 0)
     525        2656 :                 set_pollout (handle);
     526        5233 :             outpos [outsize++] = 3;     //  Major version number
     527             :         }
     528             : 
     529       12087 :         if (greeting_bytes_read > signature_size) {
     530        8815 :             if (outpos + outsize == greeting_send + signature_size + 1) {
     531        4492 :                 if (outsize == 0)
     532        2530 :                     set_pollout (handle);
     533             : 
     534             :                 //  Use ZMTP/2.0 to talk to older peers.
     535        4492 :                 if (greeting_recv [10] == ZMTP_1_0
     536             :                 ||  greeting_recv [10] == ZMTP_2_0)
     537           0 :                     outpos [outsize++] = options.type;
     538             :                 else {
     539        4492 :                     outpos [outsize++] = 0; //  Minor version number
     540        4492 :                     memset (outpos + outsize, 0, 20);
     541             : 
     542        4492 :                     zmq_assert (options.mechanism == ZMQ_NULL
     543             :                             ||  options.mechanism == ZMQ_PLAIN
     544             :                             ||  options.mechanism == ZMQ_CURVE
     545             :                             ||  options.mechanism == ZMQ_GSSAPI);
     546             : 
     547        4492 :                     if (options.mechanism == ZMQ_NULL)
     548        4381 :                         memcpy (outpos + outsize, "NULL", 4);
     549             :                     else
     550         111 :                     if (options.mechanism == ZMQ_PLAIN)
     551          18 :                         memcpy (outpos + outsize, "PLAIN", 5);
     552             :                     else
     553          93 :                     if (options.mechanism == ZMQ_GSSAPI)
     554           0 :                         memcpy (outpos + outsize, "GSSAPI", 6);
     555             :                     else
     556          93 :                     if (options.mechanism == ZMQ_CURVE)
     557          93 :                         memcpy (outpos + outsize, "CURVE", 5);
     558        4492 :                     outsize += 20;
     559        4492 :                     memset (outpos + outsize, 0, 32);
     560        4492 :                     outsize += 32;
     561        4492 :                     greeting_size = v3_greeting_size;
     562             :                 }
     563             :             }
     564             :         }
     565             :     }
     566             : 
     567             :     //  Position of the revision field in the greeting.
     568        4330 :     const size_t revision_pos = 10;
     569             : 
     570             :     //  Is the peer using ZMTP/1.0 with no revision number?
     571             :     //  If so, we send and receive rest of identity message
     572        4330 :     if (greeting_recv [0] != 0xff || !(greeting_recv [9] & 0x01)) {
     573           7 :         if (session->zap_enabled ()) {
     574             :            // reject ZMTP 1.0 connections if ZAP is enabled
     575           9 :            error (protocol_error);
     576           9 :            return false;
     577             :         }
     578             : 
     579           0 :         encoder = new (std::nothrow) v1_encoder_t (out_batch_size);
     580           0 :         alloc_assert (encoder);
     581             : 
     582           0 :         decoder = new (std::nothrow) v1_decoder_t (in_batch_size, options.maxmsgsize);
     583           0 :         alloc_assert (decoder);
     584             : 
     585             :         //  We have already sent the message header.
     586             :         //  Since there is no way to tell the encoder to
     587             :         //  skip the message header, we simply throw that
     588             :         //  header data away.
     589           0 :         const size_t header_size = options.identity_size + 1 >= 255 ? 10 : 2;
     590           0 :         unsigned char tmp [10], *bufferp = tmp;
     591             : 
     592             :         //  Prepare the identity message and load it into encoder.
     593             :         //  Then consume bytes we have already sent to the peer.
     594           0 :         const int rc = tx_msg.init_size (options.identity_size);
     595           0 :         zmq_assert (rc == 0);
     596           0 :         memcpy (tx_msg.data (), options.identity, options.identity_size);
     597           0 :         encoder->load_msg (&tx_msg);
     598           0 :         size_t buffer_size = encoder->encode (&bufferp, header_size);
     599           0 :         zmq_assert (buffer_size == header_size);
     600             : 
     601             :         //  Make sure the decoder sees the data we have already received.
     602           0 :         inpos = greeting_recv;
     603           0 :         insize = greeting_bytes_read;
     604             : 
     605             :         //  To allow for interoperability with peers that do not forward
     606             :         //  their subscriptions, we inject a phantom subscription message
     607             :         //  message into the incoming message stream.
     608           0 :         if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB)
     609           0 :             subscription_required = true;
     610             : 
     611             :         //  We are sending our identity now and the next message
     612             :         //  will come from the socket.
     613           0 :         next_msg = &stream_engine_t::pull_msg_from_session;
     614             : 
     615             :         //  We are expecting identity message.
     616           0 :         process_msg = &stream_engine_t::process_identity_msg;
     617             :     }
     618             :     else
     619        4323 :     if (greeting_recv [revision_pos] == ZMTP_1_0) {
     620           0 :         if (session->zap_enabled ()) {
     621             :            // reject ZMTP 1.0 connections if ZAP is enabled
     622           0 :            error (protocol_error);
     623           0 :            return false;
     624             :         }
     625             : 
     626           0 :         encoder = new (std::nothrow) v1_encoder_t (out_batch_size);
     627           0 :         alloc_assert (encoder);
     628             : 
     629             :         decoder = new (std::nothrow) v1_decoder_t (
     630           0 :             in_batch_size, options.maxmsgsize);
     631           0 :         alloc_assert (decoder);
     632             :     }
     633             :     else
     634        4323 :     if (greeting_recv [revision_pos] == ZMTP_2_0) {
     635           0 :         if (session->zap_enabled ()) {
     636             :            // reject ZMTP 2.0 connections if ZAP is enabled
     637           0 :            error (protocol_error);
     638           0 :            return false;
     639             :         }
     640             : 
     641           0 :         encoder = new (std::nothrow) v2_encoder_t (out_batch_size);
     642           0 :         alloc_assert (encoder);
     643             : 
     644             :         decoder = new (std::nothrow) v2_decoder_t (
     645           0 :             in_batch_size, options.maxmsgsize);
     646           0 :         alloc_assert (decoder);
     647             :     }
     648             :     else {
     649        4323 :         encoder = new (std::nothrow) v2_encoder_t (out_batch_size);
     650        4323 :         alloc_assert (encoder);
     651             : 
     652             :         decoder = new (std::nothrow) v2_decoder_t (
     653        4323 :                 in_batch_size, options.maxmsgsize);
     654        4323 :         alloc_assert (decoder);
     655             : 
     656        4323 :         if (options.mechanism == ZMQ_NULL
     657        4212 :         &&  memcmp (greeting_recv + 12, "NULL\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20) == 0) {
     658             :             mechanism = new (std::nothrow)
     659        4209 :                 null_mechanism_t (session, peer_address, options);
     660        4209 :             alloc_assert (mechanism);
     661             :         }
     662             :         else
     663         114 :         if (options.mechanism == ZMQ_PLAIN
     664          18 :         &&  memcmp (greeting_recv + 12, "PLAIN\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20) == 0) {
     665          18 :             if (options.as_server)
     666             :                 mechanism = new (std::nothrow)
     667          12 :                     plain_server_t (session, peer_address, options);
     668             :             else
     669             :                 mechanism = new (std::nothrow)
     670           6 :                     plain_client_t (options);
     671          18 :             alloc_assert (mechanism);
     672             :         }
     673             : #ifdef ZMQ_HAVE_CURVE
     674             :         else
     675          96 :         if (options.mechanism == ZMQ_CURVE
     676          93 :         &&  memcmp (greeting_recv + 12, "CURVE\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20) == 0) {
     677          90 :             if (options.as_server)
     678             :                 mechanism = new (std::nothrow)
     679          45 :                     curve_server_t (session, peer_address, options);
     680             :             else
     681          45 :                 mechanism = new (std::nothrow) curve_client_t (options);
     682          90 :             alloc_assert (mechanism);
     683             :         }
     684             : #endif
     685             : #ifdef HAVE_LIBGSSAPI_KRB5
     686             :         else
     687             :         if (options.mechanism == ZMQ_GSSAPI
     688             :         &&  memcmp (greeting_recv + 12, "GSSAPI\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20) == 0) {
     689             :             if (options.as_server)
     690             :                 mechanism = new (std::nothrow)
     691             :                     gssapi_server_t (session, peer_address, options);
     692             :             else
     693             :                 mechanism = new (std::nothrow) gssapi_client_t (options);
     694             :             alloc_assert (mechanism);
     695             :         }
     696             : #endif
     697             :         else {
     698           6 :             error (protocol_error);
     699           6 :             return false;
     700             :         }
     701        4317 :         next_msg = &stream_engine_t::next_handshake_command;
     702        4317 :         process_msg = &stream_engine_t::process_handshake_command;
     703             :     }
     704             : 
     705             :     // Start polling for output if necessary.
     706        4317 :     if (outsize == 0)
     707        2417 :         set_pollout (handle);
     708             : 
     709             :     //  Handshaking was successful.
     710             :     //  Switch into the normal message flow.
     711        4317 :     handshaking = false;
     712             : 
     713        4317 :     if (has_handshake_timer) {
     714        4317 :         cancel_timer (handshake_timer_id);
     715        4317 :         has_handshake_timer = false;
     716             :     }
     717             : 
     718             :     return true;
     719             : }
     720             : 
     721           0 : int zmq::stream_engine_t::identity_msg (msg_t *msg_)
     722             : {
     723           0 :     int rc = msg_->init_size (options.identity_size);
     724           0 :     errno_assert (rc == 0);
     725           0 :     if (options.identity_size > 0)
     726           0 :         memcpy (msg_->data (), options.identity, options.identity_size);
     727           0 :     next_msg = &stream_engine_t::pull_msg_from_session;
     728           0 :     return 0;
     729             : }
     730             : 
     731           0 : int zmq::stream_engine_t::process_identity_msg (msg_t *msg_)
     732             : {
     733           0 :     if (options.recv_identity) {
     734           0 :         msg_->set_flags (msg_t::identity);
     735           0 :         int rc = session->push_msg (msg_);
     736           0 :         errno_assert (rc == 0);
     737             :     }
     738             :     else {
     739           0 :         int rc = msg_->close ();
     740           0 :         errno_assert (rc == 0);
     741           0 :         rc = msg_->init ();
     742           0 :         errno_assert (rc == 0);
     743             :     }
     744             : 
     745           0 :     if (subscription_required)
     746           0 :         process_msg = &stream_engine_t::write_subscription_msg;
     747             :     else
     748           0 :         process_msg = &stream_engine_t::push_msg_to_session;
     749             : 
     750           0 :     return 0;
     751             : }
     752             : 
     753       10869 : int zmq::stream_engine_t::next_handshake_command (msg_t *msg_)
     754             : {
     755       10869 :     zmq_assert (mechanism != NULL);
     756             : 
     757       10869 :     if (mechanism->status () == mechanism_t::ready) {
     758        1575 :         mechanism_ready ();
     759        1575 :         return pull_and_encode (msg_);
     760             :     }
     761             :     else
     762        9295 :     if (mechanism->status () == mechanism_t::error) {
     763          18 :         errno = EPROTO;
     764          18 :         return -1;
     765             :     }
     766             :     else {
     767        9277 :         const int rc = mechanism->next_handshake_command (msg_);
     768        9277 :         if (rc == 0)
     769        4159 :             msg_->set_flags (msg_t::command);
     770        9277 :         return rc;
     771             :     }
     772             : }
     773             : 
     774        4002 : int zmq::stream_engine_t::process_handshake_command (msg_t *msg_)
     775             : {
     776        4002 :     zmq_assert (mechanism != NULL);
     777        4002 :     const int rc = mechanism->process_handshake_command (msg_);
     778        4002 :     if (rc == 0) {
     779        3963 :         if (mechanism->status () == mechanism_t::ready)
     780        2193 :             mechanism_ready ();
     781             :         else
     782        1770 :         if (mechanism->status () == mechanism_t::error) {
     783           6 :             errno = EPROTO;
     784           6 :             return -1;
     785             :         }
     786        3957 :         if (output_stopped)
     787        2177 :             restart_output ();
     788             :     }
     789             : 
     790        3996 :     return rc;
     791             : }
     792             : 
     793          19 : void zmq::stream_engine_t::zap_msg_available ()
     794             : {
     795          19 :     zmq_assert (mechanism != NULL);
     796             : 
     797          19 :     const int rc = mechanism->zap_msg_available ();
     798          19 :     if (rc == -1) {
     799           0 :         error (protocol_error);
     800          19 :         return;
     801             :     }
     802          19 :     if (input_stopped)
     803           0 :         restart_input ();
     804          19 :     if (output_stopped)
     805          19 :         restart_output ();
     806             : }
     807             : 
     808        3768 : void zmq::stream_engine_t::mechanism_ready ()
     809             : {
     810        3768 :     if (options.heartbeat_interval > 0) {
     811          15 :         add_timer(options.heartbeat_interval, heartbeat_ivl_timer_id);
     812          15 :         has_heartbeat_timer = true;
     813             :     }
     814             : 
     815        3768 :     if (options.recv_identity) {
     816             :         msg_t identity;
     817         492 :         mechanism->peer_identity (&identity);
     818         492 :         const int rc = session->push_msg (&identity);
     819         492 :         if (rc == -1 && errno == EAGAIN) {
     820             :             // If the write is failing at this stage with
     821             :             // an EAGAIN the pipe must be being shut down,
     822             :             // so we can just bail out of the identity set.
     823        3768 :             return;
     824             :         }
     825         492 :         errno_assert (rc == 0);
     826         492 :         session->flush ();
     827             :     }
     828             : 
     829        3768 :     next_msg = &stream_engine_t::pull_and_encode;
     830        3768 :     process_msg = &stream_engine_t::write_credential;
     831             : 
     832             :     //  Compile metadata.
     833             :     properties_t properties;
     834        3768 :     init_properties(properties);
     835             : 
     836             :     //  Add ZAP properties.
     837        3768 :     const properties_t& zap_properties = mechanism->get_zap_properties ();
     838             :     properties.insert(zap_properties.begin (), zap_properties.end ());
     839             : 
     840             :     //  Add ZMTP properties.
     841        3768 :     const properties_t& zmtp_properties = mechanism->get_zmtp_properties ();
     842             :     properties.insert(zmtp_properties.begin (), zmtp_properties.end ());
     843             : 
     844        3768 :     zmq_assert (metadata == NULL);
     845        3768 :     if (!properties.empty ())
     846        3768 :         metadata = new (std::nothrow) metadata_t (properties);
     847             : }
     848             : 
     849         117 : int zmq::stream_engine_t::pull_msg_from_session (msg_t *msg_)
     850             : {
     851         117 :     return session->pull_msg (msg_);
     852             : }
     853             : 
     854           0 : int zmq::stream_engine_t::push_msg_to_session (msg_t *msg_)
     855             : {
     856         106 :     return session->push_msg (msg_);
     857             : }
     858             : 
     859         106 : int zmq::stream_engine_t::push_raw_msg_to_session (msg_t *msg_) {
     860         106 :     if (metadata && metadata != msg_->metadata())
     861         106 :         msg_->set_metadata(metadata);
     862         106 :     return push_msg_to_session(msg_);
     863             : }
     864             : 
     865         892 : int zmq::stream_engine_t::write_credential (msg_t *msg_)
     866             : {
     867         892 :     zmq_assert (mechanism != NULL);
     868         892 :     zmq_assert (session != NULL);
     869             : 
     870         892 :     const blob_t credential = mechanism->get_user_id ();
     871         892 :     if (credential.size () > 0) {
     872             :         msg_t msg;
     873          12 :         int rc = msg.init_size (credential.size ());
     874          12 :         zmq_assert (rc == 0);
     875          12 :         memcpy (msg.data (), credential.data (), credential.size ());
     876          12 :         msg.set_flags (msg_t::credential);
     877          12 :         rc = session->push_msg (&msg);
     878          12 :         if (rc == -1) {
     879           0 :             rc = msg.close ();
     880           0 :             errno_assert (rc == 0);
     881           0 :             return -1;
     882             :         }
     883             :     }
     884         892 :     process_msg = &stream_engine_t::decode_and_push;
     885         892 :     return decode_and_push (msg_);
     886             : }
     887             : 
     888      649700 : int zmq::stream_engine_t::pull_and_encode (msg_t *msg_)
     889             : {
     890      649700 :     zmq_assert (mechanism != NULL);
     891             : 
     892      649700 :     if (session->pull_msg (msg_) == -1)
     893             :         return -1;
     894      634983 :     if (mechanism->encode (msg_) == -1)
     895             :         return -1;
     896      634983 :     return 0;
     897             : }
     898             : 
     899      635187 : int zmq::stream_engine_t::decode_and_push (msg_t *msg_)
     900             : {
     901      635187 :     zmq_assert (mechanism != NULL);
     902             : 
     903      635187 :     if (mechanism->decode (msg_) == -1)
     904             :         return -1;
     905             : 
     906      635187 :     if(has_timeout_timer) {
     907         105 :         has_timeout_timer = false;
     908         105 :         cancel_timer(heartbeat_timeout_timer_id);
     909             :     }
     910             : 
     911      635187 :     if(has_ttl_timer) {
     912           0 :         has_ttl_timer = false;
     913           0 :         cancel_timer(heartbeat_ttl_timer_id);
     914             :     }
     915             : 
     916      635187 :     if(msg_->flags() & msg_t::command) {
     917         219 :         uint8_t cmd_id = *((uint8_t*)msg_->data());
     918         219 :         if(cmd_id == 4)
     919         216 :             process_heartbeat_message(msg_);
     920             :     }
     921             : 
     922      635187 :     if (metadata)
     923      635187 :         msg_->set_metadata (metadata);
     924      635187 :     if (session->push_msg (msg_) == -1) {
     925        1202 :         if (errno == EAGAIN)
     926        1202 :             process_msg = &stream_engine_t::push_one_then_decode_and_push;
     927             :         return -1;
     928             :     }
     929             :     return 0;
     930             : }
     931             : 
     932        1191 : int zmq::stream_engine_t::push_one_then_decode_and_push (msg_t *msg_)
     933             : {
     934        1191 :     const int rc = session->push_msg (msg_);
     935        1191 :     if (rc == 0)
     936        1191 :         process_msg = &stream_engine_t::decode_and_push;
     937        1191 :     return rc;
     938             : }
     939             : 
     940           0 : int zmq::stream_engine_t::write_subscription_msg (msg_t *msg_)
     941             : {
     942             :     msg_t subscription;
     943             : 
     944             :     //  Inject the subscription message, so that also
     945             :     //  ZMQ 2.x peers receive published messages.
     946           0 :     int rc = subscription.init_size (1);
     947           0 :     errno_assert (rc == 0);
     948           0 :     *(unsigned char*) subscription.data () = 1;
     949           0 :     rc = session->push_msg (&subscription);
     950           0 :     if (rc == -1)
     951             :        return -1;
     952             : 
     953           0 :     process_msg = &stream_engine_t::push_msg_to_session;
     954             :     return push_msg_to_session (msg_);
     955             : }
     956             : 
     957        2401 : void zmq::stream_engine_t::error (error_reason_t reason)
     958             : {
     959        2401 :     if (options.raw_socket && options.raw_notify) {
     960             :         //  For raw sockets, send a final 0-length message to the application
     961             :         //  so that it knows the peer has been disconnected.
     962             :         msg_t terminator;
     963          13 :         terminator.init();
     964          13 :         (this->*process_msg) (&terminator);
     965          13 :         terminator.close();
     966             :     }
     967        2401 :     zmq_assert (session);
     968        2401 :     socket->event_disconnected (endpoint, (int) s);
     969        2401 :     session->flush ();
     970        2401 :     session->engine_error (reason);
     971        2401 :     unplug ();
     972        2401 :     delete this;
     973        2401 : }
     974             : 
     975        6729 : void zmq::stream_engine_t::set_handshake_timer ()
     976             : {
     977        6729 :     zmq_assert (!has_handshake_timer);
     978             : 
     979        6729 :     if (!options.raw_socket && options.handshake_ivl > 0) {
     980        6729 :         add_timer (options.handshake_ivl, handshake_timer_id);
     981        6728 :         has_handshake_timer = true;
     982             :     }
     983        6728 : }
     984             : 
     985        3801 : bool zmq::stream_engine_t::init_properties (properties_t & properties) {
     986        7602 :     if (peer_address.empty()) return false;
     987        7602 :     properties.insert (std::make_pair("Peer-Address", peer_address));
     988             : 
     989             :     //  Private property to support deprecated SRCFD
     990        3801 :     std::ostringstream stream;
     991        3801 :     stream << (int)s;
     992             :     std::string fd_string = stream.str();
     993        3801 :     properties.insert(std::make_pair("__fd", fd_string));
     994        7602 :     return true;
     995             : }
     996             : 
     997         120 : void zmq::stream_engine_t::timer_event (int id_)
     998             : {
     999         120 :     if(id_ == handshake_timer_id) {
    1000           6 :         has_handshake_timer = false;
    1001             :         //  handshake timer expired before handshake completed, so engine fail
    1002           6 :         error (timeout_error);
    1003             :     }
    1004         114 :     else if(id_ == heartbeat_ivl_timer_id) {
    1005         108 :         next_msg = &stream_engine_t::produce_ping_message;
    1006         108 :         out_event();
    1007         108 :         add_timer(options.heartbeat_interval, heartbeat_ivl_timer_id);
    1008             :     }
    1009           6 :     else if(id_ == heartbeat_ttl_timer_id) {
    1010           3 :         has_ttl_timer = false;
    1011           3 :         error(timeout_error);
    1012             :     }
    1013           3 :     else if(id_ == heartbeat_timeout_timer_id) {
    1014           3 :         has_timeout_timer = false;
    1015           3 :         error(timeout_error);
    1016             :     }
    1017             :     else
    1018             :         // There are no other valid timer ids!
    1019             :         assert(false);
    1020         120 : }
    1021             : 
    1022         108 : int zmq::stream_engine_t::produce_ping_message(msg_t * msg_)
    1023             : {
    1024         108 :     int rc = 0;
    1025         108 :     zmq_assert (mechanism != NULL);
    1026             : 
    1027             :     // 16-bit TTL + \4PING == 7
    1028         108 :     rc = msg_->init_size(7);
    1029         108 :     errno_assert(rc == 0);
    1030         108 :     msg_->set_flags(msg_t::command);
    1031             :     // Copy in the command message
    1032         108 :     memcpy(msg_->data(), "\4PING", 5);
    1033             : 
    1034         108 :     uint16_t ttl_val = htons(options.heartbeat_ttl);
    1035         108 :     memcpy(((uint8_t*)msg_->data()) + 5, &ttl_val, sizeof(ttl_val));
    1036             : 
    1037         108 :     rc = mechanism->encode (msg_);
    1038         108 :     next_msg = &stream_engine_t::pull_and_encode;
    1039         108 :     if(!has_timeout_timer && heartbeat_timeout > 0) {
    1040         108 :         add_timer(heartbeat_timeout, heartbeat_timeout_timer_id);
    1041         108 :         has_timeout_timer = true;
    1042             :     }
    1043         108 :     return rc;
    1044             : }
    1045             : 
    1046         105 : int zmq::stream_engine_t::produce_pong_message(msg_t * msg_)
    1047             : {
    1048         105 :     int rc = 0;
    1049         105 :     zmq_assert (mechanism != NULL);
    1050             : 
    1051         105 :     rc = msg_->init_size(5);
    1052         105 :     errno_assert(rc == 0);
    1053         105 :     msg_->set_flags(msg_t::command);
    1054             : 
    1055         105 :     memcpy(msg_->data(), "\4PONG", 5);
    1056             : 
    1057         105 :     rc = mechanism->encode (msg_);
    1058         105 :     next_msg = &stream_engine_t::pull_and_encode;
    1059         105 :     return rc;
    1060             : }
    1061             : 
    1062         216 : int zmq::stream_engine_t::process_heartbeat_message(msg_t * msg_)
    1063             : {
    1064         216 :     if(memcmp(msg_->data(), "\4PING", 5) == 0) {
    1065             :         uint16_t remote_heartbeat_ttl;
    1066             :         // Get the remote heartbeat TTL to setup the timer
    1067         105 :         memcpy(&remote_heartbeat_ttl, (uint8_t*)msg_->data() + 5, 2);
    1068         105 :         remote_heartbeat_ttl = ntohs(remote_heartbeat_ttl);
    1069             :         // The remote heartbeat is in 10ths of a second
    1070             :         // so we multiply it by 100 to get the timer interval in ms.
    1071         105 :         remote_heartbeat_ttl *= 100;
    1072             : 
    1073         105 :         if(!has_ttl_timer && remote_heartbeat_ttl > 0) {
    1074           3 :             add_timer(remote_heartbeat_ttl, heartbeat_ttl_timer_id);
    1075           3 :             has_ttl_timer = true;
    1076             :         }
    1077             : 
    1078         105 :         next_msg = &stream_engine_t::produce_pong_message;
    1079         105 :         out_event();
    1080             :     }
    1081             : 
    1082         216 :     return 0;
    1083             : }

Generated by: LCOV version 1.10