libzmq  master
ZeroMQ C++ Core Engine (LIBZMQ)
stream.cpp
Go to the documentation of this file.
1 /*
2  Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 
4  This file is part of libzmq, the ZeroMQ core engine in C++.
5 
6  libzmq is free software; you can redistribute it and/or modify it under
7  the terms of the GNU Lesser General Public License (LGPL) as published
8  by the Free Software Foundation; either version 3 of the License, or
9  (at your option) any later version.
10 
11  As a special exception, the Contributors give you permission to link
12  this library with independent modules to produce an executable,
13  regardless of the license terms of these independent modules, and to
14  copy and distribute the resulting executable under terms of your choice,
15  provided that you also meet, for each linked independent module, the
16  terms and conditions of the license of that module. An independent
17  module is a module which is not derived from or based on this library.
18  If you modify this library, you must extend this exception to your
19  version of the library.
20 
21  libzmq is distributed in the hope that it will be useful, but WITHOUT
22  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23  FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24  License for more details.
25 
26  You should have received a copy of the GNU Lesser General Public License
27  along with this program. If not, see <http://www.gnu.org/licenses/>.
28 */
29 
30 #include "precompiled.hpp"
31 #include "macros.hpp"
32 #include "stream.hpp"
33 #include "pipe.hpp"
34 #include "wire.hpp"
35 #include "random.hpp"
36 #include "likely.hpp"
37 #include "err.hpp"
38 
39 zmq::stream_t::stream_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
40  socket_base_t (parent_, tid_, sid_),
41  prefetched (false),
42  identity_sent (false),
43  current_out (NULL),
44  more_out (false),
45  next_rid (generate_random ())
46 {
48  options.raw_socket = true;
49 
52 }
53 
55 {
56  zmq_assert (outpipes.empty ());
59 }
60 
61 void zmq::stream_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
62 {
63  LIBZMQ_UNUSED(subscribe_to_all_);
64 
65  zmq_assert (pipe_);
66 
67  identify_peer (pipe_);
68  fq.attach (pipe_);
69 }
70 
72 {
73  outpipes_t::iterator it = outpipes.find (pipe_->get_identity ());
74  zmq_assert (it != outpipes.end ());
75  outpipes.erase (it);
76  fq.pipe_terminated (pipe_);
77  if (pipe_ == current_out)
78  current_out = NULL;
79 }
80 
82 {
83  fq.activated (pipe_);
84 }
85 
87 {
88  outpipes_t::iterator it;
89  for (it = outpipes.begin (); it != outpipes.end (); ++it)
90  if (it->second.pipe == pipe_)
91  break;
92 
93  zmq_assert (it != outpipes.end ());
94  zmq_assert (!it->second.active);
95  it->second.active = true;
96 }
97 
99 {
100  // If this is the first part of the message it's the ID of the
101  // peer to send the message to.
102  if (!more_out) {
104 
105  // If we have malformed message (prefix with no subsequent message)
106  // then just silently ignore it.
107  // TODO: The connections should be killed instead.
108  if (msg_->flags () & msg_t::more) {
109 
110  // Find the pipe associated with the identity stored in the prefix.
111  // If there's no such pipe return an error
112  blob_t identity ((unsigned char*) msg_->data (), msg_->size ());
113  outpipes_t::iterator it = outpipes.find (identity);
114 
115  if (it != outpipes.end ()) {
116  current_out = it->second.pipe;
117  if (!current_out->check_write ()) {
118  it->second.active = false;
119  current_out = NULL;
120  errno = EAGAIN;
121  return -1;
122  }
123  }
124  else {
125  errno = EHOSTUNREACH;
126  return -1;
127  }
128  }
129 
130  // Expect one more message frame.
131  more_out = true;
132 
133  int rc = msg_->close ();
134  errno_assert (rc == 0);
135  rc = msg_->init ();
136  errno_assert (rc == 0);
137  return 0;
138  }
139 
140  // Ignore the MORE flag
141  msg_->reset_flags (msg_t::more);
142 
143  // This is the last part of the message.
144  more_out = false;
145 
146  // Push the message into the pipe. If there's no out pipe, just drop it.
147  if (current_out) {
148 
149  // Close the remote connection if user has asked to do so
150  // by sending zero length message.
151  // Pending messages in the pipe will be dropped (on receiving term- ack)
152  if (msg_->size () == 0) {
153  current_out->terminate (false);
154  int rc = msg_->close ();
155  errno_assert (rc == 0);
156  rc = msg_->init ();
157  errno_assert (rc == 0);
158  current_out = NULL;
159  return 0;
160  }
161  bool ok = current_out->write (msg_);
162  if (likely (ok))
163  current_out->flush ();
164  current_out = NULL;
165  }
166  else {
167  int rc = msg_->close ();
168  errno_assert (rc == 0);
169  }
170 
171  // Detach the message from the data buffer.
172  int rc = msg_->init ();
173  errno_assert (rc == 0);
174 
175  return 0;
176 }
177 
178 int zmq::stream_t::xsetsockopt (int option_, const void *optval_,
179  size_t optvallen_)
180 {
181  bool is_int = (optvallen_ == sizeof (int));
182  int value = 0;
183  if (is_int) memcpy(&value, optval_, sizeof (int));
184 
185  switch (option_) {
186  case ZMQ_CONNECT_RID:
187  if (optval_ && optvallen_) {
188  connect_rid.assign ((char*) optval_, optvallen_);
189  return 0;
190  }
191  break;
192 
193  case ZMQ_STREAM_NOTIFY:
194  if (is_int && (value == 0 || value == 1)) {
195  options.raw_notify = (value != 0);
196  return 0;
197  }
198  break;
199 
200  default:
201  break;
202  }
203  errno = EINVAL;
204  return -1;
205 }
206 
208 {
209  if (prefetched) {
210  if (!identity_sent) {
211  int rc = msg_->move (prefetched_id);
212  errno_assert (rc == 0);
213  identity_sent = true;
214  }
215  else {
216  int rc = msg_->move (prefetched_msg);
217  errno_assert (rc == 0);
218  prefetched = false;
219  }
220  return 0;
221  }
222 
223  pipe_t *pipe = NULL;
224  int rc = fq.recvpipe (&prefetched_msg, &pipe);
225  if (rc != 0)
226  return -1;
227 
228  zmq_assert (pipe != NULL);
230 
231  // We have received a frame with TCP data.
232  // Rather than sending this frame, we keep it in prefetched
233  // buffer and send a frame with peer's ID.
234  blob_t identity = pipe->get_identity ();
235  rc = msg_->close();
236  errno_assert (rc == 0);
237  rc = msg_->init_size (identity.size ());
238  errno_assert (rc == 0);
239 
240  // forward metadata (if any)
241  metadata_t *metadata = prefetched_msg.metadata();
242  if (metadata)
243  msg_->set_metadata(metadata);
244 
245  memcpy (msg_->data (), identity.data (), identity.size ());
246  msg_->set_flags (msg_t::more);
247 
248  prefetched = true;
249  identity_sent = true;
250 
251  return 0;
252 }
253 
255 {
256  // We may already have a message pre-fetched.
257  if (prefetched)
258  return true;
259 
260  // Try to read the next message.
261  // The message, if read, is kept in the pre-fetch buffer.
262  pipe_t *pipe = NULL;
263  int rc = fq.recvpipe (&prefetched_msg, &pipe);
264  if (rc != 0)
265  return false;
266 
267  zmq_assert (pipe != NULL);
269 
270  blob_t identity = pipe->get_identity ();
271  rc = prefetched_id.init_size (identity.size ());
272  errno_assert (rc == 0);
273 
274  // forward metadata (if any)
275  metadata_t *metadata = prefetched_msg.metadata();
276  if (metadata)
277  prefetched_id.set_metadata(metadata);
278 
279  memcpy (prefetched_id.data (), identity.data (), identity.size ());
281 
282  prefetched = true;
283  identity_sent = false;
284 
285  return true;
286 }
287 
289 {
290  // In theory, STREAM socket is always ready for writing. Whether actual
291  // attempt to write succeeds depends on which pipe the message is going
292  // to be routed to.
293  return true;
294 }
295 
297 {
298  // Always assign identity for raw-socket
299  unsigned char buffer [5];
300  buffer [0] = 0;
301  blob_t identity;
302  if (connect_rid.length ()) {
303  identity = blob_t ((unsigned char*) connect_rid.c_str(),
304  connect_rid.length ());
305  connect_rid.clear ();
306  outpipes_t::iterator it = outpipes.find (identity);
307  zmq_assert (it == outpipes.end ());
308  }
309  else {
310  put_uint32 (buffer + 1, next_rid++);
311  identity = blob_t (buffer, sizeof buffer);
312  memcpy (options.identity, identity.data (), identity.size ());
313  options.identity_size = (unsigned char) identity.size ();
314  }
315  pipe_->set_identity (identity);
316  // Add the record into output pipes lookup table
317  outpipe_t outpipe = {pipe_, true};
318  const bool ok = outpipes.insert (
319  outpipes_t::value_type (identity, outpipe)).second;
320  zmq_assert (ok);
321 }
bool check_write()
Definition: pipe.cpp:206
outpipes_t outpipes
Definition: stream.hpp:89
int close()
Definition: msg.cpp:217
#define ZMQ_STREAM
Definition: zmq.h:257
bool write(msg_t *msg_)
Definition: pipe.cpp:221
bool xhas_out()
Definition: stream.cpp:288
#define zmq_assert(x)
Definition: err.hpp:119
int move(msg_t &src_)
Definition: msg.cpp:274
void identify_peer(pipe_t *pipe_)
Definition: stream.cpp:296
int recvpipe(msg_t *msg_, pipe_t **pipe_)
Definition: fq.cpp:88
#define ZMQ_STREAM_NOTIFY
Definition: zmq.h:324
void pipe_terminated(pipe_t *pipe_)
Definition: fq.cpp:56
metadata_t * metadata
Definition: msg.hpp:175
void put_uint32(unsigned char *buffer_, uint32_t value)
Definition: wire.hpp:64
stream_t(zmq::ctx_t *parent_, uint32_t tid_, int sid)
Definition: stream.cpp:39
unsigned char size
Definition: msg.hpp:188
int init_size(size_t size_)
Definition: msg.cpp:93
void reset_flags(unsigned char flags_)
Definition: msg.cpp:389
void xpipe_terminated(zmq::pipe_t *pipe_)
Definition: stream.cpp:71
int xsetsockopt(int option_, const void *optval_, size_t optvallen_)
Definition: stream.cpp:178
msg_t prefetched_msg
Definition: stream.hpp:79
#define LIBZMQ_UNUSED(object)
Definition: macros.hpp:6
int init()
Definition: msg.cpp:82
uint32_t generate_random()
Definition: random.cpp:54
#define EHOSTUNREACH
Definition: zmq.h:160
void attach(pipe_t *pipe_)
Definition: fq.cpp:49
unsigned char identity[256]
Definition: options.hpp:74
std::basic_string< unsigned char > blob_t
Definition: blob.hpp:134
void flush()
Definition: pipe.cpp:248
int xrecv(zmq::msg_t *msg_)
Definition: stream.cpp:207
int xsend(zmq::msg_t *msg_)
Definition: stream.cpp:98
std::string connect_rid
void xwrite_activated(zmq::pipe_t *pipe_)
Definition: stream.cpp:86
zmq::pipe_t * current_out
Definition: stream.hpp:92
void xattach_pipe(zmq::pipe_t *pipe_, bool subscribe_to_all_)
Definition: stream.cpp:61
void set_flags(unsigned char flags_)
Definition: msg.cpp:384
void terminate(bool delay_)
Definition: pipe.cpp:385
void activated(pipe_t *pipe_)
Definition: fq.cpp:76
#define errno_assert(x)
Definition: err.hpp:129
bool prefetched
Definition: stream.hpp:69
bool identity_sent
Definition: stream.hpp:73
msg_t prefetched_id
Definition: stream.hpp:76
unsigned char data[max_vsm_size]
Definition: msg.hpp:187
void set_identity(const blob_t &identity_)
Definition: pipe.cpp:128
options_t options
Definition: own.hpp:109
bool more_out
Definition: stream.hpp:95
#define likely(x)
Definition: likely.hpp:37
unsigned char identity_size
Definition: options.hpp:73
bool xhas_in()
Definition: stream.cpp:254
unsigned char flags
Definition: msg.hpp:181
#define ZMQ_CONNECT_RID
Definition: zmq.h:311
uint32_t next_rid
Definition: stream.hpp:99
void xread_activated(zmq::pipe_t *pipe_)
Definition: stream.cpp:81
blob_t get_identity()
Definition: pipe.cpp:133
void set_metadata(metadata_t *metadata_)
Definition: msg.cpp:399