Line data Source code
1 : /*
2 : Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 :
4 : This file is part of libzmq, the ZeroMQ core engine in C++.
5 :
6 : libzmq is free software; you can redistribute it and/or modify it under
7 : the terms of the GNU Lesser General Public License (LGPL) as published
8 : by the Free Software Foundation; either version 3 of the License, or
9 : (at your option) any later version.
10 :
11 : As a special exception, the Contributors give you permission to link
12 : this library with independent modules to produce an executable,
13 : regardless of the license terms of these independent modules, and to
14 : copy and distribute the resulting executable under terms of your choice,
15 : provided that you also meet, for each linked independent module, the
16 : terms and conditions of the license of that module. An independent
17 : module is a module which is not derived from or based on this library.
18 : If you modify this library, you must extend this exception to your
19 : version of the library.
20 :
21 : libzmq is distributed in the hope that it will be useful, but WITHOUT
22 : ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 : FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 : License for more details.
25 :
26 : You should have received a copy of the GNU Lesser General Public License
27 : along with this program. If not, see <http://www.gnu.org/licenses/>.
28 : */
29 :
30 : #include "precompiled.hpp"
31 : #include <string.h>
32 : #include <stdarg.h>
33 :
34 : #include "object.hpp"
35 : #include "ctx.hpp"
36 : #include "err.hpp"
37 : #include "pipe.hpp"
38 : #include "io_thread.hpp"
39 : #include "session_base.hpp"
40 : #include "socket_base.hpp"
41 :
42 12223 : zmq::object_t::object_t (ctx_t *ctx_, uint32_t tid_) :
43 : ctx (ctx_),
44 12223 : tid (tid_)
45 : {
46 12223 : }
47 :
48 27240 : zmq::object_t::object_t (object_t *parent_) :
49 : ctx (parent_->ctx),
50 27240 : tid (parent_->tid)
51 : {
52 27240 : }
53 :
54 39376 : zmq::object_t::~object_t ()
55 : {
56 39376 : }
57 :
58 11407 : uint32_t zmq::object_t::get_tid ()
59 : {
60 137184 : return tid;
61 : }
62 :
63 276 : void zmq::object_t::set_tid(uint32_t id)
64 : {
65 276 : tid = id;
66 276 : }
67 :
68 3777 : zmq::ctx_t *zmq::object_t::get_ctx ()
69 : {
70 3777 : return ctx;
71 : }
72 :
73 129337 : void zmq::object_t::process_command (command_t &cmd_)
74 : {
75 129337 : switch (cmd_.type) {
76 :
77 : case command_t::activate_read:
78 11130 : process_activate_read ();
79 11130 : break;
80 :
81 : case command_t::activate_write:
82 4452 : process_activate_write (cmd_.args.activate_write.msgs_read);
83 4452 : break;
84 :
85 : case command_t::stop:
86 3289 : process_stop ();
87 3289 : break;
88 :
89 : case command_t::plug:
90 11543 : process_plug ();
91 11544 : process_seqnum ();
92 11544 : break;
93 :
94 : case command_t::own:
95 11546 : process_own (cmd_.args.own.object);
96 11545 : process_seqnum ();
97 11545 : break;
98 :
99 : case command_t::attach:
100 6768 : process_attach (cmd_.args.attach.engine);
101 6768 : process_seqnum ();
102 6768 : break;
103 :
104 : case command_t::bind:
105 4112 : process_bind (cmd_.args.bind.pipe);
106 4112 : process_seqnum ();
107 4112 : break;
108 :
109 : case command_t::hiccup:
110 4 : process_hiccup (cmd_.args.hiccup.pipe);
111 4 : break;
112 :
113 : case command_t::pipe_term:
114 9734 : process_pipe_term ();
115 9747 : break;
116 :
117 : case command_t::pipe_term_ack:
118 15685 : process_pipe_term_ack ();
119 15694 : break;
120 :
121 : case command_t::term_req:
122 5470 : process_term_req (cmd_.args.term_req.object);
123 5471 : break;
124 :
125 : case command_t::term:
126 11542 : process_term (cmd_.args.term.linger);
127 11545 : break;
128 :
129 : case command_t::term_ack:
130 11544 : process_term_ack ();
131 11543 : break;
132 :
133 : case command_t::reap:
134 11131 : process_reap (cmd_.args.reap.socket);
135 11131 : break;
136 :
137 : case command_t::reaped:
138 11131 : process_reaped ();
139 11131 : break;
140 :
141 : case command_t::inproc_connected:
142 256 : process_seqnum ();
143 256 : break;
144 :
145 : case command_t::done:
146 : default:
147 0 : zmq_assert (false);
148 : }
149 129362 : }
150 :
151 358 : int zmq::object_t::register_endpoint (const char *addr_,
152 : const endpoint_t &endpoint_)
153 : {
154 358 : return ctx->register_endpoint (addr_, endpoint_);
155 : }
156 :
157 9 : int zmq::object_t::unregister_endpoint (
158 : const std::string &addr_, socket_base_t *socket_)
159 : {
160 9 : return ctx->unregister_endpoint (addr_, socket_);
161 : }
162 :
163 11131 : void zmq::object_t::unregister_endpoints (socket_base_t *socket_)
164 : {
165 11131 : return ctx->unregister_endpoints (socket_);
166 : }
167 :
168 591 : zmq::endpoint_t zmq::object_t::find_endpoint (const char *addr_)
169 : {
170 591 : return ctx->find_endpoint (addr_);
171 : }
172 :
173 276 : void zmq::object_t::pend_connection (const std::string &addr_,
174 : const endpoint_t &endpoint_, pipe_t **pipes_)
175 : {
176 276 : ctx->pend_connection (addr_, endpoint_, pipes_);
177 276 : }
178 :
179 358 : void zmq::object_t::connect_pending (const char *addr_, zmq::socket_base_t *bind_socket_)
180 : {
181 358 : return ctx->connect_pending(addr_, bind_socket_);
182 : }
183 :
184 11131 : void zmq::object_t::destroy_socket (socket_base_t *socket_)
185 : {
186 11131 : ctx->destroy_socket (socket_);
187 11131 : }
188 :
189 11559 : zmq::io_thread_t *zmq::object_t::choose_io_thread (uint64_t affinity_)
190 : {
191 11559 : return ctx->choose_io_thread (affinity_);
192 : }
193 :
194 9273 : void zmq::object_t::send_stop ()
195 : {
196 : // 'stop' command goes always from administrative thread to
197 : // the current object.
198 : command_t cmd;
199 9273 : cmd.destination = this;
200 9273 : cmd.type = command_t::stop;
201 9273 : ctx->send_command (tid, cmd);
202 9273 : }
203 :
204 11545 : void zmq::object_t::send_plug (own_t *destination_, bool inc_seqnum_)
205 : {
206 11545 : if (inc_seqnum_)
207 11545 : destination_->inc_seqnum ();
208 :
209 : command_t cmd;
210 11546 : cmd.destination = destination_;
211 11546 : cmd.type = command_t::plug;
212 : send_command (cmd);
213 11546 : }
214 :
215 11546 : void zmq::object_t::send_own (own_t *destination_, own_t *object_)
216 : {
217 11546 : destination_->inc_seqnum ();
218 : command_t cmd;
219 11546 : cmd.destination = destination_;
220 11546 : cmd.type = command_t::own;
221 11546 : cmd.args.own.object = object_;
222 : send_command (cmd);
223 11546 : }
224 :
225 6767 : void zmq::object_t::send_attach (session_base_t *destination_,
226 : i_engine *engine_, bool inc_seqnum_)
227 : {
228 6767 : if (inc_seqnum_)
229 3253 : destination_->inc_seqnum ();
230 :
231 : command_t cmd;
232 6767 : cmd.destination = destination_;
233 6767 : cmd.type = command_t::attach;
234 6767 : cmd.args.attach.engine = engine_;
235 : send_command (cmd);
236 6768 : }
237 :
238 3856 : void zmq::object_t::send_bind (own_t *destination_, pipe_t *pipe_,
239 : bool inc_seqnum_)
240 : {
241 3856 : if (inc_seqnum_)
242 3524 : destination_->inc_seqnum ();
243 :
244 : command_t cmd;
245 3856 : cmd.destination = destination_;
246 3856 : cmd.type = command_t::bind;
247 3856 : cmd.args.bind.pipe = pipe_;
248 : send_command (cmd);
249 3856 : }
250 :
251 11130 : void zmq::object_t::send_activate_read (pipe_t *destination_)
252 : {
253 : command_t cmd;
254 11130 : cmd.destination = destination_;
255 11130 : cmd.type = command_t::activate_read;
256 : send_command (cmd);
257 11129 : }
258 :
259 4452 : void zmq::object_t::send_activate_write (pipe_t *destination_,
260 : uint64_t msgs_read_)
261 : {
262 : command_t cmd;
263 4452 : cmd.destination = destination_;
264 4452 : cmd.type = command_t::activate_write;
265 4452 : cmd.args.activate_write.msgs_read = msgs_read_;
266 : send_command (cmd);
267 4452 : }
268 :
269 4 : void zmq::object_t::send_hiccup (pipe_t *destination_, void *pipe_)
270 : {
271 : command_t cmd;
272 4 : cmd.destination = destination_;
273 4 : cmd.type = command_t::hiccup;
274 4 : cmd.args.hiccup.pipe = pipe_;
275 : send_command (cmd);
276 4 : }
277 :
278 9748 : void zmq::object_t::send_pipe_term (pipe_t *destination_)
279 : {
280 : command_t cmd;
281 9748 : cmd.destination = destination_;
282 9748 : cmd.type = command_t::pipe_term;
283 : send_command (cmd);
284 9748 : }
285 :
286 15663 : void zmq::object_t::send_pipe_term_ack (pipe_t *destination_)
287 : {
288 : command_t cmd;
289 15663 : cmd.destination = destination_;
290 15663 : cmd.type = command_t::pipe_term_ack;
291 : send_command (cmd);
292 15696 : }
293 :
294 5470 : void zmq::object_t::send_term_req (own_t *destination_,
295 : own_t *object_)
296 : {
297 : command_t cmd;
298 5470 : cmd.destination = destination_;
299 5470 : cmd.type = command_t::term_req;
300 5470 : cmd.args.term_req.object = object_;
301 : send_command (cmd);
302 5471 : }
303 :
304 11545 : void zmq::object_t::send_term (own_t *destination_, int linger_)
305 : {
306 : command_t cmd;
307 11545 : cmd.destination = destination_;
308 11545 : cmd.type = command_t::term;
309 11545 : cmd.args.term.linger = linger_;
310 : send_command (cmd);
311 11546 : }
312 :
313 11532 : void zmq::object_t::send_term_ack (own_t *destination_)
314 : {
315 : command_t cmd;
316 11532 : cmd.destination = destination_;
317 11532 : cmd.type = command_t::term_ack;
318 : send_command (cmd);
319 11545 : }
320 :
321 11131 : void zmq::object_t::send_reap (class socket_base_t *socket_)
322 : {
323 : command_t cmd;
324 11131 : cmd.destination = ctx->get_reaper ();
325 11131 : cmd.type = command_t::reap;
326 11131 : cmd.args.reap.socket = socket_;
327 : send_command (cmd);
328 11131 : }
329 :
330 11131 : void zmq::object_t::send_reaped ()
331 : {
332 : command_t cmd;
333 11131 : cmd.destination = ctx->get_reaper ();
334 11131 : cmd.type = command_t::reaped;
335 : send_command (cmd);
336 11131 : }
337 :
338 256 : void zmq::object_t::send_inproc_connected (zmq::socket_base_t *socket_)
339 : {
340 : command_t cmd;
341 256 : cmd.destination = socket_;
342 256 : cmd.type = command_t::inproc_connected;
343 : send_command (cmd);
344 256 : }
345 :
346 423 : void zmq::object_t::send_done ()
347 : {
348 : command_t cmd;
349 423 : cmd.destination = NULL;
350 423 : cmd.type = command_t::done;
351 423 : ctx->send_command (ctx_t::term_tid, cmd);
352 423 : }
353 :
354 0 : void zmq::object_t::process_stop ()
355 : {
356 0 : zmq_assert (false);
357 0 : }
358 :
359 0 : void zmq::object_t::process_plug ()
360 : {
361 0 : zmq_assert (false);
362 0 : }
363 :
364 0 : void zmq::object_t::process_own (own_t *)
365 : {
366 0 : zmq_assert (false);
367 0 : }
368 :
369 0 : void zmq::object_t::process_attach (i_engine *)
370 : {
371 0 : zmq_assert (false);
372 0 : }
373 :
374 0 : void zmq::object_t::process_bind (pipe_t *)
375 : {
376 0 : zmq_assert (false);
377 0 : }
378 :
379 0 : void zmq::object_t::process_activate_read ()
380 : {
381 0 : zmq_assert (false);
382 0 : }
383 :
384 0 : void zmq::object_t::process_activate_write (uint64_t)
385 : {
386 0 : zmq_assert (false);
387 0 : }
388 :
389 0 : void zmq::object_t::process_hiccup (void *)
390 : {
391 0 : zmq_assert (false);
392 0 : }
393 :
394 0 : void zmq::object_t::process_pipe_term ()
395 : {
396 0 : zmq_assert (false);
397 0 : }
398 :
399 0 : void zmq::object_t::process_pipe_term_ack ()
400 : {
401 0 : zmq_assert (false);
402 0 : }
403 :
404 0 : void zmq::object_t::process_term_req (own_t *)
405 : {
406 0 : zmq_assert (false);
407 0 : }
408 :
409 0 : void zmq::object_t::process_term (int)
410 : {
411 0 : zmq_assert (false);
412 0 : }
413 :
414 0 : void zmq::object_t::process_term_ack ()
415 : {
416 0 : zmq_assert (false);
417 0 : }
418 :
419 0 : void zmq::object_t::process_reap (class socket_base_t *)
420 : {
421 0 : zmq_assert (false);
422 0 : }
423 :
424 0 : void zmq::object_t::process_reaped ()
425 : {
426 0 : zmq_assert (false);
427 0 : }
428 :
429 0 : void zmq::object_t::process_seqnum ()
430 : {
431 0 : zmq_assert (false);
432 0 : }
433 :
434 0 : void zmq::object_t::send_command (command_t &cmd_)
435 : {
436 251554 : ctx->send_command (cmd_.destination->get_tid (), cmd_);
437 0 : }
438 :
|