libzmq  master
ZeroMQ C++ Core Engine (LIBZMQ)
pgm_receiver.cpp
Go to the documentation of this file.
1 /*
2  Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 
4  This file is part of libzmq, the ZeroMQ core engine in C++.
5 
6  libzmq is free software; you can redistribute it and/or modify it under
7  the terms of the GNU Lesser General Public License (LGPL) as published
8  by the Free Software Foundation; either version 3 of the License, or
9  (at your option) any later version.
10 
11  As a special exception, the Contributors give you permission to link
12  this library with independent modules to produce an executable,
13  regardless of the license terms of these independent modules, and to
14  copy and distribute the resulting executable under terms of your choice,
15  provided that you also meet, for each linked independent module, the
16  terms and conditions of the license of that module. An independent
17  module is a module which is not derived from or based on this library.
18  If you modify this library, you must extend this exception to your
19  version of the library.
20 
21  libzmq is distributed in the hope that it will be useful, but WITHOUT
22  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23  FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24  License for more details.
25 
26  You should have received a copy of the GNU Lesser General Public License
27  along with this program. If not, see <http://www.gnu.org/licenses/>.
28 */
29 
30 #include "precompiled.hpp"
31 #include "macros.hpp"
32 #include "platform.hpp"
33 
34 #if defined ZMQ_HAVE_OPENPGM
35 
36 #include <new>
37 
38 #ifdef ZMQ_HAVE_WINDOWS
39 #include "windows.hpp"
40 #endif
41 
42 #include "pgm_receiver.hpp"
43 #include "session_base.hpp"
44 #include "v1_decoder.hpp"
45 #include "stdint.hpp"
46 #include "wire.hpp"
47 #include "err.hpp"
48 
49 zmq::pgm_receiver_t::pgm_receiver_t (class io_thread_t *parent_,
50  const options_t &options_) :
51  io_object_t (parent_),
52  has_rx_timer (false),
53  pgm_socket (true, options_),
54  options (options_),
55  session (NULL),
56  active_tsi (NULL),
57  insize (0)
58 {
59 }
60 
61 zmq::pgm_receiver_t::~pgm_receiver_t ()
62 {
63  // Destructor should not be called before unplug.
64  zmq_assert (peers.empty ());
65 }
66 
67 int zmq::pgm_receiver_t::init (bool udp_encapsulation_, const char *network_)
68 {
69  return pgm_socket.init (udp_encapsulation_, network_);
70 }
71 
72 void zmq::pgm_receiver_t::plug (io_thread_t *io_thread_,
73  session_base_t *session_)
74 {
75  // Retrieve PGM fds and start polling.
76  fd_t socket_fd = retired_fd;
77  fd_t waiting_pipe_fd = retired_fd;
78  pgm_socket.get_receiver_fds (&socket_fd, &waiting_pipe_fd);
79  socket_handle = add_fd (socket_fd);
80  pipe_handle = add_fd (waiting_pipe_fd);
81  set_pollin (pipe_handle);
82  set_pollin (socket_handle);
83 
84  session = session_;
85 
86  // If there are any subscriptions already queued in the session, drop them.
87  drop_subscriptions ();
88 }
89 
90 void zmq::pgm_receiver_t::unplug ()
91 {
92  // Delete decoders.
93  for (peers_t::iterator it = peers.begin (); it != peers.end (); ++it) {
94  if (it->second.decoder != NULL) {
95  LIBZMQ_DELETE(it->second.decoder);
96  }
97  }
98  peers.clear ();
99  active_tsi = NULL;
100 
101  if (has_rx_timer) {
102  cancel_timer (rx_timer_id);
103  has_rx_timer = false;
104  }
105 
106  rm_fd (socket_handle);
107  rm_fd (pipe_handle);
108 
109  session = NULL;
110 }
111 
112 void zmq::pgm_receiver_t::terminate ()
113 {
114  unplug ();
115  delete this;
116 }
117 
118 void zmq::pgm_receiver_t::restart_output ()
119 {
120  drop_subscriptions ();
121 }
122 
123 void zmq::pgm_receiver_t::restart_input ()
124 {
125  zmq_assert (session != NULL);
126  zmq_assert (active_tsi != NULL);
127 
128  const peers_t::iterator it = peers.find (*active_tsi);
129  zmq_assert (it != peers.end ());
130  zmq_assert (it->second.joined);
131 
132  // Push the pending message into the session.
133  int rc = session->push_msg (it->second.decoder->msg ());
134  errno_assert (rc == 0);
135 
136  if (insize > 0) {
137  rc = process_input (it->second.decoder);
138  if (rc == -1) {
139  // HWM reached; we will try later.
140  if (errno == EAGAIN) {
141  session->flush ();
142  return;
143  }
144  // Data error. Delete message decoder, mark the
145  // peer as not joined and drop remaining data.
146  it->second.joined = false;
147  LIBZMQ_DELETE(it->second.decoder);
148  insize = 0;
149  }
150  }
151 
152  // Resume polling.
153  set_pollin (pipe_handle);
154  set_pollin (socket_handle);
155 
156  active_tsi = NULL;
157  in_event ();
158 }
159 
160 void zmq::pgm_receiver_t::in_event ()
161 {
162  // Read data from the underlying pgm_socket.
163  const pgm_tsi_t *tsi = NULL;
164 
165  if (has_rx_timer) {
166  cancel_timer (rx_timer_id);
167  has_rx_timer = false;
168  }
169 
170  // TODO: This loop can effectively block other engines in the same I/O
171  // thread in the case of high load.
172  while (true) {
173 
174  // Get new batch of data.
175  // Note the workaround made not to break strict-aliasing rules.
176  void *tmp = NULL;
177  ssize_t received = pgm_socket.receive (&tmp, &tsi);
178  inpos = (unsigned char*) tmp;
179 
180  // No data to process. This may happen if the packet received is
181  // neither ODATA nor ODATA.
182  if (received == 0) {
183  if (errno == ENOMEM || errno == EBUSY) {
184  const long timeout = pgm_socket.get_rx_timeout ();
185  add_timer (timeout, rx_timer_id);
186  has_rx_timer = true;
187  }
188  break;
189  }
190 
191  // Find the peer based on its TSI.
192  peers_t::iterator it = peers.find (*tsi);
193 
194  // Data loss. Delete decoder and mark the peer as disjoint.
195  if (received == -1) {
196  if (it != peers.end ()) {
197  it->second.joined = false;
198  if (it->second.decoder != NULL) {
199  LIBZMQ_DELETE(it->second.decoder);
200  }
201  }
202  break;
203  }
204 
205  // New peer. Add it to the list of know but unjoint peers.
206  if (it == peers.end ()) {
207  peer_info_t peer_info = {false, NULL};
208  it = peers.insert (peers_t::value_type (*tsi, peer_info)).first;
209  }
210 
211  insize = static_cast <size_t> (received);
212 
213  // Read the offset of the fist message in the current packet.
214  zmq_assert (insize >= sizeof (uint16_t));
215  uint16_t offset = get_uint16 (inpos);
216  inpos += sizeof (uint16_t);
217  insize -= sizeof (uint16_t);
218 
219  // Join the stream if needed.
220  if (!it->second.joined) {
221 
222  // There is no beginning of the message in current packet.
223  // Ignore the data.
224  if (offset == 0xffff)
225  continue;
226 
227  zmq_assert (offset <= insize);
228  zmq_assert (it->second.decoder == NULL);
229 
230  // We have to move data to the beginning of the first message.
231  inpos += offset;
232  insize -= offset;
233 
234  // Mark the stream as joined.
235  it->second.joined = true;
236 
237  // Create and connect decoder for the peer.
238  it->second.decoder = new (std::nothrow)
239  v1_decoder_t (0, options.maxmsgsize);
240  alloc_assert (it->second.decoder);
241  }
242 
243  int rc = process_input (it->second.decoder);
244  if (rc == -1) {
245  if (errno == EAGAIN) {
246  active_tsi = tsi;
247 
248  // Stop polling.
249  reset_pollin (pipe_handle);
250  reset_pollin (socket_handle);
251 
252  break;
253  }
254 
255  it->second.joined = false;
256  LIBZMQ_DELETE(it->second.decoder);
257  insize = 0;
258  }
259  }
260 
261  // Flush any messages decoder may have produced.
262  session->flush ();
263 }
264 
265 int zmq::pgm_receiver_t::process_input (v1_decoder_t *decoder)
266 {
267  zmq_assert (session != NULL);
268 
269  while (insize > 0) {
270  size_t n = 0;
271  int rc = decoder->decode (inpos, insize, n);
272  if (rc == -1)
273  return -1;
274  inpos += n;
275  insize -= n;
276  if (rc == 0)
277  break;
278  rc = session->push_msg (decoder->msg ());
279  if (rc == -1) {
280  errno_assert (errno == EAGAIN);
281  return -1;
282  }
283  }
284  return 0;
285 }
286 
287 
288 void zmq::pgm_receiver_t::timer_event (int token)
289 {
290  zmq_assert (token == rx_timer_id);
291 
292  // Timer cancels on return by poller_base.
293  has_rx_timer = false;
294  in_event ();
295 }
296 
297 void zmq::pgm_receiver_t::drop_subscriptions ()
298 {
299  msg_t msg;
300  msg.init ();
301  while (session->pull_msg (&msg) == 0)
302  msg.close ();
303 }
304 
305 #endif
306 
#define LIBZMQ_DELETE(p_object)
Definition: macros.hpp:7
Definition: command.hpp:81
int fd_t
Definition: fd.hpp:50
#define zmq_assert(x)
Definition: err.hpp:119
int64_t maxmsgsize
Definition: options.hpp:124
uint16_t get_uint16(const unsigned char *buffer_)
Definition: wire.hpp:57
#define alloc_assert(x)
Definition: err.hpp:159
#define errno_assert(x)
Definition: err.hpp:129
options_t options
Definition: own.hpp:109
static uint8_t decoder[96]
Definition: zmq_utils.cpp:103