libzmq  master
ZeroMQ C++ Core Engine (LIBZMQ)
pgm_sender.cpp
Go to the documentation of this file.
1 /*
2  Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 
4  This file is part of libzmq, the ZeroMQ core engine in C++.
5 
6  libzmq is free software; you can redistribute it and/or modify it under
7  the terms of the GNU Lesser General Public License (LGPL) as published
8  by the Free Software Foundation; either version 3 of the License, or
9  (at your option) any later version.
10 
11  As a special exception, the Contributors give you permission to link
12  this library with independent modules to produce an executable,
13  regardless of the license terms of these independent modules, and to
14  copy and distribute the resulting executable under terms of your choice,
15  provided that you also meet, for each linked independent module, the
16  terms and conditions of the license of that module. An independent
17  module is a module which is not derived from or based on this library.
18  If you modify this library, you must extend this exception to your
19  version of the library.
20 
21  libzmq is distributed in the hope that it will be useful, but WITHOUT
22  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23  FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24  License for more details.
25 
26  You should have received a copy of the GNU Lesser General Public License
27  along with this program. If not, see <http://www.gnu.org/licenses/>.
28 */
29 
30 #include "precompiled.hpp"
31 #include "platform.hpp"
32 
33 #if defined ZMQ_HAVE_OPENPGM
34 
35 #ifdef ZMQ_HAVE_WINDOWS
36 #include "windows.hpp"
37 #endif
38 
39 #include <stdlib.h>
40 
41 #include "io_thread.hpp"
42 #include "pgm_sender.hpp"
43 #include "session_base.hpp"
44 #include "err.hpp"
45 #include "wire.hpp"
46 #include "stdint.hpp"
47 
48 zmq::pgm_sender_t::pgm_sender_t (io_thread_t *parent_,
49  const options_t &options_) :
50  io_object_t (parent_),
51  has_tx_timer (false),
52  has_rx_timer (false),
53  session (NULL),
54  encoder (0),
55  more_flag (false),
56  pgm_socket (false, options_),
57  options (options_),
58  out_buffer (NULL),
59  out_buffer_size (0),
60  write_size (0)
61 {
62  int rc = msg.init ();
63  errno_assert (rc == 0);
64 }
65 
66 int zmq::pgm_sender_t::init (bool udp_encapsulation_, const char *network_)
67 {
68  int rc = pgm_socket.init (udp_encapsulation_, network_);
69  if (rc != 0)
70  return rc;
71 
72  out_buffer_size = pgm_socket.get_max_tsdu_size ();
73  out_buffer = (unsigned char*) malloc (out_buffer_size);
74  alloc_assert (out_buffer);
75 
76  return rc;
77 }
78 
79 void zmq::pgm_sender_t::plug (io_thread_t *io_thread_, session_base_t *session_)
80 {
81  // Allocate 2 fds for PGM socket.
82  fd_t downlink_socket_fd = retired_fd;
83  fd_t uplink_socket_fd = retired_fd;
84  fd_t rdata_notify_fd = retired_fd;
85  fd_t pending_notify_fd = retired_fd;
86 
87  session = session_;
88 
89  // Fill fds from PGM transport and add them to the poller.
90  pgm_socket.get_sender_fds (&downlink_socket_fd, &uplink_socket_fd,
91  &rdata_notify_fd, &pending_notify_fd);
92 
93  handle = add_fd (downlink_socket_fd);
94  uplink_handle = add_fd (uplink_socket_fd);
95  rdata_notify_handle = add_fd (rdata_notify_fd);
96  pending_notify_handle = add_fd (pending_notify_fd);
97 
98  // Set POLLIN. We wont never want to stop polling for uplink = we never
99  // want to stop processing NAKs.
100  set_pollin (uplink_handle);
101  set_pollin (rdata_notify_handle);
102  set_pollin (pending_notify_handle);
103 
104  // Set POLLOUT for downlink_socket_handle.
105  set_pollout (handle);
106 }
107 
108 void zmq::pgm_sender_t::unplug ()
109 {
110  if (has_rx_timer) {
111  cancel_timer (rx_timer_id);
112  has_rx_timer = false;
113  }
114 
115  if (has_tx_timer) {
116  cancel_timer (tx_timer_id);
117  has_tx_timer = false;
118  }
119 
120  rm_fd (handle);
121  rm_fd (uplink_handle);
122  rm_fd (rdata_notify_handle);
123  rm_fd (pending_notify_handle);
124  session = NULL;
125 }
126 
127 void zmq::pgm_sender_t::terminate ()
128 {
129  unplug ();
130  delete this;
131 }
132 
133 void zmq::pgm_sender_t::restart_output ()
134 {
135  set_pollout (handle);
136  out_event ();
137 }
138 
139 void zmq::pgm_sender_t::restart_input ()
140 {
141  zmq_assert (false);
142 }
143 
144 zmq::pgm_sender_t::~pgm_sender_t ()
145 {
146  int rc = msg.close ();
147  errno_assert (rc == 0);
148 
149  if (out_buffer) {
150  free (out_buffer);
151  out_buffer = NULL;
152  }
153 }
154 
155 void zmq::pgm_sender_t::in_event ()
156 {
157  if (has_rx_timer) {
158  cancel_timer (rx_timer_id);
159  has_rx_timer = false;
160  }
161 
162  // In-event on sender side means NAK or SPMR receiving from some peer.
163  pgm_socket.process_upstream ();
164  if (errno == ENOMEM || errno == EBUSY) {
165  const long timeout = pgm_socket.get_rx_timeout ();
166  add_timer (timeout, rx_timer_id);
167  has_rx_timer = true;
168  }
169 }
170 
171 void zmq::pgm_sender_t::out_event ()
172 {
173  // POLLOUT event from send socket. If write buffer is empty,
174  // try to read new data from the encoder.
175  if (write_size == 0) {
176 
177  // First two bytes (sizeof uint16_t) are used to store message
178  // offset in following steps. Note that by passing our buffer to
179  // the get data function we prevent it from returning its own buffer.
180  unsigned char *bf = out_buffer + sizeof (uint16_t);
181  size_t bfsz = out_buffer_size - sizeof (uint16_t);
182  uint16_t offset = 0xffff;
183 
184  size_t bytes = encoder.encode (&bf, bfsz);
185  while (bytes < bfsz) {
186  if (!more_flag && offset == 0xffff)
187  offset = static_cast <uint16_t> (bytes);
188  int rc = session->pull_msg (&msg);
189  if (rc == -1)
190  break;
191  more_flag = msg.flags () & msg_t::more;
192  encoder.load_msg (&msg);
193  bf = out_buffer + sizeof (uint16_t) + bytes;
194  bytes += encoder.encode (&bf, bfsz - bytes);
195  }
196 
197  // If there are no data to write stop polling for output.
198  if (bytes == 0) {
199  reset_pollout (handle);
200  return;
201  }
202 
203  write_size = sizeof (uint16_t) + bytes;
204 
205  // Put offset information in the buffer.
206  put_uint16 (out_buffer, offset);
207  }
208 
209  if (has_tx_timer) {
210  cancel_timer (tx_timer_id);
211  set_pollout (handle);
212  has_tx_timer = false;
213  }
214 
215  // Send the data.
216  size_t nbytes = pgm_socket.send (out_buffer, write_size);
217 
218  // We can write either all data or 0 which means rate limit reached.
219  if (nbytes == write_size)
220  write_size = 0;
221  else {
222  zmq_assert (nbytes == 0);
223 
224  if (errno == ENOMEM) {
225  // Stop polling handle and wait for tx timeout
226  const long timeout = pgm_socket.get_tx_timeout ();
227  add_timer (timeout, tx_timer_id);
228  reset_pollout (handle);
229  has_tx_timer = true;
230  }
231  else
232  errno_assert (errno == EBUSY);
233  }
234 }
235 
236 void zmq::pgm_sender_t::timer_event (int token)
237 {
238  // Timer cancels on return by poller_base.
239  if (token == rx_timer_id) {
240  has_rx_timer = false;
241  in_event ();
242  }
243  else
244  if (token == tx_timer_id) {
245  // Restart polling handle and retry sending
246  has_tx_timer = false;
247  set_pollout (handle);
248  out_event ();
249  }
250  else
251  zmq_assert (false);
252 }
253 
254 #endif
255 
Definition: command.hpp:81
int fd_t
Definition: fd.hpp:50
#define zmq_assert(x)
Definition: err.hpp:119
#define alloc_assert(x)
Definition: err.hpp:159
#define errno_assert(x)
Definition: err.hpp:129
void put_uint16(unsigned char *buffer_, uint16_t value)
Definition: wire.hpp:51
static char encoder[85+1]
Definition: zmq_utils.cpp:95
poller_t::handle_t handle