Line data Source code
1 : /*
2 : Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 :
4 : This file is part of libzmq, the ZeroMQ core engine in C++.
5 :
6 : libzmq is free software; you can redistribute it and/or modify it under
7 : the terms of the GNU Lesser General Public License (LGPL) as published
8 : by the Free Software Foundation; either version 3 of the License, or
9 : (at your option) any later version.
10 :
11 : As a special exception, the Contributors give you permission to link
12 : this library with independent modules to produce an executable,
13 : regardless of the license terms of these independent modules, and to
14 : copy and distribute the resulting executable under terms of your choice,
15 : provided that you also meet, for each linked independent module, the
16 : terms and conditions of the license of that module. An independent
17 : module is a module which is not derived from or based on this library.
18 : If you modify this library, you must extend this exception to your
19 : version of the library.
20 :
21 : libzmq is distributed in the hope that it will be useful, but WITHOUT
22 : ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 : FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 : License for more details.
25 :
26 : You should have received a copy of the GNU Lesser General Public License
27 : along with this program. If not, see <http://www.gnu.org/licenses/>.
28 : */
29 :
30 : #include "precompiled.hpp"
31 : #include "req.hpp"
32 : #include "err.hpp"
33 : #include "msg.hpp"
34 : #include "wire.hpp"
35 : #include "random.hpp"
36 : #include "likely.hpp"
37 :
38 : extern "C"
39 : {
40 27 : static void free_id (void *data, void *hint)
41 : {
42 27 : free (data);
43 27 : }
44 : }
45 :
46 108 : zmq::req_t::req_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
47 : dealer_t (parent_, tid_, sid_),
48 : receiving_reply (false),
49 : message_begins (true),
50 : reply_pipe (NULL),
51 : request_id_frames_enabled (false),
52 108 : request_id (generate_random ()),
53 216 : strict (true)
54 : {
55 108 : options.type = ZMQ_REQ;
56 108 : }
57 :
58 108 : zmq::req_t::~req_t ()
59 : {
60 108 : }
61 :
62 207 : int zmq::req_t::xsend (msg_t *msg_)
63 : {
64 : // If we've sent a request and we still haven't got the reply,
65 : // we can't send another request unless the strict option is disabled.
66 207 : if (receiving_reply) {
67 9 : if (strict) {
68 0 : errno = EFSM;
69 0 : return -1;
70 : }
71 :
72 9 : receiving_reply = false;
73 9 : message_begins = true;
74 : }
75 :
76 : // First part of the request is the request identity.
77 207 : if (message_begins) {
78 150 : reply_pipe = NULL;
79 :
80 150 : if (request_id_frames_enabled) {
81 27 : request_id++;
82 :
83 : // Copy request id before sending (see issue #1695 for details).
84 27 : uint32_t *request_id_copy = (uint32_t *) malloc (sizeof (uint32_t));
85 27 : *request_id_copy = request_id;
86 :
87 : msg_t id;
88 : int rc = id.init_data (request_id_copy, sizeof (uint32_t),
89 27 : free_id, NULL);
90 27 : errno_assert (rc == 0);
91 27 : id.set_flags (msg_t::more);
92 :
93 27 : rc = dealer_t::sendpipe (&id, &reply_pipe);
94 27 : if (rc != 0)
95 0 : return -1;
96 : }
97 :
98 : msg_t bottom;
99 150 : int rc = bottom.init ();
100 150 : errno_assert (rc == 0);
101 150 : bottom.set_flags (msg_t::more);
102 :
103 150 : rc = dealer_t::sendpipe (&bottom, &reply_pipe);
104 150 : if (rc != 0)
105 : return -1;
106 129 : zmq_assert (reply_pipe);
107 :
108 129 : message_begins = false;
109 :
110 : // Eat all currently available messages before the request is fully
111 : // sent. This is done to avoid:
112 : // REQ sends request to A, A replies, B replies too.
113 : // A's reply was first and matches, that is used.
114 : // An hour later REQ sends a request to B. B's old reply is used.
115 : msg_t drop;
116 : while (true) {
117 138 : rc = drop.init ();
118 138 : errno_assert (rc == 0);
119 138 : rc = dealer_t::xrecv (&drop);
120 138 : if (rc != 0)
121 : break;
122 9 : drop.close ();
123 : }
124 : }
125 :
126 186 : bool more = msg_->flags () & msg_t::more ? true : false;
127 :
128 186 : int rc = dealer_t::xsend (msg_);
129 186 : if (rc != 0)
130 : return rc;
131 :
132 : // If the request was fully sent, flip the FSM into reply-receiving state.
133 186 : if (!more) {
134 129 : receiving_reply = true;
135 129 : message_begins = true;
136 : }
137 :
138 : return 0;
139 : }
140 :
141 351 : int zmq::req_t::xrecv (msg_t *msg_)
142 : {
143 : // If request wasn't send, we can't wait for reply.
144 351 : if (!receiving_reply) {
145 0 : errno = EFSM;
146 0 : return -1;
147 : }
148 :
149 : // Skip messages until one with the right first frames is found.
150 471 : while (message_begins) {
151 : // If enabled, the first frame must have the correct request_id.
152 312 : if (request_id_frames_enabled) {
153 51 : int rc = recv_reply_pipe (msg_);
154 51 : if (rc != 0)
155 : return rc;
156 :
157 21 : if (unlikely (!(msg_->flags () & msg_t::more) ||
158 : msg_->size () != sizeof (request_id) ||
159 : *static_cast<uint32_t *> (msg_->data ()) != request_id)) {
160 : // Skip the remaining frames and try the next message
161 18 : while (msg_->flags () & msg_t::more) {
162 12 : rc = recv_reply_pipe (msg_);
163 12 : errno_assert (rc == 0);
164 : }
165 : continue;
166 : }
167 : }
168 :
169 : // The next frame must be 0.
170 : // TODO: Failing this check should also close the connection with the peer!
171 276 : int rc = recv_reply_pipe (msg_);
172 276 : if (rc != 0)
173 : return rc;
174 :
175 114 : if (unlikely (!(msg_->flags () & msg_t::more) || msg_->size () != 0)) {
176 : // Skip the remaining frames and try the next message
177 0 : while (msg_->flags () & msg_t::more) {
178 0 : rc = recv_reply_pipe (msg_);
179 0 : errno_assert (rc == 0);
180 : }
181 : continue;
182 : }
183 :
184 114 : message_begins = false;
185 : }
186 :
187 159 : int rc = recv_reply_pipe (msg_);
188 159 : if (rc != 0)
189 : return rc;
190 :
191 : // If the reply is fully received, flip the FSM into request-sending state.
192 159 : if (!(msg_->flags () & msg_t::more)) {
193 114 : receiving_reply = false;
194 114 : message_begins = true;
195 : }
196 :
197 : return 0;
198 : }
199 :
200 0 : bool zmq::req_t::xhas_in ()
201 : {
202 : // TODO: Duplicates should be removed here.
203 :
204 0 : if (!receiving_reply)
205 : return false;
206 :
207 0 : return dealer_t::xhas_in ();
208 : }
209 :
210 0 : bool zmq::req_t::xhas_out ()
211 : {
212 0 : if (receiving_reply)
213 : return false;
214 :
215 0 : return dealer_t::xhas_out ();
216 : }
217 :
218 123 : int zmq::req_t::xsetsockopt (int option_, const void *optval_, size_t optvallen_)
219 : {
220 123 : bool is_int = (optvallen_ == sizeof (int));
221 123 : int value = 0;
222 123 : if (is_int)
223 : memcpy (&value, optval_, sizeof (int));
224 :
225 123 : switch (option_) {
226 : case ZMQ_REQ_CORRELATE:
227 9 : if (is_int && value >= 0) {
228 9 : request_id_frames_enabled = (value != 0);
229 9 : return 0;
230 : }
231 : break;
232 :
233 : case ZMQ_REQ_RELAXED:
234 6 : if (is_int && value >= 0) {
235 6 : strict = (value == 0);
236 6 : return 0;
237 : }
238 : break;
239 :
240 : default:
241 : break;
242 : }
243 :
244 108 : return dealer_t::xsetsockopt (option_, optval_, optvallen_);
245 : }
246 :
247 138 : void zmq::req_t::xpipe_terminated (pipe_t *pipe_)
248 : {
249 138 : if (reply_pipe == pipe_)
250 78 : reply_pipe = NULL;
251 138 : dealer_t::xpipe_terminated (pipe_);
252 138 : }
253 :
254 498 : int zmq::req_t::recv_reply_pipe (msg_t *msg_)
255 : {
256 : while (true) {
257 507 : pipe_t *pipe = NULL;
258 507 : int rc = dealer_t::recvpipe (msg_, &pipe);
259 507 : if (rc != 0)
260 498 : return rc;
261 315 : if (!reply_pipe || pipe == reply_pipe)
262 : return 0;
263 9 : }
264 : }
265 :
266 99 : zmq::req_session_t::req_session_t (io_thread_t *io_thread_, bool connect_,
267 : socket_base_t *socket_, const options_t &options_,
268 : address_t *addr_) :
269 : session_base_t (io_thread_, connect_, socket_, options_, addr_),
270 99 : state (bottom)
271 : {
272 99 : }
273 :
274 99 : zmq::req_session_t::~req_session_t ()
275 : {
276 99 : }
277 :
278 261 : int zmq::req_session_t::push_msg (msg_t *msg_)
279 : {
280 261 : switch (state) {
281 : case bottom:
282 96 : if (msg_->flags () == msg_t::more) {
283 : // In case option ZMQ_CORRELATE is on, allow request_id to be
284 : // transfered as first frame (would be too cumbersome to check
285 : // whether the option is actually on or not).
286 96 : if (msg_->size () == sizeof (uint32_t)) {
287 27 : state = request_id;
288 27 : return session_base_t::push_msg (msg_);
289 : }
290 69 : else if (msg_->size () == 0) {
291 69 : state = body;
292 69 : return session_base_t::push_msg (msg_);
293 : }
294 : }
295 : break;
296 : case request_id:
297 27 : if (msg_->flags () == msg_t::more && msg_->size () == 0) {
298 27 : state = body;
299 27 : return session_base_t::push_msg (msg_);
300 : }
301 : break;
302 : case body:
303 138 : if (msg_->flags () == msg_t::more)
304 42 : return session_base_t::push_msg (msg_);
305 96 : if (msg_->flags () == 0) {
306 96 : state = bottom;
307 96 : return session_base_t::push_msg (msg_);
308 : }
309 : break;
310 : }
311 0 : errno = EFAULT;
312 0 : return -1;
313 : }
314 :
315 0 : void zmq::req_session_t::reset ()
316 : {
317 0 : session_base_t::reset ();
318 0 : state = bottom;
319 0 : }
|