libzmq  master
ZeroMQ C++ Core Engine (LIBZMQ)
encoder.hpp
Go to the documentation of this file.
1 /*
2  Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 
4  This file is part of libzmq, the ZeroMQ core engine in C++.
5 
6  libzmq is free software; you can redistribute it and/or modify it under
7  the terms of the GNU Lesser General Public License (LGPL) as published
8  by the Free Software Foundation; either version 3 of the License, or
9  (at your option) any later version.
10 
11  As a special exception, the Contributors give you permission to link
12  this library with independent modules to produce an executable,
13  regardless of the license terms of these independent modules, and to
14  copy and distribute the resulting executable under terms of your choice,
15  provided that you also meet, for each linked independent module, the
16  terms and conditions of the license of that module. An independent
17  module is a module which is not derived from or based on this library.
18  If you modify this library, you must extend this exception to your
19  version of the library.
20 
21  libzmq is distributed in the hope that it will be useful, but WITHOUT
22  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23  FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24  License for more details.
25 
26  You should have received a copy of the GNU Lesser General Public License
27  along with this program. If not, see <http://www.gnu.org/licenses/>.
28 */
29 
30 #ifndef __ZMQ_ENCODER_HPP_INCLUDED__
31 #define __ZMQ_ENCODER_HPP_INCLUDED__
32 
33 #if defined(_MSC_VER)
34 #ifndef NOMINMAX
35 #define NOMINMAX
36 #endif
37 #endif
38 
39 #include <stddef.h>
40 #include <string.h>
41 #include <stdlib.h>
42 #include <algorithm>
43 
44 #include "err.hpp"
45 #include "msg.hpp"
46 #include "i_encoder.hpp"
47 
48 namespace zmq
49 {
50 
51  // Helper base class for encoders. It implements the state machine that
52  // fills the outgoing buffer. Derived classes should implement individual
53  // state machine actions.
54 
55  template <typename T> class encoder_base_t : public i_encoder
56  {
57  public:
58 
59  inline encoder_base_t (size_t bufsize_) :
60  write_pos(0),
61  to_write(0),
62  next(NULL),
63  new_msg_flag(false),
64  bufsize (bufsize_),
65  in_progress (NULL)
66  {
67  buf = (unsigned char*) malloc (bufsize_);
68  alloc_assert (buf);
69  }
70 
71  // The destructor doesn't have to be virtual. It is made virtual
72  // just to keep ICC and code checking tools from complaining.
73  inline virtual ~encoder_base_t ()
74  {
75  free (buf);
76  }
77 
78  // The function returns a batch of binary data. The data
79  // are filled to a supplied buffer. If no buffer is supplied (data_
80  // points to NULL) decoder object will provide buffer of its own.
81  inline size_t encode (unsigned char **data_, size_t size_)
82  {
83  unsigned char *buffer = !*data_ ? buf : *data_;
84  size_t buffersize = !*data_ ? bufsize : size_;
85 
86  if (in_progress == NULL)
87  return 0;
88 
89  size_t pos = 0;
90  while (pos < buffersize) {
91 
92  // If there are no more data to return, run the state machine.
93  // If there are still no data, return what we already have
94  // in the buffer.
95  if (!to_write) {
96  if (new_msg_flag) {
97  int rc = in_progress->close ();
98  errno_assert (rc == 0);
99  rc = in_progress->init ();
100  errno_assert (rc == 0);
101  in_progress = NULL;
102  break;
103  }
104  (static_cast <T*> (this)->*next) ();
105  }
106 
107  // If there are no data in the buffer yet and we are able to
108  // fill whole buffer in a single go, let's use zero-copy.
109  // There's no disadvantage to it as we cannot stuck multiple
110  // messages into the buffer anyway. Note that subsequent
111  // write(s) are non-blocking, thus each single write writes
112  // at most SO_SNDBUF bytes at once not depending on how large
113  // is the chunk returned from here.
114  // As a consequence, large messages being sent won't block
115  // other engines running in the same I/O thread for excessive
116  // amounts of time.
117  if (!pos && !*data_ && to_write >= buffersize) {
118  *data_ = write_pos;
119  pos = to_write;
120  write_pos = NULL;
121  to_write = 0;
122  return pos;
123  }
124 
125  // Copy data to the buffer. If the buffer is full, return.
126  size_t to_copy = std::min (to_write, buffersize - pos);
127  memcpy (buffer + pos, write_pos, to_copy);
128  pos += to_copy;
129  write_pos += to_copy;
130  to_write -= to_copy;
131  }
132 
133  *data_ = buffer;
134  return pos;
135  }
136 
137  void load_msg (msg_t *msg_)
138  {
139  zmq_assert (in_progress == NULL);
140  in_progress = msg_;
141  (static_cast <T*> (this)->*next) ();
142  }
143 
144  protected:
145 
146  // Prototype of state machine action.
147  typedef void (T::*step_t) ();
148 
149  // This function should be called from derived class to write the data
150  // to the buffer and schedule next state machine action.
151  inline void next_step (void *write_pos_, size_t to_write_,
152  step_t next_, bool new_msg_flag_)
153  {
154  write_pos = (unsigned char*) write_pos_;
155  to_write = to_write_;
156  next = next_;
157  new_msg_flag = new_msg_flag_;
158  }
159 
160  private:
161 
162  // Where to get the data to write from.
163  unsigned char *write_pos;
164 
165  // How much data to write before next step should be executed.
166  size_t to_write;
167 
168  // Next step. If set to NULL, it means that associated data stream
169  // is dead.
171 
173 
174  // The buffer for encoded data.
175  size_t bufsize;
176  unsigned char *buf;
177 
179  void operator = (const encoder_base_t&);
180 
181  protected:
182 
184 
185  };
186 }
187 
188 #endif
189 
virtual ~encoder_base_t()
Definition: encoder.hpp:73
int close()
Definition: msg.cpp:217
encoder_base_t(size_t bufsize_)
Definition: encoder.hpp:59
#define zmq_assert(x)
Definition: err.hpp:119
void next_step(void *write_pos_, size_t to_write_, step_t next_, bool new_msg_flag_)
Definition: encoder.hpp:151
size_t encode(unsigned char **data_, size_t size_)
Definition: encoder.hpp:81
unsigned char * buf
Definition: encoder.hpp:176
void(T::* step_t)()
Definition: encoder.hpp:147
int init()
Definition: msg.cpp:82
void operator=(const encoder_base_t &)
void load_msg(msg_t *msg_)
Definition: encoder.hpp:137
#define alloc_assert(x)
Definition: err.hpp:159
#define errno_assert(x)
Definition: err.hpp:129
unsigned char * write_pos
Definition: encoder.hpp:163
Definition: address.hpp:35