Line data Source code
1 : /*
2 : Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 :
4 : This file is part of libzmq, the ZeroMQ core engine in C++.
5 :
6 : libzmq is free software; you can redistribute it and/or modify it under
7 : the terms of the GNU Lesser General Public License (LGPL) as published
8 : by the Free Software Foundation; either version 3 of the License, or
9 : (at your option) any later version.
10 :
11 : As a special exception, the Contributors give you permission to link
12 : this library with independent modules to produce an executable,
13 : regardless of the license terms of these independent modules, and to
14 : copy and distribute the resulting executable under terms of your choice,
15 : provided that you also meet, for each linked independent module, the
16 : terms and conditions of the license of that module. An independent
17 : module is a module which is not derived from or based on this library.
18 : If you modify this library, you must extend this exception to your
19 : version of the library.
20 :
21 : libzmq is distributed in the hope that it will be useful, but WITHOUT
22 : ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 : FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 : License for more details.
25 :
26 : You should have received a copy of the GNU Lesser General Public License
27 : along with this program. If not, see <http://www.gnu.org/licenses/>.
28 : */
29 :
30 : #include "precompiled.hpp"
31 : #include "macros.hpp"
32 : #include "reaper.hpp"
33 : #include "socket_base.hpp"
34 : #include "err.hpp"
35 :
36 423 : zmq::reaper_t::reaper_t (class ctx_t *ctx_, uint32_t tid_) :
37 : object_t (ctx_, tid_),
38 : mailbox_handle(NULL),
39 : sockets (0),
40 846 : terminating (false)
41 : {
42 423 : poller = new (std::nothrow) poller_t (*ctx_);
43 423 : alloc_assert (poller);
44 :
45 423 : if (mailbox.get_fd () != retired_fd) {
46 423 : mailbox_handle = poller->add_fd (mailbox.get_fd (), this);
47 423 : poller->set_pollin (mailbox_handle);
48 : }
49 :
50 : #ifdef HAVE_FORK
51 423 : pid = getpid();
52 : #endif
53 423 : }
54 :
55 1692 : zmq::reaper_t::~reaper_t ()
56 : {
57 423 : LIBZMQ_DELETE(poller);
58 846 : }
59 :
60 423 : zmq::mailbox_t *zmq::reaper_t::get_mailbox ()
61 : {
62 423 : return &mailbox;
63 : }
64 :
65 423 : void zmq::reaper_t::start ()
66 : {
67 : // Start the thread.
68 423 : poller->start ();
69 423 : }
70 :
71 423 : void zmq::reaper_t::stop ()
72 : {
73 423 : send_stop ();
74 423 : }
75 :
76 3564 : void zmq::reaper_t::in_event ()
77 : {
78 : while (true) {
79 : #ifdef HAVE_FORK
80 26249 : if (unlikely(pid != getpid()))
81 : {
82 : //printf("zmq::reaper_t::in_event return in child process %d\n", (int)getpid());
83 3564 : return;
84 : }
85 : #endif
86 :
87 : // Get the next command. If there is none, exit.
88 : command_t cmd;
89 26249 : int rc = mailbox.recv (&cmd, 0);
90 26249 : if (rc != 0 && errno == EINTR)
91 0 : continue;
92 26249 : if (rc != 0 && errno == EAGAIN)
93 : break;
94 22685 : errno_assert (rc == 0);
95 :
96 : // Process the command.
97 22685 : cmd.destination->process_command (cmd);
98 : }
99 : }
100 :
101 0 : void zmq::reaper_t::out_event ()
102 : {
103 0 : zmq_assert (false);
104 0 : }
105 :
106 0 : void zmq::reaper_t::timer_event (int)
107 : {
108 0 : zmq_assert (false);
109 0 : }
110 :
111 423 : void zmq::reaper_t::process_stop ()
112 : {
113 423 : terminating = true;
114 :
115 : // If there are no sockets being reaped finish immediately.
116 423 : if (!sockets) {
117 16 : send_done ();
118 16 : poller->rm_fd (mailbox_handle);
119 16 : poller->stop ();
120 : }
121 423 : }
122 :
123 11131 : void zmq::reaper_t::process_reap (socket_base_t *socket_)
124 : {
125 : // Add the socket to the poller.
126 11131 : socket_->start_reaping (poller);
127 :
128 11131 : ++sockets;
129 11131 : }
130 :
131 11131 : void zmq::reaper_t::process_reaped ()
132 : {
133 11131 : --sockets;
134 :
135 : // If reaped was already asked to terminate and there are no more sockets,
136 : // finish immediately.
137 11131 : if (!sockets && terminating) {
138 407 : send_done ();
139 407 : poller->rm_fd (mailbox_handle);
140 407 : poller->stop ();
141 : }
142 11131 : }
|