LCOV - code coverage report
Current view: top level - home/h/core/forks/m4-libzmq/src - dish.cpp (source / functions) Hit Total Coverage
Test: zeromq-4.2.0 Code Coverage Lines: 126 164 76.8 %
Date: 2016-05-09 Functions: 15 22 68.2 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*
       2             :     Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
       3             : 
       4             :     This file is part of libzmq, the ZeroMQ core engine in C++.
       5             : 
       6             :     libzmq is free software; you can redistribute it and/or modify it under
       7             :     the terms of the GNU Lesser General Public License (LGPL) as published
       8             :     by the Free Software Foundation; either version 3 of the License, or
       9             :     (at your option) any later version.
      10             : 
      11             :     As a special exception, the Contributors give you permission to link
      12             :     this library with independent modules to produce an executable,
      13             :     regardless of the license terms of these independent modules, and to
      14             :     copy and distribute the resulting executable under terms of your choice,
      15             :     provided that you also meet, for each linked independent module, the
      16             :     terms and conditions of the license of that module. An independent
      17             :     module is a module which is not derived from or based on this library.
      18             :     If you modify this library, you must extend this exception to your
      19             :     version of the library.
      20             : 
      21             :     libzmq is distributed in the hope that it will be useful, but WITHOUT
      22             :     ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
      23             :     FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
      24             :     License for more details.
      25             : 
      26             :     You should have received a copy of the GNU Lesser General Public License
      27             :     along with this program.  If not, see <http://www.gnu.org/licenses/>.
      28             : */
      29             : 
      30             : #include "precompiled.hpp"
      31             : #include <string.h>
      32             : 
      33             : #include "platform.hpp"
      34             : 
      35             : #ifdef ZMQ_HAVE_WINDOWS
      36             : #include "windows.hpp"
      37             : #endif
      38             : 
      39             : #include "../include/zmq.h"
      40             : #include "macros.hpp"
      41             : #include "dish.hpp"
      42             : #include "err.hpp"
      43             : 
      44           6 : zmq::dish_t::dish_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
      45             :     socket_base_t (parent_, tid_, sid_, true),
      46          12 :     has_message (false)
      47             : {
      48           6 :     options.type = ZMQ_DISH;
      49             : 
      50             :     //  When socket is being closed down we don't want to wait till pending
      51             :     //  subscription commands are sent to the wire.
      52           6 :     options.linger = 0;
      53             : 
      54           6 :     int rc = message.init ();
      55           6 :     errno_assert (rc == 0);
      56           6 : }
      57             : 
      58          24 : zmq::dish_t::~dish_t ()
      59             : {
      60           6 :     int rc = message.close ();
      61           6 :     errno_assert (rc == 0);
      62          12 : }
      63             : 
      64           6 : void zmq::dish_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
      65             : {
      66             :     LIBZMQ_UNUSED (subscribe_to_all_);
      67             : 
      68           6 :     zmq_assert (pipe_);
      69           6 :     fq.attach (pipe_);
      70           6 :     dist.attach (pipe_);
      71             : 
      72             :     //  Send all the cached subscriptions to the new upstream peer.
      73           6 :     send_subscriptions (pipe_);
      74           6 : }
      75             : 
      76          12 : void zmq::dish_t::xread_activated (pipe_t *pipe_)
      77             : {
      78          12 :     fq.activated (pipe_);
      79          12 : }
      80             : 
      81           0 : void zmq::dish_t::xwrite_activated (pipe_t *pipe_)
      82             : {
      83           0 :     dist.activated (pipe_);
      84           0 : }
      85             : 
      86           6 : void zmq::dish_t::xpipe_terminated (pipe_t *pipe_)
      87             : {
      88           6 :     fq.pipe_terminated (pipe_);
      89           6 :     dist.pipe_terminated (pipe_);
      90           6 : }
      91             : 
      92           0 : void zmq::dish_t::xhiccuped (pipe_t *pipe_)
      93             : {
      94             :     //  Send all the cached subscriptions to the hiccuped pipe.
      95           0 :     send_subscriptions (pipe_);
      96           0 : }
      97             : 
      98          15 : int zmq::dish_t::xjoin (const char* group_)
      99             : {
     100          15 :     std::string group = std::string (group_);
     101             : 
     102          15 :     if (group.length () > ZMQ_GROUP_MAX_LENGTH) {
     103           3 :         errno = EINVAL;
     104           3 :         return -1;
     105             :     }
     106             : 
     107          24 :     subscriptions_t::iterator it = subscriptions.find (group);
     108             : 
     109             :     //  User cannot join same group twice
     110          24 :     if (it != subscriptions.end ()) {
     111           3 :         errno = EINVAL;
     112           3 :         return -1;
     113             :     }
     114             : 
     115           9 :     subscriptions.insert (group);
     116             : 
     117             :     msg_t msg;
     118           9 :     int rc = msg.init_join ();
     119           9 :     errno_assert (rc == 0);
     120             : 
     121           9 :     rc = msg.set_group (group_);
     122           9 :     errno_assert (rc == 0);
     123             : 
     124           9 :     int err = 0;
     125           9 :     rc = dist.send_to_all (&msg);
     126           9 :     if (rc != 0)
     127           0 :         err = errno;
     128           9 :     int rc2 = msg.close ();
     129           9 :     errno_assert (rc2 == 0);
     130           9 :     if (rc != 0)
     131           0 :         errno = err;
     132           9 :     return rc;
     133             : }
     134             : 
     135           6 : int zmq::dish_t::xleave (const char* group_)
     136             : {
     137           6 :     std::string group = std::string (group_);
     138             : 
     139           6 :     if (group.length () > ZMQ_GROUP_MAX_LENGTH) {
     140           0 :         errno = EINVAL;
     141           0 :         return -1;
     142             :     }
     143             : 
     144          18 :     subscriptions_t::iterator it =  std::find (subscriptions.begin (), subscriptions.end (), group);
     145             : 
     146          12 :     if (it == subscriptions.end ()) {
     147           3 :         errno = EINVAL;
     148           3 :         return -1;
     149             :     }
     150             : 
     151           3 :     subscriptions.erase (it);
     152             : 
     153             :     msg_t msg;
     154           3 :     int rc = msg.init_leave ();
     155           3 :     errno_assert (rc == 0);
     156             : 
     157           3 :     rc = msg.set_group (group_);
     158           3 :     errno_assert (rc == 0);
     159             : 
     160           3 :     int err = 0;
     161           3 :     rc = dist.send_to_all (&msg);
     162           3 :     if (rc != 0)
     163           0 :         err = errno;
     164           3 :     int rc2 = msg.close ();
     165           3 :     errno_assert (rc2 == 0);
     166           3 :     if (rc != 0)
     167           0 :         errno = err;
     168           3 :     return rc;
     169             : }
     170             : 
     171           0 : int zmq::dish_t::xsend (msg_t *msg_)
     172             : {
     173             :     LIBZMQ_UNUSED (msg_);
     174           0 :     errno = ENOTSUP;
     175           0 :     return -1;
     176             : }
     177             : 
     178           0 : bool zmq::dish_t::xhas_out ()
     179             : {
     180             :     //  Subscription can be added/removed anytime.
     181           0 :     return true;
     182             : }
     183             : 
     184          30 : int zmq::dish_t::xrecv (msg_t *msg_)
     185             : {
     186             :     //  If there's already a message prepared by a previous call to zmq_poll,
     187             :     //  return it straight ahead.
     188          30 :     if (has_message) {
     189           0 :         int rc = msg_->move (message);
     190           0 :         errno_assert (rc == 0);
     191           0 :         has_message = false;
     192           0 :         return 0;
     193             :     }
     194             : 
     195             :     while (true) {
     196             : 
     197             :         //  Get a message using fair queueing algorithm.
     198          30 :         int rc = fq.recv (msg_);
     199             : 
     200             :         //  If there's no message available, return immediately.
     201             :         //  The same when error occurs.
     202          30 :         if (rc != 0)
     203             :             return -1;
     204             : 
     205             :         //  Filtering non matching messages
     206          36 :         subscriptions_t::iterator it = subscriptions.find (std::string(msg_->group ()));
     207          24 :         if (it != subscriptions.end ())
     208             :             return 0;
     209             :     }
     210             : }
     211             : 
     212           0 : bool zmq::dish_t::xhas_in ()
     213             : {
     214             :     //  If there's already a message prepared by a previous call to zmq_poll,
     215             :     //  return straight ahead.
     216           0 :     if (has_message)
     217             :         return true;
     218             : 
     219             :     while (true) {
     220             :         //  Get a message using fair queueing algorithm.
     221           0 :         int rc = fq.recv (&message);
     222             : 
     223             :         //  If there's no message available, return immediately.
     224             :         //  The same when error occurs.
     225           0 :         if (rc != 0) {
     226           0 :             errno_assert (errno == EAGAIN);
     227             :             return false;
     228             :         }
     229             : 
     230             :         //  Filtering non matching messages
     231           0 :         subscriptions_t::iterator it = subscriptions.find (std::string(message.group ()));
     232           0 :         if (it != subscriptions.end ()) {
     233           0 :             has_message = true;
     234           0 :             return true;
     235             :         }
     236             :     }
     237             : }
     238             : 
     239           0 : zmq::blob_t zmq::dish_t::get_credential () const
     240             : {
     241           0 :     return fq.get_credential ();
     242             : }
     243             : 
     244           6 : void zmq::dish_t::send_subscriptions (pipe_t *pipe_)
     245             : {
     246          30 :     for (subscriptions_t::iterator it = subscriptions.begin (); it != subscriptions.end (); ++it) {
     247             :         msg_t msg;
     248           3 :         int rc = msg.init_join ();
     249           3 :         errno_assert (rc == 0);
     250             : 
     251           6 :         rc = msg.set_group (it->c_str());
     252           3 :         errno_assert (rc == 0);
     253             : 
     254             :         //  Send it to the pipe.
     255           3 :         pipe_->write (&msg);
     256           3 :         msg.close ();
     257             :     }
     258             : 
     259           6 :     pipe_->flush ();
     260           6 : }
     261             : 
     262           6 : zmq::dish_session_t::dish_session_t (io_thread_t *io_thread_, bool connect_,
     263             :       socket_base_t *socket_, const options_t &options_,
     264             :       address_t *addr_) :
     265             :     session_base_t (io_thread_, connect_, socket_, options_, addr_),
     266           6 :     state (group)
     267             : {
     268           6 : }
     269             : 
     270           6 : zmq::dish_session_t::~dish_session_t ()
     271             : {
     272           6 : }
     273             : 
     274          24 : int zmq::dish_session_t::push_msg (msg_t *msg_)
     275             : {
     276          24 :     if (state == group) {
     277          12 :         if ((msg_->flags() & msg_t::more) != msg_t::more) {
     278           0 :             errno = EFAULT;
     279           0 :             return -1;
     280             :         }
     281             : 
     282          12 :         if (msg_->size() > ZMQ_GROUP_MAX_LENGTH) {
     283           0 :             errno = EFAULT;
     284           0 :             return -1;
     285             :         }
     286             : 
     287          12 :         group_msg = *msg_;
     288          12 :         state = body;
     289             : 
     290          12 :         int rc = msg_->init ();
     291          12 :         errno_assert (rc == 0);
     292             :         return 0;
     293             :     }
     294             :     else {
     295             :         //  Set the message group
     296          12 :         int rc = msg_->set_group ((char*)group_msg.data (), group_msg. size());
     297          12 :         errno_assert (rc == 0);
     298             : 
     299             :         //  We set the group, so we don't need the group_msg anymore
     300          12 :         rc = group_msg.close ();
     301          12 :         errno_assert (rc == 0);
     302             : 
     303             :         //  Thread safe socket doesn't support multipart messages
     304          12 :         if ((msg_->flags() & msg_t::more) == msg_t::more) {
     305           0 :             errno = EFAULT;
     306           0 :             return -1;
     307             :         }
     308             : 
     309             :         //  Push message to dish socket
     310          12 :         rc = session_base_t::push_msg (msg_);
     311             : 
     312          12 :         if (rc == 0)
     313          12 :             state = group;
     314             : 
     315          12 :         return rc;
     316             :     }
     317             : }
     318             : 
     319          42 : int zmq::dish_session_t::pull_msg (msg_t *msg_)
     320             : {
     321          42 :     int rc = session_base_t::pull_msg (msg_);
     322             : 
     323          42 :     if (rc != 0)
     324             :         return rc;
     325             : 
     326          12 :     if (!msg_->is_join () && !msg_->is_leave ())
     327             :         return rc;
     328             :     else {
     329          12 :         int group_length = (int) strlen (msg_->group ());
     330             : 
     331             :         msg_t command;
     332             :         int offset;
     333             : 
     334          12 :         if (msg_->is_join ()) {
     335           9 :             rc = command.init_size (group_length + 5);
     336           9 :             errno_assert(rc == 0);
     337           9 :             offset = 5;
     338           9 :             memcpy (command.data (), "\4JOIN", 5);
     339             :         }
     340             :         else {
     341           3 :             rc = command.init_size (group_length + 6);
     342           3 :             errno_assert(rc == 0);
     343           3 :             offset = 6;
     344           3 :             memcpy (command.data (), "\5LEAVE", 6);
     345             :         }
     346             : 
     347          12 :         command.set_flags (msg_t::command);
     348          12 :         char* command_data = (char*)command.data ();
     349             : 
     350             :         //  Copy the group
     351          12 :         memcpy (command_data + offset, msg_->group (), group_length);
     352             : 
     353             :         //  Close the join message
     354          12 :         rc = msg_->close ();
     355          12 :         errno_assert (rc == 0);
     356             : 
     357          12 :         *msg_ = command;
     358             : 
     359             :         return 0;
     360             :     }
     361             : }
     362             : 
     363           1 : void zmq::dish_session_t::reset ()
     364             : {
     365           1 :     session_base_t::reset ();
     366           1 :     state = group;
     367           1 : }

Generated by: LCOV version 1.10