Line data Source code
1 : /*
2 : Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 :
4 : This file is part of libzmq, the ZeroMQ core engine in C++.
5 :
6 : libzmq is free software; you can redistribute it and/or modify it under
7 : the terms of the GNU Lesser General Public License (LGPL) as published
8 : by the Free Software Foundation; either version 3 of the License, or
9 : (at your option) any later version.
10 :
11 : As a special exception, the Contributors give you permission to link
12 : this library with independent modules to produce an executable,
13 : regardless of the license terms of these independent modules, and to
14 : copy and distribute the resulting executable under terms of your choice,
15 : provided that you also meet, for each linked independent module, the
16 : terms and conditions of the license of that module. An independent
17 : module is a module which is not derived from or based on this library.
18 : If you modify this library, you must extend this exception to your
19 : version of the library.
20 :
21 : libzmq is distributed in the hope that it will be useful, but WITHOUT
22 : ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 : FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 : License for more details.
25 :
26 : You should have received a copy of the GNU Lesser General Public License
27 : along with this program. If not, see <http://www.gnu.org/licenses/>.
28 : */
29 :
30 : #include "precompiled.hpp"
31 : #include "macros.hpp"
32 : #include "stream.hpp"
33 : #include "pipe.hpp"
34 : #include "wire.hpp"
35 : #include "random.hpp"
36 : #include "likely.hpp"
37 : #include "err.hpp"
38 :
39 33 : zmq::stream_t::stream_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
40 : socket_base_t (parent_, tid_, sid_),
41 : prefetched (false),
42 : identity_sent (false),
43 : current_out (NULL),
44 : more_out (false),
45 66 : next_rid (generate_random ())
46 : {
47 33 : options.type = ZMQ_STREAM;
48 33 : options.raw_socket = true;
49 :
50 33 : prefetched_id.init ();
51 33 : prefetched_msg.init ();
52 33 : }
53 :
54 132 : zmq::stream_t::~stream_t ()
55 : {
56 66 : zmq_assert (outpipes.empty ());
57 33 : prefetched_id.close ();
58 33 : prefetched_msg.close ();
59 66 : }
60 :
61 33 : void zmq::stream_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
62 : {
63 : LIBZMQ_UNUSED(subscribe_to_all_);
64 :
65 33 : zmq_assert (pipe_);
66 :
67 33 : identify_peer (pipe_);
68 33 : fq.attach (pipe_);
69 33 : }
70 :
71 33 : void zmq::stream_t::xpipe_terminated (pipe_t *pipe_)
72 : {
73 99 : outpipes_t::iterator it = outpipes.find (pipe_->get_identity ());
74 66 : zmq_assert (it != outpipes.end ());
75 33 : outpipes.erase (it);
76 33 : fq.pipe_terminated (pipe_);
77 33 : if (pipe_ == current_out)
78 0 : current_out = NULL;
79 33 : }
80 :
81 36 : void zmq::stream_t::xread_activated (pipe_t *pipe_)
82 : {
83 36 : fq.activated (pipe_);
84 36 : }
85 :
86 0 : void zmq::stream_t::xwrite_activated (pipe_t *pipe_)
87 : {
88 : outpipes_t::iterator it;
89 0 : for (it = outpipes.begin (); it != outpipes.end (); ++it)
90 0 : if (it->second.pipe == pipe_)
91 : break;
92 :
93 0 : zmq_assert (it != outpipes.end ());
94 0 : zmq_assert (!it->second.active);
95 0 : it->second.active = true;
96 0 : }
97 :
98 66 : int zmq::stream_t::xsend (msg_t *msg_)
99 : {
100 : // If this is the first part of the message it's the ID of the
101 : // peer to send the message to.
102 66 : if (!more_out) {
103 33 : zmq_assert (!current_out);
104 :
105 : // If we have malformed message (prefix with no subsequent message)
106 : // then just silently ignore it.
107 : // TODO: The connections should be killed instead.
108 33 : if (msg_->flags () & msg_t::more) {
109 :
110 : // Find the pipe associated with the identity stored in the prefix.
111 : // If there's no such pipe return an error
112 66 : blob_t identity ((unsigned char*) msg_->data (), msg_->size ());
113 66 : outpipes_t::iterator it = outpipes.find (identity);
114 :
115 66 : if (it != outpipes.end ()) {
116 33 : current_out = it->second.pipe;
117 33 : if (!current_out->check_write ()) {
118 0 : it->second.active = false;
119 0 : current_out = NULL;
120 0 : errno = EAGAIN;
121 0 : return -1;
122 : }
123 : }
124 : else {
125 0 : errno = EHOSTUNREACH;
126 0 : return -1;
127 : }
128 : }
129 :
130 : // Expect one more message frame.
131 33 : more_out = true;
132 :
133 33 : int rc = msg_->close ();
134 33 : errno_assert (rc == 0);
135 33 : rc = msg_->init ();
136 33 : errno_assert (rc == 0);
137 : return 0;
138 : }
139 :
140 : // Ignore the MORE flag
141 33 : msg_->reset_flags (msg_t::more);
142 :
143 : // This is the last part of the message.
144 33 : more_out = false;
145 :
146 : // Push the message into the pipe. If there's no out pipe, just drop it.
147 33 : if (current_out) {
148 :
149 : // Close the remote connection if user has asked to do so
150 : // by sending zero length message.
151 : // Pending messages in the pipe will be dropped (on receiving term- ack)
152 33 : if (msg_->size () == 0) {
153 9 : current_out->terminate (false);
154 9 : int rc = msg_->close ();
155 9 : errno_assert (rc == 0);
156 9 : rc = msg_->init ();
157 9 : errno_assert (rc == 0);
158 9 : current_out = NULL;
159 9 : return 0;
160 : }
161 24 : bool ok = current_out->write (msg_);
162 24 : if (likely (ok))
163 24 : current_out->flush ();
164 24 : current_out = NULL;
165 : }
166 : else {
167 0 : int rc = msg_->close ();
168 0 : errno_assert (rc == 0);
169 : }
170 :
171 : // Detach the message from the data buffer.
172 24 : int rc = msg_->init ();
173 24 : errno_assert (rc == 0);
174 :
175 : return 0;
176 : }
177 :
178 39 : int zmq::stream_t::xsetsockopt (int option_, const void *optval_,
179 : size_t optvallen_)
180 : {
181 39 : bool is_int = (optvallen_ == sizeof (int));
182 39 : int value = 0;
183 39 : if (is_int) memcpy(&value, optval_, sizeof (int));
184 :
185 39 : switch (option_) {
186 : case ZMQ_CONNECT_RID:
187 3 : if (optval_ && optvallen_) {
188 3 : connect_rid.assign ((char*) optval_, optvallen_);
189 : return 0;
190 : }
191 : break;
192 :
193 : case ZMQ_STREAM_NOTIFY:
194 18 : if (is_int && (value == 0 || value == 1)) {
195 18 : options.raw_notify = (value != 0);
196 18 : return 0;
197 : }
198 : break;
199 :
200 : default:
201 : break;
202 : }
203 18 : errno = EINVAL;
204 18 : return -1;
205 : }
206 :
207 206 : int zmq::stream_t::xrecv (msg_t *msg_)
208 : {
209 206 : if (prefetched) {
210 81 : if (!identity_sent) {
211 9 : int rc = msg_->move (prefetched_id);
212 9 : errno_assert (rc == 0);
213 9 : identity_sent = true;
214 : }
215 : else {
216 72 : int rc = msg_->move (prefetched_msg);
217 72 : errno_assert (rc == 0);
218 72 : prefetched = false;
219 : }
220 : return 0;
221 : }
222 :
223 125 : pipe_t *pipe = NULL;
224 125 : int rc = fq.recvpipe (&prefetched_msg, &pipe);
225 125 : if (rc != 0)
226 : return -1;
227 :
228 66 : zmq_assert (pipe != NULL);
229 66 : zmq_assert ((prefetched_msg.flags () & msg_t::more) == 0);
230 :
231 : // We have received a frame with TCP data.
232 : // Rather than sending this frame, we keep it in prefetched
233 : // buffer and send a frame with peer's ID.
234 66 : blob_t identity = pipe->get_identity ();
235 66 : rc = msg_->close();
236 66 : errno_assert (rc == 0);
237 66 : rc = msg_->init_size (identity.size ());
238 66 : errno_assert (rc == 0);
239 :
240 : // forward metadata (if any)
241 66 : metadata_t *metadata = prefetched_msg.metadata();
242 66 : if (metadata)
243 66 : msg_->set_metadata(metadata);
244 :
245 66 : memcpy (msg_->data (), identity.data (), identity.size ());
246 66 : msg_->set_flags (msg_t::more);
247 :
248 66 : prefetched = true;
249 66 : identity_sent = true;
250 :
251 66 : return 0;
252 : }
253 :
254 42 : bool zmq::stream_t::xhas_in ()
255 : {
256 : // We may already have a message pre-fetched.
257 42 : if (prefetched)
258 : return true;
259 :
260 : // Try to read the next message.
261 : // The message, if read, is kept in the pre-fetch buffer.
262 42 : pipe_t *pipe = NULL;
263 42 : int rc = fq.recvpipe (&prefetched_msg, &pipe);
264 42 : if (rc != 0)
265 : return false;
266 :
267 9 : zmq_assert (pipe != NULL);
268 9 : zmq_assert ((prefetched_msg.flags () & msg_t::more) == 0);
269 :
270 9 : blob_t identity = pipe->get_identity ();
271 9 : rc = prefetched_id.init_size (identity.size ());
272 9 : errno_assert (rc == 0);
273 :
274 : // forward metadata (if any)
275 9 : metadata_t *metadata = prefetched_msg.metadata();
276 9 : if (metadata)
277 9 : prefetched_id.set_metadata(metadata);
278 :
279 9 : memcpy (prefetched_id.data (), identity.data (), identity.size ());
280 9 : prefetched_id.set_flags (msg_t::more);
281 :
282 9 : prefetched = true;
283 9 : identity_sent = false;
284 :
285 9 : return true;
286 : }
287 :
288 42 : bool zmq::stream_t::xhas_out ()
289 : {
290 : // In theory, STREAM socket is always ready for writing. Whether actual
291 : // attempt to write succeeds depends on which pipe the message is going
292 : // to be routed to.
293 42 : return true;
294 : }
295 :
296 33 : void zmq::stream_t::identify_peer (pipe_t *pipe_)
297 : {
298 : // Always assign identity for raw-socket
299 : unsigned char buffer [5];
300 33 : buffer [0] = 0;
301 : blob_t identity;
302 66 : if (connect_rid.length ()) {
303 6 : identity = blob_t ((unsigned char*) connect_rid.c_str(),
304 : connect_rid.length ());
305 3 : connect_rid.clear ();
306 6 : outpipes_t::iterator it = outpipes.find (identity);
307 6 : zmq_assert (it == outpipes.end ());
308 : }
309 : else {
310 30 : put_uint32 (buffer + 1, next_rid++);
311 30 : identity = blob_t (buffer, sizeof buffer);
312 30 : memcpy (options.identity, identity.data (), identity.size ());
313 30 : options.identity_size = (unsigned char) identity.size ();
314 : }
315 33 : pipe_->set_identity (identity);
316 : // Add the record into output pipes lookup table
317 33 : outpipe_t outpipe = {pipe_, true};
318 : const bool ok = outpipes.insert (
319 99 : outpipes_t::value_type (identity, outpipe)).second;
320 33 : zmq_assert (ok);
321 33 : }
|