libzmq  master
ZeroMQ C++ Core Engine (LIBZMQ)
tcp_listener.cpp
Go to the documentation of this file.
1 /*
2  Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 
4  This file is part of libzmq, the ZeroMQ core engine in C++.
5 
6  libzmq is free software; you can redistribute it and/or modify it under
7  the terms of the GNU Lesser General Public License (LGPL) as published
8  by the Free Software Foundation; either version 3 of the License, or
9  (at your option) any later version.
10 
11  As a special exception, the Contributors give you permission to link
12  this library with independent modules to produce an executable,
13  regardless of the license terms of these independent modules, and to
14  copy and distribute the resulting executable under terms of your choice,
15  provided that you also meet, for each linked independent module, the
16  terms and conditions of the license of that module. An independent
17  module is a module which is not derived from or based on this library.
18  If you modify this library, you must extend this exception to your
19  version of the library.
20 
21  libzmq is distributed in the hope that it will be useful, but WITHOUT
22  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23  FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24  License for more details.
25 
26  You should have received a copy of the GNU Lesser General Public License
27  along with this program. If not, see <http://www.gnu.org/licenses/>.
28 */
29 
30 #include "precompiled.hpp"
31 #include <new>
32 
33 #include <string>
34 #include <stdio.h>
35 
36 #include "platform.hpp"
37 #include "tcp_listener.hpp"
38 #include "stream_engine.hpp"
39 #include "io_thread.hpp"
40 #include "session_base.hpp"
41 #include "config.hpp"
42 #include "err.hpp"
43 #include "ip.hpp"
44 #include "tcp.hpp"
45 #include "socket_base.hpp"
46 
47 #ifdef ZMQ_HAVE_WINDOWS
48 #include "windows.hpp"
49 #else
50 #include <unistd.h>
51 #include <sys/socket.h>
52 #include <arpa/inet.h>
53 #include <netinet/tcp.h>
54 #include <netinet/in.h>
55 #include <netdb.h>
56 #include <fcntl.h>
57 #endif
58 
59 #ifdef ZMQ_HAVE_OPENVMS
60 #include <ioctl.h>
61 #endif
62 
64  socket_base_t *socket_, const options_t &options_) :
65  own_t (io_thread_, options_),
66  io_object_t (io_thread_),
67  s (retired_fd),
68  handle(NULL),
69  socket (socket_)
70 {
71 }
72 
74 {
75  zmq_assert (s == retired_fd);
76 }
77 
79 {
80  // Start polling for incoming connections.
81  handle = add_fd (s);
83 }
84 
86 {
87  rm_fd (handle);
88  close ();
89  own_t::process_term (linger_);
90 }
91 
93 {
94  fd_t fd = accept ();
95 
96  // If connection was reset by the peer in the meantime, just ignore it.
97  // TODO: Handle specific errors like ENFILE/EMFILE etc.
98  if (fd == retired_fd) {
100  return;
101  }
102 
103  tune_tcp_socket (fd);
107 
108  // Create the engine object for this connection.
109  stream_engine_t *engine = new (std::nothrow)
111  alloc_assert (engine);
112 
113  // Choose I/O thread to run connecter in. Given that we are already
114  // running in an I/O thread, there must be at least one available.
116  zmq_assert (io_thread);
117 
118  // Create and launch a session object.
119  session_base_t *session = session_base_t::create (io_thread, false, socket,
120  options, NULL);
121  errno_assert (session);
122  session->inc_seqnum ();
123  launch_child (session);
124  send_attach (session, engine, false);
125  socket->event_accepted (endpoint, (int) fd);
126 }
127 
129 {
130  zmq_assert (s != retired_fd);
131 #ifdef ZMQ_HAVE_WINDOWS
132  int rc = closesocket (s);
133  wsa_assert (rc != SOCKET_ERROR);
134 #else
135  int rc = ::close (s);
136  errno_assert (rc == 0);
137 #endif
138  socket->event_closed (endpoint, (int) s);
139  s = retired_fd;
140 }
141 
142 int zmq::tcp_listener_t::get_address (std::string &addr_)
143 {
144  // Get the details of the TCP socket
145  struct sockaddr_storage ss;
146 #ifdef ZMQ_HAVE_HPUX
147  int sl = sizeof (ss);
148 #else
149  socklen_t sl = sizeof (ss);
150 #endif
151  int rc = getsockname (s, (struct sockaddr *) &ss, &sl);
152 
153  if (rc != 0) {
154  addr_.clear ();
155  return rc;
156  }
157 
158  tcp_address_t addr ((struct sockaddr *) &ss, sl);
159  return addr.to_string (addr_);
160 }
161 
162 int zmq::tcp_listener_t::set_address (const char *addr_)
163 {
164  // Convert the textual address into address structure.
165  int rc = address.resolve (addr_, true, options.ipv6);
166  if (rc != 0)
167  return -1;
168 
170 
171  if (options.use_fd != -1) {
172  s = options.use_fd;
173  socket->event_listening (endpoint, (int) s);
174  return 0;
175  }
176 
177  // Create a listening socket.
178  s = open_socket (address.family (), SOCK_STREAM, IPPROTO_TCP);
179 
180  // IPv6 address family not supported, try automatic downgrade to IPv4.
181  if (s == zmq::retired_fd && address.family () == AF_INET6
182  && errno == EAFNOSUPPORT
183  && options.ipv6) {
184  rc = address.resolve (addr_, true, false);
185  if (rc != 0)
186  return rc;
187  s = open_socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
188  }
189 
190 #ifdef ZMQ_HAVE_WINDOWS
191  if (s == INVALID_SOCKET) {
192  errno = wsa_error_to_errno (WSAGetLastError ());
193  return -1;
194  }
195 #if !defined _WIN32_WCE
196  // On Windows, preventing sockets to be inherited by child processes.
197  BOOL brc = SetHandleInformation ((HANDLE) s, HANDLE_FLAG_INHERIT, 0);
198  win_assert (brc);
199 #endif
200 #else
201  if (s == -1)
202  return -1;
203 #endif
204 
205  // On some systems, IPv4 mapping in IPv6 sockets is disabled by default.
206  // Switch it on in such cases.
207  if (address.family () == AF_INET6)
209 
210  // Set the IP Type-Of-Service for the underlying socket
211  if (options.tos != 0)
213 
214  // Set the socket buffer limits for the underlying socket.
215  if (options.sndbuf >= 0)
217  if (options.rcvbuf >= 0)
219 
220  // Allow reusing of the address.
221  int flag = 1;
222 #ifdef ZMQ_HAVE_WINDOWS
223  rc = setsockopt (s, SOL_SOCKET, SO_EXCLUSIVEADDRUSE,
224  (const char*) &flag, sizeof (int));
225  wsa_assert (rc != SOCKET_ERROR);
226 #else
227  rc = setsockopt (s, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof (int));
228  errno_assert (rc == 0);
229 #endif
230 
231  // Bind the socket to the network interface and port.
232  rc = bind (s, address.addr (), address.addrlen ());
233 #ifdef ZMQ_HAVE_WINDOWS
234  if (rc == SOCKET_ERROR) {
235  errno = wsa_error_to_errno (WSAGetLastError ());
236  goto error;
237  }
238 #else
239  if (rc != 0)
240  goto error;
241 #endif
242 
243  // Listen for incoming connections.
244  rc = listen (s, options.backlog);
245 #ifdef ZMQ_HAVE_WINDOWS
246  if (rc == SOCKET_ERROR) {
247  errno = wsa_error_to_errno (WSAGetLastError ());
248  goto error;
249  }
250 #else
251  if (rc != 0)
252  goto error;
253 #endif
254 
255  socket->event_listening (endpoint, (int) s);
256  return 0;
257 
258 error:
259  int err = errno;
260  close ();
261  errno = err;
262  return -1;
263 }
264 
266 {
267  // The situation where connection cannot be accepted due to insufficient
268  // resources is considered valid and treated by ignoring the connection.
269  // Accept one connection and deal with different failure modes.
270  zmq_assert (s != retired_fd);
271 
272  struct sockaddr_storage ss;
273  memset (&ss, 0, sizeof (ss));
274 #ifdef ZMQ_HAVE_HPUX
275  int ss_len = sizeof (ss);
276 #else
277  socklen_t ss_len = sizeof (ss);
278 #endif
279  fd_t sock = ::accept (s, (struct sockaddr *) &ss, &ss_len);
280 
281 #ifdef ZMQ_HAVE_WINDOWS
282  if (sock == INVALID_SOCKET) {
283  const int last_error = WSAGetLastError();
284  wsa_assert (last_error == WSAEWOULDBLOCK ||
285  last_error == WSAECONNRESET ||
286  last_error == WSAEMFILE ||
287  last_error == WSAENOBUFS);
288  return retired_fd;
289  }
290 #if !defined _WIN32_WCE
291  // On Windows, preventing sockets to be inherited by child processes.
292  BOOL brc = SetHandleInformation ((HANDLE) sock, HANDLE_FLAG_INHERIT, 0);
293  win_assert (brc);
294 #endif
295 #else
296  if (sock == -1) {
297  errno_assert (errno == EAGAIN || errno == EWOULDBLOCK ||
298  errno == EINTR || errno == ECONNABORTED || errno == EPROTO ||
299  errno == ENOBUFS || errno == ENOMEM || errno == EMFILE ||
300  errno == ENFILE);
301  return retired_fd;
302  }
303 #endif
304 
305  // Race condition can cause socket not to be closed (if fork happens
306  // between accept and this point).
307 #ifdef FD_CLOEXEC
308  int rc = fcntl (sock, F_SETFD, FD_CLOEXEC);
309  errno_assert (rc != -1);
310 #endif
311 
312  if (!options.tcp_accept_filters.empty ()) {
313  bool matched = false;
314  for (options_t::tcp_accept_filters_t::size_type i = 0; i != options.tcp_accept_filters.size (); ++i) {
315  if (options.tcp_accept_filters[i].match_address ((struct sockaddr *) &ss, ss_len)) {
316  matched = true;
317  break;
318  }
319  }
320  if (!matched) {
321 #ifdef ZMQ_HAVE_WINDOWS
322  int rc = closesocket (sock);
323  wsa_assert (rc != SOCKET_ERROR);
324 #else
325  int rc = ::close (sock);
326  errno_assert (rc == 0);
327 #endif
328  return retired_fd;
329  }
330  }
331 
332  // Set the IP Type-Of-Service priority for this client socket
333  if (options.tos != 0)
335 
336  return sock;
337 }
static session_base_t * create(zmq::io_thread_t *io_thread_, bool active_, zmq::socket_base_t *socket_, const options_t &options_, address_t *addr_)
virtual int to_string(std::string &addr_)
void send_attach(zmq::session_base_t *destination_, zmq::i_engine *engine_, bool inc_seqnum_=true)
Definition: object.cpp:225
void tune_tcp_keepalives(fd_t s_, int keepalive_, int keepalive_cnt_, int keepalive_idle_, int keepalive_intvl_)
Definition: tcp.cpp:96
void process_term(int linger_)
Definition: own.cpp:158
void event_accept_failed(const std::string &addr_, int err_)
fd_t open_socket(int domain_, int type_, int protocol_)
Definition: ip.cpp:50
#define zmq_assert(x)
Definition: err.hpp:119
Definition: command.hpp:84
std::string endpoint
zmq::io_thread_t * choose_io_thread(uint64_t affinity_)
Definition: object.cpp:189
tcp_address_t address
#define ENOBUFS
Definition: zmq.h:118
void event_listening(const std::string &addr_, int fd_)
ZMQ_EXPORT int zmq_errno(void)
Definition: zmq.cpp:107
int get_address(std::string &addr_)
uint64_t affinity
Definition: options.hpp:70
int set_address(const char *addr_)
void inc_seqnum()
Definition: own.cpp:66
int resolve(const char *name_, bool local_, bool ipv6_, bool is_src_=false)
void set_pollin(handle_t handle_)
Definition: io_object.cpp:74
int tcp_keepalive_idle
Definition: options.hpp:159
void set_tcp_receive_buffer(fd_t sockfd_, int bufsize_)
Definition: tcp.cpp:85
void tune_tcp_maxrt(fd_t sockfd_, int timeout_)
Definition: tcp.cpp:166
void tune_tcp_socket(fd_t s_)
Definition: tcp.cpp:51
void enable_ipv4_mapping(fd_t s_)
Definition: ip.cpp:103
void set_tcp_send_buffer(fd_t sockfd_, int bufsize_)
Definition: tcp.cpp:74
tcp_accept_filters_t tcp_accept_filters
Definition: options.hpp:164
#define EPROTO
Definition: err.hpp:58
void event_closed(const std::string &addr_, int fd_)
const sockaddr * addr() const
socklen_t addrlen() const
int tcp_keepalive_cnt
Definition: options.hpp:158
void launch_child(own_t *object_)
Definition: own.cpp:81
sa_family_t family() const
#define alloc_assert(x)
Definition: err.hpp:159
#define errno_assert(x)
Definition: err.hpp:129
#define ECONNABORTED
Definition: zmq.h:148
handle_t add_fd(fd_t fd_)
Definition: io_object.cpp:64
void process_term(int linger_)
options_t options
Definition: own.hpp:109
void event_accepted(const std::string &addr_, int fd_)
zmq::socket_base_t * socket
void rm_fd(handle_t handle_)
Definition: io_object.cpp:69
tcp_listener_t(zmq::io_thread_t *io_thread_, zmq::socket_base_t *socket_, const options_t &options_)
int tcp_keepalive_intvl
Definition: options.hpp:160
#define EAFNOSUPPORT
Definition: zmq.h:142
void set_ip_type_of_service(fd_t s_, int iptos)
Definition: ip.cpp:169