LCOV - code coverage report
Current view: top level - home/h/core/forks/m4-libzmq/src - xpub.cpp (source / functions) Hit Total Coverage
Test: zeromq-4.2.0 Code Coverage Lines: 125 141 88.7 %
Date: 2016-05-09 Functions: 13 14 92.9 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*
       2             :     Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
       3             : 
       4             :     This file is part of libzmq, the ZeroMQ core engine in C++.
       5             : 
       6             :     libzmq is free software; you can redistribute it and/or modify it under
       7             :     the terms of the GNU Lesser General Public License (LGPL) as published
       8             :     by the Free Software Foundation; either version 3 of the License, or
       9             :     (at your option) any later version.
      10             : 
      11             :     As a special exception, the Contributors give you permission to link
      12             :     this library with independent modules to produce an executable,
      13             :     regardless of the license terms of these independent modules, and to
      14             :     copy and distribute the resulting executable under terms of your choice,
      15             :     provided that you also meet, for each linked independent module, the
      16             :     terms and conditions of the license of that module. An independent
      17             :     module is a module which is not derived from or based on this library.
      18             :     If you modify this library, you must extend this exception to your
      19             :     version of the library.
      20             : 
      21             :     libzmq is distributed in the hope that it will be useful, but WITHOUT
      22             :     ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
      23             :     FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
      24             :     License for more details.
      25             : 
      26             :     You should have received a copy of the GNU Lesser General Public License
      27             :     along with this program.  If not, see <http://www.gnu.org/licenses/>.
      28             : */
      29             : 
      30             : #include "precompiled.hpp"
      31             : #include <string.h>
      32             : 
      33             : #include "xpub.hpp"
      34             : #include "pipe.hpp"
      35             : #include "err.hpp"
      36             : #include "msg.hpp"
      37             : 
      38         258 : zmq::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
      39             :     socket_base_t (parent_, tid_, sid_),
      40             :     verbose_subs (false),
      41             :     verbose_unsubs (false),
      42             :     more (false),
      43             :     lossy (true),
      44             :     manual(false),
      45             :     pending_pipes (),
      46        1032 :     welcome_msg ()
      47             : {
      48         258 :     last_pipe = NULL;
      49         258 :     options.type = ZMQ_XPUB;
      50         258 :     welcome_msg.init();
      51         258 : }
      52             : 
      53        1314 : zmq::xpub_t::~xpub_t ()
      54             : {
      55         258 :     welcome_msg.close();
      56         282 : }
      57             : 
      58        3028 : void zmq::xpub_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
      59             : {
      60        3028 :     zmq_assert (pipe_);
      61        3028 :     dist.attach (pipe_);
      62             : 
      63             :     //  If subscribe_to_all_ is specified, the caller would like to subscribe
      64             :     //  to all data on this pipe, implicitly.
      65        3028 :     if (subscribe_to_all_)
      66           0 :         subscriptions.add (NULL, 0, pipe_);
      67             : 
      68             :     // if welcome message exist
      69        3028 :     if (welcome_msg.size() > 0)
      70             :     {
      71             :         msg_t copy;
      72           3 :         copy.init();
      73           3 :         copy.copy(welcome_msg);
      74             : 
      75           3 :         pipe_->write(&copy);
      76           3 :         pipe_->flush();
      77             :     }
      78             : 
      79             :     //  The pipe is active when attached. Let's read the subscriptions from
      80             :     //  it, if any.
      81        3028 :     xread_activated (pipe_);
      82        3028 : }
      83             : 
      84        3073 : void zmq::xpub_t::xread_activated (pipe_t *pipe_)
      85             : {
      86             :     //  There are some subscriptions waiting. Let's process them.
      87             :     msg_t sub;
      88        6272 :     while (pipe_->read (&sub)) {
      89             :         //  Apply the subscription to the trie
      90         126 :         unsigned char *const data = (unsigned char *) sub.data ();
      91         126 :         const size_t size = sub.size ();
      92         126 :         if (size > 0 && (*data == 0 || *data == 1)) {
      93         126 :             if (manual)
      94             :             {
      95          27 :                 pending_pipes.push_back(pipe_);
      96          54 :                 pending_data.push_back(blob_t(data, size));
      97          54 :                 pending_metadata.push_back(sub.metadata());
      98          54 :                 pending_flags.push_back(0);
      99             :             }
     100             :             else
     101             :             {
     102             :                 bool unique;
     103          99 :                 if (*data == 0)
     104           6 :                     unique = subscriptions.rm(data + 1, size - 1, pipe_);
     105             :                 else
     106          93 :                     unique = subscriptions.add(data + 1, size - 1, pipe_);
     107             : 
     108             :                 //  If the (un)subscription is not a duplicate store it so that it can be
     109             :                 //  passed to the user on next recv call unless verbose mode is enabled
     110             :                 //  which makes to pass always these messages.
     111          99 :                 if (options.type == ZMQ_XPUB && (unique || (*data == 1 && verbose_subs) ||
     112           0 :                         (*data == 0 && verbose_unsubs && verbose_subs))) {
     113          18 :                     pending_data.push_back(blob_t(data, size));
     114          18 :                     pending_metadata.push_back(sub.metadata());
     115          18 :                     pending_flags.push_back(0);
     116             :                 }
     117             :             }
     118             :         }
     119             :         else {
     120             :             //  Process user message coming upstream from xsub socket
     121           0 :             pending_data.push_back (blob_t (data, size));
     122           0 :             pending_metadata.push_back (sub.metadata ());
     123           0 :             pending_flags.push_back (sub.flags ());
     124             :         }
     125         126 :         sub.close ();
     126             :     }
     127        3073 : }
     128             : 
     129           0 : void zmq::xpub_t::xwrite_activated (pipe_t *pipe_)
     130             : {
     131           0 :     dist.activated (pipe_);
     132           0 : }
     133             : 
     134          84 : int zmq::xpub_t::xsetsockopt (int option_, const void *optval_,
     135             :     size_t optvallen_)
     136             : {
     137         168 :     if (option_ == ZMQ_XPUB_VERBOSE
     138          84 :      || option_ == ZMQ_XPUB_VERBOSER
     139          84 :      || option_ == ZMQ_XPUB_NODROP
     140          84 :      || option_ == ZMQ_XPUB_MANUAL) {
     141          21 :         if (optvallen_ != sizeof(int) || *static_cast <const int*> (optval_) < 0) {
     142           0 :             errno = EINVAL;
     143           0 :             return -1;
     144             :         }
     145          21 :         if (option_ == ZMQ_XPUB_VERBOSE) {
     146           0 :             verbose_subs = (*static_cast <const int*> (optval_) != 0);
     147           0 :             verbose_unsubs = 0;
     148             :         }
     149             :         else
     150          21 :         if (option_ == ZMQ_XPUB_VERBOSER) {
     151           0 :             verbose_subs = (*static_cast <const int*> (optval_) != 0);
     152           0 :             verbose_unsubs = verbose_subs;
     153             :         }
     154             :         else
     155          21 :         if (option_ == ZMQ_XPUB_NODROP)
     156           6 :             lossy = (*static_cast <const int*> (optval_) == 0);
     157             :         else
     158          15 :         if (option_ == ZMQ_XPUB_MANUAL)
     159          15 :             manual = (*static_cast <const int*> (optval_) != 0);
     160             :     }
     161             :     else
     162          63 :     if (option_ == ZMQ_SUBSCRIBE && manual) {
     163          27 :         if (last_pipe != NULL)
     164          27 :             subscriptions.add ((unsigned char *)optval_, optvallen_, last_pipe);
     165             :     }
     166             :     else
     167          36 :     if (option_ == ZMQ_UNSUBSCRIBE && manual) {
     168          12 :         if (last_pipe != NULL)
     169           0 :             subscriptions.rm ((unsigned char *)optval_, optvallen_, last_pipe);
     170             :     }
     171             :     else
     172          24 :     if (option_ == ZMQ_XPUB_WELCOME_MSG) {
     173           3 :         welcome_msg.close();
     174             : 
     175           3 :         if (optvallen_ > 0) {
     176           3 :             int rc = welcome_msg.init_size(optvallen_);
     177           3 :             errno_assert(rc == 0);
     178             : 
     179           3 :             unsigned char *data = (unsigned char*)welcome_msg.data();
     180             :             memcpy(data, optval_, optvallen_);
     181             :         }
     182             :         else
     183           0 :             welcome_msg.init();
     184             :     }
     185             :     else {
     186          21 :         errno = EINVAL;
     187          21 :         return -1;
     188             :     }
     189             :     return 0;
     190             : }
     191             : 
     192        3028 : void zmq::xpub_t::xpipe_terminated (pipe_t *pipe_)
     193             : {
     194             :     //  Remove the pipe from the trie. If there are topics that nobody
     195             :     //  is interested in anymore, send corresponding unsubscriptions
     196             :     //  upstream.
     197        3028 :     subscriptions.rm (pipe_, send_unsubscription, this, !(verbose_unsubs || manual));
     198             : 
     199        3028 :     dist.pipe_terminated (pipe_);
     200        3028 : }
     201             : 
     202       75218 : void zmq::xpub_t::mark_as_matching (pipe_t *pipe_, void *arg_)
     203             : {
     204       75218 :     xpub_t *self = (xpub_t*) arg_;
     205       75218 :     self->dist.match (pipe_);
     206       75218 : }
     207             : 
     208       75266 : int zmq::xpub_t::xsend (msg_t *msg_)
     209             : {
     210       75266 :     bool msg_more = msg_->flags () & msg_t::more ? true : false;
     211             : 
     212             :     //  For the first part of multi-part message, find the matching pipes.
     213       75266 :     if (!more) {
     214       75206 :         subscriptions.match ((unsigned char*) msg_->data (), msg_->size (),
     215      150412 :             mark_as_matching, this);
     216             :         // If inverted matching is used, reverse the selection now
     217       75206 :         if (options.invert_matching) {
     218           6 :             dist.reverse_match();
     219             :         }
     220             :     }
     221             : 
     222       75266 :     int rc = -1;            //  Assume we fail
     223       75266 :     if (lossy || dist.check_hwm ()) {
     224       63465 :         if (dist.send_to_matching (msg_) == 0) {
     225             :             //  If we are at the end of multi-part message we can mark
     226             :             //  all the pipes as non-matching.
     227       63465 :             if (!msg_more)
     228       63405 :                 dist.unmatch ();
     229       63465 :             more = msg_more;
     230       63465 :             rc = 0;         //  Yay, sent successfully
     231             :         }
     232             :     }
     233             :     else
     234       11801 :         errno = EAGAIN;
     235       75266 :     return rc;
     236             : }
     237             : 
     238          33 : bool zmq::xpub_t::xhas_out ()
     239             : {
     240          33 :     return dist.has_out ();
     241             : }
     242             : 
     243          96 : int zmq::xpub_t::xrecv (msg_t *msg_)
     244             : {
     245             :     //  If there is at least one
     246         192 :     if (pending_data.empty ()) {
     247          45 :         errno = EAGAIN;
     248          45 :         return -1;
     249             :     }
     250             : 
     251             :     // User is reading a message, set last_pipe and remove it from the deque
     252          90 :     if (manual && !pending_pipes.empty ()) {
     253          78 :         last_pipe = pending_pipes.front ();
     254          39 :         pending_pipes.pop_front ();
     255             :     }
     256             : 
     257          51 :     int rc = msg_->close ();
     258          51 :     errno_assert (rc == 0);
     259         153 :     rc = msg_->init_size (pending_data.front ().size ());
     260          51 :     errno_assert (rc == 0);
     261             :     memcpy (msg_->data (),
     262         102 :         pending_data.front ().data (),
     263         153 :         pending_data.front ().size ());
     264             : 
     265             :     // set metadata only if there is some
     266         102 :     if (metadata_t* metadata = pending_metadata.front ()) {
     267          27 :         msg_->set_metadata (metadata);
     268             :     }
     269             : 
     270         102 :     msg_->set_flags (pending_flags.front ());
     271          51 :     pending_data.pop_front ();
     272          51 :     pending_metadata.pop_front ();
     273          51 :     pending_flags.pop_front ();
     274          51 :     return 0;
     275             : }
     276             : 
     277          33 : bool zmq::xpub_t::xhas_in ()
     278             : {
     279          66 :     return !pending_data.empty ();
     280             : }
     281             : 
     282          84 : void zmq::xpub_t::send_unsubscription (unsigned char *data_, size_t size_,
     283             :     void *arg_)
     284             : {
     285          84 :     xpub_t *self = (xpub_t*) arg_;
     286             : 
     287          84 :     if (self->options.type != ZMQ_PUB) {
     288             :         //  Place the unsubscription to the queue of pending (un)subscriptions
     289             :         //  to be retrieved by the user later on.
     290          72 :         blob_t unsub (size_ + 1, 0);
     291          36 :         unsub [0] = 0;
     292          36 :         if (size_ > 0)
     293             :             memcpy (&unsub [1], data_, size_);
     294          36 :         self->pending_data.push_back (unsub);
     295          72 :         self->pending_metadata.push_back (NULL);
     296          72 :         self->pending_flags.push_back (0);
     297             : 
     298          36 :         if (self->manual) {
     299          27 :             self->last_pipe = NULL;
     300          54 :             self->pending_pipes.push_back (NULL);
     301             :         }
     302             :     }
     303          84 : }

Generated by: LCOV version 1.10