libzmq  master
ZeroMQ C++ Core Engine (LIBZMQ)
stream_engine.hpp
Go to the documentation of this file.
1 /*
2  Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 
4  This file is part of libzmq, the ZeroMQ core engine in C++.
5 
6  libzmq is free software; you can redistribute it and/or modify it under
7  the terms of the GNU Lesser General Public License (LGPL) as published
8  by the Free Software Foundation; either version 3 of the License, or
9  (at your option) any later version.
10 
11  As a special exception, the Contributors give you permission to link
12  this library with independent modules to produce an executable,
13  regardless of the license terms of these independent modules, and to
14  copy and distribute the resulting executable under terms of your choice,
15  provided that you also meet, for each linked independent module, the
16  terms and conditions of the license of that module. An independent
17  module is a module which is not derived from or based on this library.
18  If you modify this library, you must extend this exception to your
19  version of the library.
20 
21  libzmq is distributed in the hope that it will be useful, but WITHOUT
22  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23  FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24  License for more details.
25 
26  You should have received a copy of the GNU Lesser General Public License
27  along with this program. If not, see <http://www.gnu.org/licenses/>.
28 */
29 
30 #ifndef __ZMQ_STREAM_ENGINE_HPP_INCLUDED__
31 #define __ZMQ_STREAM_ENGINE_HPP_INCLUDED__
32 
33 #include <stddef.h>
34 
35 #include "fd.hpp"
36 #include "i_engine.hpp"
37 #include "io_object.hpp"
38 #include "i_encoder.hpp"
39 #include "i_decoder.hpp"
40 #include "options.hpp"
41 #include "socket_base.hpp"
42 #include "../include/zmq.h"
43 #include "metadata.hpp"
44 
45 namespace zmq
46 {
47  // Protocol revisions
48  enum
49  {
50  ZMTP_1_0 = 0,
52  };
53 
54  class io_thread_t;
55  class msg_t;
56  class session_base_t;
57  class mechanism_t;
58 
59  // This engine handles any socket with SOCK_STREAM semantics,
60  // e.g. TCP socket or an UNIX domain socket.
61 
62  class stream_engine_t : public io_object_t, public i_engine
63  {
64  public:
65 
70  };
71 
72  stream_engine_t (fd_t fd_, const options_t &options_,
73  const std::string &endpoint);
75 
76  // i_engine interface implementation.
77  void plug (zmq::io_thread_t *io_thread_,
78  zmq::session_base_t *session_);
79  void terminate ();
80  void restart_input ();
81  void restart_output ();
82  void zap_msg_available ();
83 
84  // i_poll_events interface implementation.
85  void in_event ();
86  void out_event ();
87  void timer_event (int id_);
88 
89  private:
90  // Unplug the engine from the session.
91  void unplug ();
92 
93  // Function to handle network disconnections.
94  void error (error_reason_t reason);
95 
96  // Receives the greeting message from the peer.
97  int receive_greeting ();
98 
99  // Detects the protocol used by the peer.
100  bool handshake ();
101 
102  int identity_msg (msg_t *msg_);
103  int process_identity_msg (msg_t *msg_);
104 
105  int next_handshake_command (msg_t *msg);
106  int process_handshake_command (msg_t *msg);
107 
108  int pull_msg_from_session (msg_t *msg_);
109  int push_msg_to_session (msg_t *msg);
110 
111  int push_raw_msg_to_session (msg_t *msg);
112 
113  int write_credential (msg_t *msg_);
114  int pull_and_encode (msg_t *msg_);
115  int decode_and_push (msg_t *msg_);
117 
118  void mechanism_ready ();
119 
120  int write_subscription_msg (msg_t *msg_);
121 
122  size_t add_property (unsigned char *ptr,
123  const char *name, const void *value, size_t value_len);
124 
125  void set_handshake_timer();
126 
128  bool init_properties (properties_t & properties);
129 
130  int produce_ping_message(msg_t * msg_);
131  int process_heartbeat_message(msg_t * msg_);
132  int produce_pong_message(msg_t * msg_);
133 
134  // Underlying socket.
136 
137  // True iff this is server's engine.
138  bool as_server;
139 
141 
143 
144  unsigned char *inpos;
145  size_t insize;
147 
148  unsigned char *outpos;
149  size_t outsize;
151 
152  // Metadata to be attached to received messages. May be NULL.
154 
155  // When true, we are still trying to determine whether
156  // the peer is using versioned protocol, and if so, which
157  // version. When false, normal message flow has started.
159 
160  static const size_t signature_size = 10;
161 
162  // Size of ZMTP/1.0 and ZMTP/2.0 greeting message
163  static const size_t v2_greeting_size = 12;
164 
165  // Size of ZMTP/3.0 greeting message
166  static const size_t v3_greeting_size = 64;
167 
168  // Expected greeting size.
170 
171  // Greeting received from, and sent to peer
174 
175  // Size of greeting received so far
176  unsigned int greeting_bytes_read;
177 
178  // The session this engine is attached to.
180 
182 
183  // String representation of endpoint
184  std::string endpoint;
185 
186  bool plugged;
187 
189 
191 
192  bool io_error;
193 
194  // Indicates whether the engine is to inject a phantom
195  // subscription message into the incoming stream.
196  // Needed to support old peers.
198 
200 
201  // True iff the engine couldn't consume the last decoded message.
203 
204  // True iff the engine doesn't have any message to encode.
206 
207  // ID of the handshake timer
208  enum {handshake_timer_id = 0x40};
209 
210  // True is linger timer is running.
212 
213  // Heartbeat stuff
214  enum {
218  };
223 
224  // Socket
226 
227  std::string peer_address;
228 
231  };
232 
233 }
234 
235 #endif
static const size_t v3_greeting_size
int process_handshake_command(msg_t *msg)
unsigned char greeting_send[v3_greeting_size]
int produce_ping_message(msg_t *msg_)
mechanism_t * mechanism
int process_identity_msg(msg_t *msg_)
std::map< std::string, std::string > dict_t
Definition: metadata.hpp:43
int pull_msg_from_session(msg_t *msg_)
unsigned char * outpos
int decode_and_push(msg_t *msg_)
int produce_pong_message(msg_t *msg_)
zmq::socket_base_t * socket
stream_engine_t(fd_t fd_, const options_t &options_, const std::string &endpoint)
void plug(zmq::io_thread_t *io_thread_, zmq::session_base_t *session_)
unsigned char * inpos
size_t add_property(unsigned char *ptr, const char *name, const void *value, size_t value_len)
int(stream_engine_t::* process_msg)(msg_t *msg_)
int(stream_engine_t::* next_msg)(msg_t *msg_)
void error(error_reason_t reason)
int write_credential(msg_t *msg_)
unsigned int greeting_bytes_read
const stream_engine_t & operator=(const stream_engine_t &)
metadata_t::dict_t properties_t
int next_handshake_command(msg_t *msg)
bool init_properties(properties_t &properties)
int identity_msg(msg_t *msg_)
int push_raw_msg_to_session(msg_t *msg)
void timer_event(int id_)
static const size_t v2_greeting_size
unsigned char greeting_recv[v3_greeting_size]
int push_msg_to_session(msg_t *msg)
int process_heartbeat_message(msg_t *msg_)
zmq::session_base_t * session
Definition: address.hpp:35
int write_subscription_msg(msg_t *msg_)
int push_one_then_decode_and_push(msg_t *msg_)
static const size_t signature_size
int pull_and_encode(msg_t *msg_)
poller_t::handle_t handle_t
Definition: io_object.hpp:62