![]() |
libzmq
master
ZeroMQ C++ Core Engine (LIBZMQ)
|
#include <stream_engine.hpp>
Public Types | |
enum | error_reason_t { protocol_error, connection_error, timeout_error } |
Public Member Functions | |
stream_engine_t (fd_t fd_, const options_t &options_, const std::string &endpoint) | |
~stream_engine_t () | |
void | in_event () |
void | out_event () |
void | plug (zmq::io_thread_t *io_thread_, zmq::session_base_t *session_) |
void | restart_input () |
void | restart_output () |
void | terminate () |
void | timer_event (int id_) |
void | zap_msg_available () |
![]() | |
io_object_t (zmq::io_thread_t *io_thread_=NULL) | |
~io_object_t () | |
void | plug (zmq::io_thread_t *io_thread_) |
void | unplug () |
![]() | |
virtual | ~i_poll_events () |
![]() | |
virtual | ~i_engine () |
Private Types | |
enum | { handshake_timer_id = 0x40 } |
enum | { heartbeat_ivl_timer_id = 0x80, heartbeat_timeout_timer_id = 0x81, heartbeat_ttl_timer_id = 0x82 } |
typedef metadata_t::dict_t | properties_t |
Private Member Functions | |
stream_engine_t (const stream_engine_t &) | |
size_t | add_property (unsigned char *ptr, const char *name, const void *value, size_t value_len) |
int | decode_and_push (msg_t *msg_) |
void | error (error_reason_t reason) |
bool | handshake () |
int | identity_msg (msg_t *msg_) |
bool | init_properties (properties_t &properties) |
void | mechanism_ready () |
int | next_handshake_command (msg_t *msg) |
const stream_engine_t & | operator= (const stream_engine_t &) |
int | process_handshake_command (msg_t *msg) |
int | process_heartbeat_message (msg_t *msg_) |
int | process_identity_msg (msg_t *msg_) |
int | produce_ping_message (msg_t *msg_) |
int | produce_pong_message (msg_t *msg_) |
int | pull_and_encode (msg_t *msg_) |
int | pull_msg_from_session (msg_t *msg_) |
int | push_msg_to_session (msg_t *msg) |
int | push_one_then_decode_and_push (msg_t *msg_) |
int | push_raw_msg_to_session (msg_t *msg) |
int | receive_greeting () |
void | set_handshake_timer () |
void | unplug () |
int | write_credential (msg_t *msg_) |
int | write_subscription_msg (msg_t *msg_) |
Private Attributes | |
bool | as_server |
i_decoder * | decoder |
i_encoder * | encoder |
std::string | endpoint |
unsigned int | greeting_bytes_read |
unsigned char | greeting_recv [v3_greeting_size] |
unsigned char | greeting_send [v3_greeting_size] |
size_t | greeting_size |
handle_t | handle |
bool | handshaking |
bool | has_handshake_timer |
bool | has_heartbeat_timer |
bool | has_timeout_timer |
bool | has_ttl_timer |
int | heartbeat_timeout |
unsigned char * | inpos |
bool | input_stopped |
size_t | insize |
bool | io_error |
mechanism_t * | mechanism |
metadata_t * | metadata |
int(stream_engine_t::* | next_msg )(msg_t *msg_) |
options_t | options |
unsigned char * | outpos |
bool | output_stopped |
size_t | outsize |
std::string | peer_address |
bool | plugged |
int(stream_engine_t::* | process_msg )(msg_t *msg_) |
fd_t | s |
zmq::session_base_t * | session |
zmq::socket_base_t * | socket |
bool | subscription_required |
msg_t | tx_msg |
Static Private Attributes | |
static const size_t | signature_size = 10 |
static const size_t | v2_greeting_size = 12 |
static const size_t | v3_greeting_size = 64 |
Additional Inherited Members | |
![]() | |
typedef poller_t::handle_t | handle_t |
![]() | |
handle_t | add_fd (fd_t fd_) |
void | add_timer (int timout_, int id_) |
void | cancel_timer (int id_) |
void | reset_pollin (handle_t handle_) |
void | reset_pollout (handle_t handle_) |
void | rm_fd (handle_t handle_) |
void | set_pollin (handle_t handle_) |
void | set_pollout (handle_t handle_) |
Definition at line 62 of file stream_engine.hpp.
|
private |
Definition at line 127 of file stream_engine.hpp.
|
private |
Enumerator | |
---|---|
handshake_timer_id |
Definition at line 208 of file stream_engine.hpp.
|
private |
Enumerator | |
---|---|
heartbeat_ivl_timer_id | |
heartbeat_timeout_timer_id | |
heartbeat_ttl_timer_id |
Definition at line 214 of file stream_engine.hpp.
Enumerator | |
---|---|
protocol_error | |
connection_error | |
timeout_error |
Definition at line 66 of file stream_engine.hpp.
zmq::stream_engine_t::stream_engine_t | ( | fd_t | fd_, |
const options_t & | options_, | ||
const std::string & | endpoint | ||
) |
Definition at line 61 of file stream_engine.cpp.
References errno_assert, zmq::get_peer_ip_address(), zmq::options_t::heartbeat_interval, heartbeat_timeout, zmq::options_t::heartbeat_timeout, zmq::msg_t::init(), options, peer_address, s, size, tx_msg, and zmq::unblock_socket().
zmq::stream_engine_t::~stream_engine_t | ( | ) |
Definition at line 145 of file stream_engine.cpp.
References zmq::msg_t::close(), decoder, zmq::metadata_t::drop_ref(), encoder, errno_assert, LIBZMQ_DELETE, mechanism, metadata, plugged, zmq::retired_fd, s, tx_msg, and zmq_assert.
|
private |
|
private |
Definition at line 899 of file stream_engine.cpp.
References zmq::io_object_t::cancel_timer(), zmq::msg_t::command, zmq::msg_t::data, zmq::mechanism_t::decode(), zmq::msg_t::flags, has_timeout_timer, has_ttl_timer, heartbeat_timeout_timer_id, heartbeat_ttl_timer_id, mechanism, metadata, process_heartbeat_message(), process_msg, zmq::session_base_t::push_msg(), push_one_then_decode_and_push(), session, zmq::msg_t::set_metadata(), and zmq_assert.
Referenced by push_one_then_decode_and_push(), and write_credential().
|
private |
Definition at line 957 of file stream_engine.cpp.
References zmq::msg_t::close(), endpoint, zmq::session_base_t::engine_error(), zmq::socket_base_t::event_disconnected(), zmq::session_base_t::flush(), zmq::msg_t::init(), options, process_msg, zmq::options_t::raw_notify, zmq::options_t::raw_socket, s, session, socket, unplug(), and zmq_assert.
Referenced by handshake(), in_event(), restart_input(), timer_event(), and zap_msg_available().
|
private |
Definition at line 485 of file stream_engine.cpp.
References alloc_assert, zmq::options_t::as_server, zmq::io_object_t::cancel_timer(), connection_error, zmq::msg_t::data, decoder, zmq::i_encoder::encode(), encoder, error(), greeting_bytes_read, greeting_recv, greeting_send, greeting_size, handle, handshake_timer_id, handshaking, has_handshake_timer, zmq::options_t::identity, zmq::options_t::identity_size, zmq::in_batch_size, zmq::msg_t::init_size(), inpos, insize, zmq::i_encoder::load_msg(), zmq::options_t::maxmsgsize, zmq::options_t::mechanism, mechanism, next_handshake_command(), next_msg, options, zmq::out_batch_size, outpos, outsize, peer_address, process_handshake_command(), process_identity_msg(), process_msg, protocol_error, pull_msg_from_session(), s, session, zmq::io_object_t::set_pollout(), signature_size, subscription_required, zmq::tcp_read(), tx_msg, zmq::options_t::type, v3_greeting_size, zmq::session_base_t::zap_enabled(), zmq_assert, ZMQ_CURVE, ZMQ_GSSAPI, ZMQ_NULL, ZMQ_PLAIN, ZMQ_PUB, ZMQ_XPUB, zmq::ZMTP_1_0, and zmq::ZMTP_2_0.
Referenced by in_event().
Definition at line 721 of file stream_engine.cpp.
References zmq::msg_t::data, errno_assert, zmq::options_t::identity, zmq::options_t::identity_size, zmq::msg_t::init_size(), next_msg, options, and pull_msg_from_session().
|
virtual |
Reimplemented from zmq::io_object_t.
Definition at line 284 of file stream_engine.cpp.
References connection_error, zmq::i_decoder::decode(), decoder, error(), zmq::session_base_t::flush(), zmq::i_decoder::get_buffer(), handle, handshake(), handshaking, inpos, input_stopped, insize, io_error, zmq::i_decoder::msg(), process_msg, protocol_error, zmq::io_object_t::reset_pollin(), zmq::i_decoder::resize_buffer(), zmq::io_object_t::rm_fd(), s, session, zmq::tcp_read(), unlikely, and zmq_assert.
Referenced by plug(), and restart_input().
|
private |
Definition at line 985 of file stream_engine.cpp.
References peer_address, and s.
Referenced by mechanism_ready(), and plug().
|
private |
Definition at line 808 of file stream_engine.cpp.
References zmq::io_object_t::add_timer(), errno_assert, zmq::session_base_t::flush(), zmq::mechanism_t::get_zap_properties(), zmq::mechanism_t::get_zmtp_properties(), has_heartbeat_timer, zmq::options_t::heartbeat_interval, heartbeat_ivl_timer_id, init_properties(), mechanism, metadata, next_msg, options, zmq::mechanism_t::peer_identity(), process_msg, pull_and_encode(), zmq::session_base_t::push_msg(), zmq::options_t::recv_identity, session, write_credential(), and zmq_assert.
Referenced by next_handshake_command(), and process_handshake_command().
Definition at line 753 of file stream_engine.cpp.
References zmq::msg_t::command, EPROTO, zmq::mechanism_t::error, mechanism, mechanism_ready(), zmq::mechanism_t::next_handshake_command(), pull_and_encode(), zmq::mechanism_t::ready, zmq::msg_t::set_flags(), zmq::mechanism_t::status(), and zmq_assert.
Referenced by handshake().
|
private |
|
virtual |
Reimplemented from zmq::io_object_t.
Definition at line 359 of file stream_engine.cpp.
References zmq::i_encoder::encode(), encoder, handle, handshaking, io_error, zmq::i_encoder::load_msg(), next_msg, zmq::out_batch_size, outpos, output_stopped, outsize, zmq::io_object_t::reset_pollout(), s, zmq::tcp_write(), tx_msg, unlikely, and zmq_assert.
Referenced by process_heartbeat_message(), restart_output(), and timer_event().
|
virtual |
Implements zmq::i_engine.
Definition at line 176 of file stream_engine.cpp.
References zmq::io_object_t::add_fd(), alloc_assert, zmq::msg_t::close(), decoder, encoder, zmq::session_base_t::flush(), greeting_send, handle, handshaking, zmq::options_t::identity_size, zmq::in_batch_size, in_event(), zmq::msg_t::init(), init_properties(), io_error, metadata, next_msg, options, zmq::out_batch_size, outpos, outsize, zmq::io_object_t::plug(), plugged, process_msg, pull_msg_from_session(), push_raw_msg_to_session(), zmq::put_uint64(), zmq::options_t::raw_notify, zmq::options_t::raw_socket, s, session, set_handshake_timer(), zmq::io_object_t::set_pollin(), zmq::io_object_t::set_pollout(), socket, and zmq_assert.
Definition at line 774 of file stream_engine.cpp.
References EPROTO, zmq::mechanism_t::error, mechanism, mechanism_ready(), output_stopped, zmq::mechanism_t::process_handshake_command(), zmq::mechanism_t::ready, restart_output(), zmq::mechanism_t::status(), and zmq_assert.
Referenced by handshake().
Definition at line 1062 of file stream_engine.cpp.
References zmq::io_object_t::add_timer(), zmq::msg_t::data, has_ttl_timer, heartbeat_ttl_timer_id, next_msg, out_event(), and produce_pong_message().
Referenced by decode_and_push().
Definition at line 731 of file stream_engine.cpp.
References zmq::msg_t::close(), errno_assert, zmq::msg_t::identity, zmq::msg_t::init(), options, process_msg, zmq::session_base_t::push_msg(), push_msg_to_session(), zmq::options_t::recv_identity, session, zmq::msg_t::set_flags(), subscription_required, and write_subscription_msg().
Referenced by handshake().
Definition at line 1022 of file stream_engine.cpp.
References zmq::io_object_t::add_timer(), zmq::msg_t::command, zmq::msg_t::data, zmq::mechanism_t::encode(), errno_assert, has_timeout_timer, heartbeat_timeout, heartbeat_timeout_timer_id, zmq::options_t::heartbeat_ttl, zmq::msg_t::init_size(), mechanism, next_msg, options, pull_and_encode(), zmq::msg_t::set_flags(), and zmq_assert.
Referenced by timer_event().
Definition at line 1046 of file stream_engine.cpp.
References zmq::msg_t::command, zmq::msg_t::data, zmq::mechanism_t::encode(), errno_assert, zmq::msg_t::init_size(), mechanism, next_msg, pull_and_encode(), zmq::msg_t::set_flags(), and zmq_assert.
Referenced by process_heartbeat_message().
Definition at line 888 of file stream_engine.cpp.
References zmq::mechanism_t::encode(), mechanism, zmq::session_base_t::pull_msg(), session, and zmq_assert.
Referenced by mechanism_ready(), next_handshake_command(), produce_ping_message(), and produce_pong_message().
Definition at line 849 of file stream_engine.cpp.
References zmq::session_base_t::pull_msg(), and session.
Referenced by handshake(), identity_msg(), and plug().
Definition at line 854 of file stream_engine.cpp.
References zmq::session_base_t::push_msg(), and session.
Referenced by process_identity_msg(), push_raw_msg_to_session(), and write_subscription_msg().
Definition at line 932 of file stream_engine.cpp.
References decode_and_push(), process_msg, zmq::session_base_t::push_msg(), and session.
Referenced by decode_and_push().
Definition at line 859 of file stream_engine.cpp.
References metadata, zmq::msg_t::metadata, push_msg_to_session(), and zmq::msg_t::set_metadata().
Referenced by plug().
|
private |
|
virtual |
Implements zmq::i_engine.
Definition at line 439 of file stream_engine.cpp.
References connection_error, zmq::i_decoder::decode(), decoder, error(), zmq::session_base_t::flush(), handle, in_event(), inpos, input_stopped, insize, io_error, zmq::i_decoder::msg(), process_msg, protocol_error, session, zmq::io_object_t::set_pollin(), and zmq_assert.
Referenced by zap_msg_available().
|
virtual |
Implements zmq::i_engine.
Definition at line 422 of file stream_engine.cpp.
References handle, io_error, likely, out_event(), output_stopped, zmq::io_object_t::set_pollout(), and unlikely.
Referenced by process_handshake_command(), and zap_msg_available().
|
private |
Definition at line 975 of file stream_engine.cpp.
References zmq::io_object_t::add_timer(), zmq::options_t::handshake_ivl, handshake_timer_id, has_handshake_timer, options, zmq::options_t::raw_socket, and zmq_assert.
Referenced by plug().
|
virtual |
Implements zmq::i_engine.
Definition at line 278 of file stream_engine.cpp.
References unplug().
|
virtual |
Reimplemented from zmq::io_object_t.
Definition at line 997 of file stream_engine.cpp.
References zmq::io_object_t::add_timer(), error(), handshake_timer_id, has_handshake_timer, has_timeout_timer, has_ttl_timer, zmq::options_t::heartbeat_interval, heartbeat_ivl_timer_id, heartbeat_timeout_timer_id, heartbeat_ttl_timer_id, next_msg, options, out_event(), produce_ping_message(), and timeout_error.
|
private |
Definition at line 243 of file stream_engine.cpp.
References zmq::io_object_t::cancel_timer(), handle, handshake_timer_id, has_handshake_timer, has_heartbeat_timer, has_timeout_timer, has_ttl_timer, heartbeat_ivl_timer_id, heartbeat_timeout_timer_id, heartbeat_ttl_timer_id, io_error, plugged, zmq::io_object_t::rm_fd(), session, zmq::io_object_t::unplug(), and zmq_assert.
Referenced by error(), and terminate().
Definition at line 865 of file stream_engine.cpp.
References zmq::msg_t::close(), zmq::msg_t::credential, zmq::msg_t::data, decode_and_push(), errno_assert, zmq::mechanism_t::get_user_id(), zmq::msg_t::init_size(), mechanism, process_msg, zmq::session_base_t::push_msg(), session, zmq::msg_t::set_flags(), and zmq_assert.
Referenced by mechanism_ready().
Definition at line 940 of file stream_engine.cpp.
References zmq::msg_t::data, errno_assert, zmq::msg_t::init_size(), process_msg, zmq::session_base_t::push_msg(), push_msg_to_session(), and session.
Referenced by process_identity_msg().
|
virtual |
Implements zmq::i_engine.
Definition at line 793 of file stream_engine.cpp.
References error(), input_stopped, mechanism, output_stopped, protocol_error, restart_input(), restart_output(), zmq::mechanism_t::zap_msg_available(), and zmq_assert.
|
private |
Definition at line 138 of file stream_engine.hpp.
|
private |
Definition at line 146 of file stream_engine.hpp.
Referenced by handshake(), in_event(), plug(), restart_input(), and ~stream_engine_t().
|
private |
Definition at line 150 of file stream_engine.hpp.
Referenced by handshake(), out_event(), plug(), and ~stream_engine_t().
|
private |
Definition at line 184 of file stream_engine.hpp.
Referenced by error().
|
private |
Definition at line 176 of file stream_engine.hpp.
Referenced by handshake().
|
private |
Definition at line 172 of file stream_engine.hpp.
Referenced by handshake().
|
private |
Definition at line 173 of file stream_engine.hpp.
Referenced by handshake(), and plug().
|
private |
Definition at line 169 of file stream_engine.hpp.
Referenced by handshake().
|
private |
Definition at line 142 of file stream_engine.hpp.
Referenced by handshake(), in_event(), out_event(), plug(), restart_input(), restart_output(), and unplug().
|
private |
Definition at line 158 of file stream_engine.hpp.
Referenced by handshake(), in_event(), out_event(), and plug().
|
private |
Definition at line 211 of file stream_engine.hpp.
Referenced by handshake(), set_handshake_timer(), timer_event(), and unplug().
|
private |
Definition at line 221 of file stream_engine.hpp.
Referenced by mechanism_ready(), and unplug().
|
private |
Definition at line 220 of file stream_engine.hpp.
Referenced by decode_and_push(), produce_ping_message(), timer_event(), and unplug().
|
private |
Definition at line 219 of file stream_engine.hpp.
Referenced by decode_and_push(), process_heartbeat_message(), timer_event(), and unplug().
|
private |
Definition at line 222 of file stream_engine.hpp.
Referenced by produce_ping_message(), and stream_engine_t().
|
private |
Definition at line 144 of file stream_engine.hpp.
Referenced by handshake(), in_event(), and restart_input().
|
private |
Definition at line 202 of file stream_engine.hpp.
Referenced by in_event(), restart_input(), and zap_msg_available().
|
private |
Definition at line 145 of file stream_engine.hpp.
Referenced by handshake(), in_event(), and restart_input().
|
private |
Definition at line 192 of file stream_engine.hpp.
Referenced by in_event(), out_event(), plug(), restart_input(), restart_output(), and unplug().
|
private |
Definition at line 199 of file stream_engine.hpp.
Referenced by decode_and_push(), handshake(), mechanism_ready(), next_handshake_command(), process_handshake_command(), produce_ping_message(), produce_pong_message(), pull_and_encode(), write_credential(), zap_msg_available(), and ~stream_engine_t().
|
private |
Definition at line 153 of file stream_engine.hpp.
Referenced by decode_and_push(), mechanism_ready(), plug(), push_raw_msg_to_session(), and ~stream_engine_t().
Definition at line 188 of file stream_engine.hpp.
Referenced by handshake(), identity_msg(), mechanism_ready(), out_event(), plug(), process_heartbeat_message(), produce_ping_message(), produce_pong_message(), and timer_event().
|
private |
Definition at line 181 of file stream_engine.hpp.
Referenced by error(), handshake(), identity_msg(), mechanism_ready(), plug(), process_identity_msg(), produce_ping_message(), set_handshake_timer(), stream_engine_t(), and timer_event().
|
private |
Definition at line 148 of file stream_engine.hpp.
Referenced by handshake(), out_event(), and plug().
|
private |
Definition at line 205 of file stream_engine.hpp.
Referenced by out_event(), process_handshake_command(), restart_output(), and zap_msg_available().
|
private |
Definition at line 149 of file stream_engine.hpp.
Referenced by handshake(), out_event(), and plug().
|
private |
Definition at line 227 of file stream_engine.hpp.
Referenced by handshake(), init_properties(), and stream_engine_t().
|
private |
Definition at line 186 of file stream_engine.hpp.
Referenced by plug(), unplug(), and ~stream_engine_t().
Definition at line 190 of file stream_engine.hpp.
Referenced by decode_and_push(), error(), handshake(), in_event(), mechanism_ready(), plug(), process_identity_msg(), push_one_then_decode_and_push(), restart_input(), write_credential(), and write_subscription_msg().
|
private |
Definition at line 135 of file stream_engine.hpp.
Referenced by error(), handshake(), in_event(), init_properties(), out_event(), plug(), stream_engine_t(), and ~stream_engine_t().
|
private |
Definition at line 179 of file stream_engine.hpp.
Referenced by decode_and_push(), error(), handshake(), in_event(), mechanism_ready(), plug(), process_identity_msg(), pull_and_encode(), pull_msg_from_session(), push_msg_to_session(), push_one_then_decode_and_push(), restart_input(), unplug(), write_credential(), and write_subscription_msg().
|
staticprivate |
Definition at line 160 of file stream_engine.hpp.
Referenced by handshake().
|
private |
Definition at line 225 of file stream_engine.hpp.
|
private |
Definition at line 197 of file stream_engine.hpp.
Referenced by handshake(), and process_identity_msg().
|
private |
Definition at line 140 of file stream_engine.hpp.
Referenced by handshake(), out_event(), stream_engine_t(), and ~stream_engine_t().
|
staticprivate |
Definition at line 163 of file stream_engine.hpp.
|
staticprivate |
Definition at line 166 of file stream_engine.hpp.
Referenced by handshake().