LCOV - code coverage report
Current view: top level - home/h/core/forks/m4-libzmq/src - xsub.cpp (source / functions) Hit Total Coverage
Test: zeromq-4.2.0 Code Coverage Lines: 73 92 79.3 %
Date: 2016-05-09 Functions: 11 15 73.3 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*
       2             :     Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
       3             : 
       4             :     This file is part of libzmq, the ZeroMQ core engine in C++.
       5             : 
       6             :     libzmq is free software; you can redistribute it and/or modify it under
       7             :     the terms of the GNU Lesser General Public License (LGPL) as published
       8             :     by the Free Software Foundation; either version 3 of the License, or
       9             :     (at your option) any later version.
      10             : 
      11             :     As a special exception, the Contributors give you permission to link
      12             :     this library with independent modules to produce an executable,
      13             :     regardless of the license terms of these independent modules, and to
      14             :     copy and distribute the resulting executable under terms of your choice,
      15             :     provided that you also meet, for each linked independent module, the
      16             :     terms and conditions of the license of that module. An independent
      17             :     module is a module which is not derived from or based on this library.
      18             :     If you modify this library, you must extend this exception to your
      19             :     version of the library.
      20             : 
      21             :     libzmq is distributed in the hope that it will be useful, but WITHOUT
      22             :     ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
      23             :     FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
      24             :     License for more details.
      25             : 
      26             :     You should have received a copy of the GNU Lesser General Public License
      27             :     along with this program.  If not, see <http://www.gnu.org/licenses/>.
      28             : */
      29             : 
      30             : #include "precompiled.hpp"
      31             : #include <string.h>
      32             : 
      33             : #include "macros.hpp"
      34             : #include "xsub.hpp"
      35             : #include "err.hpp"
      36             : 
      37        3261 : zmq::xsub_t::xsub_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
      38             :     socket_base_t (parent_, tid_, sid_),
      39             :     has_message (false),
      40        3261 :     more (false)
      41             : {
      42        3261 :     options.type = ZMQ_XSUB;
      43             : 
      44             :     //  When socket is being closed down we don't want to wait till pending
      45             :     //  subscription commands are sent to the wire.
      46        3261 :     options.linger = 0;
      47             : 
      48        3261 :     int rc = message.init ();
      49        3261 :     errno_assert (rc == 0);
      50        3261 : }
      51             : 
      52        6540 : zmq::xsub_t::~xsub_t ()
      53             : {
      54        3261 :     int rc = message.close ();
      55        3261 :     errno_assert (rc == 0);
      56        3279 : }
      57             : 
      58        3260 : void zmq::xsub_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
      59             : {
      60             :     LIBZMQ_UNUSED (subscribe_to_all_);
      61             : 
      62        3260 :     zmq_assert (pipe_);
      63        3260 :     fq.attach (pipe_);
      64        3261 :     dist.attach (pipe_);
      65             : 
      66             :     //  Send all the cached subscriptions to the new upstream peer.
      67        3261 :     subscriptions.apply (send_subscription, pipe_);
      68        3261 :     pipe_->flush ();
      69        3261 : }
      70             : 
      71          60 : void zmq::xsub_t::xread_activated (pipe_t *pipe_)
      72             : {
      73          60 :     fq.activated (pipe_);
      74          60 : }
      75             : 
      76           0 : void zmq::xsub_t::xwrite_activated (pipe_t *pipe_)
      77             : {
      78           0 :     dist.activated (pipe_);
      79           0 : }
      80             : 
      81        3261 : void zmq::xsub_t::xpipe_terminated (pipe_t *pipe_)
      82             : {
      83        3261 :     fq.pipe_terminated (pipe_);
      84        3261 :     dist.pipe_terminated (pipe_);
      85        3261 : }
      86             : 
      87           0 : void zmq::xsub_t::xhiccuped (pipe_t *pipe_)
      88             : {
      89             :     //  Send all the cached subscriptions to the hiccuped pipe.
      90           0 :     subscriptions.apply (send_subscription, pipe_);
      91           0 :     pipe_->flush ();
      92           0 : }
      93             : 
      94         132 : int zmq::xsub_t::xsend (msg_t *msg_)
      95             : {
      96         132 :     size_t size = msg_->size ();
      97         132 :     unsigned char *data = (unsigned char *) msg_->data ();
      98             : 
      99         132 :     if (size > 0 && *data == 1) {
     100             :         //  Process subscribe message
     101             :         //  This used to filter out duplicate subscriptions,
     102             :         //  however this is alread done on the XPUB side and
     103             :         //  doing it here as well breaks ZMQ_XPUB_VERBOSE
     104             :         //  when there are forwarding devices involved.
     105         120 :         subscriptions.add (data + 1, size - 1);
     106         120 :         return dist.send_to_all (msg_);
     107             :     }
     108             :     else
     109          12 :     if (size > 0 && *data == 0) {
     110             :         //  Process unsubscribe message
     111          12 :         if (subscriptions.rm (data + 1, size - 1))
     112           6 :             return dist.send_to_all (msg_);
     113             :     }
     114             :     else
     115             :         //  User message sent upstream to XPUB socket
     116           0 :         return dist.send_to_all (msg_);
     117             : 
     118           6 :     int rc = msg_->close ();
     119           6 :     errno_assert (rc == 0);
     120           6 :     rc = msg_->init ();
     121           6 :     errno_assert (rc == 0);
     122             : 
     123             :     return 0;
     124             : }
     125             : 
     126           0 : bool zmq::xsub_t::xhas_out ()
     127             : {
     128             :     //  Subscription can be added/removed anytime.
     129           0 :     return true;
     130             : }
     131             : 
     132     3230792 : int zmq::xsub_t::xrecv (msg_t *msg_)
     133             : {
     134             :     //  If there's already a message prepared by a previous call to zmq_poll,
     135             :     //  return it straight ahead.
     136     3230792 :     if (has_message) {
     137          27 :         int rc = msg_->move (message);
     138          27 :         errno_assert (rc == 0);
     139          27 :         has_message = false;
     140          27 :         more = msg_->flags () & msg_t::more ? true : false;
     141          27 :         return 0;
     142             :     }
     143             : 
     144             :     //  TODO: This can result in infinite loop in the case of continuous
     145             :     //  stream of non-matching messages which breaks the non-blocking recv
     146             :     //  semantics.
     147             :     while (true) {
     148             : 
     149             :         //  Get a message using fair queueing algorithm.
     150     3230765 :         int rc = fq.recv (msg_);
     151             : 
     152             :         //  If there's no message available, return immediately.
     153             :         //  The same when error occurs.
     154     3229620 :         if (rc != 0)
     155             :             return -1;
     156             : 
     157             :         //  Check whether the message matches at least one subscription.
     158             :         //  Non-initial parts of the message are passed
     159       63432 :         if (more || !options.filter || match (msg_)) {
     160       63432 :             more = msg_->flags () & msg_t::more ? true : false;
     161       63432 :             return 0;
     162             :         }
     163             : 
     164             :         //  Message doesn't match. Pop any remaining parts of the message
     165             :         //  from the pipe.
     166           0 :         while (msg_->flags () & msg_t::more) {
     167           0 :             rc = fq.recv (msg_);
     168           0 :             errno_assert (rc == 0);
     169             :         }
     170             :     }
     171             : }
     172             : 
     173      795892 : bool zmq::xsub_t::xhas_in ()
     174             : {
     175             :     //  There are subsequent parts of the partly-read message available.
     176      795892 :     if (more)
     177             :         return true;
     178             : 
     179             :     //  If there's already a message prepared by a previous call to zmq_poll,
     180             :     //  return straight ahead.
     181      795892 :     if (has_message)
     182             :         return true;
     183             : 
     184             :     //  TODO: This can result in infinite loop in the case of continuous
     185             :     //  stream of non-matching messages.
     186             :     while (true) {
     187             : 
     188             :         //  Get a message using fair queueing algorithm.
     189      266051 :         int rc = fq.recv (&message);
     190             : 
     191             :         //  If there's no message available, return immediately.
     192             :         //  The same when error occurs.
     193      266051 :         if (rc != 0) {
     194      266021 :             errno_assert (errno == EAGAIN);
     195             :             return false;
     196             :         }
     197             : 
     198             :         //  Check whether the message matches at least one subscription.
     199          30 :         if (!options.filter || match (&message)) {
     200          30 :             has_message = true;
     201          30 :             return true;
     202             :         }
     203             : 
     204             :         //  Message doesn't match. Pop any remaining parts of the message
     205             :         //  from the pipe.
     206           0 :         while (message.flags () & msg_t::more) {
     207           0 :             rc = fq.recv (&message);
     208           0 :             errno_assert (rc == 0);
     209             :         }
     210             :     }
     211             : }
     212             : 
     213           0 : zmq::blob_t zmq::xsub_t::get_credential () const
     214             : {
     215           0 :     return fq.get_credential ();
     216             : }
     217             : 
     218       63387 : bool zmq::xsub_t::match (msg_t *msg_)
     219             : {
     220       63387 :     bool matching = subscriptions.check ((unsigned char*) msg_->data (), msg_->size ());
     221             : 
     222       63387 :     return matching ^ options.invert_matching;
     223             : }
     224             : 
     225          45 : void zmq::xsub_t::send_subscription (unsigned char *data_, size_t size_,
     226             :     void *arg_)
     227             : {
     228          45 :     pipe_t *pipe = (pipe_t*) arg_;
     229             : 
     230             :     //  Create the subscription message.
     231             :     msg_t msg;
     232          45 :     int rc = msg.init_size (size_ + 1);
     233          45 :     errno_assert (rc == 0);
     234          45 :     unsigned char *data = (unsigned char*) msg.data ();
     235          45 :     data [0] = 1;
     236             : 
     237             :     //  We explicitly allow a NULL subscription with size zero
     238          45 :     if (size_) {
     239             :         assert (data_);
     240           6 :         memcpy (data + 1, data_, size_);
     241             :     }
     242             : 
     243             :     //  Send it to the pipe.
     244          45 :     bool sent = pipe->write (&msg);
     245             :     //  If we reached the SNDHWM, and thus cannot send the subscription, drop
     246             :     //  the subscription message instead. This matches the behaviour of
     247             :     //  zmq_setsockopt(ZMQ_SUBSCRIBE, ...), which also drops subscriptions
     248             :     //  when the SNDHWM is reached.
     249          45 :     if (!sent)
     250           0 :         msg.close ();
     251          45 : }

Generated by: LCOV version 1.10