LCOV - code coverage report
Current view: top level - home/h/core/forks/m4-libzmq/src - radio.cpp (source / functions) Hit Total Coverage
Test: zeromq-4.2.0 Code Coverage Lines: 83 101 82.2 %
Date: 2016-05-09 Functions: 11 17 64.7 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*
       2             :     Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
       3             : 
       4             :     This file is part of libzmq, the ZeroMQ core engine in C++.
       5             : 
       6             :     libzmq is free software; you can redistribute it and/or modify it under
       7             :     the terms of the GNU Lesser General Public License (LGPL) as published
       8             :     by the Free Software Foundation; either version 3 of the License, or
       9             :     (at your option) any later version.
      10             : 
      11             :     As a special exception, the Contributors give you permission to link
      12             :     this library with independent modules to produce an executable,
      13             :     regardless of the license terms of these independent modules, and to
      14             :     copy and distribute the resulting executable under terms of your choice,
      15             :     provided that you also meet, for each linked independent module, the
      16             :     terms and conditions of the license of that module. An independent
      17             :     module is a module which is not derived from or based on this library.
      18             :     If you modify this library, you must extend this exception to your
      19             :     version of the library.
      20             : 
      21             :     libzmq is distributed in the hope that it will be useful, but WITHOUT
      22             :     ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
      23             :     FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
      24             :     License for more details.
      25             : 
      26             :     You should have received a copy of the GNU Lesser General Public License
      27             :     along with this program.  If not, see <http://www.gnu.org/licenses/>.
      28             : */
      29             : 
      30             : #include "precompiled.hpp"
      31             : #include <string.h>
      32             : 
      33             : #include "radio.hpp"
      34             : #include "macros.hpp"
      35             : #include "pipe.hpp"
      36             : #include "err.hpp"
      37             : #include "msg.hpp"
      38             : 
      39           6 : zmq::radio_t::radio_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
      40          18 :     socket_base_t (parent_, tid_, sid_, true)
      41             : {
      42           6 :     options.type = ZMQ_RADIO;
      43           6 : }
      44             : 
      45          24 : zmq::radio_t::~radio_t ()
      46             : {
      47          12 : }
      48             : 
      49           6 : void zmq::radio_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
      50             : {
      51             :     LIBZMQ_UNUSED (subscribe_to_all_);
      52             : 
      53           6 :     zmq_assert (pipe_);
      54             : 
      55             :     //  Don't delay pipe termination as there is no one
      56             :     //  to receive the delimiter.
      57           6 :     pipe_->set_nodelay ();
      58             : 
      59           6 :     dist.attach (pipe_);
      60             : 
      61           6 :     if (subscribe_to_all_)
      62           3 :         udp_pipes.push_back (pipe_);
      63             :     //  The pipe is active when attached. Let's read the subscriptions from
      64             :     //  it, if any.
      65             :     else
      66           3 :         xread_activated (pipe_);
      67           6 : }
      68             : 
      69           9 : void zmq::radio_t::xread_activated (pipe_t *pipe_)
      70             : {
      71             :     //  There are some subscriptions waiting. Let's process them.
      72             :     msg_t msg;
      73          27 :     while (pipe_->read (&msg)) {
      74             :         //  Apply the subscription to the trie
      75           9 :         if (msg.is_join () || msg.is_leave ()) {
      76           9 :             std::string group = std::string (msg.group ());
      77             : 
      78           9 :             if (msg.is_join ())
      79          12 :                 subscriptions.insert (subscriptions_t::value_type (group, pipe_));
      80             :             else {
      81             :                 std::pair<subscriptions_t::iterator, subscriptions_t::iterator> range =
      82           6 :                     subscriptions.equal_range (group);
      83             : 
      84           6 :                 for (subscriptions_t::iterator it = range.first; it != range.second; ++it) {
      85           3 :                     if (it->second == pipe_) {
      86           3 :                         subscriptions.erase (it);
      87             :                         break;
      88             :                     }
      89             :                 }
      90             :             }
      91             :         }
      92           9 :         msg.close ();
      93             :     }
      94           9 : }
      95             : 
      96           0 : void zmq::radio_t::xwrite_activated (pipe_t *pipe_)
      97             : {
      98           0 :     dist.activated (pipe_);
      99           0 : }
     100             : 
     101           6 : void zmq::radio_t::xpipe_terminated (pipe_t *pipe_)
     102             : {
     103          30 :     for (subscriptions_t::iterator it = subscriptions.begin (); it != subscriptions.end (); ) {
     104           3 :         if (it->second == pipe_) {
     105           6 :             subscriptions.erase (it++);
     106             :         } else {
     107             :             ++it;
     108             :         }
     109             :     }
     110             : 
     111             :     udp_pipes_t::iterator it = std::find(udp_pipes.begin(),
     112          18 :         udp_pipes.end (), pipe_);
     113          18 :     if (it != udp_pipes.end ())
     114           3 :         udp_pipes.erase (it);
     115             : 
     116           6 :     dist.pipe_terminated (pipe_);
     117           6 : }
     118             : 
     119          18 : int zmq::radio_t::xsend (msg_t *msg_)
     120             : {
     121             :     //  Radio sockets do not allow multipart data (ZMQ_SNDMORE)
     122          18 :     if (msg_->flags () & msg_t::more) {
     123           0 :         errno = EINVAL;
     124           0 :         return -1;
     125             :     }
     126             : 
     127          18 :     dist.unmatch ();
     128             : 
     129             :     std::pair<subscriptions_t::iterator, subscriptions_t::iterator> range =
     130          54 :         subscriptions.equal_range (std::string(msg_->group ()));
     131             : 
     132          45 :     for (subscriptions_t::iterator it = range.first; it != range.second; ++it)
     133           9 :         dist.match (it-> second);
     134             : 
     135          99 :     for (udp_pipes_t::iterator it = udp_pipes.begin (); it != udp_pipes.end (); ++it)
     136           3 :         dist.match (*it);
     137             : 
     138          18 :     int rc = dist.send_to_matching (msg_);
     139             : 
     140             :     return rc;
     141             : }
     142             : 
     143           0 : bool zmq::radio_t::xhas_out ()
     144             : {
     145           0 :     return dist.has_out ();
     146             : }
     147             : 
     148           0 : int zmq::radio_t::xrecv (msg_t *msg_)
     149             : {
     150             :     //  Messages cannot be received from PUB socket.
     151             :     LIBZMQ_UNUSED (msg_);
     152           0 :     errno = ENOTSUP;
     153           0 :     return -1;
     154             : }
     155             : 
     156           0 : bool zmq::radio_t::xhas_in ()
     157             : {
     158           0 :     return false;
     159             : }
     160             : 
     161           6 : zmq::radio_session_t::radio_session_t (io_thread_t *io_thread_, bool connect_,
     162             :       socket_base_t *socket_, const options_t &options_,
     163             :       address_t *addr_) :
     164             :     session_base_t (io_thread_, connect_, socket_, options_, addr_),
     165           6 :     state (group)
     166             : {
     167           6 : }
     168             : 
     169           6 : zmq::radio_session_t::~radio_session_t ()
     170             : {
     171           6 : }
     172             : 
     173           9 : int zmq::radio_session_t::push_msg (msg_t *msg_)
     174             : {
     175           9 :     if (msg_->flags() & msg_t::command) {
     176             :         char *command_data =
     177           9 :             static_cast <char *> (msg_->data ());
     178           9 :         const size_t data_size = msg_->size ();
     179             : 
     180             :         int group_length;
     181             :         char * group;
     182             : 
     183             :         msg_t join_leave_msg;
     184             :         int rc;
     185             : 
     186             :         //  Set the msg type to either JOIN or LEAVE
     187           9 :         if (data_size >= 5 && memcmp (command_data, "\4JOIN", 5) == 0) {
     188           6 :             group_length = (int) data_size - 5;
     189           6 :             group = command_data + 5;
     190           6 :             rc = join_leave_msg.init_join ();
     191             :         }
     192           3 :         else if (data_size >= 6 && memcmp (command_data, "\5LEAVE", 6) == 0) {
     193           3 :             group_length = (int) data_size - 6;
     194           3 :             group = command_data + 6;
     195           3 :             rc = join_leave_msg.init_leave ();
     196             :         }
     197             :         //  If it is not a JOIN or LEAVE just push the message
     198             :         else
     199           0 :             return session_base_t::push_msg (msg_);
     200             : 
     201           9 :         errno_assert (rc == 0);
     202             : 
     203             :         //  Set the group
     204           9 :         rc = join_leave_msg.set_group (group, group_length);
     205           9 :         errno_assert (rc == 0);
     206             : 
     207             :         //  Close the current command
     208           9 :         rc = msg_->close ();
     209           9 :         errno_assert (rc == 0);
     210             : 
     211             :         //  Push the join or leave command
     212           9 :         *msg_ = join_leave_msg;
     213           9 :         return session_base_t::push_msg (msg_);
     214             :     }
     215             :     else
     216           0 :         return session_base_t::push_msg (msg_);
     217             : }
     218             : 
     219          57 : int zmq::radio_session_t::pull_msg (msg_t *msg_)
     220             : {
     221          57 :     if (state == group) {
     222          45 :         int rc = session_base_t::pull_msg (&pending_msg);
     223          45 :         if (rc != 0)
     224             :             return rc;
     225             : 
     226          12 :         const char *group = pending_msg.group ();
     227          12 :         int length = (int) strlen (group);
     228             : 
     229             :         //  First frame is the group
     230          12 :         rc = msg_->init_size (length);
     231          12 :         errno_assert(rc == 0);
     232          12 :         msg_->set_flags(msg_t::more);
     233          12 :         memcpy (msg_->data (), group, length);
     234             : 
     235             :         //  Next status is the body
     236          12 :         state = body;
     237          12 :         return 0;
     238             :     }
     239             :     else {
     240          12 :         *msg_ = pending_msg;
     241          12 :         state = group;
     242          12 :         return 0;
     243             :     }
     244             : }
     245             : 
     246           0 : void zmq::radio_session_t::reset ()
     247             : {
     248           0 :     session_base_t::reset ();
     249           0 :     state = group;
     250           0 : }

Generated by: LCOV version 1.10