libzmq  master
ZeroMQ C++ Core Engine (LIBZMQ)
udp_engine.cpp
Go to the documentation of this file.
1 /*
2 Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 
4 This file is part of libzmq, the ZeroMQ core engine in C++.
5 
6 libzmq is free software; you can redistribute it and/or modify it under
7 the terms of the GNU Lesser General Public License (LGPL) as published
8 by the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
10 
11 As a special exception, the Contributors give you permission to link
12 this library with independent modules to produce an executable,
13 regardless of the license terms of these independent modules, and to
14 copy and distribute the resulting executable under terms of your choice,
15 provided that you also meet, for each linked independent module, the
16 terms and conditions of the license of that module. An independent
17 module is a module which is not derived from or based on this library.
18 If you modify this library, you must extend this exception to your
19 version of the library.
20 
21 libzmq is distributed in the hope that it will be useful, but WITHOUT
22 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 License for more details.
25 
26 You should have received a copy of the GNU Lesser General Public License
27 along with this program. If not, see <http://www.gnu.org/licenses/>.
28 */
29 
30 #include "precompiled.hpp"
31 #include "platform.hpp"
32 
33 #if defined ZMQ_HAVE_WINDOWS
34 #include "windows.hpp"
35 #else
36 #include <sys/types.h>
37 #include <unistd.h>
38 #include <sys/socket.h>
39 #include <netinet/in.h>
40 #include <arpa/inet.h>
41 #endif
42 
43 #include "udp_engine.hpp"
44 #include "session_base.hpp"
45 #include "v2_protocol.hpp"
46 #include "err.hpp"
47 #include "ip.hpp"
48 
50  plugged (false),
51  fd(-1),
52  session(NULL),
53  handle(NULL),
54  address(NULL),
55  send_enabled(false),
56  recv_enabled(false)
57 {
58 }
59 
61 {
63 
64  if (fd != retired_fd) {
65 #ifdef ZMQ_HAVE_WINDOWS
66  int rc = closesocket (fd);
67  wsa_assert (rc != SOCKET_ERROR);
68 #else
69  int rc = close (fd);
70  errno_assert (rc == 0);
71 #endif
72  fd = retired_fd;
73  }
74 }
75 
76 int zmq::udp_engine_t::init (address_t *address_, bool send_, bool recv_)
77 {
78  zmq_assert (address_);
79  zmq_assert (send_ || recv_);
80  send_enabled = send_;
81  recv_enabled = recv_;
82  address = address_;
83 
84  fd = open_socket (address->resolved.udp_addr->family (), SOCK_DGRAM, IPPROTO_UDP);
85  if (fd == retired_fd)
86  return -1;
87 
89 
90  return 0;
91 }
92 
93 void zmq::udp_engine_t::plug (io_thread_t* io_thread_, session_base_t *session_)
94 {
96  plugged = true;
97 
99  zmq_assert (session_);
100  session = session_;
101 
102  // Connect to I/O threads poller object.
103  io_object_t::plug (io_thread_);
104  handle = add_fd (fd);
105 
106  if (send_enabled)
108 
109  if (recv_enabled) {
110  int on = 1;
111  int rc = setsockopt (fd, SOL_SOCKET, SO_REUSEADDR, (char *) &on, sizeof (on));
112 #ifdef ZMQ_HAVE_WINDOWS
113  wsa_assert (rc != SOCKET_ERROR);
114 #else
115  errno_assert (rc == 0);
116 #endif
117 
120 #ifdef ZMQ_HAVE_WINDOWS
121  wsa_assert (rc != SOCKET_ERROR);
122 #else
123  errno_assert (rc == 0);
124 #endif
125 
126  if (address->resolved.udp_addr->is_mcast ()) {
127  struct ip_mreq mreq;
128  mreq.imr_multiaddr = address->resolved.udp_addr->multicast_ip ();
129  mreq.imr_interface = address->resolved.udp_addr->interface_ip ();
130  rc = setsockopt (fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, (char*) &mreq, sizeof (mreq));
131 #ifdef ZMQ_HAVE_WINDOWS
132  wsa_assert (rc != SOCKET_ERROR);
133 #else
134  errno_assert (rc == 0);
135 #endif
136  }
137  set_pollin (handle);
138 
139  // Call restart output to drop all join/leave commands
140  restart_output ();
141  }
142 }
143 
145 {
147  plugged = false;
148 
149  rm_fd (handle);
150 
151  // Disconnect from I/O threads poller object.
153 
154  delete this;
155 }
156 
158 {
159  msg_t group_msg;
160  int rc = session->pull_msg (&group_msg);
161  errno_assert (rc == 0 || (rc == -1 && errno == EAGAIN));
162 
163  if (rc == 0) {
164  msg_t body_msg;
165  rc = session->pull_msg (&body_msg);
166 
167  size_t group_size = group_msg.size ();
168  size_t body_size = body_msg.size ();
169  size_t size = group_size + body_size + 1;
170 
171  // TODO: check if larger than maximum size
172  out_buffer[0] = (unsigned char) group_size;
173  memcpy (out_buffer + 1, group_msg.data (), group_size);
174  memcpy (out_buffer + 1 + group_size, body_msg.data (), body_size);
175 
176  rc = group_msg.close ();
177  errno_assert (rc == 0);
178 
179  body_msg.close ();
180  errno_assert (rc == 0);
181 
182 #ifdef ZMQ_HAVE_WINDOWS
183  rc = sendto (fd, (const char *) out_buffer, (int) size, 0,
186  wsa_assert (rc != SOCKET_ERROR);
187 #else
188  rc = sendto (fd, out_buffer, size, 0,
191  errno_assert (rc != -1);
192 #endif
193  }
194  else
196 }
197 
199 {
200  // If we don't support send we just drop all messages
201  if (!send_enabled) {
202  msg_t msg;
203  while (session->pull_msg (&msg) == 0)
204  msg.close ();
205  }
206  else {
208  out_event ();
209  }
210 }
211 
213 {
214 #ifdef ZMQ_HAVE_WINDOWS
215  int nbytes = recv(fd, (char*) in_buffer, MAX_UDP_MSG, 0);
216  const int last_error = WSAGetLastError();
217  if (nbytes == SOCKET_ERROR) {
218  wsa_assert(
219  last_error == WSAENETDOWN ||
220  last_error == WSAENETRESET ||
221  last_error == WSAEWOULDBLOCK);
222  return;
223  }
224 #else
225  int nbytes = recv(fd, in_buffer, MAX_UDP_MSG, 0);
226  if (nbytes == -1) {
227  errno_assert(errno != EBADF
228  && errno != EFAULT
229  && errno != ENOMEM
230  && errno != ENOTSOCK);
231  return;
232  }
233 #endif
234 
235  int group_size = in_buffer[0];
236 
237  // This doesn't fit, just ingore
238  if (nbytes - 1 < group_size)
239  return;
240 
241  int body_size = nbytes - 1 - group_size;
242 
243  msg_t msg;
244  int rc = msg.init_size (group_size);
245  errno_assert (rc == 0);
246  msg.set_flags (msg_t::more);
247  memcpy (msg.data (), in_buffer + 1, group_size);
248 
249  rc = session->push_msg (&msg);
250  errno_assert (rc == 0 || (rc == -1 && errno == EAGAIN));
251 
252  // Pipe is full
253  if (rc != 0) {
254  rc = msg.close ();
255  errno_assert (rc == 0);
256 
258  return;
259  }
260 
261  rc = msg.close ();
262  errno_assert (rc == 0);
263  rc = msg.init_size (body_size);
264  errno_assert (rc == 0);
265  memcpy (msg.data (), in_buffer + 1 + group_size, body_size);
266  rc = session->push_msg (&msg);
267  errno_assert (rc == 0);
268  rc = msg.close ();
269  errno_assert (rc == 0);
270  session->flush ();
271 }
272 
274 {
275  if (!recv_enabled)
276  return;
277 
278  set_pollin (handle);
279  in_event ();
280 }
#define size
int close()
Definition: msg.cpp:217
void set_pollout(handle_t handle_)
Definition: io_object.cpp:84
void plug(zmq::io_thread_t *io_thread_)
Definition: io_object.cpp:46
fd_t open_socket(int domain_, int type_, int protocol_)
Definition: ip.cpp:50
void unblock_socket(fd_t s_)
Definition: ip.cpp:84
#define zmq_assert(x)
Definition: err.hpp:119
virtual int push_msg(msg_t *msg_)
unsigned char in_buffer[MAX_UDP_MSG]
Definition: udp_engine.hpp:56
Definition: command.hpp:84
int init(address_t *address_, bool send_, bool recv_)
Definition: udp_engine.cpp:76
#define MAX_UDP_MSG
Definition: udp_engine.hpp:10
sa_family_t family() const
socklen_t bind_addrlen() const
unsigned char size
Definition: msg.hpp:188
int init_size(size_t size_)
Definition: msg.cpp:93
address_t * address
Definition: udp_engine.hpp:53
udp_address_t * udp_addr
Definition: address.hpp:61
void set_pollin(handle_t handle_)
Definition: io_object.cpp:74
#define ENOTSOCK
Definition: zmq.h:136
const sockaddr * bind_addr() const
const sockaddr * dest_addr() const
void plug(zmq::io_thread_t *io_thread_, class session_base_t *session_)
Definition: udp_engine.cpp:93
virtual int pull_msg(msg_t *msg_)
void reset_pollout(handle_t handle_)
Definition: io_object.cpp:89
void reset_pollin(handle_t handle_)
Definition: io_object.cpp:79
void set_flags(unsigned char flags_)
Definition: msg.cpp:384
#define errno_assert(x)
Definition: err.hpp:129
unsigned char out_buffer[MAX_UDP_MSG]
Definition: udp_engine.hpp:55
unsigned char data[max_vsm_size]
Definition: msg.hpp:187
session_base_t * session
Definition: udp_engine.hpp:51
handle_t add_fd(fd_t fd_)
Definition: io_object.cpp:64
const in_addr multicast_ip() const
socklen_t dest_addrlen() const
const char * address
Definition: test_fork.cpp:32
union zmq::address_t::@0 resolved
const in_addr interface_ip() const
bool is_mcast() const
void rm_fd(handle_t handle_)
Definition: io_object.cpp:69