Line data Source code
1 : /*
2 : Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 :
4 : This file is part of libzmq, the ZeroMQ core engine in C++.
5 :
6 : libzmq is free software; you can redistribute it and/or modify it under
7 : the terms of the GNU Lesser General Public License (LGPL) as published
8 : by the Free Software Foundation; either version 3 of the License, or
9 : (at your option) any later version.
10 :
11 : As a special exception, the Contributors give you permission to link
12 : this library with independent modules to produce an executable,
13 : regardless of the license terms of these independent modules, and to
14 : copy and distribute the resulting executable under terms of your choice,
15 : provided that you also meet, for each linked independent module, the
16 : terms and conditions of the license of that module. An independent
17 : module is a module which is not derived from or based on this library.
18 : If you modify this library, you must extend this exception to your
19 : version of the library.
20 :
21 : libzmq is distributed in the hope that it will be useful, but WITHOUT
22 : ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 : FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 : License for more details.
25 :
26 : You should have received a copy of the GNU Lesser General Public License
27 : along with this program. If not, see <http://www.gnu.org/licenses/>.
28 : */
29 :
30 : #include "precompiled.hpp"
31 : #include <stdlib.h>
32 : #include <string.h>
33 : #include <cmath>
34 :
35 : #include "platform.hpp"
36 : #ifdef ZMQ_HAVE_WINDOWS
37 : #include "windows.hpp"
38 : #endif
39 :
40 : #include "v2_protocol.hpp"
41 : #include "v2_decoder.hpp"
42 : #include "likely.hpp"
43 : #include "wire.hpp"
44 : #include "err.hpp"
45 :
46 :
47 :
48 4323 : zmq::v2_decoder_t::v2_decoder_t (size_t bufsize_, int64_t maxmsgsize_) :
49 : shared_message_memory_allocator( bufsize_),
50 : decoder_base_t <v2_decoder_t, shared_message_memory_allocator> (this),
51 : msg_flags (0),
52 4323 : maxmsgsize (maxmsgsize_)
53 : {
54 4323 : int rc = in_progress.init ();
55 4323 : errno_assert (rc == 0);
56 :
57 : // At the beginning, read one byte and go to flags_ready state.
58 4323 : next_step (tmpbuf, 1, &v2_decoder_t::flags_ready);
59 4323 : }
60 :
61 17292 : zmq::v2_decoder_t::~v2_decoder_t ()
62 : {
63 4323 : int rc = in_progress.close ();
64 4323 : errno_assert (rc == 0);
65 8646 : }
66 :
67 639189 : int zmq::v2_decoder_t::flags_ready (unsigned char const*)
68 : {
69 639189 : msg_flags = 0;
70 639189 : if (tmpbuf [0] & v2_protocol_t::more_flag)
71 546 : msg_flags |= msg_t::more;
72 639189 : if (tmpbuf [0] & v2_protocol_t::command_flag)
73 4119 : msg_flags |= msg_t::command;
74 :
75 : // The payload length is either one or eight bytes,
76 : // depending on whether the 'large' bit is set.
77 639189 : if (tmpbuf [0] & v2_protocol_t::large_flag)
78 39 : next_step (tmpbuf, 8, &v2_decoder_t::eight_byte_size_ready);
79 : else
80 639150 : next_step (tmpbuf, 1, &v2_decoder_t::one_byte_size_ready);
81 :
82 639189 : return 0;
83 : }
84 :
85 639150 : int zmq::v2_decoder_t::one_byte_size_ready (unsigned char const* read_from)
86 : {
87 639150 : return size_ready(tmpbuf[0], read_from);
88 : }
89 :
90 39 : int zmq::v2_decoder_t::eight_byte_size_ready (unsigned char const* read_from) {
91 : // The payload size is encoded as 64-bit unsigned integer.
92 : // The most significant byte comes first.
93 39 : const uint64_t msg_size = get_uint64(tmpbuf);
94 :
95 39 : return size_ready(msg_size, read_from);
96 : }
97 :
98 639189 : int zmq::v2_decoder_t::size_ready(uint64_t msg_size, unsigned char const* read_pos) {
99 : // Message size must not exceed the maximum allowed size.
100 639189 : if (maxmsgsize >= 0)
101 0 : if (unlikely (msg_size > static_cast <uint64_t> (maxmsgsize))) {
102 0 : errno = EMSGSIZE;
103 0 : return -1;
104 : }
105 :
106 : // Message size must fit into size_t data type.
107 : if (unlikely (msg_size != static_cast <size_t> (msg_size))) {
108 : errno = EMSGSIZE;
109 : return -1;
110 : }
111 :
112 639189 : int rc = in_progress.close();
113 : assert(rc == 0);
114 :
115 : // the current message can exceed the current buffer. We have to copy the buffer
116 : // data into a new message and complete it in the next receive.
117 :
118 1917413 : if (unlikely ((unsigned char*)read_pos + msg_size > (data() + size())))
119 : {
120 : // a new message has started, but the size would exceed the pre-allocated arena
121 : // this happens every time when a message does not fit completely into the buffer
122 77 : rc = in_progress.init_size (static_cast <size_t> (msg_size));
123 : }
124 : else
125 : {
126 : // construct message using n bytes from the buffer as storage
127 : // increase buffer ref count
128 : // if the message will be a large message, pass a valid refcnt memory location as well
129 : rc = in_progress.init ((unsigned char *) read_pos, static_cast <size_t> (msg_size),
130 1278224 : shared_message_memory_allocator::call_dec_ref, buffer(),
131 1917336 : provide_content ());
132 :
133 : // For small messages, data has been copied and refcount does not have to be increased
134 639112 : if (in_progress.is_zcmsg())
135 : {
136 1200 : advance_content();
137 1200 : inc_ref();
138 : }
139 : }
140 :
141 639189 : if (unlikely (rc)) {
142 0 : errno_assert (errno == ENOMEM);
143 0 : rc = in_progress.init ();
144 0 : errno_assert (rc == 0);
145 0 : errno = ENOMEM;
146 0 : return -1;
147 : }
148 :
149 639189 : in_progress.set_flags (msg_flags);
150 : // this sets read_pos to
151 : // the message data address if the data needs to be copied
152 : // for small message / messages exceeding the current buffer
153 : // or
154 : // to the current start address in the buffer because the message
155 : // was constructed to use n bytes from the address passed as argument
156 : next_step (in_progress.data (), in_progress.size (),
157 639189 : &v2_decoder_t::message_ready);
158 :
159 639189 : return 0;
160 : }
161 :
162 639189 : int zmq::v2_decoder_t::message_ready (unsigned char const*)
163 : {
164 : // Message is completely read. Signal this to the caller
165 : // and prepare to decode next message.
166 639189 : next_step (tmpbuf, 1, &v2_decoder_t::flags_ready);
167 639189 : return 1;
168 : }
|