libzmq  master
ZeroMQ C++ Core Engine (LIBZMQ)
object.cpp
Go to the documentation of this file.
1 /*
2  Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 
4  This file is part of libzmq, the ZeroMQ core engine in C++.
5 
6  libzmq is free software; you can redistribute it and/or modify it under
7  the terms of the GNU Lesser General Public License (LGPL) as published
8  by the Free Software Foundation; either version 3 of the License, or
9  (at your option) any later version.
10 
11  As a special exception, the Contributors give you permission to link
12  this library with independent modules to produce an executable,
13  regardless of the license terms of these independent modules, and to
14  copy and distribute the resulting executable under terms of your choice,
15  provided that you also meet, for each linked independent module, the
16  terms and conditions of the license of that module. An independent
17  module is a module which is not derived from or based on this library.
18  If you modify this library, you must extend this exception to your
19  version of the library.
20 
21  libzmq is distributed in the hope that it will be useful, but WITHOUT
22  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23  FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24  License for more details.
25 
26  You should have received a copy of the GNU Lesser General Public License
27  along with this program. If not, see <http://www.gnu.org/licenses/>.
28 */
29 
30 #include "precompiled.hpp"
31 #include <string.h>
32 #include <stdarg.h>
33 
34 #include "object.hpp"
35 #include "ctx.hpp"
36 #include "err.hpp"
37 #include "pipe.hpp"
38 #include "io_thread.hpp"
39 #include "session_base.hpp"
40 #include "socket_base.hpp"
41 
42 zmq::object_t::object_t (ctx_t *ctx_, uint32_t tid_) :
43  ctx (ctx_),
44  tid (tid_)
45 {
46 }
47 
49  ctx (parent_->ctx),
50  tid (parent_->tid)
51 {
52 }
53 
55 {
56 }
57 
59 {
60  return tid;
61 }
62 
63 void zmq::object_t::set_tid(uint32_t id)
64 {
65  tid = id;
66 }
67 
69 {
70  return ctx;
71 }
72 
74 {
75  switch (cmd_.type) {
76 
79  break;
80 
83  break;
84 
85  case command_t::stop:
86  process_stop ();
87  break;
88 
89  case command_t::plug:
90  process_plug ();
91  process_seqnum ();
92  break;
93 
94  case command_t::own:
95  process_own (cmd_.args.own.object);
96  process_seqnum ();
97  break;
98 
99  case command_t::attach:
101  process_seqnum ();
102  break;
103 
104  case command_t::bind:
105  process_bind (cmd_.args.bind.pipe);
106  process_seqnum ();
107  break;
108 
109  case command_t::hiccup:
111  break;
112 
115  break;
116 
119  break;
120 
121  case command_t::term_req:
123  break;
124 
125  case command_t::term:
126  process_term (cmd_.args.term.linger);
127  break;
128 
129  case command_t::term_ack:
130  process_term_ack ();
131  break;
132 
133  case command_t::reap:
134  process_reap (cmd_.args.reap.socket);
135  break;
136 
137  case command_t::reaped:
138  process_reaped ();
139  break;
140 
142  process_seqnum ();
143  break;
144 
145  case command_t::done:
146  default:
147  zmq_assert (false);
148  }
149 }
150 
151 int zmq::object_t::register_endpoint (const char *addr_,
152  const endpoint_t &endpoint_)
153 {
154  return ctx->register_endpoint (addr_, endpoint_);
155 }
156 
158  const std::string &addr_, socket_base_t *socket_)
159 {
160  return ctx->unregister_endpoint (addr_, socket_);
161 }
162 
164 {
165  return ctx->unregister_endpoints (socket_);
166 }
167 
169 {
170  return ctx->find_endpoint (addr_);
171 }
172 
173 void zmq::object_t::pend_connection (const std::string &addr_,
174  const endpoint_t &endpoint_, pipe_t **pipes_)
175 {
176  ctx->pend_connection (addr_, endpoint_, pipes_);
177 }
178 
179 void zmq::object_t::connect_pending (const char *addr_, zmq::socket_base_t *bind_socket_)
180 {
181  return ctx->connect_pending(addr_, bind_socket_);
182 }
183 
185 {
186  ctx->destroy_socket (socket_);
187 }
188 
190 {
191  return ctx->choose_io_thread (affinity_);
192 }
193 
195 {
196  // 'stop' command goes always from administrative thread to
197  // the current object.
198  command_t cmd;
199  cmd.destination = this;
200  cmd.type = command_t::stop;
201  ctx->send_command (tid, cmd);
202 }
203 
204 void zmq::object_t::send_plug (own_t *destination_, bool inc_seqnum_)
205 {
206  if (inc_seqnum_)
207  destination_->inc_seqnum ();
208 
209  command_t cmd;
210  cmd.destination = destination_;
211  cmd.type = command_t::plug;
212  send_command (cmd);
213 }
214 
215 void zmq::object_t::send_own (own_t *destination_, own_t *object_)
216 {
217  destination_->inc_seqnum ();
218  command_t cmd;
219  cmd.destination = destination_;
220  cmd.type = command_t::own;
221  cmd.args.own.object = object_;
222  send_command (cmd);
223 }
224 
226  i_engine *engine_, bool inc_seqnum_)
227 {
228  if (inc_seqnum_)
229  destination_->inc_seqnum ();
230 
231  command_t cmd;
232  cmd.destination = destination_;
233  cmd.type = command_t::attach;
234  cmd.args.attach.engine = engine_;
235  send_command (cmd);
236 }
237 
238 void zmq::object_t::send_bind (own_t *destination_, pipe_t *pipe_,
239  bool inc_seqnum_)
240 {
241  if (inc_seqnum_)
242  destination_->inc_seqnum ();
243 
244  command_t cmd;
245  cmd.destination = destination_;
246  cmd.type = command_t::bind;
247  cmd.args.bind.pipe = pipe_;
248  send_command (cmd);
249 }
250 
252 {
253  command_t cmd;
254  cmd.destination = destination_;
256  send_command (cmd);
257 }
258 
260  uint64_t msgs_read_)
261 {
262  command_t cmd;
263  cmd.destination = destination_;
265  cmd.args.activate_write.msgs_read = msgs_read_;
266  send_command (cmd);
267 }
268 
269 void zmq::object_t::send_hiccup (pipe_t *destination_, void *pipe_)
270 {
271  command_t cmd;
272  cmd.destination = destination_;
273  cmd.type = command_t::hiccup;
274  cmd.args.hiccup.pipe = pipe_;
275  send_command (cmd);
276 }
277 
279 {
280  command_t cmd;
281  cmd.destination = destination_;
283  send_command (cmd);
284 }
285 
287 {
288  command_t cmd;
289  cmd.destination = destination_;
291  send_command (cmd);
292 }
293 
295  own_t *object_)
296 {
297  command_t cmd;
298  cmd.destination = destination_;
300  cmd.args.term_req.object = object_;
301  send_command (cmd);
302 }
303 
304 void zmq::object_t::send_term (own_t *destination_, int linger_)
305 {
306  command_t cmd;
307  cmd.destination = destination_;
308  cmd.type = command_t::term;
309  cmd.args.term.linger = linger_;
310  send_command (cmd);
311 }
312 
314 {
315  command_t cmd;
316  cmd.destination = destination_;
318  send_command (cmd);
319 }
320 
322 {
323  command_t cmd;
324  cmd.destination = ctx->get_reaper ();
325  cmd.type = command_t::reap;
326  cmd.args.reap.socket = socket_;
327  send_command (cmd);
328 }
329 
331 {
332  command_t cmd;
333  cmd.destination = ctx->get_reaper ();
334  cmd.type = command_t::reaped;
335  send_command (cmd);
336 }
337 
339 {
340  command_t cmd;
341  cmd.destination = socket_;
343  send_command (cmd);
344 }
345 
347 {
348  command_t cmd;
349  cmd.destination = NULL;
350  cmd.type = command_t::done;
352 }
353 
355 {
356  zmq_assert (false);
357 }
358 
360 {
361  zmq_assert (false);
362 }
363 
365 {
366  zmq_assert (false);
367 }
368 
370 {
371  zmq_assert (false);
372 }
373 
375 {
376  zmq_assert (false);
377 }
378 
380 {
381  zmq_assert (false);
382 }
383 
385 {
386  zmq_assert (false);
387 }
388 
390 {
391  zmq_assert (false);
392 }
393 
395 {
396  zmq_assert (false);
397 }
398 
400 {
401  zmq_assert (false);
402 }
403 
405 {
406  zmq_assert (false);
407 }
408 
410 {
411  zmq_assert (false);
412 }
413 
415 {
416  zmq_assert (false);
417 }
418 
420 {
421  zmq_assert (false);
422 }
423 
425 {
426  zmq_assert (false);
427 }
428 
430 {
431  zmq_assert (false);
432 }
433 
435 {
436  ctx->send_command (cmd_.destination->get_tid (), cmd_);
437 }
438 
endpoint_t find_endpoint(const char *addr_)
Definition: ctx.cpp:508
int unregister_endpoint(const std::string &addr_, socket_base_t *socket_)
Definition: ctx.cpp:470
void pend_connection(const std::string &addr_, const endpoint_t &endpoint, pipe_t **pipes_)
Definition: object.cpp:173
void destroy_socket(zmq::socket_base_t *socket_)
Definition: ctx.cpp:397
struct zmq::command_t::args_t::@12 term
void process_command(zmq::command_t &cmd_)
Definition: object.cpp:73
void send_attach(zmq::session_base_t *destination_, zmq::i_engine *engine_, bool inc_seqnum_=true)
Definition: object.cpp:225
uint32_t tid
Definition: object.hpp:142
struct zmq::command_t::args_t::@14 reap
struct zmq::command_t::args_t::@3 own
#define zmq_assert(x)
Definition: err.hpp:119
virtual void process_activate_write(uint64_t msgs_read_)
Definition: object.cpp:384
zmq::io_thread_t * choose_io_thread(uint64_t affinity_)
Definition: object.cpp:189
void send_command(command_t &cmd_)
Definition: object.cpp:434
virtual void process_term_ack()
Definition: object.cpp:414
void send_own(zmq::own_t *destination_, zmq::own_t *object_)
Definition: object.cpp:215
zmq::ctx_t * ctx
Definition: object.hpp:139
void pend_connection(const std::string &addr_, const endpoint_t &endpoint_, pipe_t **pipes_)
Definition: ctx.cpp:531
virtual void process_reap(zmq::socket_base_t *socket_)
Definition: object.cpp:419
virtual void process_activate_read()
Definition: object.cpp:379
void set_tid(uint32_t id)
Definition: object.cpp:63
void send_reaped()
Definition: object.cpp:330
virtual void process_reaped()
Definition: object.cpp:424
void send_done()
Definition: object.cpp:346
virtual void process_term_req(zmq::own_t *object_)
Definition: object.cpp:404
int unregister_endpoint(const std::string &addr_, socket_base_t *socket_)
Definition: object.cpp:157
virtual void process_bind(zmq::pipe_t *pipe_)
Definition: object.cpp:374
virtual void process_own(zmq::own_t *object_)
Definition: object.cpp:364
int register_endpoint(const char *addr_, const zmq::endpoint_t &endpoint_)
Definition: object.cpp:151
void destroy_socket(zmq::socket_base_t *socket_)
Definition: object.cpp:184
void send_pipe_term_ack(zmq::pipe_t *destination_)
Definition: object.cpp:286
void send_plug(zmq::own_t *destination_, bool inc_seqnum_=true)
Definition: object.cpp:204
void inc_seqnum()
Definition: own.cpp:66
union zmq::command_t::args_t args
virtual void process_pipe_term_ack()
Definition: object.cpp:399
virtual void process_pipe_term()
Definition: object.cpp:394
void unregister_endpoints(zmq::socket_base_t *socket_)
Definition: object.cpp:163
void connect_pending(const char *addr_, zmq::socket_base_t *bind_socket_)
Definition: object.cpp:179
zmq::socket_base_t * socket
Definition: command.hpp:151
struct zmq::command_t::args_t::@5 bind
virtual ~object_t()
Definition: object.cpp:54
struct zmq::command_t::args_t::@7 activate_write
void send_term_req(zmq::own_t *destination_, zmq::own_t *object_)
Definition: object.cpp:294
struct zmq::command_t::args_t::@8 hiccup
void send_term_ack(zmq::own_t *destination_)
Definition: object.cpp:313
void send_bind(zmq::own_t *destination_, zmq::pipe_t *pipe_, bool inc_seqnum_=true)
Definition: object.cpp:238
void send_term(zmq::own_t *destination_, int linger_)
Definition: object.cpp:304
void send_activate_write(zmq::pipe_t *destination_, uint64_t msgs_read_)
Definition: object.cpp:259
zmq::object_t * destination
Definition: command.hpp:53
virtual void process_hiccup(void *pipe_)
Definition: object.cpp:389
virtual void process_seqnum()
Definition: object.cpp:429
struct zmq::command_t::args_t::@11 term_req
void send_inproc_connected(zmq::socket_base_t *socket_)
Definition: object.cpp:338
void send_command(uint32_t tid_, const command_t &command_)
Definition: ctx.cpp:428
virtual void process_stop()
Definition: object.cpp:354
zmq::own_t * object
Definition: command.hpp:90
zmq::io_thread_t * choose_io_thread(uint64_t affinity_)
Definition: ctx.cpp:433
enum zmq::command_t::type_t type
virtual void process_term(int linger_)
Definition: object.cpp:409
void send_stop()
Definition: object.cpp:194
zmq::pipe_t * pipe
Definition: command.hpp:102
struct i_engine * engine
Definition: command.hpp:96
ctx_t * get_ctx()
Definition: object.cpp:68
void send_activate_read(zmq::pipe_t *destination_)
Definition: object.cpp:251
void connect_pending(const char *addr_, zmq::socket_base_t *bind_socket_)
Definition: ctx.cpp:552
virtual void process_plug()
Definition: object.cpp:359
void send_reap(zmq::socket_base_t *socket_)
Definition: object.cpp:321
int register_endpoint(const char *addr_, const endpoint_t &endpoint_)
Definition: ctx.cpp:453
void send_pipe_term(zmq::pipe_t *destination_)
Definition: object.cpp:278
zmq::object_t * get_reaper()
Definition: ctx.cpp:417
struct zmq::command_t::args_t::@4 attach
void send_hiccup(zmq::pipe_t *destination_, void *pipe_)
Definition: object.cpp:269
zmq::endpoint_t find_endpoint(const char *addr_)
Definition: object.cpp:168
object_t(zmq::ctx_t *ctx_, uint32_t tid_)
Definition: object.cpp:42
uint32_t get_tid()
Definition: object.cpp:58
void unregister_endpoints(zmq::socket_base_t *socket_)
Definition: ctx.cpp:490
virtual void process_attach(zmq::i_engine *engine_)
Definition: object.cpp:369