58 switch (options_.
type) {
61 socket_, options_, addr_);
65 socket_, options_, addr_);
69 socket_, options_, addr_);
87 socket_, options_, addr_);
100 own_t (io_thread_, options_),
159 int rc = msg_->
init ();
196 const int rc = msg_->
init ();
223 int rc = msg.
init ();
325 if (peer.
socket == NULL) {
339 pipe_t *new_pipes [2] = {NULL, NULL};
340 int hwms [2] = {0, 0};
341 bool conflates [2] = {
false,
false};
342 int rc =
pipepair (parents, new_pipes, hwms, conflates);
381 pipe_t *pipes [2] = {NULL, NULL};
392 bool conflates [2] = {conflate, conflate};
393 int rc =
pipepair (parents, pipes, hwms, conflates);
537 address_t *proxy_address =
new (std::nothrow)
542 io_thread,
this,
options,
addr, proxy_address, wait_);
555 #if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS 564 #if defined ZMQ_HAVE_TIPC 566 tipc_connecter_t *connecter =
new (std::nothrow) tipc_connecter_t (
592 int rc = engine->
init (
addr, send, recv);
600 #ifdef ZMQ_HAVE_OPENPGM 609 bool const udp_encapsulation =
addr->
protocol ==
"epgm";
617 pgm_sender_t *pgm_sender =
new (std::nothrow) pgm_sender_t (
621 int rc = pgm_sender->init (udp_encapsulation,
addr->
address.c_str ());
629 pgm_receiver_t *pgm_receiver =
new (std::nothrow) pgm_receiver_t (
633 int rc = pgm_receiver->init (udp_encapsulation,
addr->
address.c_str ());
651 norm_engine_t* norm_sender =
new (std::nothrow) norm_engine_t(io_thread,
options);
654 int rc = norm_sender->init (
addr->
address.c_str (),
true,
false);
662 norm_engine_t* norm_receiver =
new (std::nothrow) norm_engine_t (io_thread,
options);
665 int rc = norm_receiver->init (
addr->
address.c_str (),
false,
true);
672 #endif // ZMQ_HAVE_NORM 674 #if defined ZMQ_HAVE_VMCI 676 vmci_connecter_t *connecter =
new (std::nothrow) vmci_connecter_t (
virtual void terminate()=0
static session_base_t * create(zmq::io_thread_t *io_thread_, bool active_, zmq::socket_base_t *socket_, const options_t &options_, address_t *addr_)
#define LIBZMQ_DELETE(p_object)
void send_attach(zmq::session_base_t *destination_, zmq::i_engine *engine_, bool inc_seqnum_=true)
virtual void plug(zmq::io_thread_t *io_thread_, class session_base_t *session_)=0
void process_term(int linger_)
void attach_pipe(zmq::pipe_t *pipe_)
int pipepair(zmq::object_t *parents_[2], zmq::pipe_t *pipes_[2], int hwms_[2], bool conflate_[2])
virtual ~session_base_t()
std::string socks_proxy_address
const std::string address
virtual int push_msg(msg_t *msg_)
void cancel_timer(int id_)
zmq::io_thread_t * choose_io_thread(uint64_t affinity_)
int init(address_t *address_, bool send_, bool recv_)
void start_connecting(bool wait_)
void engine_error(zmq::stream_engine_t::error_reason_t reason)
void process_attach(zmq::i_engine *engine_)
int write_zap_msg(msg_t *msg_)
socket_base_t * get_socket()
virtual void restart_input()=0
const std::string protocol
void send_bind(zmq::own_t *destination_, zmq::pipe_t *pipe_, bool inc_seqnum_=true)
int read_zap_msg(msg_t *msg_)
void write_activated(zmq::pipe_t *pipe_)
virtual int pull_msg(msg_t *msg_)
virtual void zap_msg_available()=0
void timer_event(int id_)
void add_timer(int timout_, int id_)
session_base_t(zmq::io_thread_t *io_thread_, bool active_, zmq::socket_base_t *socket_, const options_t &options_, address_t *addr_)
void set_event_sink(i_pipe_events *sink_)
zmq::socket_base_t * socket
void process_term(int linger_)
void terminate(bool delay_)
void launch_child(own_t *object_)
void pipe_terminated(zmq::pipe_t *pipe_)
void read_activated(zmq::pipe_t *pipe_)
std::set< pipe_t * > terminating_pipes
virtual void restart_output()=0
zmq::endpoint_t find_endpoint(const char *addr_)
void hiccuped(zmq::pipe_t *pipe_)
zmq::io_thread_t * io_thread