40 #if defined ZMQ_POLL_BASED_ON_POLL 50 #include "../include/zmq.h" 60 int rc = ctrl.
init ();
63 rc = ctrl.
copy (msg_);
82 int rc = from_->
recv (&msg_, 0);
92 rc =
capture(capture_, msg_, more);
112 int rc = msg.
init ();
126 int qt_poll_items = (control_ ? 3 : 2);
139 while (state != terminated) {
141 rc =
zmq_poll (&items [0], qt_poll_items, -1);
148 if (frontend_ != backend_) {
149 rc =
zmq_poll (&itemsout [0], 2, 0);
156 if (control_ && items [2].revents &
ZMQ_POLLIN) {
157 rc = control_->
recv (&msg, 0);
161 moresz =
sizeof more;
171 if (msg.
size () == 5 && memcmp (msg.
data (),
"PAUSE", 5) == 0)
174 if (msg.
size () == 6 && memcmp (msg.
data (),
"RESUME", 6) == 0)
177 if (msg.
size () == 9 && memcmp (msg.
data (),
"TERMINATE", 9) == 0)
181 puts (
"E: invalid command sent to proxy");
187 && items [0].revents & ZMQ_POLLIN
188 && (frontend_ == backend_ || itemsout [1].revents &
ZMQ_POLLOUT)) {
189 rc =
forward(frontend_, backend_, capture_,msg);
195 && frontend_ != backend_
196 && items [1].revents & ZMQ_POLLIN
197 && itemsout [0].revents & ZMQ_POLLOUT) {
198 rc =
forward(backend_, frontend_, capture_,msg);
int capture(class zmq::socket_base_t *capture_, zmq::msg_t &msg_, int more_=0)
int forward(class zmq::socket_base_t *from_, class zmq::socket_base_t *to_, class zmq::socket_base_t *capture_, zmq::msg_t &msg_)
int getsockopt(int option_, void *optval_, size_t *optvallen_)
int send(zmq::msg_t *msg_, int flags_)
int proxy(class socket_base_t *frontend_, class socket_base_t *backend_, class socket_base_t *capture_, class socket_base_t *control_=NULL)
ZMQ_EXPORT int zmq_poll(zmq_pollitem_t *items, int nitems, long timeout)
unsigned char data[max_vsm_size]
int recv(zmq::msg_t *msg_, int flags_)