libzmq  master
ZeroMQ C++ Core Engine (LIBZMQ)
socket_base.hpp
Go to the documentation of this file.
1 /*
2  Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 
4  This file is part of libzmq, the ZeroMQ core engine in C++.
5 
6  libzmq is free software; you can redistribute it and/or modify it under
7  the terms of the GNU Lesser General Public License (LGPL) as published
8  by the Free Software Foundation; either version 3 of the License, or
9  (at your option) any later version.
10 
11  As a special exception, the Contributors give you permission to link
12  this library with independent modules to produce an executable,
13  regardless of the license terms of these independent modules, and to
14  copy and distribute the resulting executable under terms of your choice,
15  provided that you also meet, for each linked independent module, the
16  terms and conditions of the license of that module. An independent
17  module is a module which is not derived from or based on this library.
18  If you modify this library, you must extend this exception to your
19  version of the library.
20 
21  libzmq is distributed in the hope that it will be useful, but WITHOUT
22  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23  FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24  License for more details.
25 
26  You should have received a copy of the GNU Lesser General Public License
27  along with this program. If not, see <http://www.gnu.org/licenses/>.
28 */
29 
30 #ifndef __ZMQ_SOCKET_BASE_HPP_INCLUDED__
31 #define __ZMQ_SOCKET_BASE_HPP_INCLUDED__
32 
33 #include <string>
34 #include <map>
35 #include <stdarg.h>
36 
37 #include "own.hpp"
38 #include "array.hpp"
39 #include "blob.hpp"
40 #include "stdint.hpp"
41 #include "poller.hpp"
42 #include "atomic_counter.hpp"
43 #include "i_poll_events.hpp"
44 #include "i_mailbox.hpp"
45 #include "stdint.hpp"
46 #include "clock.hpp"
47 #include "pipe.hpp"
48 
49 extern "C"
50 {
51  void zmq_free_event (void *data, void *hint);
52 }
53 
54 namespace zmq
55 {
56 
57  class ctx_t;
58  class msg_t;
59  class pipe_t;
60 
61  class socket_base_t :
62  public own_t,
63  public array_item_t <>,
64  public i_poll_events,
65  public i_pipe_events
66  {
67  friend class reaper_t;
68 
69  public:
70 
71  // Returns false if object is not a socket.
72  bool check_tag ();
73 
74  // Create a socket of a specified type.
75  static socket_base_t *create (int type_, zmq::ctx_t *parent_,
76  uint32_t tid_, int sid_);
77 
78  // Returns the mailbox associated with this socket.
80 
81  // Interrupt blocking call if the socket is stuck in one.
82  // This function can be called from a different thread!
83  void stop ();
84 
85  // Interface for communication with the API layer.
86  int setsockopt (int option_, const void *optval_, size_t optvallen_);
87  int getsockopt (int option_, void *optval_, size_t *optvallen_);
88  int bind (const char *addr_);
89  int connect (const char *addr_);
90  int term_endpoint (const char *addr_);
91  int send (zmq::msg_t *msg_, int flags_);
92  int recv (zmq::msg_t *msg_, int flags_);
93  int add_signaler (signaler_t *s);
94  int remove_signaler (signaler_t *s);
95  int close ();
96 
97  // These functions are used by the polling mechanism to determine
98  // which events are to be reported from this socket.
99  bool has_in ();
100  bool has_out ();
101 
102  // Joining and leaving groups
103  int join (const char *group);
104  int leave (const char *group);
105 
106  // Using this function reaper thread ask the socket to register with
107  // its poller.
108  void start_reaping (poller_t *poller_);
109 
110  // i_poll_events implementation. This interface is used when socket
111  // is handled by the poller in the reaper thread.
112  void in_event ();
113  void out_event ();
114  void timer_event (int id_);
115 
116  // i_pipe_events interface implementation.
117  void read_activated (pipe_t *pipe_);
118  void write_activated (pipe_t *pipe_);
119  void hiccuped (pipe_t *pipe_);
120  void pipe_terminated (pipe_t *pipe_);
121  void lock();
122  void unlock();
123 
124  int monitor (const char *endpoint_, int events_);
125 
126  void event_connected (const std::string &addr_, int fd_);
127  void event_connect_delayed (const std::string &addr_, int err_);
128  void event_connect_retried (const std::string &addr_, int interval_);
129  void event_listening (const std::string &addr_, int fd_);
130  void event_bind_failed (const std::string &addr_, int err_);
131  void event_accepted (const std::string &addr_, int fd_);
132  void event_accept_failed (const std::string &addr_, int err_);
133  void event_closed (const std::string &addr_, int fd_);
134  void event_close_failed (const std::string &addr_, int fd_);
135  void event_disconnected (const std::string &addr_, int fd_);
136 
137  protected:
138 
139  socket_base_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_, bool thread_safe_ = false);
140  virtual ~socket_base_t ();
141 
142  // Concrete algorithms for the x- methods are to be defined by
143  // individual socket types.
144  virtual void xattach_pipe (zmq::pipe_t *pipe_,
145  bool subscribe_to_all_ = false) = 0;
146 
147  // The default implementation assumes there are no specific socket
148  // options for the particular socket type. If not so, override this
149  // method.
150  virtual int xsetsockopt (int option_, const void *optval_,
151  size_t optvallen_);
152 
153  // The default implementation assumes that send is not supported.
154  virtual bool xhas_out ();
155  virtual int xsend (zmq::msg_t *msg_);
156 
157  // The default implementation assumes that recv in not supported.
158  virtual bool xhas_in ();
159  virtual int xrecv (zmq::msg_t *msg_);
160 
161  // Returns the credential for the peer from which we have received
162  // the last message. If no message has been received yet,
163  // the function returns empty credential.
164  virtual blob_t get_credential () const;
165 
166  // i_pipe_events will be forwarded to these functions.
167  virtual void xread_activated (pipe_t *pipe_);
168  virtual void xwrite_activated (pipe_t *pipe_);
169  virtual void xhiccuped (pipe_t *pipe_);
170  virtual void xpipe_terminated (pipe_t *pipe_) = 0;
171 
172  // the default implementation assumes that joub and leave are not supported.
173  virtual int xjoin (const char *group_);
174  virtual int xleave (const char *group_);
175 
176  // Delay actual destruction of the socket.
177  void process_destroy ();
178 
179  // Socket event data dispatch
180  void monitor_event (int event_, int value_, const std::string& addr_);
181 
182  // Monitor socket cleanup
183  void stop_monitor (bool send_monitor_stopped_event_ = true);
184 
185  // Next assigned name on a zmq_connect() call used by ROUTER and STREAM socket types
186  std::string connect_rid;
187 
188  private:
189  // Creates new endpoint ID and adds the endpoint to the map.
190  void add_endpoint (const char *addr_, own_t *endpoint_, pipe_t *pipe);
191 
192  // Map of open endpoints.
193  typedef std::pair <own_t *, pipe_t*> endpoint_pipe_t;
194  typedef std::multimap <std::string, endpoint_pipe_t> endpoints_t;
195  endpoints_t endpoints;
196 
197  // Map of open inproc endpoints.
198  typedef std::multimap <std::string, pipe_t *> inprocs_t;
199  inprocs_t inprocs;
200 
201  // To be called after processing commands or invoking any command
202  // handlers explicitly. If required, it will deallocate the socket.
203  void check_destroy ();
204 
205  // Moves the flags from the message to local variables,
206  // to be later retrieved by getsockopt.
207  void extract_flags (msg_t *msg_);
208 
209  // Used to check whether the object is a socket.
210  uint32_t tag;
211 
212  // If true, associated context was already terminated.
214 
215  // If true, object should have been already destroyed. However,
216  // destruction is delayed while we unwind the stack to the point
217  // where it doesn't intersect the object being destroyed.
218  bool destroyed;
219 
220  // Parse URI string.
221  int parse_uri (const char *uri_, std::string &protocol_,
222  std::string &address_);
223 
224  // Check whether transport protocol, as specified in connect or
225  // bind, is available and compatible with the socket type.
226  int check_protocol (const std::string &protocol_);
227 
228  // Register the pipe with this socket.
229  void attach_pipe (zmq::pipe_t *pipe_, bool subscribe_to_all_ = false);
230 
231  // Processes commands sent to this socket (if any). If timeout is -1,
232  // returns only after at least one command was processed.
233  // If throttle argument is true, commands are processed at most once
234  // in a predefined time period.
235  int process_commands (int timeout_, bool throttle_);
236 
237  // Handlers for incoming commands.
238  void process_stop ();
239  void process_bind (zmq::pipe_t *pipe_);
240  void process_term (int linger_);
241 
242  void update_pipe_options(int option_);
243 
244  // Socket's mailbox object.
246 
247  // List of attached pipes.
249  pipes_t pipes;
250 
251  // Reaper's poller and handle of this socket within it.
252  poller_t *poller;
253  poller_t::handle_t handle;
254 
255  // Timestamp of when commands were processed the last time.
256  uint64_t last_tsc;
257 
258  // Number of messages received since last command processing.
259  int ticks;
260 
261  // True if the last message received had MORE flag set.
262  bool rcvmore;
263 
264  // Improves efficiency of time measurement.
266 
267  // Monitor socket;
269 
270  // Bitmask of events being monitored
272 
273  // Last socket endpoint resolved URI
274  std::string last_endpoint;
275 
276  // Indicate if the socket is thread safe
278 
279  // Signaler to be used in the reaping stage
281 
282  // Mutex for synchronize access to the socket in thread safe mode
284 
285  socket_base_t (const socket_base_t&);
286  const socket_base_t &operator = (const socket_base_t&);
287  };
288 
289 }
290 
291 #endif
void pipe_terminated(pipe_t *pipe_)
int process_commands(int timeout_, bool throttle_)
virtual void xpipe_terminated(pipe_t *pipe_)=0
void event_accept_failed(const std::string &addr_, int err_)
virtual void xhiccuped(pipe_t *pipe_)
virtual void xattach_pipe(zmq::pipe_t *pipe_, bool subscribe_to_all_=false)=0
void monitor_event(int event_, int value_, const std::string &addr_)
int join(const char *group)
void write_activated(pipe_t *pipe_)
std::multimap< std::string, endpoint_pipe_t > endpoints_t
void start_reaping(poller_t *poller_)
int add_signaler(signaler_t *s)
virtual blob_t get_credential() const
int setsockopt(int option_, const void *optval_, size_t optvallen_)
void attach_pipe(zmq::pipe_t *pipe_, bool subscribe_to_all_=false)
i_mailbox * mailbox
void event_listening(const std::string &addr_, int fd_)
int bind(const char *addr_)
int term_endpoint(const char *addr_)
void event_connected(const std::string &addr_, int fd_)
const socket_base_t & operator=(const socket_base_t &)
virtual void xwrite_activated(pipe_t *pipe_)
signaler_t * reaper_signaler
virtual void xread_activated(pipe_t *pipe_)
void event_close_failed(const std::string &addr_, int fd_)
int check_protocol(const std::string &protocol_)
int getsockopt(int option_, void *optval_, size_t *optvallen_)
void event_connect_delayed(const std::string &addr_, int err_)
endpoints_t endpoints
int send(zmq::msg_t *msg_, int flags_)
void stop_monitor(bool send_monitor_stopped_event_=true)
void update_pipe_options(int option_)
void zmq_free_event(void *data, void *hint)
std::basic_string< unsigned char > blob_t
Definition: blob.hpp:134
int monitor(const char *endpoint_, int events_)
std::string connect_rid
virtual int xleave(const char *group_)
void timer_event(int id_)
static socket_base_t * create(int type_, zmq::ctx_t *parent_, uint32_t tid_, int sid_)
void event_closed(const std::string &addr_, int fd_)
virtual int xsend(zmq::msg_t *msg_)
socket_base_t(zmq::ctx_t *parent_, uint32_t tid_, int sid_, bool thread_safe_=false)
void event_connect_retried(const std::string &addr_, int interval_)
void hiccuped(pipe_t *pipe_)
void extract_flags(msg_t *msg_)
void event_disconnected(const std::string &addr_, int fd_)
std::multimap< std::string, pipe_t * > inprocs_t
void event_bind_failed(const std::string &addr_, int err_)
void process_term(int linger_)
int remove_signaler(signaler_t *s)
std::pair< own_t *, pipe_t * > endpoint_pipe_t
void add_endpoint(const char *addr_, own_t *endpoint_, pipe_t *pipe)
int leave(const char *group)
virtual int xrecv(zmq::msg_t *msg_)
i_mailbox * get_mailbox()
std::string last_endpoint
void process_bind(zmq::pipe_t *pipe_)
poller_t::handle_t handle
int parse_uri(const char *uri_, std::string &protocol_, std::string &address_)
void event_accepted(const std::string &addr_, int fd_)
array_t< pipe_t, 3 > pipes_t
virtual ~socket_base_t()
Definition: address.hpp:35
void read_activated(pipe_t *pipe_)
int recv(zmq::msg_t *msg_, int flags_)
virtual int xjoin(const char *group_)
virtual bool xhas_in()
virtual bool xhas_out()
int connect(const char *addr_)
virtual int xsetsockopt(int option_, const void *optval_, size_t optvallen_)