42 int hwms_ [2],
bool conflate_ [2])
52 upipe1 =
new (std::nothrow) upipe_conflate_t ();
54 upipe1 =
new (std::nothrow) upipe_normal_t ();
59 upipe2 =
new (std::nothrow) upipe_conflate_t ();
61 upipe2 =
new (std::nothrow) upipe_normal_t ();
64 pipes_ [0] =
new (std::nothrow)
pipe_t (parents_ [0], upipe1, upipe2,
65 hwms_ [1], hwms_ [0], conflate_ [0]);
67 pipes_ [1] =
new (std::nothrow)
pipe_t (parents_ [1], upipe2, upipe1,
68 hwms_ [0], hwms_ [1], conflate_ [1]);
78 int inhwm_,
int outhwm_,
bool conflate_) :
85 lwm (compute_lwm (inhwm_)),
184 const unsigned char *data = static_cast <
const unsigned char *> (msg_->
data ());
186 const int rc = msg_->
close ();
229 if (!more && !is_identity)
242 int rc = msg.
close ();
287 int rc = msg.
close ();
369 int rc = msg.
close ();
467 int result = (hwm_ + 1) / 2;
515 if (inhwm_ <= 0 || inhwmboost <= 0)
518 if (outhwm_ <= 0 || outhwmboost <= 0)
virtual bool unwrite(T *value_)=0
#define LIBZMQ_DELETE(p_object)
virtual bool read(T *value_)=0
virtual void hiccuped(zmq::pipe_t *pipe_)=0
virtual bool check_read()=0
int pipepair(zmq::object_t *parents_[2], zmq::pipe_t *pipes_[2], int hwms_[2], bool conflate_[2])
void set_hwms(int inhwm_, int outhwm_)
blob_t get_credential() const
void process_activate_read()
void send_pipe_term_ack(zmq::pipe_t *destination_)
void set_routing_id(uint32_t routing_id_)
uint32_t get_routing_id()
pipe_t(object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_, int inhwm_, int outhwm_, bool conflate_)
void set_hwms_boost(int inhwmboost_, int outhwmboost_)
void process_hiccup(void *pipe_)
void process_activate_write(uint64_t msgs_read_)
virtual void write_activated(zmq::pipe_t *pipe_)=0
std::basic_string< unsigned char > blob_t
void send_activate_write(zmq::pipe_t *destination_, uint64_t msgs_read_)
virtual void pipe_terminated(zmq::pipe_t *pipe_)=0
static int compute_lwm(int hwm_)
void set_peer(pipe_t *pipe_)
void set_event_sink(i_pipe_events *sink_)
virtual void read_activated(zmq::pipe_t *pipe_)=0
static bool is_delimiter(const msg_t &msg_)
void terminate(bool delay_)
virtual bool probe(bool(*fn)(const T &))=0
virtual void write(const T &value_, bool incomplete_)=0
void send_activate_read(zmq::pipe_t *destination_)
unsigned char data[max_vsm_size]
void set_identity(const blob_t &identity_)
bool is_delimiter() const
void send_pipe_term(zmq::pipe_t *destination_)
enum zmq::pipe_t::@49 state
void send_hiccup(zmq::pipe_t *destination_, void *pipe_)
void process_pipe_term_ack()
bool is_credential() const