libzmq  master
ZeroMQ C++ Core Engine (LIBZMQ)
zmq::stream_engine_t Class Reference

#include <stream_engine.hpp>

Inheritance diagram for zmq::stream_engine_t:
Collaboration diagram for zmq::stream_engine_t:

Public Types

enum  error_reason_t { protocol_error, connection_error, timeout_error }
 

Public Member Functions

 stream_engine_t (fd_t fd_, const options_t &options_, const std::string &endpoint)
 
 ~stream_engine_t ()
 
void in_event ()
 
void out_event ()
 
void plug (zmq::io_thread_t *io_thread_, zmq::session_base_t *session_)
 
void restart_input ()
 
void restart_output ()
 
void terminate ()
 
void timer_event (int id_)
 
void zap_msg_available ()
 
- Public Member Functions inherited from zmq::io_object_t
 io_object_t (zmq::io_thread_t *io_thread_=NULL)
 
 ~io_object_t ()
 
void plug (zmq::io_thread_t *io_thread_)
 
void unplug ()
 
- Public Member Functions inherited from zmq::i_poll_events
virtual ~i_poll_events ()
 
- Public Member Functions inherited from zmq::i_engine
virtual ~i_engine ()
 

Private Types

enum  { handshake_timer_id = 0x40 }
 
enum  { heartbeat_ivl_timer_id = 0x80, heartbeat_timeout_timer_id = 0x81, heartbeat_ttl_timer_id = 0x82 }
 
typedef metadata_t::dict_t properties_t
 

Private Member Functions

 stream_engine_t (const stream_engine_t &)
 
size_t add_property (unsigned char *ptr, const char *name, const void *value, size_t value_len)
 
int decode_and_push (msg_t *msg_)
 
void error (error_reason_t reason)
 
bool handshake ()
 
int identity_msg (msg_t *msg_)
 
bool init_properties (properties_t &properties)
 
void mechanism_ready ()
 
int next_handshake_command (msg_t *msg)
 
const stream_engine_toperator= (const stream_engine_t &)
 
int process_handshake_command (msg_t *msg)
 
int process_heartbeat_message (msg_t *msg_)
 
int process_identity_msg (msg_t *msg_)
 
int produce_ping_message (msg_t *msg_)
 
int produce_pong_message (msg_t *msg_)
 
int pull_and_encode (msg_t *msg_)
 
int pull_msg_from_session (msg_t *msg_)
 
int push_msg_to_session (msg_t *msg)
 
int push_one_then_decode_and_push (msg_t *msg_)
 
int push_raw_msg_to_session (msg_t *msg)
 
int receive_greeting ()
 
void set_handshake_timer ()
 
void unplug ()
 
int write_credential (msg_t *msg_)
 
int write_subscription_msg (msg_t *msg_)
 

Private Attributes

bool as_server
 
i_decoderdecoder
 
i_encoderencoder
 
std::string endpoint
 
unsigned int greeting_bytes_read
 
unsigned char greeting_recv [v3_greeting_size]
 
unsigned char greeting_send [v3_greeting_size]
 
size_t greeting_size
 
handle_t handle
 
bool handshaking
 
bool has_handshake_timer
 
bool has_heartbeat_timer
 
bool has_timeout_timer
 
bool has_ttl_timer
 
int heartbeat_timeout
 
unsigned char * inpos
 
bool input_stopped
 
size_t insize
 
bool io_error
 
mechanism_tmechanism
 
metadata_tmetadata
 
int(stream_engine_t::* next_msg )(msg_t *msg_)
 
options_t options
 
unsigned char * outpos
 
bool output_stopped
 
size_t outsize
 
std::string peer_address
 
bool plugged
 
int(stream_engine_t::* process_msg )(msg_t *msg_)
 
fd_t s
 
zmq::session_base_tsession
 
zmq::socket_base_tsocket
 
bool subscription_required
 
msg_t tx_msg
 

Static Private Attributes

static const size_t signature_size = 10
 
static const size_t v2_greeting_size = 12
 
static const size_t v3_greeting_size = 64
 

Additional Inherited Members

- Protected Types inherited from zmq::io_object_t
typedef poller_t::handle_t handle_t
 
- Protected Member Functions inherited from zmq::io_object_t
handle_t add_fd (fd_t fd_)
 
void add_timer (int timout_, int id_)
 
void cancel_timer (int id_)
 
void reset_pollin (handle_t handle_)
 
void reset_pollout (handle_t handle_)
 
void rm_fd (handle_t handle_)
 
void set_pollin (handle_t handle_)
 
void set_pollout (handle_t handle_)
 

Detailed Description

Definition at line 62 of file stream_engine.hpp.

Member Typedef Documentation

Definition at line 127 of file stream_engine.hpp.

Member Enumeration Documentation

anonymous enum
private
Enumerator
handshake_timer_id 

Definition at line 208 of file stream_engine.hpp.

anonymous enum
private
Enumerator
heartbeat_ivl_timer_id 
heartbeat_timeout_timer_id 
heartbeat_ttl_timer_id 

Definition at line 214 of file stream_engine.hpp.

Enumerator
protocol_error 
connection_error 
timeout_error 

Definition at line 66 of file stream_engine.hpp.

Constructor & Destructor Documentation

zmq::stream_engine_t::stream_engine_t ( fd_t  fd_,
const options_t options_,
const std::string &  endpoint 
)
zmq::stream_engine_t::~stream_engine_t ( )

Definition at line 145 of file stream_engine.cpp.

References zmq::msg_t::close(), decoder, zmq::metadata_t::drop_ref(), encoder, errno_assert, LIBZMQ_DELETE, mechanism, metadata, plugged, zmq::retired_fd, s, tx_msg, and zmq_assert.

Here is the call graph for this function:

zmq::stream_engine_t::stream_engine_t ( const stream_engine_t )
private

Member Function Documentation

size_t zmq::stream_engine_t::add_property ( unsigned char *  ptr,
const char *  name,
const void *  value,
size_t  value_len 
)
private
void zmq::stream_engine_t::error ( error_reason_t  reason)
private
int zmq::stream_engine_t::identity_msg ( msg_t msg_)
private
void zmq::stream_engine_t::in_event ( )
virtual
bool zmq::stream_engine_t::init_properties ( properties_t properties)
private

Definition at line 985 of file stream_engine.cpp.

References peer_address, and s.

Referenced by mechanism_ready(), and plug().

Here is the caller graph for this function:

int zmq::stream_engine_t::next_handshake_command ( msg_t msg)
private

Definition at line 753 of file stream_engine.cpp.

References zmq::msg_t::command, EPROTO, zmq::mechanism_t::error, mechanism, mechanism_ready(), zmq::mechanism_t::next_handshake_command(), pull_and_encode(), zmq::mechanism_t::ready, zmq::msg_t::set_flags(), zmq::mechanism_t::status(), and zmq_assert.

Referenced by handshake().

Here is the call graph for this function:

Here is the caller graph for this function:

const stream_engine_t& zmq::stream_engine_t::operator= ( const stream_engine_t )
private
void zmq::stream_engine_t::out_event ( )
virtual

Reimplemented from zmq::io_object_t.

Definition at line 359 of file stream_engine.cpp.

References zmq::i_encoder::encode(), encoder, handle, handshaking, io_error, zmq::i_encoder::load_msg(), next_msg, zmq::out_batch_size, outpos, output_stopped, outsize, zmq::io_object_t::reset_pollout(), s, zmq::tcp_write(), tx_msg, unlikely, and zmq_assert.

Referenced by process_heartbeat_message(), restart_output(), and timer_event().

Here is the call graph for this function:

Here is the caller graph for this function:

int zmq::stream_engine_t::process_handshake_command ( msg_t msg)
private

Definition at line 774 of file stream_engine.cpp.

References EPROTO, zmq::mechanism_t::error, mechanism, mechanism_ready(), output_stopped, zmq::mechanism_t::process_handshake_command(), zmq::mechanism_t::ready, restart_output(), zmq::mechanism_t::status(), and zmq_assert.

Referenced by handshake().

Here is the call graph for this function:

Here is the caller graph for this function:

int zmq::stream_engine_t::process_heartbeat_message ( msg_t msg_)
private

Definition at line 1062 of file stream_engine.cpp.

References zmq::io_object_t::add_timer(), zmq::msg_t::data, has_ttl_timer, heartbeat_ttl_timer_id, next_msg, out_event(), and produce_pong_message().

Referenced by decode_and_push().

Here is the call graph for this function:

Here is the caller graph for this function:

int zmq::stream_engine_t::process_identity_msg ( msg_t msg_)
private
int zmq::stream_engine_t::produce_ping_message ( msg_t msg_)
private
int zmq::stream_engine_t::produce_pong_message ( msg_t msg_)
private

Definition at line 1046 of file stream_engine.cpp.

References zmq::msg_t::command, zmq::msg_t::data, zmq::mechanism_t::encode(), errno_assert, zmq::msg_t::init_size(), mechanism, next_msg, pull_and_encode(), zmq::msg_t::set_flags(), and zmq_assert.

Referenced by process_heartbeat_message().

Here is the call graph for this function:

Here is the caller graph for this function:

int zmq::stream_engine_t::pull_and_encode ( msg_t msg_)
private

Definition at line 888 of file stream_engine.cpp.

References zmq::mechanism_t::encode(), mechanism, zmq::session_base_t::pull_msg(), session, and zmq_assert.

Referenced by mechanism_ready(), next_handshake_command(), produce_ping_message(), and produce_pong_message().

Here is the call graph for this function:

Here is the caller graph for this function:

int zmq::stream_engine_t::pull_msg_from_session ( msg_t msg_)
private

Definition at line 849 of file stream_engine.cpp.

References zmq::session_base_t::pull_msg(), and session.

Referenced by handshake(), identity_msg(), and plug().

Here is the call graph for this function:

Here is the caller graph for this function:

int zmq::stream_engine_t::push_msg_to_session ( msg_t msg)
private

Definition at line 854 of file stream_engine.cpp.

References zmq::session_base_t::push_msg(), and session.

Referenced by process_identity_msg(), push_raw_msg_to_session(), and write_subscription_msg().

Here is the call graph for this function:

Here is the caller graph for this function:

int zmq::stream_engine_t::push_one_then_decode_and_push ( msg_t msg_)
private

Definition at line 932 of file stream_engine.cpp.

References decode_and_push(), process_msg, zmq::session_base_t::push_msg(), and session.

Referenced by decode_and_push().

Here is the call graph for this function:

Here is the caller graph for this function:

int zmq::stream_engine_t::push_raw_msg_to_session ( msg_t msg)
private

Definition at line 859 of file stream_engine.cpp.

References metadata, zmq::msg_t::metadata, push_msg_to_session(), and zmq::msg_t::set_metadata().

Referenced by plug().

Here is the call graph for this function:

Here is the caller graph for this function:

int zmq::stream_engine_t::receive_greeting ( )
private
void zmq::stream_engine_t::restart_input ( )
virtual

Implements zmq::i_engine.

Definition at line 439 of file stream_engine.cpp.

References connection_error, zmq::i_decoder::decode(), decoder, error(), zmq::session_base_t::flush(), handle, in_event(), inpos, input_stopped, insize, io_error, zmq::i_decoder::msg(), process_msg, protocol_error, session, zmq::io_object_t::set_pollin(), and zmq_assert.

Referenced by zap_msg_available().

Here is the call graph for this function:

Here is the caller graph for this function:

void zmq::stream_engine_t::restart_output ( )
virtual

Implements zmq::i_engine.

Definition at line 422 of file stream_engine.cpp.

References handle, io_error, likely, out_event(), output_stopped, zmq::io_object_t::set_pollout(), and unlikely.

Referenced by process_handshake_command(), and zap_msg_available().

Here is the call graph for this function:

Here is the caller graph for this function:

void zmq::stream_engine_t::set_handshake_timer ( )
private

Definition at line 975 of file stream_engine.cpp.

References zmq::io_object_t::add_timer(), zmq::options_t::handshake_ivl, handshake_timer_id, has_handshake_timer, options, zmq::options_t::raw_socket, and zmq_assert.

Referenced by plug().

Here is the call graph for this function:

Here is the caller graph for this function:

void zmq::stream_engine_t::terminate ( )
virtual

Implements zmq::i_engine.

Definition at line 278 of file stream_engine.cpp.

References unplug().

Here is the call graph for this function:

void zmq::stream_engine_t::unplug ( )
private
int zmq::stream_engine_t::write_credential ( msg_t msg_)
private
int zmq::stream_engine_t::write_subscription_msg ( msg_t msg_)
private

Definition at line 940 of file stream_engine.cpp.

References zmq::msg_t::data, errno_assert, zmq::msg_t::init_size(), process_msg, zmq::session_base_t::push_msg(), push_msg_to_session(), and session.

Referenced by process_identity_msg().

Here is the call graph for this function:

Here is the caller graph for this function:

void zmq::stream_engine_t::zap_msg_available ( )
virtual

Implements zmq::i_engine.

Definition at line 793 of file stream_engine.cpp.

References error(), input_stopped, mechanism, output_stopped, protocol_error, restart_input(), restart_output(), zmq::mechanism_t::zap_msg_available(), and zmq_assert.

Here is the call graph for this function:

Member Data Documentation

bool zmq::stream_engine_t::as_server
private

Definition at line 138 of file stream_engine.hpp.

i_decoder* zmq::stream_engine_t::decoder
private

Definition at line 146 of file stream_engine.hpp.

Referenced by handshake(), in_event(), plug(), restart_input(), and ~stream_engine_t().

i_encoder* zmq::stream_engine_t::encoder
private

Definition at line 150 of file stream_engine.hpp.

Referenced by handshake(), out_event(), plug(), and ~stream_engine_t().

std::string zmq::stream_engine_t::endpoint
private

Definition at line 184 of file stream_engine.hpp.

Referenced by error().

unsigned int zmq::stream_engine_t::greeting_bytes_read
private

Definition at line 176 of file stream_engine.hpp.

Referenced by handshake().

unsigned char zmq::stream_engine_t::greeting_recv[v3_greeting_size]
private

Definition at line 172 of file stream_engine.hpp.

Referenced by handshake().

unsigned char zmq::stream_engine_t::greeting_send[v3_greeting_size]
private

Definition at line 173 of file stream_engine.hpp.

Referenced by handshake(), and plug().

size_t zmq::stream_engine_t::greeting_size
private

Definition at line 169 of file stream_engine.hpp.

Referenced by handshake().

handle_t zmq::stream_engine_t::handle
private
bool zmq::stream_engine_t::handshaking
private

Definition at line 158 of file stream_engine.hpp.

Referenced by handshake(), in_event(), out_event(), and plug().

bool zmq::stream_engine_t::has_handshake_timer
private

Definition at line 211 of file stream_engine.hpp.

Referenced by handshake(), set_handshake_timer(), timer_event(), and unplug().

bool zmq::stream_engine_t::has_heartbeat_timer
private

Definition at line 221 of file stream_engine.hpp.

Referenced by mechanism_ready(), and unplug().

bool zmq::stream_engine_t::has_timeout_timer
private

Definition at line 220 of file stream_engine.hpp.

Referenced by decode_and_push(), produce_ping_message(), timer_event(), and unplug().

bool zmq::stream_engine_t::has_ttl_timer
private

Definition at line 219 of file stream_engine.hpp.

Referenced by decode_and_push(), process_heartbeat_message(), timer_event(), and unplug().

int zmq::stream_engine_t::heartbeat_timeout
private

Definition at line 222 of file stream_engine.hpp.

Referenced by produce_ping_message(), and stream_engine_t().

unsigned char* zmq::stream_engine_t::inpos
private

Definition at line 144 of file stream_engine.hpp.

Referenced by handshake(), in_event(), and restart_input().

bool zmq::stream_engine_t::input_stopped
private

Definition at line 202 of file stream_engine.hpp.

Referenced by in_event(), restart_input(), and zap_msg_available().

size_t zmq::stream_engine_t::insize
private

Definition at line 145 of file stream_engine.hpp.

Referenced by handshake(), in_event(), and restart_input().

bool zmq::stream_engine_t::io_error
private

Definition at line 192 of file stream_engine.hpp.

Referenced by in_event(), out_event(), plug(), restart_input(), restart_output(), and unplug().

metadata_t* zmq::stream_engine_t::metadata
private
int(stream_engine_t::* zmq::stream_engine_t::next_msg) (msg_t *msg_)
private
unsigned char* zmq::stream_engine_t::outpos
private

Definition at line 148 of file stream_engine.hpp.

Referenced by handshake(), out_event(), and plug().

bool zmq::stream_engine_t::output_stopped
private
size_t zmq::stream_engine_t::outsize
private

Definition at line 149 of file stream_engine.hpp.

Referenced by handshake(), out_event(), and plug().

std::string zmq::stream_engine_t::peer_address
private

Definition at line 227 of file stream_engine.hpp.

Referenced by handshake(), init_properties(), and stream_engine_t().

bool zmq::stream_engine_t::plugged
private

Definition at line 186 of file stream_engine.hpp.

Referenced by plug(), unplug(), and ~stream_engine_t().

int(stream_engine_t::* zmq::stream_engine_t::process_msg) (msg_t *msg_)
private
fd_t zmq::stream_engine_t::s
private
const size_t zmq::stream_engine_t::signature_size = 10
staticprivate

Definition at line 160 of file stream_engine.hpp.

Referenced by handshake().

zmq::socket_base_t* zmq::stream_engine_t::socket
private

Definition at line 225 of file stream_engine.hpp.

Referenced by error(), and plug().

bool zmq::stream_engine_t::subscription_required
private

Definition at line 197 of file stream_engine.hpp.

Referenced by handshake(), and process_identity_msg().

msg_t zmq::stream_engine_t::tx_msg
private

Definition at line 140 of file stream_engine.hpp.

Referenced by handshake(), out_event(), stream_engine_t(), and ~stream_engine_t().

const size_t zmq::stream_engine_t::v2_greeting_size = 12
staticprivate

Definition at line 163 of file stream_engine.hpp.

const size_t zmq::stream_engine_t::v3_greeting_size = 64
staticprivate

Definition at line 166 of file stream_engine.hpp.

Referenced by handshake().