Line data Source code
1 : /*
2 : Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 :
4 : This file is part of libzmq, the ZeroMQ core engine in C++.
5 :
6 : libzmq is free software; you can redistribute it and/or modify it under
7 : the terms of the GNU Lesser General Public License (LGPL) as published
8 : by the Free Software Foundation; either version 3 of the License, or
9 : (at your option) any later version.
10 :
11 : As a special exception, the Contributors give you permission to link
12 : this library with independent modules to produce an executable,
13 : regardless of the license terms of these independent modules, and to
14 : copy and distribute the resulting executable under terms of your choice,
15 : provided that you also meet, for each linked independent module, the
16 : terms and conditions of the license of that module. An independent
17 : module is a module which is not derived from or based on this library.
18 : If you modify this library, you must extend this exception to your
19 : version of the library.
20 :
21 : libzmq is distributed in the hope that it will be useful, but WITHOUT
22 : ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 : FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 : License for more details.
25 :
26 : You should have received a copy of the GNU Lesser General Public License
27 : along with this program. If not, see <http://www.gnu.org/licenses/>.
28 : */
29 :
30 : #include "precompiled.hpp"
31 : #include "macros.hpp"
32 : #include "platform.hpp"
33 :
34 : #include <string.h>
35 : #include <new>
36 : #include <sstream>
37 :
38 : #include "stream_engine.hpp"
39 : #include "io_thread.hpp"
40 : #include "session_base.hpp"
41 : #include "v1_encoder.hpp"
42 : #include "v1_decoder.hpp"
43 : #include "v2_encoder.hpp"
44 : #include "v2_decoder.hpp"
45 : #include "null_mechanism.hpp"
46 : #include "plain_client.hpp"
47 : #include "plain_server.hpp"
48 : #include "gssapi_client.hpp"
49 : #include "gssapi_server.hpp"
50 : #include "curve_client.hpp"
51 : #include "curve_server.hpp"
52 : #include "raw_decoder.hpp"
53 : #include "raw_encoder.hpp"
54 : #include "config.hpp"
55 : #include "err.hpp"
56 : #include "ip.hpp"
57 : #include "tcp.hpp"
58 : #include "likely.hpp"
59 : #include "wire.hpp"
60 :
61 6762 : zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_,
62 : const std::string &endpoint_) :
63 : s (fd_),
64 : as_server(false),
65 : handle(NULL),
66 : inpos (NULL),
67 : insize (0),
68 : decoder (NULL),
69 : outpos (NULL),
70 : outsize (0),
71 : encoder (NULL),
72 : metadata (NULL),
73 : handshaking (true),
74 : greeting_size (v2_greeting_size),
75 : greeting_bytes_read (0),
76 : session (NULL),
77 : options (options_),
78 : endpoint (endpoint_),
79 : plugged (false),
80 : next_msg (&stream_engine_t::identity_msg),
81 : process_msg (&stream_engine_t::process_identity_msg),
82 : io_error (false),
83 : subscription_required (false),
84 : mechanism (NULL),
85 : input_stopped (false),
86 : output_stopped (false),
87 : has_handshake_timer (false),
88 : has_ttl_timer (false),
89 : has_timeout_timer (false),
90 : has_heartbeat_timer (false),
91 : heartbeat_timeout (0),
92 13524 : socket (NULL)
93 : {
94 6762 : int rc = tx_msg.init ();
95 6761 : errno_assert (rc == 0);
96 :
97 : // Put the socket into non-blocking mode.
98 6761 : unblock_socket (s);
99 :
100 6761 : int family = get_peer_ip_address (s, peer_address);
101 6761 : if (family == 0)
102 0 : peer_address.clear();
103 : #if defined ZMQ_HAVE_SO_PEERCRED
104 : else
105 6761 : if (family == PF_UNIX) {
106 : struct ucred cred;
107 117 : socklen_t size = sizeof (cred);
108 117 : if (!getsockopt (s, SOL_SOCKET, SO_PEERCRED, &cred, &size)) {
109 117 : std::ostringstream buf;
110 351 : buf << ":" << cred.uid << ":" << cred.gid << ":" << cred.pid;
111 234 : peer_address += buf.str ();
112 : }
113 : }
114 : #elif defined ZMQ_HAVE_LOCAL_PEERCRED
115 : else
116 : if (family == PF_UNIX) {
117 : struct xucred cred;
118 : socklen_t size = sizeof (cred);
119 : if (!getsockopt (s, 0, LOCAL_PEERCRED, &cred, &size)
120 : && cred.cr_version == XUCRED_VERSION) {
121 : std::ostringstream buf;
122 : buf << ":" << cred.cr_uid << ":";
123 : if (cred.cr_ngroups > 0)
124 : buf << cred.cr_groups[0];
125 : buf << ":";
126 : peer_address += buf.str ();
127 : }
128 : }
129 : #endif
130 :
131 : #ifdef SO_NOSIGPIPE
132 : // Make sure that SIGPIPE signal is not generated when writing to a
133 : // connection that was already closed by the peer.
134 : int set = 1;
135 : rc = setsockopt (s, SOL_SOCKET, SO_NOSIGPIPE, &set, sizeof (int));
136 : errno_assert (rc == 0);
137 : #endif
138 6761 : if(options.heartbeat_interval > 0) {
139 15 : heartbeat_timeout = options.heartbeat_timeout;
140 15 : if(heartbeat_timeout == -1)
141 15 : heartbeat_timeout = options.heartbeat_interval;
142 : }
143 6761 : }
144 :
145 40570 : zmq::stream_engine_t::~stream_engine_t ()
146 : {
147 6762 : zmq_assert (!plugged);
148 :
149 6762 : if (s != retired_fd) {
150 : #ifdef ZMQ_HAVE_WINDOWS
151 : int rc = closesocket (s);
152 : wsa_assert (rc != SOCKET_ERROR);
153 : #else
154 6762 : int rc = close (s);
155 6762 : errno_assert (rc == 0);
156 : #endif
157 6762 : s = retired_fd;
158 : }
159 :
160 6762 : int rc = tx_msg.close ();
161 6762 : errno_assert (rc == 0);
162 :
163 : // Drop reference to metadata and destroy it if we are
164 : // the only user.
165 6762 : if (metadata != NULL) {
166 3801 : if (metadata->drop_ref ()) {
167 7478 : LIBZMQ_DELETE(metadata);
168 : }
169 : }
170 :
171 6762 : LIBZMQ_DELETE(encoder);
172 6762 : LIBZMQ_DELETE(decoder);
173 6762 : LIBZMQ_DELETE(mechanism);
174 13524 : }
175 :
176 6762 : void zmq::stream_engine_t::plug (io_thread_t *io_thread_,
177 : session_base_t *session_)
178 : {
179 6762 : zmq_assert (!plugged);
180 6762 : plugged = true;
181 :
182 : // Connect to session object.
183 6762 : zmq_assert (!session);
184 6762 : zmq_assert (session_);
185 6762 : session = session_;
186 6762 : socket = session-> get_socket ();
187 :
188 : // Connect to I/O threads poller object.
189 6762 : io_object_t::plug (io_thread_);
190 6762 : handle = add_fd (s);
191 6762 : io_error = false;
192 :
193 6762 : if (options.raw_socket) {
194 : // no handshaking for raw sock, instantiate raw encoder and decoders
195 33 : encoder = new (std::nothrow) raw_encoder_t (out_batch_size);
196 33 : alloc_assert (encoder);
197 :
198 33 : decoder = new (std::nothrow) raw_decoder_t (in_batch_size);
199 33 : alloc_assert (decoder);
200 :
201 : // disable handshaking for raw socket
202 33 : handshaking = false;
203 :
204 33 : next_msg = &stream_engine_t::pull_msg_from_session;
205 33 : process_msg = &stream_engine_t::push_raw_msg_to_session;
206 :
207 : properties_t properties;
208 33 : if (init_properties(properties)) {
209 : // Compile metadata.
210 33 : zmq_assert (metadata == NULL);
211 33 : metadata = new (std::nothrow) metadata_t (properties);
212 : }
213 :
214 33 : if (options.raw_notify) {
215 : // For raw sockets, send an initial 0-length message to the
216 : // application so that it knows a peer has connected.
217 : msg_t connector;
218 30 : connector.init();
219 30 : push_raw_msg_to_session (&connector);
220 30 : connector.close();
221 30 : session->flush ();
222 : }
223 : }
224 : else {
225 : // start optional timer, to prevent handshake hanging on no input
226 6729 : set_handshake_timer ();
227 :
228 : // Send the 'length' and 'flags' fields of the identity message.
229 : // The 'length' field is encoded in the long format.
230 6728 : outpos = greeting_send;
231 6728 : outpos [outsize++] = 0xff;
232 6728 : put_uint64 (&outpos [outsize], options.identity_size + 1);
233 6728 : outsize += 8;
234 6728 : outpos [outsize++] = 0x7f;
235 : }
236 :
237 6761 : set_pollin (handle);
238 6762 : set_pollout (handle);
239 : // Flush all the data that may have been already received downstream.
240 6762 : in_event ();
241 6762 : }
242 :
243 6762 : void zmq::stream_engine_t::unplug ()
244 : {
245 6762 : zmq_assert (plugged);
246 6761 : plugged = false;
247 :
248 : // Cancel all timers.
249 6761 : if (has_handshake_timer) {
250 2406 : cancel_timer (handshake_timer_id);
251 2406 : has_handshake_timer = false;
252 : }
253 :
254 6761 : if (has_ttl_timer) {
255 0 : cancel_timer (heartbeat_ttl_timer_id);
256 0 : has_ttl_timer = false;
257 : }
258 :
259 6761 : if (has_timeout_timer) {
260 0 : cancel_timer (heartbeat_timeout_timer_id);
261 0 : has_timeout_timer = false;
262 : }
263 :
264 6761 : if (has_heartbeat_timer) {
265 15 : cancel_timer (heartbeat_ivl_timer_id);
266 15 : has_heartbeat_timer = false;
267 : }
268 : // Cancel all fd subscriptions.
269 6761 : if (!io_error)
270 6761 : rm_fd (handle);
271 :
272 : // Disconnect from I/O threads poller object.
273 6762 : io_object_t::unplug ();
274 :
275 6762 : session = NULL;
276 6762 : }
277 :
278 4361 : void zmq::stream_engine_t::terminate ()
279 : {
280 4361 : unplug ();
281 4361 : delete this;
282 4361 : }
283 :
284 19704 : void zmq::stream_engine_t::in_event ()
285 : {
286 19704 : zmq_assert (!io_error);
287 :
288 : // If still handshaking, receive and process the greeting message.
289 19704 : if (unlikely (handshaking))
290 14912 : if (!handshake ())
291 14258 : return;
292 :
293 9109 : zmq_assert (decoder);
294 :
295 : // If there has been an I/O error, stop polling.
296 9109 : if (input_stopped) {
297 0 : rm_fd (handle);
298 0 : io_error = true;
299 0 : return;
300 : }
301 :
302 : // If there's no data to process in the buffer...
303 9109 : if (!insize) {
304 :
305 : // Retrieve the buffer and read as much data as possible.
306 : // Note that buffer can be arbitrarily large. However, we assume
307 : // the underlying TCP layer has fixed buffer size and thus the
308 : // number of bytes read will be always limited.
309 9109 : size_t bufsize = 0;
310 9109 : decoder->get_buffer (&inpos, &bufsize);
311 :
312 9109 : const int rc = tcp_read (s, inpos, bufsize);
313 :
314 9109 : if (rc == 0) {
315 942 : error (connection_error);
316 3618 : return;
317 : }
318 8167 : if (rc == -1) {
319 2676 : if (errno != EAGAIN)
320 47 : error (connection_error);
321 : return;
322 : }
323 :
324 : // Adjust input size
325 5491 : insize = static_cast <size_t> (rc);
326 : // Adjust buffer size to received bytes
327 5491 : decoder->resize_buffer(insize);
328 : }
329 :
330 5490 : int rc = 0;
331 5490 : size_t processed = 0;
332 :
333 107459 : while (insize > 0) {
334 96768 : rc = decoder->decode (inpos, insize, processed);
335 96769 : zmq_assert (processed <= insize);
336 96769 : inpos += processed;
337 96769 : insize -= processed;
338 96769 : if (rc == 0 || rc == -1)
339 : break;
340 96763 : rc = (this->*process_msg) (decoder->msg ());
341 96763 : if (rc == -1)
342 : break;
343 : }
344 :
345 : // Tear down the connection if we have failed to decode input data
346 : // or the session has rejected the message.
347 5491 : if (rc == -1) {
348 284 : if (errno != EAGAIN) {
349 45 : error (protocol_error);
350 : return;
351 : }
352 239 : input_stopped = true;
353 239 : reset_pollin (handle);
354 : }
355 :
356 5446 : session->flush ();
357 : }
358 :
359 30876 : void zmq::stream_engine_t::out_event ()
360 : {
361 30876 : zmq_assert (!io_error);
362 :
363 : // If write buffer is empty, try to read new data from the encoder.
364 30878 : if (!outsize) {
365 :
366 : // Even when we stop polling as soon as there is no
367 : // data to send, the poller may invoke out_event one
368 : // more time due to 'speculative write' optimisation.
369 20035 : if (unlikely (encoder == NULL)) {
370 0 : zmq_assert (handshaking);
371 : return;
372 : }
373 :
374 20035 : outpos = NULL;
375 20035 : outsize = encoder->encode (&outpos, 0);
376 :
377 679449 : while (outsize < (size_t) out_batch_size) {
378 659323 : if ((this->*next_msg) (&tx_msg) == -1)
379 : break;
380 639379 : encoder->load_msg (&tx_msg);
381 639379 : unsigned char *bufptr = outpos + outsize;
382 639379 : size_t n = encoder->encode (&bufptr, out_batch_size - outsize);
383 639379 : zmq_assert (n > 0);
384 639379 : if (outpos == NULL)
385 11008 : outpos = bufptr;
386 639379 : outsize += n;
387 : }
388 :
389 : // If there is no data to send, stop polling for output.
390 20037 : if (outsize == 0) {
391 8948 : output_stopped = true;
392 8948 : reset_pollout (handle);
393 8948 : return;
394 : }
395 : }
396 :
397 : // If there are any data to write in write buffer, write as much as
398 : // possible to the socket. Note that amount of data to write can be
399 : // arbitrarily large. However, we assume that underlying TCP layer has
400 : // limited transmission buffer and thus the actual number of bytes
401 : // written should be reasonably modest.
402 21932 : const int nbytes = tcp_write (s, outpos, outsize);
403 :
404 : // IO error has occurred. We stop waiting for output events.
405 : // The engine is not terminated until we detect input error;
406 : // this is necessary to prevent losing incoming messages.
407 21928 : if (nbytes == -1) {
408 12 : reset_pollout (handle);
409 12 : return;
410 : }
411 :
412 21916 : outpos += nbytes;
413 21916 : outsize -= nbytes;
414 :
415 : // If we are still handshaking and there are no data
416 : // to send, stop polling for output.
417 21916 : if (unlikely (handshaking))
418 8986 : if (outsize == 0)
419 8986 : reset_pollout (handle);
420 : }
421 :
422 10065 : void zmq::stream_engine_t::restart_output ()
423 : {
424 10065 : if (unlikely (io_error))
425 10065 : return;
426 :
427 10065 : if (likely (output_stopped)) {
428 4840 : set_pollout (handle);
429 4840 : output_stopped = false;
430 : }
431 :
432 : // Speculative write: The assumption is that at the moment new message
433 : // was sent by the user the socket is probably available for writing.
434 : // Thus we try to write the data to socket avoiding polling for POLLOUT.
435 : // Consequently, the latency should be better in request/reply scenarios.
436 10065 : out_event ();
437 : }
438 :
439 1191 : void zmq::stream_engine_t::restart_input ()
440 : {
441 1191 : zmq_assert (input_stopped);
442 1191 : zmq_assert (session != NULL);
443 1191 : zmq_assert (decoder != NULL);
444 :
445 1191 : int rc = (this->*process_msg) (decoder->msg ());
446 1191 : if (rc == -1) {
447 0 : if (errno == EAGAIN)
448 0 : session->flush ();
449 : else
450 0 : error (protocol_error);
451 1191 : return;
452 : }
453 :
454 542715 : while (insize > 0) {
455 542629 : size_t processed = 0;
456 542629 : rc = decoder->decode (inpos, insize, processed);
457 542629 : zmq_assert (processed <= insize);
458 542629 : inpos += processed;
459 542629 : insize -= processed;
460 542629 : if (rc == 0 || rc == -1)
461 : break;
462 542489 : rc = (this->*process_msg) (decoder->msg ());
463 542489 : if (rc == -1)
464 : break;
465 : }
466 :
467 1191 : if (rc == -1 && errno == EAGAIN)
468 965 : session->flush ();
469 : else
470 226 : if (io_error)
471 0 : error (connection_error);
472 : else
473 226 : if (rc == -1)
474 0 : error (protocol_error);
475 : else {
476 226 : input_stopped = false;
477 226 : set_pollin (handle);
478 226 : session->flush ();
479 :
480 : // Speculative read.
481 226 : in_event ();
482 : }
483 : }
484 :
485 14912 : bool zmq::stream_engine_t::handshake ()
486 : {
487 14912 : zmq_assert (handshaking);
488 14909 : zmq_assert (greeting_bytes_read < greeting_size);
489 : // Receive the greeting.
490 26996 : while (greeting_bytes_read < greeting_size) {
491 22674 : const int n = tcp_read (s, greeting_recv + greeting_bytes_read,
492 45348 : greeting_size - greeting_bytes_read);
493 22676 : if (n == 0) {
494 898 : error (connection_error);
495 898 : return false;
496 : }
497 21778 : if (n == -1) {
498 9681 : if (errno != EAGAIN)
499 442 : error (connection_error);
500 : return false;
501 : }
502 :
503 12097 : greeting_bytes_read += n;
504 :
505 : // We have received at least one byte from the peer.
506 : // If the first byte is not 0xff, we know that the
507 : // peer is using unversioned protocol.
508 12097 : if (greeting_recv [0] != 0xff)
509 : break;
510 :
511 12088 : if (greeting_bytes_read < signature_size)
512 : continue;
513 :
514 : // Inspect the right-most bit of the 10th byte (which coincides
515 : // with the 'flags' field if a regular message was sent).
516 : // Zero indicates this is a header of identity message
517 : // (i.e. the peer is using the unversioned protocol).
518 12088 : if (!(greeting_recv [9] & 0x01))
519 : break;
520 :
521 : // The peer is using versioned protocol.
522 : // Send the major version number.
523 12089 : if (outpos + outsize == greeting_send + signature_size) {
524 5235 : if (outsize == 0)
525 2656 : set_pollout (handle);
526 5233 : outpos [outsize++] = 3; // Major version number
527 : }
528 :
529 12087 : if (greeting_bytes_read > signature_size) {
530 8815 : if (outpos + outsize == greeting_send + signature_size + 1) {
531 4492 : if (outsize == 0)
532 2530 : set_pollout (handle);
533 :
534 : // Use ZMTP/2.0 to talk to older peers.
535 4492 : if (greeting_recv [10] == ZMTP_1_0
536 : || greeting_recv [10] == ZMTP_2_0)
537 0 : outpos [outsize++] = options.type;
538 : else {
539 4492 : outpos [outsize++] = 0; // Minor version number
540 4492 : memset (outpos + outsize, 0, 20);
541 :
542 4492 : zmq_assert (options.mechanism == ZMQ_NULL
543 : || options.mechanism == ZMQ_PLAIN
544 : || options.mechanism == ZMQ_CURVE
545 : || options.mechanism == ZMQ_GSSAPI);
546 :
547 4492 : if (options.mechanism == ZMQ_NULL)
548 4381 : memcpy (outpos + outsize, "NULL", 4);
549 : else
550 111 : if (options.mechanism == ZMQ_PLAIN)
551 18 : memcpy (outpos + outsize, "PLAIN", 5);
552 : else
553 93 : if (options.mechanism == ZMQ_GSSAPI)
554 0 : memcpy (outpos + outsize, "GSSAPI", 6);
555 : else
556 93 : if (options.mechanism == ZMQ_CURVE)
557 93 : memcpy (outpos + outsize, "CURVE", 5);
558 4492 : outsize += 20;
559 4492 : memset (outpos + outsize, 0, 32);
560 4492 : outsize += 32;
561 4492 : greeting_size = v3_greeting_size;
562 : }
563 : }
564 : }
565 : }
566 :
567 : // Position of the revision field in the greeting.
568 4330 : const size_t revision_pos = 10;
569 :
570 : // Is the peer using ZMTP/1.0 with no revision number?
571 : // If so, we send and receive rest of identity message
572 4330 : if (greeting_recv [0] != 0xff || !(greeting_recv [9] & 0x01)) {
573 7 : if (session->zap_enabled ()) {
574 : // reject ZMTP 1.0 connections if ZAP is enabled
575 9 : error (protocol_error);
576 9 : return false;
577 : }
578 :
579 0 : encoder = new (std::nothrow) v1_encoder_t (out_batch_size);
580 0 : alloc_assert (encoder);
581 :
582 0 : decoder = new (std::nothrow) v1_decoder_t (in_batch_size, options.maxmsgsize);
583 0 : alloc_assert (decoder);
584 :
585 : // We have already sent the message header.
586 : // Since there is no way to tell the encoder to
587 : // skip the message header, we simply throw that
588 : // header data away.
589 0 : const size_t header_size = options.identity_size + 1 >= 255 ? 10 : 2;
590 0 : unsigned char tmp [10], *bufferp = tmp;
591 :
592 : // Prepare the identity message and load it into encoder.
593 : // Then consume bytes we have already sent to the peer.
594 0 : const int rc = tx_msg.init_size (options.identity_size);
595 0 : zmq_assert (rc == 0);
596 0 : memcpy (tx_msg.data (), options.identity, options.identity_size);
597 0 : encoder->load_msg (&tx_msg);
598 0 : size_t buffer_size = encoder->encode (&bufferp, header_size);
599 0 : zmq_assert (buffer_size == header_size);
600 :
601 : // Make sure the decoder sees the data we have already received.
602 0 : inpos = greeting_recv;
603 0 : insize = greeting_bytes_read;
604 :
605 : // To allow for interoperability with peers that do not forward
606 : // their subscriptions, we inject a phantom subscription message
607 : // message into the incoming message stream.
608 0 : if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB)
609 0 : subscription_required = true;
610 :
611 : // We are sending our identity now and the next message
612 : // will come from the socket.
613 0 : next_msg = &stream_engine_t::pull_msg_from_session;
614 :
615 : // We are expecting identity message.
616 0 : process_msg = &stream_engine_t::process_identity_msg;
617 : }
618 : else
619 4323 : if (greeting_recv [revision_pos] == ZMTP_1_0) {
620 0 : if (session->zap_enabled ()) {
621 : // reject ZMTP 1.0 connections if ZAP is enabled
622 0 : error (protocol_error);
623 0 : return false;
624 : }
625 :
626 0 : encoder = new (std::nothrow) v1_encoder_t (out_batch_size);
627 0 : alloc_assert (encoder);
628 :
629 : decoder = new (std::nothrow) v1_decoder_t (
630 0 : in_batch_size, options.maxmsgsize);
631 0 : alloc_assert (decoder);
632 : }
633 : else
634 4323 : if (greeting_recv [revision_pos] == ZMTP_2_0) {
635 0 : if (session->zap_enabled ()) {
636 : // reject ZMTP 2.0 connections if ZAP is enabled
637 0 : error (protocol_error);
638 0 : return false;
639 : }
640 :
641 0 : encoder = new (std::nothrow) v2_encoder_t (out_batch_size);
642 0 : alloc_assert (encoder);
643 :
644 : decoder = new (std::nothrow) v2_decoder_t (
645 0 : in_batch_size, options.maxmsgsize);
646 0 : alloc_assert (decoder);
647 : }
648 : else {
649 4323 : encoder = new (std::nothrow) v2_encoder_t (out_batch_size);
650 4323 : alloc_assert (encoder);
651 :
652 : decoder = new (std::nothrow) v2_decoder_t (
653 4323 : in_batch_size, options.maxmsgsize);
654 4323 : alloc_assert (decoder);
655 :
656 4323 : if (options.mechanism == ZMQ_NULL
657 4212 : && memcmp (greeting_recv + 12, "NULL\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20) == 0) {
658 : mechanism = new (std::nothrow)
659 4209 : null_mechanism_t (session, peer_address, options);
660 4209 : alloc_assert (mechanism);
661 : }
662 : else
663 114 : if (options.mechanism == ZMQ_PLAIN
664 18 : && memcmp (greeting_recv + 12, "PLAIN\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20) == 0) {
665 18 : if (options.as_server)
666 : mechanism = new (std::nothrow)
667 12 : plain_server_t (session, peer_address, options);
668 : else
669 : mechanism = new (std::nothrow)
670 6 : plain_client_t (options);
671 18 : alloc_assert (mechanism);
672 : }
673 : #ifdef ZMQ_HAVE_CURVE
674 : else
675 96 : if (options.mechanism == ZMQ_CURVE
676 93 : && memcmp (greeting_recv + 12, "CURVE\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20) == 0) {
677 90 : if (options.as_server)
678 : mechanism = new (std::nothrow)
679 45 : curve_server_t (session, peer_address, options);
680 : else
681 45 : mechanism = new (std::nothrow) curve_client_t (options);
682 90 : alloc_assert (mechanism);
683 : }
684 : #endif
685 : #ifdef HAVE_LIBGSSAPI_KRB5
686 : else
687 : if (options.mechanism == ZMQ_GSSAPI
688 : && memcmp (greeting_recv + 12, "GSSAPI\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20) == 0) {
689 : if (options.as_server)
690 : mechanism = new (std::nothrow)
691 : gssapi_server_t (session, peer_address, options);
692 : else
693 : mechanism = new (std::nothrow) gssapi_client_t (options);
694 : alloc_assert (mechanism);
695 : }
696 : #endif
697 : else {
698 6 : error (protocol_error);
699 6 : return false;
700 : }
701 4317 : next_msg = &stream_engine_t::next_handshake_command;
702 4317 : process_msg = &stream_engine_t::process_handshake_command;
703 : }
704 :
705 : // Start polling for output if necessary.
706 4317 : if (outsize == 0)
707 2417 : set_pollout (handle);
708 :
709 : // Handshaking was successful.
710 : // Switch into the normal message flow.
711 4317 : handshaking = false;
712 :
713 4317 : if (has_handshake_timer) {
714 4317 : cancel_timer (handshake_timer_id);
715 4317 : has_handshake_timer = false;
716 : }
717 :
718 : return true;
719 : }
720 :
721 0 : int zmq::stream_engine_t::identity_msg (msg_t *msg_)
722 : {
723 0 : int rc = msg_->init_size (options.identity_size);
724 0 : errno_assert (rc == 0);
725 0 : if (options.identity_size > 0)
726 0 : memcpy (msg_->data (), options.identity, options.identity_size);
727 0 : next_msg = &stream_engine_t::pull_msg_from_session;
728 0 : return 0;
729 : }
730 :
731 0 : int zmq::stream_engine_t::process_identity_msg (msg_t *msg_)
732 : {
733 0 : if (options.recv_identity) {
734 0 : msg_->set_flags (msg_t::identity);
735 0 : int rc = session->push_msg (msg_);
736 0 : errno_assert (rc == 0);
737 : }
738 : else {
739 0 : int rc = msg_->close ();
740 0 : errno_assert (rc == 0);
741 0 : rc = msg_->init ();
742 0 : errno_assert (rc == 0);
743 : }
744 :
745 0 : if (subscription_required)
746 0 : process_msg = &stream_engine_t::write_subscription_msg;
747 : else
748 0 : process_msg = &stream_engine_t::push_msg_to_session;
749 :
750 0 : return 0;
751 : }
752 :
753 10869 : int zmq::stream_engine_t::next_handshake_command (msg_t *msg_)
754 : {
755 10869 : zmq_assert (mechanism != NULL);
756 :
757 10869 : if (mechanism->status () == mechanism_t::ready) {
758 1575 : mechanism_ready ();
759 1575 : return pull_and_encode (msg_);
760 : }
761 : else
762 9295 : if (mechanism->status () == mechanism_t::error) {
763 18 : errno = EPROTO;
764 18 : return -1;
765 : }
766 : else {
767 9277 : const int rc = mechanism->next_handshake_command (msg_);
768 9277 : if (rc == 0)
769 4159 : msg_->set_flags (msg_t::command);
770 9277 : return rc;
771 : }
772 : }
773 :
774 4002 : int zmq::stream_engine_t::process_handshake_command (msg_t *msg_)
775 : {
776 4002 : zmq_assert (mechanism != NULL);
777 4002 : const int rc = mechanism->process_handshake_command (msg_);
778 4002 : if (rc == 0) {
779 3963 : if (mechanism->status () == mechanism_t::ready)
780 2193 : mechanism_ready ();
781 : else
782 1770 : if (mechanism->status () == mechanism_t::error) {
783 6 : errno = EPROTO;
784 6 : return -1;
785 : }
786 3957 : if (output_stopped)
787 2177 : restart_output ();
788 : }
789 :
790 3996 : return rc;
791 : }
792 :
793 19 : void zmq::stream_engine_t::zap_msg_available ()
794 : {
795 19 : zmq_assert (mechanism != NULL);
796 :
797 19 : const int rc = mechanism->zap_msg_available ();
798 19 : if (rc == -1) {
799 0 : error (protocol_error);
800 19 : return;
801 : }
802 19 : if (input_stopped)
803 0 : restart_input ();
804 19 : if (output_stopped)
805 19 : restart_output ();
806 : }
807 :
808 3768 : void zmq::stream_engine_t::mechanism_ready ()
809 : {
810 3768 : if (options.heartbeat_interval > 0) {
811 15 : add_timer(options.heartbeat_interval, heartbeat_ivl_timer_id);
812 15 : has_heartbeat_timer = true;
813 : }
814 :
815 3768 : if (options.recv_identity) {
816 : msg_t identity;
817 492 : mechanism->peer_identity (&identity);
818 492 : const int rc = session->push_msg (&identity);
819 492 : if (rc == -1 && errno == EAGAIN) {
820 : // If the write is failing at this stage with
821 : // an EAGAIN the pipe must be being shut down,
822 : // so we can just bail out of the identity set.
823 3768 : return;
824 : }
825 492 : errno_assert (rc == 0);
826 492 : session->flush ();
827 : }
828 :
829 3768 : next_msg = &stream_engine_t::pull_and_encode;
830 3768 : process_msg = &stream_engine_t::write_credential;
831 :
832 : // Compile metadata.
833 : properties_t properties;
834 3768 : init_properties(properties);
835 :
836 : // Add ZAP properties.
837 3768 : const properties_t& zap_properties = mechanism->get_zap_properties ();
838 : properties.insert(zap_properties.begin (), zap_properties.end ());
839 :
840 : // Add ZMTP properties.
841 3768 : const properties_t& zmtp_properties = mechanism->get_zmtp_properties ();
842 : properties.insert(zmtp_properties.begin (), zmtp_properties.end ());
843 :
844 3768 : zmq_assert (metadata == NULL);
845 3768 : if (!properties.empty ())
846 3768 : metadata = new (std::nothrow) metadata_t (properties);
847 : }
848 :
849 117 : int zmq::stream_engine_t::pull_msg_from_session (msg_t *msg_)
850 : {
851 117 : return session->pull_msg (msg_);
852 : }
853 :
854 0 : int zmq::stream_engine_t::push_msg_to_session (msg_t *msg_)
855 : {
856 106 : return session->push_msg (msg_);
857 : }
858 :
859 106 : int zmq::stream_engine_t::push_raw_msg_to_session (msg_t *msg_) {
860 106 : if (metadata && metadata != msg_->metadata())
861 106 : msg_->set_metadata(metadata);
862 106 : return push_msg_to_session(msg_);
863 : }
864 :
865 892 : int zmq::stream_engine_t::write_credential (msg_t *msg_)
866 : {
867 892 : zmq_assert (mechanism != NULL);
868 892 : zmq_assert (session != NULL);
869 :
870 892 : const blob_t credential = mechanism->get_user_id ();
871 892 : if (credential.size () > 0) {
872 : msg_t msg;
873 12 : int rc = msg.init_size (credential.size ());
874 12 : zmq_assert (rc == 0);
875 12 : memcpy (msg.data (), credential.data (), credential.size ());
876 12 : msg.set_flags (msg_t::credential);
877 12 : rc = session->push_msg (&msg);
878 12 : if (rc == -1) {
879 0 : rc = msg.close ();
880 0 : errno_assert (rc == 0);
881 0 : return -1;
882 : }
883 : }
884 892 : process_msg = &stream_engine_t::decode_and_push;
885 892 : return decode_and_push (msg_);
886 : }
887 :
888 649700 : int zmq::stream_engine_t::pull_and_encode (msg_t *msg_)
889 : {
890 649700 : zmq_assert (mechanism != NULL);
891 :
892 649700 : if (session->pull_msg (msg_) == -1)
893 : return -1;
894 634983 : if (mechanism->encode (msg_) == -1)
895 : return -1;
896 634983 : return 0;
897 : }
898 :
899 635187 : int zmq::stream_engine_t::decode_and_push (msg_t *msg_)
900 : {
901 635187 : zmq_assert (mechanism != NULL);
902 :
903 635187 : if (mechanism->decode (msg_) == -1)
904 : return -1;
905 :
906 635187 : if(has_timeout_timer) {
907 105 : has_timeout_timer = false;
908 105 : cancel_timer(heartbeat_timeout_timer_id);
909 : }
910 :
911 635187 : if(has_ttl_timer) {
912 0 : has_ttl_timer = false;
913 0 : cancel_timer(heartbeat_ttl_timer_id);
914 : }
915 :
916 635187 : if(msg_->flags() & msg_t::command) {
917 219 : uint8_t cmd_id = *((uint8_t*)msg_->data());
918 219 : if(cmd_id == 4)
919 216 : process_heartbeat_message(msg_);
920 : }
921 :
922 635187 : if (metadata)
923 635187 : msg_->set_metadata (metadata);
924 635187 : if (session->push_msg (msg_) == -1) {
925 1202 : if (errno == EAGAIN)
926 1202 : process_msg = &stream_engine_t::push_one_then_decode_and_push;
927 : return -1;
928 : }
929 : return 0;
930 : }
931 :
932 1191 : int zmq::stream_engine_t::push_one_then_decode_and_push (msg_t *msg_)
933 : {
934 1191 : const int rc = session->push_msg (msg_);
935 1191 : if (rc == 0)
936 1191 : process_msg = &stream_engine_t::decode_and_push;
937 1191 : return rc;
938 : }
939 :
940 0 : int zmq::stream_engine_t::write_subscription_msg (msg_t *msg_)
941 : {
942 : msg_t subscription;
943 :
944 : // Inject the subscription message, so that also
945 : // ZMQ 2.x peers receive published messages.
946 0 : int rc = subscription.init_size (1);
947 0 : errno_assert (rc == 0);
948 0 : *(unsigned char*) subscription.data () = 1;
949 0 : rc = session->push_msg (&subscription);
950 0 : if (rc == -1)
951 : return -1;
952 :
953 0 : process_msg = &stream_engine_t::push_msg_to_session;
954 : return push_msg_to_session (msg_);
955 : }
956 :
957 2401 : void zmq::stream_engine_t::error (error_reason_t reason)
958 : {
959 2401 : if (options.raw_socket && options.raw_notify) {
960 : // For raw sockets, send a final 0-length message to the application
961 : // so that it knows the peer has been disconnected.
962 : msg_t terminator;
963 13 : terminator.init();
964 13 : (this->*process_msg) (&terminator);
965 13 : terminator.close();
966 : }
967 2401 : zmq_assert (session);
968 2401 : socket->event_disconnected (endpoint, (int) s);
969 2401 : session->flush ();
970 2401 : session->engine_error (reason);
971 2401 : unplug ();
972 2401 : delete this;
973 2401 : }
974 :
975 6729 : void zmq::stream_engine_t::set_handshake_timer ()
976 : {
977 6729 : zmq_assert (!has_handshake_timer);
978 :
979 6729 : if (!options.raw_socket && options.handshake_ivl > 0) {
980 6729 : add_timer (options.handshake_ivl, handshake_timer_id);
981 6728 : has_handshake_timer = true;
982 : }
983 6728 : }
984 :
985 3801 : bool zmq::stream_engine_t::init_properties (properties_t & properties) {
986 7602 : if (peer_address.empty()) return false;
987 7602 : properties.insert (std::make_pair("Peer-Address", peer_address));
988 :
989 : // Private property to support deprecated SRCFD
990 3801 : std::ostringstream stream;
991 3801 : stream << (int)s;
992 : std::string fd_string = stream.str();
993 3801 : properties.insert(std::make_pair("__fd", fd_string));
994 7602 : return true;
995 : }
996 :
997 120 : void zmq::stream_engine_t::timer_event (int id_)
998 : {
999 120 : if(id_ == handshake_timer_id) {
1000 6 : has_handshake_timer = false;
1001 : // handshake timer expired before handshake completed, so engine fail
1002 6 : error (timeout_error);
1003 : }
1004 114 : else if(id_ == heartbeat_ivl_timer_id) {
1005 108 : next_msg = &stream_engine_t::produce_ping_message;
1006 108 : out_event();
1007 108 : add_timer(options.heartbeat_interval, heartbeat_ivl_timer_id);
1008 : }
1009 6 : else if(id_ == heartbeat_ttl_timer_id) {
1010 3 : has_ttl_timer = false;
1011 3 : error(timeout_error);
1012 : }
1013 3 : else if(id_ == heartbeat_timeout_timer_id) {
1014 3 : has_timeout_timer = false;
1015 3 : error(timeout_error);
1016 : }
1017 : else
1018 : // There are no other valid timer ids!
1019 : assert(false);
1020 120 : }
1021 :
1022 108 : int zmq::stream_engine_t::produce_ping_message(msg_t * msg_)
1023 : {
1024 108 : int rc = 0;
1025 108 : zmq_assert (mechanism != NULL);
1026 :
1027 : // 16-bit TTL + \4PING == 7
1028 108 : rc = msg_->init_size(7);
1029 108 : errno_assert(rc == 0);
1030 108 : msg_->set_flags(msg_t::command);
1031 : // Copy in the command message
1032 108 : memcpy(msg_->data(), "\4PING", 5);
1033 :
1034 108 : uint16_t ttl_val = htons(options.heartbeat_ttl);
1035 108 : memcpy(((uint8_t*)msg_->data()) + 5, &ttl_val, sizeof(ttl_val));
1036 :
1037 108 : rc = mechanism->encode (msg_);
1038 108 : next_msg = &stream_engine_t::pull_and_encode;
1039 108 : if(!has_timeout_timer && heartbeat_timeout > 0) {
1040 108 : add_timer(heartbeat_timeout, heartbeat_timeout_timer_id);
1041 108 : has_timeout_timer = true;
1042 : }
1043 108 : return rc;
1044 : }
1045 :
1046 105 : int zmq::stream_engine_t::produce_pong_message(msg_t * msg_)
1047 : {
1048 105 : int rc = 0;
1049 105 : zmq_assert (mechanism != NULL);
1050 :
1051 105 : rc = msg_->init_size(5);
1052 105 : errno_assert(rc == 0);
1053 105 : msg_->set_flags(msg_t::command);
1054 :
1055 105 : memcpy(msg_->data(), "\4PONG", 5);
1056 :
1057 105 : rc = mechanism->encode (msg_);
1058 105 : next_msg = &stream_engine_t::pull_and_encode;
1059 105 : return rc;
1060 : }
1061 :
1062 216 : int zmq::stream_engine_t::process_heartbeat_message(msg_t * msg_)
1063 : {
1064 216 : if(memcmp(msg_->data(), "\4PING", 5) == 0) {
1065 : uint16_t remote_heartbeat_ttl;
1066 : // Get the remote heartbeat TTL to setup the timer
1067 105 : memcpy(&remote_heartbeat_ttl, (uint8_t*)msg_->data() + 5, 2);
1068 105 : remote_heartbeat_ttl = ntohs(remote_heartbeat_ttl);
1069 : // The remote heartbeat is in 10ths of a second
1070 : // so we multiply it by 100 to get the timer interval in ms.
1071 105 : remote_heartbeat_ttl *= 100;
1072 :
1073 105 : if(!has_ttl_timer && remote_heartbeat_ttl > 0) {
1074 3 : add_timer(remote_heartbeat_ttl, heartbeat_ttl_timer_id);
1075 3 : has_ttl_timer = true;
1076 : }
1077 :
1078 105 : next_msg = &stream_engine_t::produce_pong_message;
1079 105 : out_event();
1080 : }
1081 :
1082 216 : return 0;
1083 : }
|