LCOV - code coverage report
Current view: top level - home/h/core/forks/m4-libzmq/src - tcp_listener.cpp (source / functions) Hit Total Coverage
Test: zeromq-4.2.0 Code Coverage Lines: 81 99 81.8 %
Date: 2016-05-09 Functions: 10 10 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*
       2             :     Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
       3             : 
       4             :     This file is part of libzmq, the ZeroMQ core engine in C++.
       5             : 
       6             :     libzmq is free software; you can redistribute it and/or modify it under
       7             :     the terms of the GNU Lesser General Public License (LGPL) as published
       8             :     by the Free Software Foundation; either version 3 of the License, or
       9             :     (at your option) any later version.
      10             : 
      11             :     As a special exception, the Contributors give you permission to link
      12             :     this library with independent modules to produce an executable,
      13             :     regardless of the license terms of these independent modules, and to
      14             :     copy and distribute the resulting executable under terms of your choice,
      15             :     provided that you also meet, for each linked independent module, the
      16             :     terms and conditions of the license of that module. An independent
      17             :     module is a module which is not derived from or based on this library.
      18             :     If you modify this library, you must extend this exception to your
      19             :     version of the library.
      20             : 
      21             :     libzmq is distributed in the hope that it will be useful, but WITHOUT
      22             :     ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
      23             :     FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
      24             :     License for more details.
      25             : 
      26             :     You should have received a copy of the GNU Lesser General Public License
      27             :     along with this program.  If not, see <http://www.gnu.org/licenses/>.
      28             : */
      29             : 
      30             : #include "precompiled.hpp"
      31             : #include <new>
      32             : 
      33             : #include <string>
      34             : #include <stdio.h>
      35             : 
      36             : #include "platform.hpp"
      37             : #include "tcp_listener.hpp"
      38             : #include "stream_engine.hpp"
      39             : #include "io_thread.hpp"
      40             : #include "session_base.hpp"
      41             : #include "config.hpp"
      42             : #include "err.hpp"
      43             : #include "ip.hpp"
      44             : #include "tcp.hpp"
      45             : #include "socket_base.hpp"
      46             : 
      47             : #ifdef ZMQ_HAVE_WINDOWS
      48             : #include "windows.hpp"
      49             : #else
      50             : #include <unistd.h>
      51             : #include <sys/socket.h>
      52             : #include <arpa/inet.h>
      53             : #include <netinet/tcp.h>
      54             : #include <netinet/in.h>
      55             : #include <netdb.h>
      56             : #include <fcntl.h>
      57             : #endif
      58             : 
      59             : #ifdef ZMQ_HAVE_OPENVMS
      60             : #include <ioctl.h>
      61             : #endif
      62             : 
      63         279 : zmq::tcp_listener_t::tcp_listener_t (io_thread_t *io_thread_,
      64             :       socket_base_t *socket_, const options_t &options_) :
      65             :     own_t (io_thread_, options_),
      66             :     io_object_t (io_thread_),
      67             :     s (retired_fd),
      68             :     handle(NULL),
      69         279 :     socket (socket_)
      70             : {
      71         279 : }
      72             : 
      73        1116 : zmq::tcp_listener_t::~tcp_listener_t ()
      74             : {
      75         279 :     zmq_assert (s == retired_fd);
      76         558 : }
      77             : 
      78         279 : void zmq::tcp_listener_t::process_plug ()
      79             : {
      80             :     //  Start polling for incoming connections.
      81         279 :     handle = add_fd (s);
      82         279 :     set_pollin (handle);
      83         279 : }
      84             : 
      85         279 : void zmq::tcp_listener_t::process_term (int linger_)
      86             : {
      87         279 :     rm_fd (handle);
      88         279 :     close ();
      89         279 :     own_t::process_term (linger_);
      90         279 : }
      91             : 
      92        3461 : void zmq::tcp_listener_t::in_event ()
      93             : {
      94        3461 :     fd_t fd = accept ();
      95             : 
      96             :     //  If connection was reset by the peer in the meantime, just ignore it.
      97             :     //  TODO: Handle specific errors like ENFILE/EMFILE etc.
      98        3461 :     if (fd == retired_fd) {
      99           0 :         socket->event_accept_failed (endpoint, zmq_errno());
     100        3461 :         return;
     101             :     }
     102             : 
     103        3461 :     tune_tcp_socket (fd);
     104             :     tune_tcp_keepalives (fd, options.tcp_keepalive, options.tcp_keepalive_cnt,
     105        3461 :             options.tcp_keepalive_idle, options.tcp_keepalive_intvl);
     106        3461 :     tune_tcp_maxrt (fd, options.tcp_maxrt);
     107             : 
     108             :     //  Create the engine object for this connection.
     109             :     stream_engine_t *engine = new (std::nothrow)
     110        3461 :         stream_engine_t (fd, options, endpoint);
     111        3461 :     alloc_assert (engine);
     112             : 
     113             :     //  Choose I/O thread to run connecter in. Given that we are already
     114             :     //  running in an I/O thread, there must be at least one available.
     115        3461 :     io_thread_t *io_thread = choose_io_thread (options.affinity);
     116        3461 :     zmq_assert (io_thread);
     117             : 
     118             :     //  Create and launch a session object.
     119             :     session_base_t *session = session_base_t::create (io_thread, false, socket,
     120        3461 :         options, NULL);
     121        3461 :     errno_assert (session);
     122        3461 :     session->inc_seqnum ();
     123        3461 :     launch_child (session);
     124        3461 :     send_attach (session, engine, false);
     125        3461 :     socket->event_accepted (endpoint, (int) fd);
     126             : }
     127             : 
     128         279 : void zmq::tcp_listener_t::close ()
     129             : {
     130         279 :     zmq_assert (s != retired_fd);
     131             : #ifdef ZMQ_HAVE_WINDOWS
     132             :     int rc = closesocket (s);
     133             :     wsa_assert (rc != SOCKET_ERROR);
     134             : #else
     135         279 :     int rc = ::close (s);
     136         279 :     errno_assert (rc == 0);
     137             : #endif
     138         279 :     socket->event_closed (endpoint, (int) s);
     139         279 :     s = retired_fd;
     140         279 : }
     141             : 
     142         279 : int zmq::tcp_listener_t::get_address (std::string &addr_)
     143             : {
     144             :     // Get the details of the TCP socket
     145             :     struct sockaddr_storage ss;
     146             : #ifdef ZMQ_HAVE_HPUX
     147             :     int sl = sizeof (ss);
     148             : #else
     149         279 :     socklen_t sl = sizeof (ss);
     150             : #endif
     151         279 :     int rc = getsockname (s, (struct sockaddr *) &ss, &sl);
     152             : 
     153         279 :     if (rc != 0) {
     154             :         addr_.clear ();
     155           0 :         return rc;
     156             :     }
     157             : 
     158         279 :     tcp_address_t addr ((struct sockaddr *) &ss, sl);
     159         279 :     return addr.to_string (addr_);
     160             : }
     161             : 
     162         279 : int zmq::tcp_listener_t::set_address (const char *addr_)
     163             : {
     164             :     //  Convert the textual address into address structure.
     165         279 :     int rc = address.resolve (addr_, true, options.ipv6);
     166         279 :     if (rc != 0)
     167             :         return -1;
     168             : 
     169         279 :     address.to_string (endpoint);
     170             : 
     171         279 :     if (options.use_fd != -1) {
     172           9 :         s = options.use_fd;
     173           9 :         socket->event_listening (endpoint, (int) s);
     174             :         return 0;
     175             :     }
     176             : 
     177             :     //  Create a listening socket.
     178         270 :     s = open_socket (address.family (), SOCK_STREAM, IPPROTO_TCP);
     179             : 
     180             :     //  IPv6 address family not supported, try automatic downgrade to IPv4.
     181         270 :     if (s == zmq::retired_fd && address.family () == AF_INET6
     182           0 :     && errno == EAFNOSUPPORT
     183         270 :     && options.ipv6) {
     184           0 :         rc = address.resolve (addr_, true, false);
     185           0 :         if (rc != 0)
     186             :             return rc;
     187           0 :         s = open_socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
     188             :     }
     189             : 
     190             : #ifdef ZMQ_HAVE_WINDOWS
     191             :     if (s == INVALID_SOCKET) {
     192             :         errno = wsa_error_to_errno (WSAGetLastError ());
     193             :         return -1;
     194             :     }
     195             : #if !defined _WIN32_WCE
     196             :     //  On Windows, preventing sockets to be inherited by child processes.
     197             :     BOOL brc = SetHandleInformation ((HANDLE) s, HANDLE_FLAG_INHERIT, 0);
     198             :     win_assert (brc);
     199             : #endif
     200             : #else
     201         270 :     if (s == -1)
     202             :         return -1;
     203             : #endif
     204             : 
     205             :     //  On some systems, IPv4 mapping in IPv6 sockets is disabled by default.
     206             :     //  Switch it on in such cases.
     207         270 :     if (address.family () == AF_INET6)
     208          15 :         enable_ipv4_mapping (s);
     209             : 
     210             :     // Set the IP Type-Of-Service for the underlying socket
     211         270 :     if (options.tos != 0)
     212           3 :         set_ip_type_of_service (s, options.tos);
     213             : 
     214             :     //  Set the socket buffer limits for the underlying socket.
     215         270 :     if (options.sndbuf >= 0)
     216           0 :         set_tcp_send_buffer (s, options.sndbuf);
     217         270 :     if (options.rcvbuf >= 0)
     218           0 :         set_tcp_receive_buffer (s, options.rcvbuf);
     219             : 
     220             :     //  Allow reusing of the address.
     221         270 :     int flag = 1;
     222             : #ifdef ZMQ_HAVE_WINDOWS
     223             :     rc = setsockopt (s, SOL_SOCKET, SO_EXCLUSIVEADDRUSE,
     224             :         (const char*) &flag, sizeof (int));
     225             :     wsa_assert (rc != SOCKET_ERROR);
     226             : #else
     227         270 :     rc = setsockopt (s, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof (int));
     228         270 :     errno_assert (rc == 0);
     229             : #endif
     230             : 
     231             :     //  Bind the socket to the network interface and port.
     232         270 :     rc = bind (s, address.addr (), address.addrlen ());
     233             : #ifdef ZMQ_HAVE_WINDOWS
     234             :     if (rc == SOCKET_ERROR) {
     235             :         errno = wsa_error_to_errno (WSAGetLastError ());
     236             :         goto error;
     237             :     }
     238             : #else
     239         270 :     if (rc != 0)
     240             :         goto error;
     241             : #endif
     242             : 
     243             :     //  Listen for incoming connections.
     244         270 :     rc = listen (s, options.backlog);
     245             : #ifdef ZMQ_HAVE_WINDOWS
     246             :     if (rc == SOCKET_ERROR) {
     247             :         errno = wsa_error_to_errno (WSAGetLastError ());
     248             :         goto error;
     249             :     }
     250             : #else
     251         270 :     if (rc != 0)
     252             :         goto error;
     253             : #endif
     254             : 
     255         270 :     socket->event_listening (endpoint, (int) s);
     256             :     return 0;
     257             : 
     258             : error:
     259           0 :     int err = errno;
     260           0 :     close ();
     261           0 :     errno = err;
     262           0 :     return -1;
     263             : }
     264             : 
     265        3461 : zmq::fd_t zmq::tcp_listener_t::accept ()
     266             : {
     267             :     //  The situation where connection cannot be accepted due to insufficient
     268             :     //  resources is considered valid and treated by ignoring the connection.
     269             :     //  Accept one connection and deal with different failure modes.
     270        3461 :     zmq_assert (s != retired_fd);
     271             : 
     272             :     struct sockaddr_storage ss;
     273             :     memset (&ss, 0, sizeof (ss));
     274             : #ifdef ZMQ_HAVE_HPUX
     275             :     int ss_len = sizeof (ss);
     276             : #else
     277        3461 :     socklen_t ss_len = sizeof (ss);
     278             : #endif
     279        3461 :     fd_t sock = ::accept (s, (struct sockaddr *) &ss, &ss_len);
     280             : 
     281             : #ifdef ZMQ_HAVE_WINDOWS
     282             :     if (sock == INVALID_SOCKET) {
     283             :                 const int last_error = WSAGetLastError();
     284             :         wsa_assert (last_error == WSAEWOULDBLOCK ||
     285             :             last_error == WSAECONNRESET ||
     286             :             last_error == WSAEMFILE ||
     287             :             last_error == WSAENOBUFS);
     288             :         return retired_fd;
     289             :     }
     290             : #if !defined _WIN32_WCE
     291             :     //  On Windows, preventing sockets to be inherited by child processes.
     292             :     BOOL brc = SetHandleInformation ((HANDLE) sock, HANDLE_FLAG_INHERIT, 0);
     293             :     win_assert (brc);
     294             : #endif
     295             : #else
     296        3461 :     if (sock == -1) {
     297           0 :         errno_assert (errno == EAGAIN || errno == EWOULDBLOCK ||
     298             :             errno == EINTR || errno == ECONNABORTED || errno == EPROTO ||
     299             :             errno == ENOBUFS || errno == ENOMEM || errno == EMFILE ||
     300             :             errno == ENFILE);
     301             :         return retired_fd;
     302             :     }
     303             : #endif
     304             : 
     305             :     //  Race condition can cause socket not to be closed (if fork happens
     306             :     //  between accept and this point).
     307             : #ifdef FD_CLOEXEC
     308        3461 :     int rc = fcntl (sock, F_SETFD, FD_CLOEXEC);
     309        3461 :     errno_assert (rc != -1);
     310             : #endif
     311             : 
     312        6922 :     if (!options.tcp_accept_filters.empty ()) {
     313             :         bool matched = false;
     314           0 :         for (options_t::tcp_accept_filters_t::size_type i = 0; i != options.tcp_accept_filters.size (); ++i) {
     315           0 :             if (options.tcp_accept_filters[i].match_address ((struct sockaddr *) &ss, ss_len)) {
     316             :                 matched = true;
     317             :                 break;
     318             :             }
     319             :         }
     320           0 :         if (!matched) {
     321             : #ifdef ZMQ_HAVE_WINDOWS
     322             :             int rc = closesocket (sock);
     323             :             wsa_assert (rc != SOCKET_ERROR);
     324             : #else
     325           0 :             int rc = ::close (sock);
     326           0 :             errno_assert (rc == 0);
     327             : #endif
     328             :             return retired_fd;
     329             :         }
     330             :     }
     331             : 
     332             :     // Set the IP Type-Of-Service priority for this client socket
     333        3461 :     if (options.tos != 0)
     334           3 :         set_ip_type_of_service (sock, options.tos);
     335             : 
     336        3461 :     return sock;
     337             : }

Generated by: LCOV version 1.10