Line data Source code
1 : /*
2 : Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 :
4 : This file is part of libzmq, the ZeroMQ core engine in C++.
5 :
6 : libzmq is free software; you can redistribute it and/or modify it under
7 : the terms of the GNU Lesser General Public License (LGPL) as published
8 : by the Free Software Foundation; either version 3 of the License, or
9 : (at your option) any later version.
10 :
11 : As a special exception, the Contributors give you permission to link
12 : this library with independent modules to produce an executable,
13 : regardless of the license terms of these independent modules, and to
14 : copy and distribute the resulting executable under terms of your choice,
15 : provided that you also meet, for each linked independent module, the
16 : terms and conditions of the license of that module. An independent
17 : module is a module which is not derived from or based on this library.
18 : If you modify this library, you must extend this exception to your
19 : version of the library.
20 :
21 : libzmq is distributed in the hope that it will be useful, but WITHOUT
22 : ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 : FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 : License for more details.
25 :
26 : You should have received a copy of the GNU Lesser General Public License
27 : along with this program. If not, see <http://www.gnu.org/licenses/>.
28 : */
29 :
30 : #ifndef __ZMQ_DECODER_HPP_INCLUDED__
31 : #define __ZMQ_DECODER_HPP_INCLUDED__
32 :
33 : #include <algorithm>
34 : #include <cstddef>
35 : #include <cstring>
36 :
37 : #include "decoder_allocators.hpp"
38 : #include "err.hpp"
39 : #include "i_decoder.hpp"
40 : #include "msg.hpp"
41 : #include "stdint.hpp"
42 :
43 : namespace zmq
44 : {
45 : // Helper base class for decoders that know the amount of data to read
46 : // in advance at any moment. Knowing the amount in advance is a property
47 : // of the protocol used. 0MQ framing protocol is based size-prefixed
48 : // paradigm, which qualifies it to be parsed by this class.
49 : // On the other hand, XML-based transports (like XMPP or SOAP) don't allow
50 : // for knowing the size of data to read in advance and should use different
51 : // decoding algorithms.
52 : //
53 : // This class implements the state machine that parses the incoming buffer.
54 : // Derived class should implement individual state machine actions.
55 : //
56 : // Buffer management is done by an allocator policy.
57 : template <typename T, typename A = c_single_allocator>
58 : class decoder_base_t : public i_decoder
59 : {
60 : public:
61 :
62 4323 : explicit decoder_base_t (A *allocator_) :
63 : next (NULL),
64 : read_pos (NULL),
65 : to_read (0),
66 8646 : allocator(allocator_)
67 : {
68 4323 : buf = allocator->allocate ();
69 4323 : }
70 :
71 : // The destructor doesn't have to be virtual. It is made virtual
72 : // just to keep ICC and code checking tools from complaining.
73 0 : virtual ~decoder_base_t ()
74 : {
75 4323 : allocator->deallocate ();
76 8646 : }
77 :
78 : // Returns a buffer to be filled with binary data.
79 9002 : void get_buffer (unsigned char **data_, std::size_t *size_)
80 : {
81 9002 : buf = allocator->allocate ();
82 :
83 : // If we are expected to read large message, we'll opt for zero-
84 : // copy, i.e. we'll ask caller to fill the data directly to the
85 : // message. Note that subsequent read(s) are non-blocking, thus
86 : // each single read reads at most SO_RCVBUF bytes at once not
87 : // depending on how large is the chunk returned from here.
88 : // As a consequence, large messages being received won't block
89 : // other engines running in the same I/O thread for excessive
90 : // amounts of time.
91 9002 : if (to_read >= allocator->size ()) {
92 6 : *data_ = read_pos;
93 6 : *size_ = to_read;
94 9008 : return;
95 : }
96 :
97 8996 : *data_ = buf;
98 8996 : *size_ = allocator->size ();
99 : }
100 :
101 : // Processes the data in the buffer previously allocated using
102 : // get_buffer function. size_ argument specifies number of bytes
103 : // actually filled into the buffer. Function returns 1 when the
104 : // whole message was decoded or 0 when more data is required.
105 : // On error, -1 is returned and errno set accordingly.
106 : // Number of bytes processed is returned in bytes_used_.
107 639335 : int decode (const unsigned char *data_, std::size_t size_,
108 : std::size_t &bytes_used_)
109 : {
110 639335 : bytes_used_ = 0;
111 :
112 : // In case of zero-copy simply adjust the pointers, no copying
113 : // is required. Also, run the state machine in case all the data
114 : // were processed.
115 639335 : if (data_ == read_pos) {
116 6 : zmq_assert (size_ <= to_read);
117 6 : read_pos += size_;
118 6 : to_read -= size_;
119 6 : bytes_used_ = size_;
120 :
121 12 : while (!to_read) {
122 : const int rc =
123 6 : (static_cast <T *> (this)->*next) (data_ + bytes_used_);
124 6 : if (rc != 0)
125 : return rc;
126 : }
127 : return 0;
128 : }
129 :
130 1884175 : while (bytes_used_ < size_) {
131 : // Copy the data from buffer to the message.
132 3768058 : const size_t to_copy = std::min (to_read, size_ - bytes_used_);
133 : // Only copy when destination address is different from the
134 : // current address in the buffer.
135 1884029 : if (read_pos != data_ + bytes_used_) {
136 1882832 : memcpy (read_pos, data_ + bytes_used_, to_copy);
137 : }
138 :
139 1884029 : read_pos += to_copy;
140 1884029 : to_read -= to_copy;
141 1884029 : bytes_used_ += to_copy;
142 : // Try to get more space in the message to fill in.
143 : // If none is available, return.
144 5046435 : while (to_read == 0) {
145 : // pass current address in the buffer
146 : const int rc =
147 1917560 : (static_cast <T *> (this)->*next) (data_ + bytes_used_);
148 1917559 : if (rc != 0)
149 : return rc;
150 : }
151 : }
152 :
153 : return 0;
154 : }
155 :
156 5427 : virtual void resize_buffer (std::size_t new_size)
157 : {
158 5427 : allocator->resize (new_size);
159 5427 : }
160 :
161 : protected:
162 :
163 : // Prototype of state machine action. Action should return false if
164 : // it is unable to push the data to the system.
165 : typedef int (T:: *step_t) (unsigned char const *);
166 :
167 : // This function should be called from derived class to read data
168 : // from the buffer and schedule next state machine action.
169 : void next_step (void *read_pos_, std::size_t to_read_, step_t next_)
170 : {
171 1921890 : read_pos = static_cast <unsigned char*> (read_pos_);
172 1921890 : to_read = to_read_;
173 1921890 : next = next_;
174 : }
175 :
176 : private:
177 :
178 : // Next step. If set to NULL, it means that associated data stream
179 : // is dead. Note that there can be still data in the process in such
180 : // case.
181 : step_t next;
182 :
183 : // Where to store the read data.
184 : unsigned char *read_pos;
185 :
186 : // How much data to read before taking next step.
187 : std::size_t to_read;
188 :
189 : // The duffer for data to decode.
190 : A *allocator;
191 : unsigned char *buf;
192 :
193 : decoder_base_t (const decoder_base_t &);
194 : const decoder_base_t &operator = (const decoder_base_t &);
195 : };
196 : }
197 :
198 : #endif
|