libzmq  master
ZeroMQ C++ Core Engine (LIBZMQ)
decoder.hpp
Go to the documentation of this file.
1 /*
2  Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 
4  This file is part of libzmq, the ZeroMQ core engine in C++.
5 
6  libzmq is free software; you can redistribute it and/or modify it under
7  the terms of the GNU Lesser General Public License (LGPL) as published
8  by the Free Software Foundation; either version 3 of the License, or
9  (at your option) any later version.
10 
11  As a special exception, the Contributors give you permission to link
12  this library with independent modules to produce an executable,
13  regardless of the license terms of these independent modules, and to
14  copy and distribute the resulting executable under terms of your choice,
15  provided that you also meet, for each linked independent module, the
16  terms and conditions of the license of that module. An independent
17  module is a module which is not derived from or based on this library.
18  If you modify this library, you must extend this exception to your
19  version of the library.
20 
21  libzmq is distributed in the hope that it will be useful, but WITHOUT
22  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23  FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24  License for more details.
25 
26  You should have received a copy of the GNU Lesser General Public License
27  along with this program. If not, see <http://www.gnu.org/licenses/>.
28 */
29 
30 #ifndef __ZMQ_DECODER_HPP_INCLUDED__
31 #define __ZMQ_DECODER_HPP_INCLUDED__
32 
33 #include <algorithm>
34 #include <cstddef>
35 #include <cstring>
36 
37 #include "decoder_allocators.hpp"
38 #include "err.hpp"
39 #include "i_decoder.hpp"
40 #include "msg.hpp"
41 #include "stdint.hpp"
42 
43 namespace zmq
44 {
45  // Helper base class for decoders that know the amount of data to read
46  // in advance at any moment. Knowing the amount in advance is a property
47  // of the protocol used. 0MQ framing protocol is based size-prefixed
48  // paradigm, which qualifies it to be parsed by this class.
49  // On the other hand, XML-based transports (like XMPP or SOAP) don't allow
50  // for knowing the size of data to read in advance and should use different
51  // decoding algorithms.
52  //
53  // This class implements the state machine that parses the incoming buffer.
54  // Derived class should implement individual state machine actions.
55  //
56  // Buffer management is done by an allocator policy.
57  template <typename T, typename A = c_single_allocator>
58  class decoder_base_t : public i_decoder
59  {
60  public:
61 
62  explicit decoder_base_t (A *allocator_) :
63  next (NULL),
64  read_pos (NULL),
65  to_read (0),
66  allocator(allocator_)
67  {
68  buf = allocator->allocate ();
69  }
70 
71  // The destructor doesn't have to be virtual. It is made virtual
72  // just to keep ICC and code checking tools from complaining.
73  virtual ~decoder_base_t ()
74  {
75  allocator->deallocate ();
76  }
77 
78  // Returns a buffer to be filled with binary data.
79  void get_buffer (unsigned char **data_, std::size_t *size_)
80  {
81  buf = allocator->allocate ();
82 
83  // If we are expected to read large message, we'll opt for zero-
84  // copy, i.e. we'll ask caller to fill the data directly to the
85  // message. Note that subsequent read(s) are non-blocking, thus
86  // each single read reads at most SO_RCVBUF bytes at once not
87  // depending on how large is the chunk returned from here.
88  // As a consequence, large messages being received won't block
89  // other engines running in the same I/O thread for excessive
90  // amounts of time.
91  if (to_read >= allocator->size ()) {
92  *data_ = read_pos;
93  *size_ = to_read;
94  return;
95  }
96 
97  *data_ = buf;
98  *size_ = allocator->size ();
99  }
100 
101  // Processes the data in the buffer previously allocated using
102  // get_buffer function. size_ argument specifies number of bytes
103  // actually filled into the buffer. Function returns 1 when the
104  // whole message was decoded or 0 when more data is required.
105  // On error, -1 is returned and errno set accordingly.
106  // Number of bytes processed is returned in bytes_used_.
107  int decode (const unsigned char *data_, std::size_t size_,
108  std::size_t &bytes_used_)
109  {
110  bytes_used_ = 0;
111 
112  // In case of zero-copy simply adjust the pointers, no copying
113  // is required. Also, run the state machine in case all the data
114  // were processed.
115  if (data_ == read_pos) {
116  zmq_assert (size_ <= to_read);
117  read_pos += size_;
118  to_read -= size_;
119  bytes_used_ = size_;
120 
121  while (!to_read) {
122  const int rc =
123  (static_cast <T *> (this)->*next) (data_ + bytes_used_);
124  if (rc != 0)
125  return rc;
126  }
127  return 0;
128  }
129 
130  while (bytes_used_ < size_) {
131  // Copy the data from buffer to the message.
132  const size_t to_copy = std::min (to_read, size_ - bytes_used_);
133  // Only copy when destination address is different from the
134  // current address in the buffer.
135  if (read_pos != data_ + bytes_used_) {
136  memcpy (read_pos, data_ + bytes_used_, to_copy);
137  }
138 
139  read_pos += to_copy;
140  to_read -= to_copy;
141  bytes_used_ += to_copy;
142  // Try to get more space in the message to fill in.
143  // If none is available, return.
144  while (to_read == 0) {
145  // pass current address in the buffer
146  const int rc =
147  (static_cast <T *> (this)->*next) (data_ + bytes_used_);
148  if (rc != 0)
149  return rc;
150  }
151  }
152 
153  return 0;
154  }
155 
156  virtual void resize_buffer (std::size_t new_size)
157  {
158  allocator->resize (new_size);
159  }
160 
161  protected:
162 
163  // Prototype of state machine action. Action should return false if
164  // it is unable to push the data to the system.
165  typedef int (T:: *step_t) (unsigned char const *);
166 
167  // This function should be called from derived class to read data
168  // from the buffer and schedule next state machine action.
169  void next_step (void *read_pos_, std::size_t to_read_, step_t next_)
170  {
171  read_pos = static_cast <unsigned char*> (read_pos_);
172  to_read = to_read_;
173  next = next_;
174  }
175 
176  private:
177 
178  // Next step. If set to NULL, it means that associated data stream
179  // is dead. Note that there can be still data in the process in such
180  // case.
182 
183  // Where to store the read data.
184  unsigned char *read_pos;
185 
186  // How much data to read before taking next step.
187  std::size_t to_read;
188 
189  // The duffer for data to decode.
191  unsigned char *buf;
192 
193  decoder_base_t (const decoder_base_t &);
194  const decoder_base_t &operator = (const decoder_base_t &);
195  };
196 }
197 
198 #endif
void get_buffer(unsigned char **data_, std::size_t *size_)
Definition: decoder.hpp:79
decoder_base_t(A *allocator_)
Definition: decoder.hpp:62
#define zmq_assert(x)
Definition: err.hpp:119
virtual void resize_buffer(std::size_t new_size)
Definition: decoder.hpp:156
int decode(const unsigned char *data_, std::size_t size_, std::size_t &bytes_used_)
Definition: decoder.hpp:107
virtual ~decoder_base_t()
Definition: decoder.hpp:73
unsigned char * read_pos
Definition: decoder.hpp:184
Definition: address.hpp:35
std::size_t to_read
Definition: decoder.hpp:187
unsigned char * buf
Definition: decoder.hpp:191
int(T::* step_t)(unsigned char const *)
Definition: decoder.hpp:165
const decoder_base_t & operator=(const decoder_base_t &)
void next_step(void *read_pos_, std::size_t to_read_, step_t next_)
Definition: decoder.hpp:169