LCOV - code coverage report
Current view: top level - home/h/core/forks/m4-libzmq/src - udp_engine.cpp (source / functions) Hit Total Coverage
Test: zeromq-4.2.0 Code Coverage Lines: 96 110 87.3 %
Date: 2016-05-09 Functions: 9 10 90.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*
       2             : Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
       3             : 
       4             : This file is part of libzmq, the ZeroMQ core engine in C++.
       5             : 
       6             : libzmq is free software; you can redistribute it and/or modify it under
       7             : the terms of the GNU Lesser General Public License (LGPL) as published
       8             : by the Free Software Foundation; either version 3 of the License, or
       9             : (at your option) any later version.
      10             : 
      11             : As a special exception, the Contributors give you permission to link
      12             : this library with independent modules to produce an executable,
      13             : regardless of the license terms of these independent modules, and to
      14             : copy and distribute the resulting executable under terms of your choice,
      15             : provided that you also meet, for each linked independent module, the
      16             : terms and conditions of the license of that module. An independent
      17             : module is a module which is not derived from or based on this library.
      18             : If you modify this library, you must extend this exception to your
      19             : version of the library.
      20             : 
      21             : libzmq is distributed in the hope that it will be useful, but WITHOUT
      22             : ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
      23             : FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
      24             : License for more details.
      25             : 
      26             : You should have received a copy of the GNU Lesser General Public License
      27             : along with this program.  If not, see <http://www.gnu.org/licenses/>.
      28             : */
      29             : 
      30             : #include "precompiled.hpp"
      31             : #include "platform.hpp"
      32             : 
      33             : #if defined ZMQ_HAVE_WINDOWS
      34             : #include "windows.hpp"
      35             : #else
      36             : #include <sys/types.h>
      37             : #include <unistd.h>
      38             : #include <sys/socket.h>
      39             : #include <netinet/in.h>
      40             : #include <arpa/inet.h>
      41             : #endif
      42             : 
      43             : #include "udp_engine.hpp"
      44             : #include "session_base.hpp"
      45             : #include "v2_protocol.hpp"
      46             : #include "err.hpp"
      47             : #include "ip.hpp"
      48             : 
      49           6 : zmq::udp_engine_t::udp_engine_t() :
      50             :     plugged (false),
      51             :     fd(-1),
      52             :     session(NULL),
      53             :     handle(NULL),
      54             :     address(NULL),
      55             :     send_enabled(false),
      56          12 :     recv_enabled(false)
      57             : {
      58           6 : }
      59             : 
      60          24 : zmq::udp_engine_t::~udp_engine_t()
      61             : {
      62           6 :     zmq_assert (!plugged);
      63             : 
      64           6 :     if (fd != retired_fd) {
      65             : #ifdef ZMQ_HAVE_WINDOWS
      66             :         int rc = closesocket (fd);
      67             :         wsa_assert (rc != SOCKET_ERROR);
      68             : #else
      69           6 :         int rc = close (fd);
      70           6 :         errno_assert (rc == 0);
      71             : #endif
      72           6 :         fd = retired_fd;
      73             :     }
      74          12 : }
      75             : 
      76           6 : int zmq::udp_engine_t::init (address_t *address_, bool send_, bool recv_)
      77             : {
      78           6 :     zmq_assert (address_);
      79           6 :     zmq_assert (send_ || recv_);
      80           6 :     send_enabled = send_;
      81           6 :     recv_enabled = recv_;
      82           6 :     address = address_;
      83             : 
      84           6 :     fd = open_socket (address->resolved.udp_addr->family (), SOCK_DGRAM, IPPROTO_UDP);
      85           6 :     if (fd == retired_fd)
      86             :         return -1;
      87             : 
      88           6 :     unblock_socket (fd);
      89             : 
      90           6 :     return 0;
      91             : }
      92             : 
      93           6 : void zmq::udp_engine_t::plug (io_thread_t* io_thread_, session_base_t *session_)
      94             : {
      95           6 :     zmq_assert (!plugged);
      96           6 :     plugged = true;
      97             : 
      98           6 :     zmq_assert (!session);
      99           6 :     zmq_assert (session_);
     100           6 :     session = session_;
     101             : 
     102             :     //  Connect to I/O threads poller object.
     103           6 :     io_object_t::plug (io_thread_);
     104           6 :     handle = add_fd (fd);
     105             : 
     106           6 :     if (send_enabled)
     107           3 :         set_pollout (handle);
     108             : 
     109           6 :     if (recv_enabled) {
     110           3 :         int on = 1;
     111           3 :         int rc = setsockopt (fd, SOL_SOCKET, SO_REUSEADDR, (char *) &on, sizeof (on));
     112             : #ifdef ZMQ_HAVE_WINDOWS
     113             :         wsa_assert (rc != SOCKET_ERROR);
     114             : #else
     115           3 :         errno_assert (rc == 0);
     116             : #endif
     117             : 
     118             :         rc = bind (fd, address->resolved.udp_addr->bind_addr (),
     119           3 :                        address->resolved.udp_addr->bind_addrlen ());
     120             : #ifdef ZMQ_HAVE_WINDOWS
     121             :         wsa_assert (rc != SOCKET_ERROR);
     122             : #else
     123           3 :         errno_assert (rc == 0);
     124             : #endif
     125             : 
     126           3 :         if (address->resolved.udp_addr->is_mcast ()) {
     127             :             struct ip_mreq mreq;
     128           0 :             mreq.imr_multiaddr = address->resolved.udp_addr->multicast_ip ();
     129           0 :             mreq.imr_interface = address->resolved.udp_addr->interface_ip ();
     130           0 :             rc = setsockopt (fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, (char*) &mreq, sizeof (mreq));
     131             : #ifdef ZMQ_HAVE_WINDOWS
     132             :             wsa_assert (rc != SOCKET_ERROR);
     133             : #else
     134           0 :             errno_assert (rc == 0);
     135             : #endif
     136             :         }
     137           3 :         set_pollin (handle);
     138             : 
     139             :         //  Call restart output to drop all join/leave commands
     140           3 :         restart_output ();
     141             :     }
     142           6 : }
     143             : 
     144           6 : void zmq::udp_engine_t::terminate()
     145             : {
     146           6 :     zmq_assert (plugged);
     147           6 :     plugged = false;
     148             : 
     149           6 :     rm_fd (handle);
     150             : 
     151             :     //  Disconnect from I/O threads poller object.
     152           6 :     io_object_t::unplug ();
     153             : 
     154           6 :     delete this;
     155           6 : }
     156             : 
     157          12 : void zmq::udp_engine_t::out_event()
     158             : {
     159             :     msg_t group_msg;
     160          12 :     int rc = session->pull_msg (&group_msg);
     161          12 :     errno_assert (rc == 0 || (rc == -1 && errno == EAGAIN));
     162             : 
     163          12 :     if (rc == 0) {
     164             :         msg_t body_msg;
     165           3 :         rc = session->pull_msg (&body_msg);
     166             : 
     167           3 :         size_t group_size = group_msg.size ();
     168           3 :         size_t body_size = body_msg.size ();
     169           3 :         size_t size = group_size + body_size + 1;
     170             : 
     171             :         // TODO: check if larger than maximum size
     172           3 :         out_buffer[0] = (unsigned char) group_size;
     173           3 :         memcpy (out_buffer + 1, group_msg.data (), group_size);
     174           3 :         memcpy (out_buffer + 1 + group_size, body_msg.data (), body_size);
     175             : 
     176           3 :         rc = group_msg.close ();
     177           3 :         errno_assert (rc == 0);
     178             : 
     179           3 :         body_msg.close ();
     180           3 :         errno_assert (rc == 0);
     181             : 
     182             : #ifdef ZMQ_HAVE_WINDOWS
     183             :         rc = sendto (fd, (const char *) out_buffer, (int) size, 0,
     184             :             address->resolved.udp_addr->dest_addr (),
     185             :             (int) address->resolved.udp_addr->dest_addrlen ());
     186             :         wsa_assert (rc != SOCKET_ERROR);
     187             : #else
     188             :         rc = sendto (fd, out_buffer, size, 0,
     189             :             address->resolved.udp_addr->dest_addr (),
     190           3 :             address->resolved.udp_addr->dest_addrlen ());
     191           3 :         errno_assert (rc != -1);
     192             : #endif
     193             :     }
     194             :     else
     195           9 :        reset_pollout (handle);
     196          12 : }
     197             : 
     198          15 : void zmq::udp_engine_t::restart_output()
     199             : {
     200             :     //  If we don't support send we just drop all messages
     201          15 :     if (!send_enabled) {
     202             :         msg_t msg;
     203          12 :         while (session->pull_msg (&msg) == 0)
     204           3 :             msg.close ();
     205             :     }
     206             :     else {
     207           6 :         set_pollout(handle);
     208           6 :         out_event ();
     209             :     }
     210          15 : }
     211             : 
     212           3 : void zmq::udp_engine_t::in_event()
     213             : {
     214             : #ifdef ZMQ_HAVE_WINDOWS
     215             :     int nbytes = recv(fd, (char*) in_buffer, MAX_UDP_MSG, 0);
     216             :     const int last_error = WSAGetLastError();
     217             :     if (nbytes == SOCKET_ERROR) {
     218             :         wsa_assert(
     219             :             last_error == WSAENETDOWN ||
     220             :             last_error == WSAENETRESET ||
     221             :             last_error == WSAEWOULDBLOCK);
     222             :         return;
     223             :     }
     224             : #else
     225           6 :     int nbytes = recv(fd, in_buffer, MAX_UDP_MSG, 0);
     226           3 :     if (nbytes == -1) {
     227           0 :         errno_assert(errno != EBADF
     228             :             && errno != EFAULT
     229             :             && errno != ENOMEM
     230             :             && errno != ENOTSOCK);
     231           0 :         return;
     232             :     }
     233             : #endif
     234             : 
     235           3 :     int group_size = in_buffer[0];
     236             : 
     237             :     //  This doesn't fit, just ingore
     238           3 :     if (nbytes - 1 < group_size)
     239             :         return;
     240             : 
     241           3 :     int body_size = nbytes - 1 - group_size;
     242             : 
     243             :     msg_t msg;
     244           3 :     int rc = msg.init_size (group_size);
     245           3 :     errno_assert (rc == 0);
     246           3 :     msg.set_flags (msg_t::more);
     247           3 :     memcpy (msg.data (), in_buffer + 1, group_size);
     248             : 
     249           3 :     rc = session->push_msg (&msg);
     250           3 :     errno_assert (rc == 0 || (rc == -1 && errno == EAGAIN));
     251             : 
     252             :     //  Pipe is full
     253           3 :     if (rc != 0) {
     254           0 :         rc = msg.close ();
     255           0 :         errno_assert (rc == 0);
     256             : 
     257           0 :         reset_pollin (handle);
     258             :         return;
     259             :     }
     260             : 
     261           3 :     rc = msg.close ();
     262           3 :     errno_assert (rc == 0);
     263           3 :     rc = msg.init_size (body_size);
     264           3 :     errno_assert (rc == 0);
     265           3 :     memcpy (msg.data (), in_buffer + 1 + group_size, body_size);
     266           3 :     rc = session->push_msg (&msg);
     267           3 :     errno_assert (rc == 0);
     268           3 :     rc = msg.close ();
     269           3 :     errno_assert (rc == 0);
     270           3 :     session->flush ();
     271             : }
     272             : 
     273           0 : void zmq::udp_engine_t::restart_input()
     274             : {
     275           0 :     if (!recv_enabled)
     276           0 :         return;
     277             : 
     278           0 :     set_pollin (handle);
     279           0 :     in_event ();
     280             : }

Generated by: LCOV version 1.10