Line data Source code
1 : /*
2 : Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 :
4 : This file is part of libzmq, the ZeroMQ core engine in C++.
5 :
6 : libzmq is free software; you can redistribute it and/or modify it under
7 : the terms of the GNU Lesser General Public License (LGPL) as published
8 : by the Free Software Foundation; either version 3 of the License, or
9 : (at your option) any later version.
10 :
11 : As a special exception, the Contributors give you permission to link
12 : this library with independent modules to produce an executable,
13 : regardless of the license terms of these independent modules, and to
14 : copy and distribute the resulting executable under terms of your choice,
15 : provided that you also meet, for each linked independent module, the
16 : terms and conditions of the license of that module. An independent
17 : module is a module which is not derived from or based on this library.
18 : If you modify this library, you must extend this exception to your
19 : version of the library.
20 :
21 : libzmq is distributed in the hope that it will be useful, but WITHOUT
22 : ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 : FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 : License for more details.
25 :
26 : You should have received a copy of the GNU Lesser General Public License
27 : along with this program. If not, see <http://www.gnu.org/licenses/>.
28 : */
29 :
30 : #ifndef __ZMQ_CTX_HPP_INCLUDED__
31 : #define __ZMQ_CTX_HPP_INCLUDED__
32 :
33 : #include <map>
34 : #include <vector>
35 : #include <string>
36 : #include <stdarg.h>
37 :
38 : #include "mailbox.hpp"
39 : #include "array.hpp"
40 : #include "config.hpp"
41 : #include "mutex.hpp"
42 : #include "stdint.hpp"
43 : #include "options.hpp"
44 : #include "atomic_counter.hpp"
45 : #include "thread.hpp"
46 :
47 : namespace zmq
48 : {
49 :
50 : class object_t;
51 : class io_thread_t;
52 : class socket_base_t;
53 : class reaper_t;
54 : class pipe_t;
55 :
56 : // Information associated with inproc endpoint. Note that endpoint options
57 : // are registered as well so that the peer can access them without a need
58 : // for synchronisation, handshaking or similar.
59 5168 : struct endpoint_t
60 : {
61 : socket_base_t *socket;
62 : options_t options;
63 : };
64 :
65 : // Context object encapsulates all the global state associated with
66 : // the library.
67 :
68 : class ctx_t
69 : {
70 : public:
71 :
72 : // Create the context object.
73 : ctx_t ();
74 :
75 : // Returns false if object is not a context.
76 : bool check_tag ();
77 :
78 : // This function is called when user invokes zmq_ctx_term. If there are
79 : // no more sockets open it'll cause all the infrastructure to be shut
80 : // down. If there are open sockets still, the deallocation happens
81 : // after the last one is closed.
82 : int terminate ();
83 :
84 : // This function starts the terminate process by unblocking any blocking
85 : // operations currently in progress and stopping any more socket activity
86 : // (except zmq_close).
87 : // This function is non-blocking.
88 : // terminate must still be called afterwards.
89 : // This function is optional, terminate will unblock any current
90 : // operations as well.
91 : int shutdown();
92 :
93 : // Set and get context properties.
94 : int set (int option_, int optval_);
95 : int get (int option_);
96 :
97 : // Create and destroy a socket.
98 : zmq::socket_base_t *create_socket (int type_);
99 : void destroy_socket (zmq::socket_base_t *socket_);
100 :
101 : // Start a new thread with proper scheduling parameters.
102 : void start_thread (thread_t &thread_, thread_fn *tfn_, void *arg_) const;
103 :
104 : // Send command to the destination thread.
105 : void send_command (uint32_t tid_, const command_t &command_);
106 :
107 : // Returns the I/O thread that is the least busy at the moment.
108 : // Affinity specifies which I/O threads are eligible (0 = all).
109 : // Returns NULL if no I/O thread is available.
110 : zmq::io_thread_t *choose_io_thread (uint64_t affinity_);
111 :
112 : // Returns reaper thread object.
113 : zmq::object_t *get_reaper ();
114 :
115 : // Management of inproc endpoints.
116 : int register_endpoint (const char *addr_, const endpoint_t &endpoint_);
117 : int unregister_endpoint (const std::string &addr_, socket_base_t *socket_);
118 : void unregister_endpoints (zmq::socket_base_t *socket_);
119 : endpoint_t find_endpoint (const char *addr_);
120 : void pend_connection (const std::string &addr_,
121 : const endpoint_t &endpoint_, pipe_t **pipes_);
122 : void connect_pending (const char *addr_, zmq::socket_base_t *bind_socket_);
123 :
124 : #ifdef ZMQ_HAVE_VMCI
125 : // Return family for the VMCI socket or -1 if it's not available.
126 : int get_vmci_socket_family ();
127 : #endif
128 :
129 : enum {
130 : term_tid = 0,
131 : reaper_tid = 1
132 : };
133 :
134 : ~ctx_t ();
135 :
136 : private:
137 :
138 1860 : struct pending_connection_t
139 : {
140 : endpoint_t endpoint;
141 : pipe_t* connect_pipe;
142 : pipe_t* bind_pipe;
143 : };
144 :
145 : // Used to check whether the object is a context.
146 : uint32_t tag;
147 :
148 : // Sockets belonging to this context. We need the list so that
149 : // we can notify the sockets when zmq_ctx_term() is called.
150 : // The sockets will return ETERM then.
151 : typedef array_t <socket_base_t> sockets_t;
152 : sockets_t sockets;
153 :
154 : // List of unused thread slots.
155 : typedef std::vector <uint32_t> empty_slots_t;
156 : empty_slots_t empty_slots;
157 :
158 : // If true, zmq_init has been called but no socket has been created
159 : // yet. Launching of I/O threads is delayed.
160 : bool starting;
161 :
162 : // If true, zmq_ctx_term was already called.
163 : bool terminating;
164 :
165 : // Synchronisation of accesses to global slot-related data:
166 : // sockets, empty_slots, terminating. It also synchronises
167 : // access to zombie sockets as such (as opposed to slots) and provides
168 : // a memory barrier to ensure that all CPU cores see the same data.
169 : mutex_t slot_sync;
170 :
171 : // The reaper thread.
172 : zmq::reaper_t *reaper;
173 :
174 : // I/O threads.
175 : typedef std::vector <zmq::io_thread_t*> io_threads_t;
176 : io_threads_t io_threads;
177 :
178 : // Array of pointers to mailboxes for both application and I/O threads.
179 : uint32_t slot_count;
180 : i_mailbox **slots;
181 :
182 : // Mailbox for zmq_ctx_term thread.
183 : mailbox_t term_mailbox;
184 :
185 : // List of inproc endpoints within this context.
186 : typedef std::map <std::string, endpoint_t> endpoints_t;
187 : endpoints_t endpoints;
188 :
189 : // List of inproc connection endpoints pending a bind
190 : typedef std::multimap <std::string, pending_connection_t> pending_connections_t;
191 : pending_connections_t pending_connections;
192 :
193 : // Synchronisation of access to the list of inproc endpoints.
194 : mutex_t endpoints_sync;
195 :
196 : // Maximum socket ID.
197 : static atomic_counter_t max_socket_id;
198 :
199 : // Maximum number of sockets that can be opened at the same time.
200 : int max_sockets;
201 :
202 : // Maximum allowed message size
203 : int max_msgsz;
204 :
205 : // Number of I/O threads to launch.
206 : int io_thread_count;
207 :
208 : // Does context wait (possibly forever) on termination?
209 : bool blocky;
210 :
211 : // Is IPv6 enabled on this context?
212 : bool ipv6;
213 :
214 : // Thread scheduling parameters.
215 : int thread_priority;
216 : int thread_sched_policy;
217 :
218 : // Synchronisation of access to context options.
219 : mutex_t opt_sync;
220 :
221 : ctx_t (const ctx_t&);
222 : const ctx_t &operator = (const ctx_t&);
223 :
224 : #ifdef HAVE_FORK
225 : // the process that created this context. Used to detect forking.
226 : pid_t pid;
227 : #endif
228 : enum side { connect_side, bind_side };
229 : void connect_inproc_sockets(zmq::socket_base_t *bind_socket_, options_t& bind_options, const pending_connection_t &pending_connection_, side side_);
230 :
231 : #ifdef ZMQ_HAVE_VMCI
232 : int vmci_fd;
233 : int vmci_family;
234 : mutex_t vmci_sync;
235 : #endif
236 :
237 : mutex_t crypto_sync;
238 : };
239 :
240 : }
241 :
242 : #endif
|