32 #include "platform.hpp" 62 const std::string &endpoint_) :
74 greeting_size (v2_greeting_size),
75 greeting_bytes_read (0),
83 subscription_required (false),
85 input_stopped (false),
86 output_stopped (false),
87 has_handshake_timer (false),
88 has_ttl_timer (false),
89 has_timeout_timer (false),
90 has_heartbeat_timer (false),
91 heartbeat_timeout (0),
103 #if defined ZMQ_HAVE_SO_PEERCRED 105 if (family == PF_UNIX) {
107 socklen_t
size =
sizeof (cred);
108 if (!getsockopt (
s, SOL_SOCKET, SO_PEERCRED, &cred, &size)) {
109 std::ostringstream buf;
110 buf <<
":" << cred.uid <<
":" << cred.gid <<
":" << cred.pid;
114 #elif defined ZMQ_HAVE_LOCAL_PEERCRED 116 if (family == PF_UNIX) {
118 socklen_t
size =
sizeof (cred);
119 if (!getsockopt (
s, 0, LOCAL_PEERCRED, &cred, &size)
120 && cred.cr_version == XUCRED_VERSION) {
121 std::ostringstream buf;
122 buf <<
":" << cred.cr_uid <<
":";
123 if (cred.cr_ngroups > 0)
124 buf << cred.cr_groups[0];
135 rc = setsockopt (
s, SOL_SOCKET, SO_NOSIGPIPE, &
set,
sizeof (
int));
150 #ifdef ZMQ_HAVE_WINDOWS 151 int rc = closesocket (
s);
152 wsa_assert (rc != SOCKET_ERROR);
234 outpos [outsize++] = 0x7f;
325 insize = static_cast <
size_t> (rc);
331 size_t processed = 0;
338 if (rc == 0 || rc == -1)
348 if (errno != EAGAIN) {
382 size_t n =
encoder->
encode (&bufptr, out_batch_size - outsize);
455 size_t processed = 0;
460 if (rc == 0 || rc == -1)
467 if (rc == -1 && errno == EAGAIN)
568 const size_t revision_pos = 10;
590 unsigned char tmp [10], *bufferp = tmp;
598 size_t buffer_size =
encoder->
encode (&bufferp, header_size);
657 && memcmp (
greeting_recv + 12,
"NULL\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20) == 0) {
664 && memcmp (
greeting_recv + 12,
"PLAIN\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20) == 0) {
673 #ifdef ZMQ_HAVE_CURVE 676 && memcmp (
greeting_recv + 12,
"CURVE\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20) == 0) {
685 #ifdef HAVE_LIBGSSAPI_KRB5 688 && memcmp (
greeting_recv + 12,
"GSSAPI\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20) == 0) {
739 int rc = msg_->
close ();
819 if (rc == -1 && errno == EAGAIN) {
838 properties.insert(zap_properties.begin (), zap_properties.end ());
842 properties.insert(zmtp_properties.begin (), zmtp_properties.end ());
845 if (!properties.empty ())
871 if (credential.size () > 0) {
873 int rc = msg.
init_size (credential.size ());
875 memcpy (msg.
data (), credential.data (), credential.size ());
917 uint8_t cmd_id = *((uint8_t*)msg_->
data());
948 *(
unsigned char*) subscription.
data () = 1;
987 properties.insert (std::make_pair(
"Peer-Address",
peer_address));
990 std::ostringstream stream;
992 std::string fd_string = stream.str();
993 properties.insert(std::make_pair(
"__fd", fd_string));
1032 memcpy(msg_->
data(),
"\4PING", 5);
1035 memcpy(((uint8_t*)msg_->
data()) + 5, &ttl_val,
sizeof(ttl_val));
1055 memcpy(msg_->
data(),
"\4PONG", 5);
1064 if(memcmp(msg_->
data(),
"\4PING", 5) == 0) {
1065 uint16_t remote_heartbeat_ttl;
1067 memcpy(&remote_heartbeat_ttl, (uint8_t*)msg_->
data() + 5, 2);
1068 remote_heartbeat_ttl = ntohs(remote_heartbeat_ttl);
1071 remote_heartbeat_ttl *= 100;
#define LIBZMQ_DELETE(p_object)
static const size_t v3_greeting_size
int process_handshake_command(msg_t *msg)
unsigned char greeting_send[v3_greeting_size]
void set_pollout(handle_t handle_)
int produce_ping_message(msg_t *msg_)
int process_identity_msg(msg_t *msg_)
void plug(zmq::io_thread_t *io_thread_)
void unblock_socket(fd_t s_)
virtual int push_msg(msg_t *msg_)
void cancel_timer(int id_)
int pull_msg_from_session(msg_t *msg_)
int decode_and_push(msg_t *msg_)
int produce_pong_message(msg_t *msg_)
void engine_error(zmq::stream_engine_t::error_reason_t reason)
virtual int process_handshake_command(msg_t *msg_)=0
zmq::socket_base_t * socket
stream_engine_t(fd_t fd_, const options_t &options_, const std::string &endpoint)
void plug(zmq::io_thread_t *io_thread_, zmq::session_base_t *session_)
int init_size(size_t size_)
virtual status_t status() const =0
int(stream_engine_t::* process_msg)(msg_t *msg_)
void set_pollin(handle_t handle_)
int(stream_engine_t::* next_msg)(msg_t *msg_)
void error(error_reason_t reason)
int write_credential(msg_t *msg_)
unsigned int greeting_bytes_read
int get_peer_ip_address(fd_t sockfd_, std::string &ip_addr_)
virtual void get_buffer(unsigned char **data_, size_t *size_)=0
virtual int encode(msg_t *)
unsigned char identity[256]
std::basic_string< unsigned char > blob_t
virtual int pull_msg(msg_t *msg_)
metadata_t::dict_t properties_t
void reset_pollout(handle_t handle_)
virtual int decode(const unsigned char *data_, size_t size_, size_t &processed)=0
void add_timer(int timout_, int id_)
int next_handshake_command(msg_t *msg)
bool init_properties(properties_t &properties)
void peer_identity(msg_t *msg_)
void set_handshake_timer()
int identity_msg(msg_t *msg_)
void put_uint64(unsigned char *buffer_, uint64_t value)
int push_raw_msg_to_session(msg_t *msg)
virtual int zap_msg_available()
void reset_pollin(handle_t handle_)
int tcp_write(fd_t s_, const void *data_, size_t size_)
void set_flags(unsigned char flags_)
void timer_event(int id_)
void event_disconnected(const std::string &addr_, int fd_)
unsigned char greeting_recv[v3_greeting_size]
int push_msg_to_session(msg_t *msg)
int process_heartbeat_message(msg_t *msg_)
virtual void load_msg(msg_t *msg_)=0
unsigned char data[max_vsm_size]
virtual int next_handshake_command(msg_t *msg_)=0
static char encoder[85+1]
handle_t add_fd(fd_t fd_)
virtual int decode(msg_t *)
virtual void resize_buffer(size_t)=0
zmq::session_base_t * session
bool subscription_required
virtual size_t encode(unsigned char **data_, size_t size)=0
unsigned char identity_size
int write_subscription_msg(msg_t *msg_)
int push_one_then_decode_and_push(msg_t *msg_)
void rm_fd(handle_t handle_)
static const size_t signature_size
blob_t get_user_id() const
int pull_and_encode(msg_t *msg_)
int tcp_read(fd_t s_, void *data_, size_t size_)
const metadata_t::dict_t & get_zap_properties()
const metadata_t::dict_t & get_zmtp_properties()
static uint8_t decoder[96]
void set_metadata(metadata_t *metadata_)