libzmq  master
ZeroMQ C++ Core Engine (LIBZMQ)
object.hpp
Go to the documentation of this file.
1 /*
2  Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 
4  This file is part of libzmq, the ZeroMQ core engine in C++.
5 
6  libzmq is free software; you can redistribute it and/or modify it under
7  the terms of the GNU Lesser General Public License (LGPL) as published
8  by the Free Software Foundation; either version 3 of the License, or
9  (at your option) any later version.
10 
11  As a special exception, the Contributors give you permission to link
12  this library with independent modules to produce an executable,
13  regardless of the license terms of these independent modules, and to
14  copy and distribute the resulting executable under terms of your choice,
15  provided that you also meet, for each linked independent module, the
16  terms and conditions of the license of that module. An independent
17  module is a module which is not derived from or based on this library.
18  If you modify this library, you must extend this exception to your
19  version of the library.
20 
21  libzmq is distributed in the hope that it will be useful, but WITHOUT
22  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23  FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24  License for more details.
25 
26  You should have received a copy of the GNU Lesser General Public License
27  along with this program. If not, see <http://www.gnu.org/licenses/>.
28 */
29 
30 #ifndef __ZMQ_OBJECT_HPP_INCLUDED__
31 #define __ZMQ_OBJECT_HPP_INCLUDED__
32 
33 #include <string>
34 #include "stdint.hpp"
35 
36 namespace zmq
37 {
38 
39  struct i_engine;
40  struct endpoint_t;
41  struct pending_connection_t;
42  struct command_t;
43  class ctx_t;
44  class pipe_t;
45  class socket_base_t;
46  class session_base_t;
47  class io_thread_t;
48  class own_t;
49 
50  // Base class for all objects that participate in inter-thread
51  // communication.
52 
53  class object_t
54  {
55  public:
56 
57  object_t (zmq::ctx_t *ctx_, uint32_t tid_);
58  object_t (object_t *parent_);
59  virtual ~object_t ();
60 
61  uint32_t get_tid ();
62  void set_tid(uint32_t id);
63  ctx_t *get_ctx ();
64  void process_command (zmq::command_t &cmd_);
66  void send_bind (zmq::own_t *destination_, zmq::pipe_t *pipe_, bool inc_seqnum_ = true);
67 
68  protected:
69 
70  // Using following function, socket is able to access global
71  // repository of inproc endpoints.
72  int register_endpoint (const char *addr_,
73  const zmq::endpoint_t &endpoint_);
75  const std::string &addr_, socket_base_t *socket_);
77  zmq::endpoint_t find_endpoint (const char *addr_);
78  void pend_connection (const std::string &addr_,
79  const endpoint_t &endpoint, pipe_t **pipes_);
80  void connect_pending (const char *addr_, zmq::socket_base_t *bind_socket_);
81 
82  void destroy_socket (zmq::socket_base_t *socket_);
83 
84  // Logs an message.
85  void log (const char *format_, ...);
86 
87  // Chooses least loaded I/O thread.
88  zmq::io_thread_t *choose_io_thread (uint64_t affinity_);
89 
90  // Derived object can use these functions to send commands
91  // to other objects.
92  void send_stop ();
93  void send_plug (zmq::own_t *destination_,
94  bool inc_seqnum_ = true);
95  void send_own (zmq::own_t *destination_,
96  zmq::own_t *object_);
97  void send_attach (zmq::session_base_t *destination_,
98  zmq::i_engine *engine_, bool inc_seqnum_ = true);
99  void send_activate_read (zmq::pipe_t *destination_);
100  void send_activate_write (zmq::pipe_t *destination_,
101  uint64_t msgs_read_);
102  void send_hiccup (zmq::pipe_t *destination_, void *pipe_);
103  void send_pipe_term (zmq::pipe_t *destination_);
104  void send_pipe_term_ack (zmq::pipe_t *destination_);
105  void send_term_req (zmq::own_t *destination_,
106  zmq::own_t *object_);
107  void send_term (zmq::own_t *destination_, int linger_);
108  void send_term_ack (zmq::own_t *destination_);
109  void send_reap (zmq::socket_base_t *socket_);
110  void send_reaped ();
111  void send_done ();
112 
113  // These handlers can be overridden by the derived objects. They are
114  // called when command arrives from another thread.
115  virtual void process_stop ();
116  virtual void process_plug ();
117  virtual void process_own (zmq::own_t *object_);
118  virtual void process_attach (zmq::i_engine *engine_);
119  virtual void process_bind (zmq::pipe_t *pipe_);
120  virtual void process_activate_read ();
121  virtual void process_activate_write (uint64_t msgs_read_);
122  virtual void process_hiccup (void *pipe_);
123  virtual void process_pipe_term ();
124  virtual void process_pipe_term_ack ();
125  virtual void process_term_req (zmq::own_t *object_);
126  virtual void process_term (int linger_);
127  virtual void process_term_ack ();
128  virtual void process_reap (zmq::socket_base_t *socket_);
129  virtual void process_reaped ();
130 
131  // Special handler called after a command that requires a seqnum
132  // was processed. The implementation should catch up with its counter
133  // of processed commands here.
134  virtual void process_seqnum ();
135 
136  private:
137 
138  // Context provides access to the global state.
140 
141  // Thread ID of the thread the object belongs to.
142  uint32_t tid;
143 
144  void send_command (command_t &cmd_);
145 
146  object_t (const object_t&);
147  const object_t &operator = (const object_t&);
148  };
149 
150 }
151 
152 #endif
void pend_connection(const std::string &addr_, const endpoint_t &endpoint, pipe_t **pipes_)
Definition: object.cpp:173
void process_command(zmq::command_t &cmd_)
Definition: object.cpp:73
void send_attach(zmq::session_base_t *destination_, zmq::i_engine *engine_, bool inc_seqnum_=true)
Definition: object.cpp:225
uint32_t tid
Definition: object.hpp:142
virtual void process_activate_write(uint64_t msgs_read_)
Definition: object.cpp:384
zmq::io_thread_t * choose_io_thread(uint64_t affinity_)
Definition: object.cpp:189
void send_command(command_t &cmd_)
Definition: object.cpp:434
virtual void process_term_ack()
Definition: object.cpp:414
void send_own(zmq::own_t *destination_, zmq::own_t *object_)
Definition: object.cpp:215
zmq::ctx_t * ctx
Definition: object.hpp:139
virtual void process_reap(zmq::socket_base_t *socket_)
Definition: object.cpp:419
virtual void process_activate_read()
Definition: object.cpp:379
void set_tid(uint32_t id)
Definition: object.cpp:63
void send_reaped()
Definition: object.cpp:330
virtual void process_reaped()
Definition: object.cpp:424
void send_done()
Definition: object.cpp:346
virtual void process_term_req(zmq::own_t *object_)
Definition: object.cpp:404
int unregister_endpoint(const std::string &addr_, socket_base_t *socket_)
Definition: object.cpp:157
virtual void process_bind(zmq::pipe_t *pipe_)
Definition: object.cpp:374
virtual void process_own(zmq::own_t *object_)
Definition: object.cpp:364
int register_endpoint(const char *addr_, const zmq::endpoint_t &endpoint_)
Definition: object.cpp:151
void destroy_socket(zmq::socket_base_t *socket_)
Definition: object.cpp:184
void send_pipe_term_ack(zmq::pipe_t *destination_)
Definition: object.cpp:286
void send_plug(zmq::own_t *destination_, bool inc_seqnum_=true)
Definition: object.cpp:204
virtual void process_pipe_term_ack()
Definition: object.cpp:399
virtual void process_pipe_term()
Definition: object.cpp:394
void unregister_endpoints(zmq::socket_base_t *socket_)
Definition: object.cpp:163
void connect_pending(const char *addr_, zmq::socket_base_t *bind_socket_)
Definition: object.cpp:179
virtual ~object_t()
Definition: object.cpp:54
const object_t & operator=(const object_t &)
void send_term_req(zmq::own_t *destination_, zmq::own_t *object_)
Definition: object.cpp:294
void send_term_ack(zmq::own_t *destination_)
Definition: object.cpp:313
void send_bind(zmq::own_t *destination_, zmq::pipe_t *pipe_, bool inc_seqnum_=true)
Definition: object.cpp:238
void send_term(zmq::own_t *destination_, int linger_)
Definition: object.cpp:304
void send_activate_write(zmq::pipe_t *destination_, uint64_t msgs_read_)
Definition: object.cpp:259
virtual void process_hiccup(void *pipe_)
Definition: object.cpp:389
virtual void process_seqnum()
Definition: object.cpp:429
void send_inproc_connected(zmq::socket_base_t *socket_)
Definition: object.cpp:338
virtual void process_stop()
Definition: object.cpp:354
virtual void process_term(int linger_)
Definition: object.cpp:409
void send_stop()
Definition: object.cpp:194
ctx_t * get_ctx()
Definition: object.cpp:68
void log(const char *format_,...)
void send_activate_read(zmq::pipe_t *destination_)
Definition: object.cpp:251
virtual void process_plug()
Definition: object.cpp:359
void send_reap(zmq::socket_base_t *socket_)
Definition: object.cpp:321
void send_pipe_term(zmq::pipe_t *destination_)
Definition: object.cpp:278
Definition: address.hpp:35
void send_hiccup(zmq::pipe_t *destination_, void *pipe_)
Definition: object.cpp:269
zmq::endpoint_t find_endpoint(const char *addr_)
Definition: object.cpp:168
object_t(zmq::ctx_t *ctx_, uint32_t tid_)
Definition: object.cpp:42
uint32_t get_tid()
Definition: object.cpp:58
virtual void process_attach(zmq::i_engine *engine_)
Definition: object.cpp:369