libzmq  master
ZeroMQ C++ Core Engine (LIBZMQ)
req.cpp
Go to the documentation of this file.
1 /*
2  Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 
4  This file is part of libzmq, the ZeroMQ core engine in C++.
5 
6  libzmq is free software; you can redistribute it and/or modify it under
7  the terms of the GNU Lesser General Public License (LGPL) as published
8  by the Free Software Foundation; either version 3 of the License, or
9  (at your option) any later version.
10 
11  As a special exception, the Contributors give you permission to link
12  this library with independent modules to produce an executable,
13  regardless of the license terms of these independent modules, and to
14  copy and distribute the resulting executable under terms of your choice,
15  provided that you also meet, for each linked independent module, the
16  terms and conditions of the license of that module. An independent
17  module is a module which is not derived from or based on this library.
18  If you modify this library, you must extend this exception to your
19  version of the library.
20 
21  libzmq is distributed in the hope that it will be useful, but WITHOUT
22  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23  FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24  License for more details.
25 
26  You should have received a copy of the GNU Lesser General Public License
27  along with this program. If not, see <http://www.gnu.org/licenses/>.
28 */
29 
30 #include "precompiled.hpp"
31 #include "macros.hpp"
32 #include "req.hpp"
33 #include "err.hpp"
34 #include "msg.hpp"
35 #include "wire.hpp"
36 #include "random.hpp"
37 #include "likely.hpp"
38 
39 extern "C"
40 {
41  static void free_id (void *data, void *hint)
42  {
43  LIBZMQ_UNUSED (hint);
44  free (data);
45  }
46 }
47 
48 zmq::req_t::req_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
49  dealer_t (parent_, tid_, sid_),
50  receiving_reply (false),
51  message_begins (true),
52  reply_pipe (NULL),
53  request_id_frames_enabled (false),
54  request_id (generate_random ()),
55  strict (true)
56 {
58 }
59 
61 {
62 }
63 
65 {
66  // If we've sent a request and we still haven't got the reply,
67  // we can't send another request unless the strict option is disabled.
68  if (receiving_reply) {
69  if (strict) {
70  errno = EFSM;
71  return -1;
72  }
73 
74  receiving_reply = false;
75  message_begins = true;
76  }
77 
78  // First part of the request is the request identity.
79  if (message_begins) {
80  reply_pipe = NULL;
81 
83  request_id++;
84 
85  // Copy request id before sending (see issue #1695 for details).
86  uint32_t *request_id_copy = (uint32_t *) malloc (sizeof (uint32_t));
87  *request_id_copy = request_id;
88 
89  msg_t id;
90  int rc = id.init_data (request_id_copy, sizeof (uint32_t),
91  free_id, NULL);
92  errno_assert (rc == 0);
93  id.set_flags (msg_t::more);
94 
95  rc = dealer_t::sendpipe (&id, &reply_pipe);
96  if (rc != 0)
97  return -1;
98  }
99 
100  msg_t bottom;
101  int rc = bottom.init ();
102  errno_assert (rc == 0);
103  bottom.set_flags (msg_t::more);
104 
105  rc = dealer_t::sendpipe (&bottom, &reply_pipe);
106  if (rc != 0)
107  return -1;
109 
110  message_begins = false;
111 
112  // Eat all currently available messages before the request is fully
113  // sent. This is done to avoid:
114  // REQ sends request to A, A replies, B replies too.
115  // A's reply was first and matches, that is used.
116  // An hour later REQ sends a request to B. B's old reply is used.
117  msg_t drop;
118  while (true) {
119  rc = drop.init ();
120  errno_assert (rc == 0);
121  rc = dealer_t::xrecv (&drop);
122  if (rc != 0)
123  break;
124  drop.close ();
125  }
126  }
127 
128  bool more = msg_->flags () & msg_t::more ? true : false;
129 
130  int rc = dealer_t::xsend (msg_);
131  if (rc != 0)
132  return rc;
133 
134  // If the request was fully sent, flip the FSM into reply-receiving state.
135  if (!more) {
136  receiving_reply = true;
137  message_begins = true;
138  }
139 
140  return 0;
141 }
142 
144 {
145  // If request wasn't send, we can't wait for reply.
146  if (!receiving_reply) {
147  errno = EFSM;
148  return -1;
149  }
150 
151  // Skip messages until one with the right first frames is found.
152  while (message_begins) {
153  // If enabled, the first frame must have the correct request_id.
155  int rc = recv_reply_pipe (msg_);
156  if (rc != 0)
157  return rc;
158 
159  if (unlikely (!(msg_->flags () & msg_t::more) ||
160  msg_->size () != sizeof (request_id) ||
161  *static_cast<uint32_t *> (msg_->data ()) != request_id)) {
162  // Skip the remaining frames and try the next message
163  while (msg_->flags () & msg_t::more) {
164  rc = recv_reply_pipe (msg_);
165  errno_assert (rc == 0);
166  }
167  continue;
168  }
169  }
170 
171  // The next frame must be 0.
172  // TODO: Failing this check should also close the connection with the peer!
173  int rc = recv_reply_pipe (msg_);
174  if (rc != 0)
175  return rc;
176 
177  if (unlikely (!(msg_->flags () & msg_t::more) || msg_->size () != 0)) {
178  // Skip the remaining frames and try the next message
179  while (msg_->flags () & msg_t::more) {
180  rc = recv_reply_pipe (msg_);
181  errno_assert (rc == 0);
182  }
183  continue;
184  }
185 
186  message_begins = false;
187  }
188 
189  int rc = recv_reply_pipe (msg_);
190  if (rc != 0)
191  return rc;
192 
193  // If the reply is fully received, flip the FSM into request-sending state.
194  if (!(msg_->flags () & msg_t::more)) {
195  receiving_reply = false;
196  message_begins = true;
197  }
198 
199  return 0;
200 }
201 
203 {
204  // TODO: Duplicates should be removed here.
205 
206  if (!receiving_reply)
207  return false;
208 
209  return dealer_t::xhas_in ();
210 }
211 
213 {
214  if (receiving_reply)
215  return false;
216 
217  return dealer_t::xhas_out ();
218 }
219 
220 int zmq::req_t::xsetsockopt (int option_, const void *optval_, size_t optvallen_)
221 {
222  bool is_int = (optvallen_ == sizeof (int));
223  int value = 0;
224  if (is_int)
225  memcpy (&value, optval_, sizeof (int));
226 
227  switch (option_) {
228  case ZMQ_REQ_CORRELATE:
229  if (is_int && value >= 0) {
230  request_id_frames_enabled = (value != 0);
231  return 0;
232  }
233  break;
234 
235  case ZMQ_REQ_RELAXED:
236  if (is_int && value >= 0) {
237  strict = (value == 0);
238  return 0;
239  }
240  break;
241 
242  default:
243  break;
244  }
245 
246  return dealer_t::xsetsockopt (option_, optval_, optvallen_);
247 }
248 
250 {
251  if (reply_pipe == pipe_)
252  reply_pipe = NULL;
254 }
255 
257 {
258  while (true) {
259  pipe_t *pipe = NULL;
260  int rc = dealer_t::recvpipe (msg_, &pipe);
261  if (rc != 0)
262  return rc;
263  if (!reply_pipe || pipe == reply_pipe)
264  return 0;
265  }
266 }
267 
268 zmq::req_session_t::req_session_t (io_thread_t *io_thread_, bool connect_,
269  socket_base_t *socket_, const options_t &options_,
270  address_t *addr_) :
271  session_base_t (io_thread_, connect_, socket_, options_, addr_),
272  state (bottom)
273 {
274 }
275 
277 {
278 }
279 
281 {
282  switch (state) {
283  case bottom:
284  if (msg_->flags () == msg_t::more) {
285  // In case option ZMQ_CORRELATE is on, allow request_id to be
286  // transfered as first frame (would be too cumbersome to check
287  // whether the option is actually on or not).
288  if (msg_->size () == sizeof (uint32_t)) {
289  state = request_id;
290  return session_base_t::push_msg (msg_);
291  }
292  else if (msg_->size () == 0) {
293  state = body;
294  return session_base_t::push_msg (msg_);
295  }
296  }
297  break;
298  case request_id:
299  if (msg_->flags () == msg_t::more && msg_->size () == 0) {
300  state = body;
301  return session_base_t::push_msg (msg_);
302  }
303  break;
304  case body:
305  if (msg_->flags () == msg_t::more)
306  return session_base_t::push_msg (msg_);
307  if (msg_->flags () == 0) {
308  state = bottom;
309  return session_base_t::push_msg (msg_);
310  }
311  break;
312  }
313  errno = EFAULT;
314  return -1;
315 }
316 
318 {
320  state = bottom;
321 }
int xsend(zmq::msg_t *msg_)
Definition: req.cpp:64
#define EFSM
Definition: zmq.h:167
req_t(zmq::ctx_t *parent_, uint32_t tid_, int sid_)
Definition: req.cpp:48
int close()
Definition: msg.cpp:217
bool xhas_in()
Definition: req.cpp:202
int sendpipe(zmq::msg_t *msg_, zmq::pipe_t **pipe_)
Definition: dealer.cpp:135
#define zmq_assert(x)
Definition: err.hpp:119
virtual int push_msg(msg_t *msg_)
int xsetsockopt(int option_, const void *optval_, size_t optvallen_)
Definition: req.cpp:220
int xsetsockopt(int option_, const void *optval_, size_t optvallen_)
Definition: dealer.cpp:70
int recvpipe(zmq::msg_t *msg_, zmq::pipe_t **pipe_)
Definition: dealer.cpp:140
req_session_t(zmq::io_thread_t *io_thread_, bool connect_, zmq::socket_base_t *socket_, const options_t &options_, address_t *addr_)
Definition: req.cpp:268
bool xhas_out()
Definition: req.cpp:212
void xpipe_terminated(zmq::pipe_t *pipe_)
Definition: req.cpp:249
bool message_begins
Definition: req.hpp:73
unsigned char size
Definition: msg.hpp:188
int push_msg(msg_t *msg_)
Definition: req.cpp:280
bool xhas_in()
Definition: dealer.cpp:103
void reset()
Definition: req.cpp:317
enum zmq::req_session_t::@51 state
#define ZMQ_REQ
Definition: zmq.h:249
#define LIBZMQ_UNUSED(object)
Definition: macros.hpp:6
static void free_id(void *data, void *hint)
Definition: req.cpp:41
#define unlikely(x)
Definition: likely.hpp:38
int init()
Definition: msg.cpp:82
uint32_t generate_random()
Definition: random.cpp:54
uint32_t request_id
Definition: req.hpp:83
void xpipe_terminated(zmq::pipe_t *pipe_)
Definition: dealer.cpp:129
bool receiving_reply
Definition: req.hpp:69
bool xhas_out()
Definition: dealer.cpp:108
int xrecv(zmq::msg_t *msg_)
Definition: dealer.cpp:98
void set_flags(unsigned char flags_)
Definition: msg.cpp:384
#define errno_assert(x)
Definition: err.hpp:129
~req_t()
Definition: req.cpp:60
bool request_id_frames_enabled
Definition: req.hpp:79
int xrecv(zmq::msg_t *msg_)
Definition: req.cpp:143
unsigned char data[max_vsm_size]
Definition: msg.hpp:187
int recv_reply_pipe(zmq::msg_t *msg_)
Definition: req.cpp:256
int xsend(zmq::msg_t *msg_)
Definition: dealer.cpp:93
zmq::pipe_t * reply_pipe
Definition: req.hpp:76
#define ZMQ_REQ_RELAXED
Definition: zmq.h:306
options_t options
Definition: own.hpp:109
#define ZMQ_REQ_CORRELATE
Definition: zmq.h:305
unsigned char flags
Definition: msg.hpp:181
int init_data(void *data_, size_t size_, msg_free_fn *ffn_, void *hint_)
Definition: msg.cpp:148
virtual void reset()
bool strict
Definition: req.hpp:88