Line data Source code
1 : /*
2 : Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 :
4 : This file is part of libzmq, the ZeroMQ core engine in C++.
5 :
6 : libzmq is free software; you can redistribute it and/or modify it under
7 : the terms of the GNU Lesser General Public License (LGPL) as published
8 : by the Free Software Foundation; either version 3 of the License, or
9 : (at your option) any later version.
10 :
11 : As a special exception, the Contributors give you permission to link
12 : this library with independent modules to produce an executable,
13 : regardless of the license terms of these independent modules, and to
14 : copy and distribute the resulting executable under terms of your choice,
15 : provided that you also meet, for each linked independent module, the
16 : terms and conditions of the license of that module. An independent
17 : module is a module which is not derived from or based on this library.
18 : If you modify this library, you must extend this exception to your
19 : version of the library.
20 :
21 : libzmq is distributed in the hope that it will be useful, but WITHOUT
22 : ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 : FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 : License for more details.
25 :
26 : You should have received a copy of the GNU Lesser General Public License
27 : along with this program. If not, see <http://www.gnu.org/licenses/>.
28 : */
29 :
30 : #include "precompiled.hpp"
31 : #include "own.hpp"
32 : #include "err.hpp"
33 : #include "io_thread.hpp"
34 :
35 11197 : zmq::own_t::own_t (class ctx_t *parent_, uint32_t tid_) :
36 : object_t (parent_, tid_),
37 : terminating (false),
38 : sent_seqnum (0),
39 : processed_seqnum (0),
40 : owner (NULL),
41 33591 : term_acks (0)
42 : {
43 11197 : }
44 :
45 11546 : zmq::own_t::own_t (io_thread_t *io_thread_, const options_t &options_) :
46 : object_t (io_thread_),
47 : options (options_),
48 : terminating (false),
49 : sent_seqnum (0),
50 : processed_seqnum (0),
51 : owner (NULL),
52 34634 : term_acks (0)
53 : {
54 11544 : }
55 :
56 45464 : zmq::own_t::~own_t ()
57 : {
58 22726 : }
59 :
60 11545 : void zmq::own_t::set_owner (own_t *owner_)
61 : {
62 11545 : zmq_assert (!owner);
63 11545 : owner = owner_;
64 11545 : }
65 :
66 34216 : void zmq::own_t::inc_seqnum ()
67 : {
68 : // This function may be called from a different thread!
69 34216 : sent_seqnum.add (1);
70 34216 : }
71 :
72 34213 : void zmq::own_t::process_seqnum ()
73 : {
74 : // Catch up with counter of processed commands.
75 34213 : processed_seqnum++;
76 :
77 : // We may have catched up and still have pending terms acks.
78 34213 : check_term_acks ();
79 34213 : }
80 :
81 11545 : void zmq::own_t::launch_child (own_t *object_)
82 : {
83 : // Specify the owner of the object.
84 11545 : object_->set_owner (this);
85 :
86 : // Plug the object into the I/O thread.
87 11545 : send_plug (object_);
88 :
89 : // Take ownership of the object.
90 11546 : send_own (this, object_);
91 11546 : }
92 :
93 69 : void zmq::own_t::term_child (own_t *object_)
94 : {
95 69 : process_term_req (object_);
96 69 : }
97 :
98 5539 : void zmq::own_t::process_term_req (own_t *object_)
99 : {
100 : // When shutting down we can ignore termination requests from owned
101 : // objects. The termination request was already sent to the object.
102 5539 : if (terminating)
103 : return;
104 :
105 : // If I/O object is well and alive let's ask it to terminate.
106 14820 : owned_t::iterator it = std::find (owned.begin (), owned.end (), object_);
107 :
108 : // If not found, we assume that termination request was already sent to
109 : // the object so we can safely ignore the request.
110 9880 : if (it == owned.end ())
111 : return;
112 :
113 4937 : owned.erase (it);
114 : register_term_acks (1);
115 :
116 : // Note that this object is the root of the (partial shutdown) thus, its
117 : // value of linger is used, rather than the value stored by the children.
118 4937 : send_term (object_, options.linger);
119 : }
120 :
121 11546 : void zmq::own_t::process_own (own_t *object_)
122 : {
123 : // If the object is already being shut down, new owned objects are
124 : // immediately asked to terminate. Note that linger is set to zero.
125 11546 : if (terminating) {
126 : register_term_acks (1);
127 3159 : send_term (object_, 0);
128 14705 : return;
129 : }
130 :
131 : // Store the reference to the owned object.
132 8387 : owned.insert (object_);
133 : }
134 :
135 16599 : void zmq::own_t::terminate ()
136 : {
137 : // If termination is already underway, there's no point
138 : // in starting it anew.
139 16599 : if (terminating)
140 : return;
141 :
142 : // As for the root of the ownership tree, there's no one to terminate it,
143 : // so it has to terminate itself.
144 16601 : if (!owner) {
145 11131 : process_term (options.linger);
146 11131 : return;
147 : }
148 :
149 : // If I am an owned object, I'll ask my owner to terminate me.
150 5470 : send_term_req (owner, this);
151 : }
152 :
153 31503 : bool zmq::own_t::is_terminating ()
154 : {
155 31503 : return terminating;
156 : }
157 :
158 22656 : void zmq::own_t::process_term (int linger_)
159 : {
160 : // Double termination should never happen.
161 22656 : zmq_assert (!terminating);
162 :
163 : // Send termination request to all owned objects.
164 97530 : for (owned_t::iterator it = owned.begin (); it != owned.end (); ++it)
165 3450 : send_term (*it, linger_);
166 45316 : register_term_acks ((int) owned.size ());
167 22658 : owned.clear ();
168 :
169 : // Start termination process and check whether by chance we cannot
170 : // terminate immediately.
171 22660 : terminating = true;
172 22660 : check_term_acks ();
173 22676 : }
174 :
175 13954 : void zmq::own_t::register_term_acks (int count_)
176 : {
177 44708 : term_acks += count_;
178 13954 : }
179 :
180 19749 : void zmq::own_t::unregister_term_ack ()
181 : {
182 19749 : zmq_assert (term_acks > 0);
183 19749 : term_acks--;
184 :
185 : // This may be a last ack we are waiting for before termination...
186 19749 : check_term_acks ();
187 19744 : }
188 :
189 11544 : void zmq::own_t::process_term_ack ()
190 : {
191 11544 : unregister_term_ack ();
192 11542 : }
193 :
194 76507 : void zmq::own_t::check_term_acks ()
195 : {
196 161947 : if (terminating && processed_seqnum == sent_seqnum.get () &&
197 39532 : term_acks == 0) {
198 :
199 : // Sanity check. There should be no active children at this point.
200 45318 : zmq_assert (owned.empty ());
201 :
202 : // The root object has nobody to confirm the termination to.
203 : // Other nodes will confirm the termination to the owner.
204 22663 : if (owner)
205 11532 : send_term_ack (owner);
206 :
207 : // Deallocate the resources.
208 22676 : process_destroy ();
209 : }
210 76521 : }
211 :
212 22677 : void zmq::own_t::process_destroy ()
213 : {
214 22677 : delete this;
215 22673 : }
216 :
|