libzmq  master
ZeroMQ C++ Core Engine (LIBZMQ)
session_base.hpp
Go to the documentation of this file.
1 /*
2  Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 
4  This file is part of libzmq, the ZeroMQ core engine in C++.
5 
6  libzmq is free software; you can redistribute it and/or modify it under
7  the terms of the GNU Lesser General Public License (LGPL) as published
8  by the Free Software Foundation; either version 3 of the License, or
9  (at your option) any later version.
10 
11  As a special exception, the Contributors give you permission to link
12  this library with independent modules to produce an executable,
13  regardless of the license terms of these independent modules, and to
14  copy and distribute the resulting executable under terms of your choice,
15  provided that you also meet, for each linked independent module, the
16  terms and conditions of the license of that module. An independent
17  module is a module which is not derived from or based on this library.
18  If you modify this library, you must extend this exception to your
19  version of the library.
20 
21  libzmq is distributed in the hope that it will be useful, but WITHOUT
22  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23  FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24  License for more details.
25 
26  You should have received a copy of the GNU Lesser General Public License
27  along with this program. If not, see <http://www.gnu.org/licenses/>.
28 */
29 
30 #ifndef __ZMQ_SESSION_BASE_HPP_INCLUDED__
31 #define __ZMQ_SESSION_BASE_HPP_INCLUDED__
32 
33 #include <string>
34 #include <stdarg.h>
35 
36 #include "own.hpp"
37 #include "io_object.hpp"
38 #include "pipe.hpp"
39 #include "socket_base.hpp"
40 #include "stream_engine.hpp"
41 
42 namespace zmq
43 {
44 
45  class pipe_t;
46  class io_thread_t;
47  class socket_base_t;
48  struct i_engine;
49  struct address_t;
50 
52  public own_t,
53  public io_object_t,
54  public i_pipe_events
55  {
56  public:
57 
58  // Create a session of the particular type.
59  static session_base_t *create (zmq::io_thread_t *io_thread_,
60  bool active_, zmq::socket_base_t *socket_,
61  const options_t &options_, address_t *addr_);
62 
63  // To be used once only, when creating the session.
64  void attach_pipe (zmq::pipe_t *pipe_);
65 
66  // Following functions are the interface exposed towards the engine.
67  virtual void reset ();
68  void flush ();
70 
71  // i_pipe_events interface implementation.
72  void read_activated (zmq::pipe_t *pipe_);
73  void write_activated (zmq::pipe_t *pipe_);
74  void hiccuped (zmq::pipe_t *pipe_);
75  void pipe_terminated (zmq::pipe_t *pipe_);
76 
77  // Delivers a message. Returns 0 if successful; -1 otherwise.
78  // The function takes ownership of the message.
79  virtual int push_msg (msg_t *msg_);
80 
81  int zap_connect ();
82  bool zap_enabled ();
83 
84  // Fetches a message. Returns 0 if successful; -1 otherwise.
85  // The caller is responsible for freeing the message when no
86  // longer used.
87  virtual int pull_msg (msg_t *msg_);
88 
89  // Receives message from ZAP socket.
90  // Returns 0 on success; -1 otherwise.
91  // The caller is responsible for freeing the message.
92  int read_zap_msg (msg_t *msg_);
93 
94  // Sends message to ZAP socket.
95  // Returns 0 on success; -1 otherwise.
96  // The function takes ownership of the message.
97  int write_zap_msg (msg_t *msg_);
98 
100 
101  protected:
102 
103  session_base_t (zmq::io_thread_t *io_thread_, bool active_,
104  zmq::socket_base_t *socket_, const options_t &options_,
105  address_t *addr_);
106  virtual ~session_base_t ();
107 
108  private:
109 
110  void start_connecting (bool wait_);
111 
112  void reconnect ();
113 
114  // Handlers for incoming commands.
115  void process_plug ();
116  void process_attach (zmq::i_engine *engine_);
117  void process_term (int linger_);
118 
119  // i_poll_events handlers.
120  void timer_event (int id_);
121 
122  // Remove any half processed messages. Flush unflushed messages.
123  // Call this function when engine disconnect to get rid of leftovers.
124  void clean_pipes ();
125 
126  // If true, this session (re)connects to the peer. Otherwise, it's
127  // a transient session created by the listener.
128  const bool active;
129 
130  // Pipe connecting the session to its socket.
132 
133  // Pipe used to exchange messages with ZAP socket.
135 
136  // This set is added to with pipes we are disconnecting, but haven't yet completed
137  std::set <pipe_t *> terminating_pipes;
138 
139  // This flag is true if the remainder of the message being processed
140  // is still in the in pipe.
142 
143  // True if termination have been suspended to push the pending
144  // messages to the network.
145  bool pending;
146 
147  // The protocol I/O engine connected to the session.
149 
150  // The socket the session belongs to.
152 
153  // I/O thread the session is living in. It will be used to plug in
154  // the engines into the same thread.
156 
157  // ID of the linger timer
158  enum {linger_timer_id = 0x20};
159 
160  // True is linger timer is running.
162 
163  // Protocol and address to use when connecting.
165 
168  };
169 
170 }
171 
172 #endif
static session_base_t * create(zmq::io_thread_t *io_thread_, bool active_, zmq::socket_base_t *socket_, const options_t &options_, address_t *addr_)
void attach_pipe(zmq::pipe_t *pipe_)
virtual int push_msg(msg_t *msg_)
zmq::pipe_t * zap_pipe
void start_connecting(bool wait_)
void engine_error(zmq::stream_engine_t::error_reason_t reason)
void process_attach(zmq::i_engine *engine_)
int write_zap_msg(msg_t *msg_)
socket_base_t * get_socket()
int read_zap_msg(msg_t *msg_)
void write_activated(zmq::pipe_t *pipe_)
virtual int pull_msg(msg_t *msg_)
zmq::pipe_t * pipe
void timer_event(int id_)
session_base_t(zmq::io_thread_t *io_thread_, bool active_, zmq::socket_base_t *socket_, const options_t &options_, address_t *addr_)
zmq::socket_base_t * socket
void process_term(int linger_)
void pipe_terminated(zmq::pipe_t *pipe_)
void read_activated(zmq::pipe_t *pipe_)
std::set< pipe_t * > terminating_pipes
Definition: address.hpp:35
const session_base_t & operator=(const session_base_t &)
void hiccuped(zmq::pipe_t *pipe_)
zmq::i_engine * engine
virtual void reset()
zmq::io_thread_t * io_thread