Line data Source code
1 : /*
2 : Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 :
4 : This file is part of libzmq, the ZeroMQ core engine in C++.
5 :
6 : libzmq is free software; you can redistribute it and/or modify it under
7 : the terms of the GNU Lesser General Public License (LGPL) as published
8 : by the Free Software Foundation; either version 3 of the License, or
9 : (at your option) any later version.
10 :
11 : As a special exception, the Contributors give you permission to link
12 : this library with independent modules to produce an executable,
13 : regardless of the license terms of these independent modules, and to
14 : copy and distribute the resulting executable under terms of your choice,
15 : provided that you also meet, for each linked independent module, the
16 : terms and conditions of the license of that module. An independent
17 : module is a module which is not derived from or based on this library.
18 : If you modify this library, you must extend this exception to your
19 : version of the library.
20 :
21 : libzmq is distributed in the hope that it will be useful, but WITHOUT
22 : ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 : FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 : License for more details.
25 :
26 : You should have received a copy of the GNU Lesser General Public License
27 : along with this program. If not, see <http://www.gnu.org/licenses/>.
28 : */
29 :
30 : #include "precompiled.hpp"
31 : #include "platform.hpp"
32 :
33 : #if defined ZMQ_HAVE_WINDOWS
34 : #include "windows.hpp"
35 : #else
36 : #include <sys/types.h>
37 : #include <unistd.h>
38 : #include <sys/socket.h>
39 : #include <netinet/in.h>
40 : #include <arpa/inet.h>
41 : #endif
42 :
43 : #include "udp_engine.hpp"
44 : #include "session_base.hpp"
45 : #include "v2_protocol.hpp"
46 : #include "err.hpp"
47 : #include "ip.hpp"
48 :
49 6 : zmq::udp_engine_t::udp_engine_t() :
50 : plugged (false),
51 : fd(-1),
52 : session(NULL),
53 : handle(NULL),
54 : address(NULL),
55 : send_enabled(false),
56 12 : recv_enabled(false)
57 : {
58 6 : }
59 :
60 24 : zmq::udp_engine_t::~udp_engine_t()
61 : {
62 6 : zmq_assert (!plugged);
63 :
64 6 : if (fd != retired_fd) {
65 : #ifdef ZMQ_HAVE_WINDOWS
66 : int rc = closesocket (fd);
67 : wsa_assert (rc != SOCKET_ERROR);
68 : #else
69 6 : int rc = close (fd);
70 6 : errno_assert (rc == 0);
71 : #endif
72 6 : fd = retired_fd;
73 : }
74 12 : }
75 :
76 6 : int zmq::udp_engine_t::init (address_t *address_, bool send_, bool recv_)
77 : {
78 6 : zmq_assert (address_);
79 6 : zmq_assert (send_ || recv_);
80 6 : send_enabled = send_;
81 6 : recv_enabled = recv_;
82 6 : address = address_;
83 :
84 6 : fd = open_socket (address->resolved.udp_addr->family (), SOCK_DGRAM, IPPROTO_UDP);
85 6 : if (fd == retired_fd)
86 : return -1;
87 :
88 6 : unblock_socket (fd);
89 :
90 6 : return 0;
91 : }
92 :
93 6 : void zmq::udp_engine_t::plug (io_thread_t* io_thread_, session_base_t *session_)
94 : {
95 6 : zmq_assert (!plugged);
96 6 : plugged = true;
97 :
98 6 : zmq_assert (!session);
99 6 : zmq_assert (session_);
100 6 : session = session_;
101 :
102 : // Connect to I/O threads poller object.
103 6 : io_object_t::plug (io_thread_);
104 6 : handle = add_fd (fd);
105 :
106 6 : if (send_enabled)
107 3 : set_pollout (handle);
108 :
109 6 : if (recv_enabled) {
110 3 : int on = 1;
111 3 : int rc = setsockopt (fd, SOL_SOCKET, SO_REUSEADDR, (char *) &on, sizeof (on));
112 : #ifdef ZMQ_HAVE_WINDOWS
113 : wsa_assert (rc != SOCKET_ERROR);
114 : #else
115 3 : errno_assert (rc == 0);
116 : #endif
117 :
118 : rc = bind (fd, address->resolved.udp_addr->bind_addr (),
119 3 : address->resolved.udp_addr->bind_addrlen ());
120 : #ifdef ZMQ_HAVE_WINDOWS
121 : wsa_assert (rc != SOCKET_ERROR);
122 : #else
123 3 : errno_assert (rc == 0);
124 : #endif
125 :
126 3 : if (address->resolved.udp_addr->is_mcast ()) {
127 : struct ip_mreq mreq;
128 0 : mreq.imr_multiaddr = address->resolved.udp_addr->multicast_ip ();
129 0 : mreq.imr_interface = address->resolved.udp_addr->interface_ip ();
130 0 : rc = setsockopt (fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, (char*) &mreq, sizeof (mreq));
131 : #ifdef ZMQ_HAVE_WINDOWS
132 : wsa_assert (rc != SOCKET_ERROR);
133 : #else
134 0 : errno_assert (rc == 0);
135 : #endif
136 : }
137 3 : set_pollin (handle);
138 :
139 : // Call restart output to drop all join/leave commands
140 3 : restart_output ();
141 : }
142 6 : }
143 :
144 6 : void zmq::udp_engine_t::terminate()
145 : {
146 6 : zmq_assert (plugged);
147 6 : plugged = false;
148 :
149 6 : rm_fd (handle);
150 :
151 : // Disconnect from I/O threads poller object.
152 6 : io_object_t::unplug ();
153 :
154 6 : delete this;
155 6 : }
156 :
157 12 : void zmq::udp_engine_t::out_event()
158 : {
159 : msg_t group_msg;
160 12 : int rc = session->pull_msg (&group_msg);
161 12 : errno_assert (rc == 0 || (rc == -1 && errno == EAGAIN));
162 :
163 12 : if (rc == 0) {
164 : msg_t body_msg;
165 3 : rc = session->pull_msg (&body_msg);
166 :
167 3 : size_t group_size = group_msg.size ();
168 3 : size_t body_size = body_msg.size ();
169 3 : size_t size = group_size + body_size + 1;
170 :
171 : // TODO: check if larger than maximum size
172 3 : out_buffer[0] = (unsigned char) group_size;
173 3 : memcpy (out_buffer + 1, group_msg.data (), group_size);
174 3 : memcpy (out_buffer + 1 + group_size, body_msg.data (), body_size);
175 :
176 3 : rc = group_msg.close ();
177 3 : errno_assert (rc == 0);
178 :
179 3 : body_msg.close ();
180 3 : errno_assert (rc == 0);
181 :
182 : #ifdef ZMQ_HAVE_WINDOWS
183 : rc = sendto (fd, (const char *) out_buffer, (int) size, 0,
184 : address->resolved.udp_addr->dest_addr (),
185 : (int) address->resolved.udp_addr->dest_addrlen ());
186 : wsa_assert (rc != SOCKET_ERROR);
187 : #else
188 : rc = sendto (fd, out_buffer, size, 0,
189 : address->resolved.udp_addr->dest_addr (),
190 3 : address->resolved.udp_addr->dest_addrlen ());
191 3 : errno_assert (rc != -1);
192 : #endif
193 : }
194 : else
195 9 : reset_pollout (handle);
196 12 : }
197 :
198 15 : void zmq::udp_engine_t::restart_output()
199 : {
200 : // If we don't support send we just drop all messages
201 15 : if (!send_enabled) {
202 : msg_t msg;
203 12 : while (session->pull_msg (&msg) == 0)
204 3 : msg.close ();
205 : }
206 : else {
207 6 : set_pollout(handle);
208 6 : out_event ();
209 : }
210 15 : }
211 :
212 3 : void zmq::udp_engine_t::in_event()
213 : {
214 : #ifdef ZMQ_HAVE_WINDOWS
215 : int nbytes = recv(fd, (char*) in_buffer, MAX_UDP_MSG, 0);
216 : const int last_error = WSAGetLastError();
217 : if (nbytes == SOCKET_ERROR) {
218 : wsa_assert(
219 : last_error == WSAENETDOWN ||
220 : last_error == WSAENETRESET ||
221 : last_error == WSAEWOULDBLOCK);
222 : return;
223 : }
224 : #else
225 6 : int nbytes = recv(fd, in_buffer, MAX_UDP_MSG, 0);
226 3 : if (nbytes == -1) {
227 0 : errno_assert(errno != EBADF
228 : && errno != EFAULT
229 : && errno != ENOMEM
230 : && errno != ENOTSOCK);
231 0 : return;
232 : }
233 : #endif
234 :
235 3 : int group_size = in_buffer[0];
236 :
237 : // This doesn't fit, just ingore
238 3 : if (nbytes - 1 < group_size)
239 : return;
240 :
241 3 : int body_size = nbytes - 1 - group_size;
242 :
243 : msg_t msg;
244 3 : int rc = msg.init_size (group_size);
245 3 : errno_assert (rc == 0);
246 3 : msg.set_flags (msg_t::more);
247 3 : memcpy (msg.data (), in_buffer + 1, group_size);
248 :
249 3 : rc = session->push_msg (&msg);
250 3 : errno_assert (rc == 0 || (rc == -1 && errno == EAGAIN));
251 :
252 : // Pipe is full
253 3 : if (rc != 0) {
254 0 : rc = msg.close ();
255 0 : errno_assert (rc == 0);
256 :
257 0 : reset_pollin (handle);
258 : return;
259 : }
260 :
261 3 : rc = msg.close ();
262 3 : errno_assert (rc == 0);
263 3 : rc = msg.init_size (body_size);
264 3 : errno_assert (rc == 0);
265 3 : memcpy (msg.data (), in_buffer + 1 + group_size, body_size);
266 3 : rc = session->push_msg (&msg);
267 3 : errno_assert (rc == 0);
268 3 : rc = msg.close ();
269 3 : errno_assert (rc == 0);
270 3 : session->flush ();
271 : }
272 :
273 0 : void zmq::udp_engine_t::restart_input()
274 : {
275 0 : if (!recv_enabled)
276 0 : return;
277 :
278 0 : set_pollin (handle);
279 0 : in_event ();
280 : }
|