libzmq  master
ZeroMQ C++ Core Engine (LIBZMQ)
session_base.cpp
Go to the documentation of this file.
1 /*
2  Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 
4  This file is part of libzmq, the ZeroMQ core engine in C++.
5 
6  libzmq is free software; you can redistribute it and/or modify it under
7  the terms of the GNU Lesser General Public License (LGPL) as published
8  by the Free Software Foundation; either version 3 of the License, or
9  (at your option) any later version.
10 
11  As a special exception, the Contributors give you permission to link
12  this library with independent modules to produce an executable,
13  regardless of the license terms of these independent modules, and to
14  copy and distribute the resulting executable under terms of your choice,
15  provided that you also meet, for each linked independent module, the
16  terms and conditions of the license of that module. An independent
17  module is a module which is not derived from or based on this library.
18  If you modify this library, you must extend this exception to your
19  version of the library.
20 
21  libzmq is distributed in the hope that it will be useful, but WITHOUT
22  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23  FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24  License for more details.
25 
26  You should have received a copy of the GNU Lesser General Public License
27  along with this program. If not, see <http://www.gnu.org/licenses/>.
28 */
29 
30 #include "precompiled.hpp"
31 #include "macros.hpp"
32 #include "session_base.hpp"
33 #include "i_engine.hpp"
34 #include "err.hpp"
35 #include "pipe.hpp"
36 #include "likely.hpp"
37 #include "tcp_connecter.hpp"
38 #include "ipc_connecter.hpp"
39 #include "tipc_connecter.hpp"
40 #include "socks_connecter.hpp"
41 #include "vmci_connecter.hpp"
42 #include "pgm_sender.hpp"
43 #include "pgm_receiver.hpp"
44 #include "address.hpp"
45 #include "norm_engine.hpp"
46 #include "udp_engine.hpp"
47 
48 #include "ctx.hpp"
49 #include "req.hpp"
50 #include "radio.hpp"
51 #include "dish.hpp"
52 
54  bool active_, class socket_base_t *socket_, const options_t &options_,
55  address_t *addr_)
56 {
57  session_base_t *s = NULL;
58  switch (options_.type) {
59  case ZMQ_REQ:
60  s = new (std::nothrow) req_session_t (io_thread_, active_,
61  socket_, options_, addr_);
62  break;
63  case ZMQ_RADIO:
64  s = new (std::nothrow) radio_session_t (io_thread_, active_,
65  socket_, options_, addr_);
66  break;
67  case ZMQ_DISH:
68  s = new (std::nothrow) dish_session_t (io_thread_, active_,
69  socket_, options_, addr_);
70  break;
71  case ZMQ_DEALER:
72  case ZMQ_REP:
73  case ZMQ_ROUTER:
74  case ZMQ_PUB:
75  case ZMQ_XPUB:
76  case ZMQ_SUB:
77  case ZMQ_XSUB:
78  case ZMQ_PUSH:
79  case ZMQ_PULL:
80  case ZMQ_PAIR:
81  case ZMQ_STREAM:
82  case ZMQ_SERVER:
83  case ZMQ_CLIENT:
84  case ZMQ_GATHER:
85  case ZMQ_SCATTER:
86  s = new (std::nothrow) session_base_t (io_thread_, active_,
87  socket_, options_, addr_);
88  break;
89  default:
90  errno = EINVAL;
91  return NULL;
92  }
93  alloc_assert (s);
94  return s;
95 }
96 
98  bool active_, class socket_base_t *socket_, const options_t &options_,
99  address_t *addr_) :
100  own_t (io_thread_, options_),
101  io_object_t (io_thread_),
102  active (active_),
103  pipe (NULL),
104  zap_pipe (NULL),
105  incomplete_in (false),
106  pending (false),
107  engine (NULL),
108  socket (socket_),
109  io_thread (io_thread_),
110  has_linger_timer (false),
111  addr (addr_)
112 {
113 }
114 
116 {
117  zmq_assert (!pipe);
118  zmq_assert (!zap_pipe);
119 
120  // If there's still a pending linger timer, remove it.
121  if (has_linger_timer) {
123  has_linger_timer = false;
124  }
125 
126  // Close the engine.
127  if (engine)
128  engine->terminate ();
129 
131 }
132 
134 {
136  zmq_assert (!pipe);
137  zmq_assert (pipe_);
138  pipe = pipe_;
139  pipe->set_event_sink (this);
140 }
141 
143 {
144  if (!pipe || !pipe->read (msg_)) {
145  errno = EAGAIN;
146  return -1;
147  }
148 
149  incomplete_in = msg_->flags () & msg_t::more ? true : false;
150 
151  return 0;
152 }
153 
155 {
156  if(msg_->flags() & msg_t::command)
157  return 0;
158  if (pipe && pipe->write (msg_)) {
159  int rc = msg_->init ();
160  errno_assert (rc == 0);
161  return 0;
162  }
163 
164  errno = EAGAIN;
165  return -1;
166 }
167 
169 {
170  if (zap_pipe == NULL) {
171  errno = ENOTCONN;
172  return -1;
173  }
174 
175  if (!zap_pipe->read (msg_)) {
176  errno = EAGAIN;
177  return -1;
178  }
179 
180  return 0;
181 }
182 
184 {
185  if (zap_pipe == NULL) {
186  errno = ENOTCONN;
187  return -1;
188  }
189 
190  const bool ok = zap_pipe->write (msg_);
191  zmq_assert (ok);
192 
193  if ((msg_->flags () & msg_t::more) == 0)
194  zap_pipe->flush ();
195 
196  const int rc = msg_->init ();
197  errno_assert (rc == 0);
198  return 0;
199 }
200 
202 {
203 }
204 
206 {
207  if (pipe)
208  pipe->flush ();
209 }
210 
212 {
213  zmq_assert (pipe != NULL);
214 
215  // Get rid of half-processed messages in the out pipe. Flush any
216  // unflushed messages upstream.
217  pipe->rollback ();
218  pipe->flush ();
219 
220  // Remove any half-read message from the in pipe.
221  while (incomplete_in) {
222  msg_t msg;
223  int rc = msg.init ();
224  errno_assert (rc == 0);
225  rc = pull_msg (&msg);
226  errno_assert (rc == 0);
227  rc = msg.close ();
228  errno_assert (rc == 0);
229  }
230 }
231 
233 {
234  // Drop the reference to the deallocated pipe if required.
235  zmq_assert (pipe_ == pipe
236  || pipe_ == zap_pipe
237  || terminating_pipes.count (pipe_) == 1);
238 
239  if (pipe_ == pipe) {
240  // If this is our current pipe, remove it
241  pipe = NULL;
242  if (has_linger_timer) {
244  has_linger_timer = false;
245  }
246  }
247  else
248  if (pipe_ == zap_pipe)
249  zap_pipe = NULL;
250  else
251  // Remove the pipe from the detached pipes set
252  terminating_pipes.erase (pipe_);
253 
254  if (!is_terminating () && options.raw_socket) {
255  if (engine) {
256  engine->terminate ();
257  engine = NULL;
258  }
259  terminate ();
260  }
261 
262  // If we are waiting for pending messages to be sent, at this point
263  // we are sure that there will be no more messages and we can proceed
264  // with termination safely.
265  if (pending && !pipe && !zap_pipe && terminating_pipes.empty ()) {
266  pending = false;
268  }
269 }
270 
272 {
273  // Skip activating if we're detaching this pipe
274  if (unlikely (pipe_ != pipe && pipe_ != zap_pipe)) {
275  zmq_assert (terminating_pipes.count (pipe_) == 1);
276  return;
277  }
278 
279  if (unlikely (engine == NULL)) {
280  pipe->check_read ();
281  return;
282  }
283 
284  if (likely (pipe_ == pipe))
286  else
288 }
289 
291 {
292  // Skip activating if we're detaching this pipe
293  if (pipe != pipe_) {
294  zmq_assert (terminating_pipes.count (pipe_) == 1);
295  return;
296  }
297 
298  if (engine)
299  engine->restart_input ();
300 }
301 
303 {
304  // Hiccups are always sent from session to socket, not the other
305  // way round.
306  zmq_assert (false);
307 }
308 
310 {
311  return socket;
312 }
313 
315 {
316  if (active)
317  start_connecting (false);
318 }
319 
321 {
322  zmq_assert (zap_pipe == NULL);
323 
324  endpoint_t peer = find_endpoint ("inproc://zeromq.zap.01");
325  if (peer.socket == NULL) {
326  errno = ECONNREFUSED;
327  return -1;
328  }
329  if (peer.options.type != ZMQ_REP
330  && peer.options.type != ZMQ_ROUTER
331  && peer.options.type != ZMQ_SERVER) {
332  errno = ECONNREFUSED;
333  return -1;
334  }
335 
336  // Create a bi-directional pipe that will connect
337  // session with zap socket.
338  object_t *parents [2] = {this, peer.socket};
339  pipe_t *new_pipes [2] = {NULL, NULL};
340  int hwms [2] = {0, 0};
341  bool conflates [2] = {false, false};
342  int rc = pipepair (parents, new_pipes, hwms, conflates);
343  errno_assert (rc == 0);
344 
345  // Attach local end of the pipe to this socket object.
346  zap_pipe = new_pipes [0];
347  zap_pipe->set_nodelay ();
348  zap_pipe->set_event_sink (this);
349 
350  send_bind (peer.socket, new_pipes [1], false);
351 
352  // Send empty identity if required by the peer.
353  if (peer.options.recv_identity) {
354  msg_t id;
355  rc = id.init ();
356  errno_assert (rc == 0);
357  id.set_flags (msg_t::identity);
358  bool ok = zap_pipe->write (&id);
359  zmq_assert (ok);
360  zap_pipe->flush ();
361  }
362 
363  return 0;
364 }
365 
367 {
368  return (
370  (options.mechanism == ZMQ_NULL && options.zap_domain.length() > 0)
371  );
372 }
373 
375 {
376  zmq_assert (engine_ != NULL);
377 
378  // Create the pipe if it does not exist yet.
379  if (!pipe && !is_terminating ()) {
380  object_t *parents [2] = {this, socket};
381  pipe_t *pipes [2] = {NULL, NULL};
382 
383  bool conflate = options.conflate &&
384  (options.type == ZMQ_DEALER ||
385  options.type == ZMQ_PULL ||
386  options.type == ZMQ_PUSH ||
387  options.type == ZMQ_PUB ||
388  options.type == ZMQ_SUB);
389 
390  int hwms [2] = {conflate? -1 : options.rcvhwm,
391  conflate? -1 : options.sndhwm};
392  bool conflates [2] = {conflate, conflate};
393  int rc = pipepair (parents, pipes, hwms, conflates);
394  errno_assert (rc == 0);
395 
396  // Plug the local end of the pipe.
397  pipes [0]->set_event_sink (this);
398 
399  // Remember the local end of the pipe.
400  zmq_assert (!pipe);
401  pipe = pipes [0];
402 
403  // Ask socket to plug into the remote end of the pipe.
404  send_bind (socket, pipes [1]);
405  }
406 
407  // Plug in the engine.
408  zmq_assert (!engine);
409  engine = engine_;
410  engine->plug (io_thread, this);
411 }
412 
415 {
416  // Engine is dead. Let's forget about it.
417  engine = NULL;
418 
419  // Remove any half-done messages from the pipes.
420  if (pipe)
421  clean_pipes ();
422 
424  || reason == stream_engine_t::timeout_error
425  || reason == stream_engine_t::protocol_error);
426 
427  switch (reason) {
430  if (active)
431  reconnect ();
432  else
433  terminate ();
434  break;
436  terminate ();
437  break;
438  }
439 
440  // Just in case there's only a delimiter in the pipe.
441  if (pipe)
442  pipe->check_read ();
443 
444  if (zap_pipe)
445  zap_pipe->check_read ();
446 }
447 
449 {
450  zmq_assert (!pending);
451 
452  // If the termination of the pipe happens before the term command is
453  // delivered there's nothing much to do. We can proceed with the
454  // standard termination immediately.
455  if (!pipe && !zap_pipe && terminating_pipes.empty ()) {
457  return;
458  }
459 
460  pending = true;
461 
462  if (pipe != NULL) {
463  // If there's finite linger value, delay the termination.
464  // If linger is infinite (negative) we don't even have to set
465  // the timer.
466  if (linger_ > 0) {
468  add_timer (linger_, linger_timer_id);
469  has_linger_timer = true;
470  }
471 
472  // Start pipe termination process. Delay the termination till all messages
473  // are processed in case the linger time is non-zero.
474  pipe->terminate (linger_ != 0);
475 
476  // TODO: Should this go into pipe_t::terminate ?
477  // In case there's no engine and there's only delimiter in the
478  // pipe it wouldn't be ever read. Thus we check for it explicitly.
479  if (!engine)
480  pipe->check_read ();
481  }
482 
483  if (zap_pipe != NULL)
484  zap_pipe->terminate (false);
485 }
486 
488 {
489  // Linger period expired. We can proceed with termination even though
490  // there are still pending messages to be sent.
491  zmq_assert (id_ == linger_timer_id);
492  has_linger_timer = false;
493 
494  // Ask pipe to terminate even though there may be pending messages in it.
495  zmq_assert (pipe);
496  pipe->terminate (false);
497 }
498 
500 {
501  // For delayed connect situations, terminate the pipe
502  // and reestablish later on
503  if (pipe && options.immediate == 1
504  && addr->protocol != "pgm" && addr->protocol != "epgm"
505  && addr->protocol != "norm" && addr->protocol != "udp") {
506  pipe->hiccup ();
507  pipe->terminate (false);
508  terminating_pipes.insert (pipe);
509  pipe = NULL;
510  }
511 
512  reset ();
513 
514  // Reconnect.
515  if (options.reconnect_ivl != -1)
516  start_connecting (true);
517 
518  // For subscriber sockets we hiccup the inbound pipe, which will cause
519  // the socket object to resend all the subscriptions.
520  if (pipe && (options.type == ZMQ_SUB || options.type == ZMQ_XSUB))
521  pipe->hiccup ();
522 }
523 
525 {
526  zmq_assert (active);
527 
528  // Choose I/O thread to run connecter in. Given that we are already
529  // running in an I/O thread, there must be at least one available.
531  zmq_assert (io_thread);
532 
533  // Create the connecter object.
534 
535  if (addr->protocol == "tcp") {
536  if (!options.socks_proxy_address.empty()) {
537  address_t *proxy_address = new (std::nothrow)
538  address_t ("tcp", options.socks_proxy_address, this->get_ctx ());
539  alloc_assert (proxy_address);
540  socks_connecter_t *connecter =
541  new (std::nothrow) socks_connecter_t (
542  io_thread, this, options, addr, proxy_address, wait_);
543  alloc_assert (connecter);
544  launch_child (connecter);
545  }
546  else {
547  tcp_connecter_t *connecter = new (std::nothrow)
548  tcp_connecter_t (io_thread, this, options, addr, wait_);
549  alloc_assert (connecter);
550  launch_child (connecter);
551  }
552  return;
553  }
554 
555 #if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
556  if (addr->protocol == "ipc") {
557  ipc_connecter_t *connecter = new (std::nothrow) ipc_connecter_t (
558  io_thread, this, options, addr, wait_);
559  alloc_assert (connecter);
560  launch_child (connecter);
561  return;
562  }
563 #endif
564 #if defined ZMQ_HAVE_TIPC
565  if (addr->protocol == "tipc") {
566  tipc_connecter_t *connecter = new (std::nothrow) tipc_connecter_t (
567  io_thread, this, options, addr, wait_);
568  alloc_assert (connecter);
569  launch_child (connecter);
570  return;
571  }
572 #endif
573 
574  if (addr->protocol == "udp") {
576 
577  udp_engine_t* engine = new (std::nothrow) udp_engine_t ();
578  alloc_assert (engine);
579 
580  bool recv = false;
581  bool send = false;
582 
583  if (options.type == ZMQ_RADIO) {
584  send = true;
585  recv = false;
586  }
587  else if (options.type == ZMQ_DISH) {
588  send = false;
589  recv = true;
590  }
591 
592  int rc = engine->init (addr, send, recv);
593  errno_assert (rc == 0);
594 
595  send_attach (this, engine);
596 
597  return;
598  }
599 
600 #ifdef ZMQ_HAVE_OPENPGM
601 
602  // Both PGM and EPGM transports are using the same infrastructure.
603  if (addr->protocol == "pgm" || addr->protocol == "epgm") {
604 
606  || options.type == ZMQ_SUB || options.type == ZMQ_XSUB);
607 
608  // For EPGM transport with UDP encapsulation of PGM is used.
609  bool const udp_encapsulation = addr->protocol == "epgm";
610 
611  // At this point we'll create message pipes to the session straight
612  // away. There's no point in delaying it as no concept of 'connect'
613  // exists with PGM anyway.
614  if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB) {
615 
616  // PGM sender.
617  pgm_sender_t *pgm_sender = new (std::nothrow) pgm_sender_t (
618  io_thread, options);
619  alloc_assert (pgm_sender);
620 
621  int rc = pgm_sender->init (udp_encapsulation, addr->address.c_str ());
622  errno_assert (rc == 0);
623 
624  send_attach (this, pgm_sender);
625  }
626  else {
627 
628  // PGM receiver.
629  pgm_receiver_t *pgm_receiver = new (std::nothrow) pgm_receiver_t (
630  io_thread, options);
631  alloc_assert (pgm_receiver);
632 
633  int rc = pgm_receiver->init (udp_encapsulation, addr->address.c_str ());
634  errno_assert (rc == 0);
635 
636  send_attach (this, pgm_receiver);
637  }
638 
639  return;
640  }
641 #endif
642 
643 #ifdef ZMQ_HAVE_NORM
644  if (addr->protocol == "norm") {
645  // At this point we'll create message pipes to the session straight
646  // away. There's no point in delaying it as no concept of 'connect'
647  // exists with NORM anyway.
648  if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB) {
649 
650  // NORM sender.
651  norm_engine_t* norm_sender = new (std::nothrow) norm_engine_t(io_thread, options);
652  alloc_assert (norm_sender);
653 
654  int rc = norm_sender->init (addr->address.c_str (), true, false);
655  errno_assert (rc == 0);
656 
657  send_attach (this, norm_sender);
658  }
659  else { // ZMQ_SUB or ZMQ_XSUB
660 
661  // NORM receiver.
662  norm_engine_t* norm_receiver = new (std::nothrow) norm_engine_t (io_thread, options);
663  alloc_assert (norm_receiver);
664 
665  int rc = norm_receiver->init (addr->address.c_str (), false, true);
666  errno_assert (rc == 0);
667 
668  send_attach (this, norm_receiver);
669  }
670  return;
671  }
672 #endif // ZMQ_HAVE_NORM
673 
674 #if defined ZMQ_HAVE_VMCI
675  if (addr->protocol == "vmci") {
676  vmci_connecter_t *connecter = new (std::nothrow) vmci_connecter_t (
677  io_thread, this, options, addr, wait_);
678  alloc_assert (connecter);
679  launch_child (connecter);
680  return;
681  }
682 #endif
683 
684  zmq_assert (false);
685 }
virtual void terminate()=0
static session_base_t * create(zmq::io_thread_t *io_thread_, bool active_, zmq::socket_base_t *socket_, const options_t &options_, address_t *addr_)
bool read(msg_t *msg_)
Definition: pipe.cpp:169
#define LIBZMQ_DELETE(p_object)
Definition: macros.hpp:7
void send_attach(zmq::session_base_t *destination_, zmq::i_engine *engine_, bool inc_seqnum_=true)
Definition: object.cpp:225
int close()
Definition: msg.cpp:217
virtual void plug(zmq::io_thread_t *io_thread_, class session_base_t *session_)=0
void process_term(int linger_)
Definition: own.cpp:158
#define ZMQ_STREAM
Definition: zmq.h:257
bool write(msg_t *msg_)
Definition: pipe.cpp:221
void attach_pipe(zmq::pipe_t *pipe_)
#define ZMQ_DISH
int pipepair(zmq::object_t *parents_[2], zmq::pipe_t *pipes_[2], int hwms_[2], bool conflate_[2])
Definition: pipe.cpp:41
std::string socks_proxy_address
Definition: options.hpp:153
#define ZMQ_DEALER
Definition: zmq.h:251
const std::string address
Definition: address.hpp:55
#define ZMQ_RADIO
#define zmq_assert(x)
Definition: err.hpp:119
virtual int push_msg(msg_t *msg_)
#define ZMQ_SUB
Definition: zmq.h:248
#define ZMQ_XPUB
Definition: zmq.h:255
void cancel_timer(int id_)
Definition: io_object.cpp:99
zmq::pipe_t * zap_pipe
bool recv_identity
Definition: options.hpp:146
#define ZMQ_REP
Definition: zmq.h:250
#define ENOTCONN
Definition: zmq.h:154
void set_nodelay()
Definition: pipe.cpp:380
#define ZMQ_XSUB
Definition: zmq.h:256
zmq::io_thread_t * choose_io_thread(uint64_t affinity_)
Definition: object.cpp:189
options_t options
Definition: ctx.hpp:62
int init(address_t *address_, bool send_, bool recv_)
Definition: udp_engine.cpp:76
#define ZMQ_SCATTER
void start_connecting(bool wait_)
void engine_error(zmq::stream_engine_t::error_reason_t reason)
#define ZMQ_PUB
Definition: zmq.h:247
void process_attach(zmq::i_engine *engine_)
bool is_terminating()
Definition: own.cpp:153
uint64_t affinity
Definition: options.hpp:70
void terminate()
Definition: own.cpp:135
#define ZMQ_ROUTER
Definition: zmq.h:252
#define ZMQ_PUSH
Definition: zmq.h:254
int write_zap_msg(msg_t *msg_)
void rollback()
Definition: pipe.cpp:235
socket_base_t * get_socket()
#define ZMQ_REQ
Definition: zmq.h:249
virtual void restart_input()=0
#define unlikely(x)
Definition: likely.hpp:38
int init()
Definition: msg.cpp:82
#define ZMQ_NULL
Definition: zmq.h:349
const std::string protocol
Definition: address.hpp:54
void send_bind(zmq::own_t *destination_, zmq::pipe_t *pipe_, bool inc_seqnum_=true)
Definition: object.cpp:238
int read_zap_msg(msg_t *msg_)
void write_activated(zmq::pipe_t *pipe_)
virtual int pull_msg(msg_t *msg_)
void flush()
Definition: pipe.cpp:248
zmq::pipe_t * pipe
virtual void zap_msg_available()=0
#define ECONNREFUSED
Definition: zmq.h:130
void timer_event(int id_)
void add_timer(int timout_, int id_)
Definition: io_object.cpp:94
#define ZMQ_GATHER
session_base_t(zmq::io_thread_t *io_thread_, bool active_, zmq::socket_base_t *socket_, const options_t &options_, address_t *addr_)
void set_event_sink(i_pipe_events *sink_)
Definition: pipe.cpp:111
#define ZMQ_SERVER
zmq::socket_base_t * socket
void process_term(int linger_)
void terminate(bool delay_)
Definition: pipe.cpp:385
void launch_child(own_t *object_)
Definition: own.cpp:81
void pipe_terminated(zmq::pipe_t *pipe_)
#define alloc_assert(x)
Definition: err.hpp:159
#define errno_assert(x)
Definition: err.hpp:129
void hiccup()
Definition: pipe.cpp:486
#define ZMQ_PAIR
Definition: zmq.h:246
socket_base_t * socket
Definition: ctx.hpp:61
void read_activated(zmq::pipe_t *pipe_)
std::set< pipe_t * > terminating_pipes
bool check_read()
Definition: pipe.cpp:143
std::string zap_domain
Definition: options.hpp:186
options_t options
Definition: own.hpp:109
#define likely(x)
Definition: likely.hpp:37
virtual void restart_output()=0
zmq::endpoint_t find_endpoint(const char *addr_)
Definition: object.cpp:168
unsigned char flags
Definition: msg.hpp:181
void hiccuped(zmq::pipe_t *pipe_)
zmq::i_engine * engine
#define ZMQ_PULL
Definition: zmq.h:253
virtual void reset()
#define ZMQ_CLIENT
zmq::io_thread_t * io_thread