Line data Source code
1 : /*
2 : Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 :
4 : This file is part of libzmq, the ZeroMQ core engine in C++.
5 :
6 : libzmq is free software; you can redistribute it and/or modify it under
7 : the terms of the GNU Lesser General Public License (LGPL) as published
8 : by the Free Software Foundation; either version 3 of the License, or
9 : (at your option) any later version.
10 :
11 : As a special exception, the Contributors give you permission to link
12 : this library with independent modules to produce an executable,
13 : regardless of the license terms of these independent modules, and to
14 : copy and distribute the resulting executable under terms of your choice,
15 : provided that you also meet, for each linked independent module, the
16 : terms and conditions of the license of that module. An independent
17 : module is a module which is not derived from or based on this library.
18 : If you modify this library, you must extend this exception to your
19 : version of the library.
20 :
21 : libzmq is distributed in the hope that it will be useful, but WITHOUT
22 : ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 : FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 : License for more details.
25 :
26 : You should have received a copy of the GNU Lesser General Public License
27 : along with this program. If not, see <http://www.gnu.org/licenses/>.
28 : */
29 :
30 : #include "precompiled.hpp"
31 : #include <new>
32 :
33 : #include <string>
34 : #include <stdio.h>
35 :
36 : #include "platform.hpp"
37 : #include "tcp_listener.hpp"
38 : #include "stream_engine.hpp"
39 : #include "io_thread.hpp"
40 : #include "session_base.hpp"
41 : #include "config.hpp"
42 : #include "err.hpp"
43 : #include "ip.hpp"
44 : #include "tcp.hpp"
45 : #include "socket_base.hpp"
46 :
47 : #ifdef ZMQ_HAVE_WINDOWS
48 : #include "windows.hpp"
49 : #else
50 : #include <unistd.h>
51 : #include <sys/socket.h>
52 : #include <arpa/inet.h>
53 : #include <netinet/tcp.h>
54 : #include <netinet/in.h>
55 : #include <netdb.h>
56 : #include <fcntl.h>
57 : #endif
58 :
59 : #ifdef ZMQ_HAVE_OPENVMS
60 : #include <ioctl.h>
61 : #endif
62 :
63 279 : zmq::tcp_listener_t::tcp_listener_t (io_thread_t *io_thread_,
64 : socket_base_t *socket_, const options_t &options_) :
65 : own_t (io_thread_, options_),
66 : io_object_t (io_thread_),
67 : s (retired_fd),
68 : handle(NULL),
69 279 : socket (socket_)
70 : {
71 279 : }
72 :
73 1116 : zmq::tcp_listener_t::~tcp_listener_t ()
74 : {
75 279 : zmq_assert (s == retired_fd);
76 558 : }
77 :
78 279 : void zmq::tcp_listener_t::process_plug ()
79 : {
80 : // Start polling for incoming connections.
81 279 : handle = add_fd (s);
82 279 : set_pollin (handle);
83 279 : }
84 :
85 279 : void zmq::tcp_listener_t::process_term (int linger_)
86 : {
87 279 : rm_fd (handle);
88 279 : close ();
89 279 : own_t::process_term (linger_);
90 279 : }
91 :
92 3461 : void zmq::tcp_listener_t::in_event ()
93 : {
94 3461 : fd_t fd = accept ();
95 :
96 : // If connection was reset by the peer in the meantime, just ignore it.
97 : // TODO: Handle specific errors like ENFILE/EMFILE etc.
98 3461 : if (fd == retired_fd) {
99 0 : socket->event_accept_failed (endpoint, zmq_errno());
100 3461 : return;
101 : }
102 :
103 3461 : tune_tcp_socket (fd);
104 : tune_tcp_keepalives (fd, options.tcp_keepalive, options.tcp_keepalive_cnt,
105 3461 : options.tcp_keepalive_idle, options.tcp_keepalive_intvl);
106 3461 : tune_tcp_maxrt (fd, options.tcp_maxrt);
107 :
108 : // Create the engine object for this connection.
109 : stream_engine_t *engine = new (std::nothrow)
110 3461 : stream_engine_t (fd, options, endpoint);
111 3461 : alloc_assert (engine);
112 :
113 : // Choose I/O thread to run connecter in. Given that we are already
114 : // running in an I/O thread, there must be at least one available.
115 3461 : io_thread_t *io_thread = choose_io_thread (options.affinity);
116 3461 : zmq_assert (io_thread);
117 :
118 : // Create and launch a session object.
119 : session_base_t *session = session_base_t::create (io_thread, false, socket,
120 3461 : options, NULL);
121 3461 : errno_assert (session);
122 3461 : session->inc_seqnum ();
123 3461 : launch_child (session);
124 3461 : send_attach (session, engine, false);
125 3461 : socket->event_accepted (endpoint, (int) fd);
126 : }
127 :
128 279 : void zmq::tcp_listener_t::close ()
129 : {
130 279 : zmq_assert (s != retired_fd);
131 : #ifdef ZMQ_HAVE_WINDOWS
132 : int rc = closesocket (s);
133 : wsa_assert (rc != SOCKET_ERROR);
134 : #else
135 279 : int rc = ::close (s);
136 279 : errno_assert (rc == 0);
137 : #endif
138 279 : socket->event_closed (endpoint, (int) s);
139 279 : s = retired_fd;
140 279 : }
141 :
142 279 : int zmq::tcp_listener_t::get_address (std::string &addr_)
143 : {
144 : // Get the details of the TCP socket
145 : struct sockaddr_storage ss;
146 : #ifdef ZMQ_HAVE_HPUX
147 : int sl = sizeof (ss);
148 : #else
149 279 : socklen_t sl = sizeof (ss);
150 : #endif
151 279 : int rc = getsockname (s, (struct sockaddr *) &ss, &sl);
152 :
153 279 : if (rc != 0) {
154 : addr_.clear ();
155 0 : return rc;
156 : }
157 :
158 279 : tcp_address_t addr ((struct sockaddr *) &ss, sl);
159 279 : return addr.to_string (addr_);
160 : }
161 :
162 279 : int zmq::tcp_listener_t::set_address (const char *addr_)
163 : {
164 : // Convert the textual address into address structure.
165 279 : int rc = address.resolve (addr_, true, options.ipv6);
166 279 : if (rc != 0)
167 : return -1;
168 :
169 279 : address.to_string (endpoint);
170 :
171 279 : if (options.use_fd != -1) {
172 9 : s = options.use_fd;
173 9 : socket->event_listening (endpoint, (int) s);
174 : return 0;
175 : }
176 :
177 : // Create a listening socket.
178 270 : s = open_socket (address.family (), SOCK_STREAM, IPPROTO_TCP);
179 :
180 : // IPv6 address family not supported, try automatic downgrade to IPv4.
181 270 : if (s == zmq::retired_fd && address.family () == AF_INET6
182 0 : && errno == EAFNOSUPPORT
183 270 : && options.ipv6) {
184 0 : rc = address.resolve (addr_, true, false);
185 0 : if (rc != 0)
186 : return rc;
187 0 : s = open_socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
188 : }
189 :
190 : #ifdef ZMQ_HAVE_WINDOWS
191 : if (s == INVALID_SOCKET) {
192 : errno = wsa_error_to_errno (WSAGetLastError ());
193 : return -1;
194 : }
195 : #if !defined _WIN32_WCE
196 : // On Windows, preventing sockets to be inherited by child processes.
197 : BOOL brc = SetHandleInformation ((HANDLE) s, HANDLE_FLAG_INHERIT, 0);
198 : win_assert (brc);
199 : #endif
200 : #else
201 270 : if (s == -1)
202 : return -1;
203 : #endif
204 :
205 : // On some systems, IPv4 mapping in IPv6 sockets is disabled by default.
206 : // Switch it on in such cases.
207 270 : if (address.family () == AF_INET6)
208 15 : enable_ipv4_mapping (s);
209 :
210 : // Set the IP Type-Of-Service for the underlying socket
211 270 : if (options.tos != 0)
212 3 : set_ip_type_of_service (s, options.tos);
213 :
214 : // Set the socket buffer limits for the underlying socket.
215 270 : if (options.sndbuf >= 0)
216 0 : set_tcp_send_buffer (s, options.sndbuf);
217 270 : if (options.rcvbuf >= 0)
218 0 : set_tcp_receive_buffer (s, options.rcvbuf);
219 :
220 : // Allow reusing of the address.
221 270 : int flag = 1;
222 : #ifdef ZMQ_HAVE_WINDOWS
223 : rc = setsockopt (s, SOL_SOCKET, SO_EXCLUSIVEADDRUSE,
224 : (const char*) &flag, sizeof (int));
225 : wsa_assert (rc != SOCKET_ERROR);
226 : #else
227 270 : rc = setsockopt (s, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof (int));
228 270 : errno_assert (rc == 0);
229 : #endif
230 :
231 : // Bind the socket to the network interface and port.
232 270 : rc = bind (s, address.addr (), address.addrlen ());
233 : #ifdef ZMQ_HAVE_WINDOWS
234 : if (rc == SOCKET_ERROR) {
235 : errno = wsa_error_to_errno (WSAGetLastError ());
236 : goto error;
237 : }
238 : #else
239 270 : if (rc != 0)
240 : goto error;
241 : #endif
242 :
243 : // Listen for incoming connections.
244 270 : rc = listen (s, options.backlog);
245 : #ifdef ZMQ_HAVE_WINDOWS
246 : if (rc == SOCKET_ERROR) {
247 : errno = wsa_error_to_errno (WSAGetLastError ());
248 : goto error;
249 : }
250 : #else
251 270 : if (rc != 0)
252 : goto error;
253 : #endif
254 :
255 270 : socket->event_listening (endpoint, (int) s);
256 : return 0;
257 :
258 : error:
259 0 : int err = errno;
260 0 : close ();
261 0 : errno = err;
262 0 : return -1;
263 : }
264 :
265 3461 : zmq::fd_t zmq::tcp_listener_t::accept ()
266 : {
267 : // The situation where connection cannot be accepted due to insufficient
268 : // resources is considered valid and treated by ignoring the connection.
269 : // Accept one connection and deal with different failure modes.
270 3461 : zmq_assert (s != retired_fd);
271 :
272 : struct sockaddr_storage ss;
273 : memset (&ss, 0, sizeof (ss));
274 : #ifdef ZMQ_HAVE_HPUX
275 : int ss_len = sizeof (ss);
276 : #else
277 3461 : socklen_t ss_len = sizeof (ss);
278 : #endif
279 3461 : fd_t sock = ::accept (s, (struct sockaddr *) &ss, &ss_len);
280 :
281 : #ifdef ZMQ_HAVE_WINDOWS
282 : if (sock == INVALID_SOCKET) {
283 : const int last_error = WSAGetLastError();
284 : wsa_assert (last_error == WSAEWOULDBLOCK ||
285 : last_error == WSAECONNRESET ||
286 : last_error == WSAEMFILE ||
287 : last_error == WSAENOBUFS);
288 : return retired_fd;
289 : }
290 : #if !defined _WIN32_WCE
291 : // On Windows, preventing sockets to be inherited by child processes.
292 : BOOL brc = SetHandleInformation ((HANDLE) sock, HANDLE_FLAG_INHERIT, 0);
293 : win_assert (brc);
294 : #endif
295 : #else
296 3461 : if (sock == -1) {
297 0 : errno_assert (errno == EAGAIN || errno == EWOULDBLOCK ||
298 : errno == EINTR || errno == ECONNABORTED || errno == EPROTO ||
299 : errno == ENOBUFS || errno == ENOMEM || errno == EMFILE ||
300 : errno == ENFILE);
301 : return retired_fd;
302 : }
303 : #endif
304 :
305 : // Race condition can cause socket not to be closed (if fork happens
306 : // between accept and this point).
307 : #ifdef FD_CLOEXEC
308 3461 : int rc = fcntl (sock, F_SETFD, FD_CLOEXEC);
309 3461 : errno_assert (rc != -1);
310 : #endif
311 :
312 6922 : if (!options.tcp_accept_filters.empty ()) {
313 : bool matched = false;
314 0 : for (options_t::tcp_accept_filters_t::size_type i = 0; i != options.tcp_accept_filters.size (); ++i) {
315 0 : if (options.tcp_accept_filters[i].match_address ((struct sockaddr *) &ss, ss_len)) {
316 : matched = true;
317 : break;
318 : }
319 : }
320 0 : if (!matched) {
321 : #ifdef ZMQ_HAVE_WINDOWS
322 : int rc = closesocket (sock);
323 : wsa_assert (rc != SOCKET_ERROR);
324 : #else
325 0 : int rc = ::close (sock);
326 0 : errno_assert (rc == 0);
327 : #endif
328 : return retired_fd;
329 : }
330 : }
331 :
332 : // Set the IP Type-Of-Service priority for this client socket
333 3461 : if (options.tos != 0)
334 3 : set_ip_type_of_service (sock, options.tos);
335 :
336 3461 : return sock;
337 : }
|