LCOV - code coverage report
Current view: top level - home/h/core/forks/m4-libzmq/src - stream.cpp (source / functions) Hit Total Coverage
Test: zeromq-4.2.0 Code Coverage Lines: 133 149 89.3 %
Date: 2016-05-09 Functions: 12 13 92.3 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*
       2             :     Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
       3             : 
       4             :     This file is part of libzmq, the ZeroMQ core engine in C++.
       5             : 
       6             :     libzmq is free software; you can redistribute it and/or modify it under
       7             :     the terms of the GNU Lesser General Public License (LGPL) as published
       8             :     by the Free Software Foundation; either version 3 of the License, or
       9             :     (at your option) any later version.
      10             : 
      11             :     As a special exception, the Contributors give you permission to link
      12             :     this library with independent modules to produce an executable,
      13             :     regardless of the license terms of these independent modules, and to
      14             :     copy and distribute the resulting executable under terms of your choice,
      15             :     provided that you also meet, for each linked independent module, the
      16             :     terms and conditions of the license of that module. An independent
      17             :     module is a module which is not derived from or based on this library.
      18             :     If you modify this library, you must extend this exception to your
      19             :     version of the library.
      20             : 
      21             :     libzmq is distributed in the hope that it will be useful, but WITHOUT
      22             :     ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
      23             :     FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
      24             :     License for more details.
      25             : 
      26             :     You should have received a copy of the GNU Lesser General Public License
      27             :     along with this program.  If not, see <http://www.gnu.org/licenses/>.
      28             : */
      29             : 
      30             : #include "precompiled.hpp"
      31             : #include "macros.hpp"
      32             : #include "stream.hpp"
      33             : #include "pipe.hpp"
      34             : #include "wire.hpp"
      35             : #include "random.hpp"
      36             : #include "likely.hpp"
      37             : #include "err.hpp"
      38             : 
      39          33 : zmq::stream_t::stream_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
      40             :     socket_base_t (parent_, tid_, sid_),
      41             :     prefetched (false),
      42             :     identity_sent (false),
      43             :     current_out (NULL),
      44             :     more_out (false),
      45          66 :     next_rid (generate_random ())
      46             : {
      47          33 :     options.type = ZMQ_STREAM;
      48          33 :     options.raw_socket = true;
      49             : 
      50          33 :     prefetched_id.init ();
      51          33 :     prefetched_msg.init ();
      52          33 : }
      53             : 
      54         132 : zmq::stream_t::~stream_t ()
      55             : {
      56          66 :     zmq_assert (outpipes.empty ());
      57          33 :     prefetched_id.close ();
      58          33 :     prefetched_msg.close ();
      59          66 : }
      60             : 
      61          33 : void zmq::stream_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
      62             : {
      63             :         LIBZMQ_UNUSED(subscribe_to_all_);
      64             : 
      65          33 :     zmq_assert (pipe_);
      66             : 
      67          33 :     identify_peer (pipe_);
      68          33 :     fq.attach (pipe_);
      69          33 : }
      70             : 
      71          33 : void zmq::stream_t::xpipe_terminated (pipe_t *pipe_)
      72             : {
      73          99 :     outpipes_t::iterator it = outpipes.find (pipe_->get_identity ());
      74          66 :     zmq_assert (it != outpipes.end ());
      75          33 :     outpipes.erase (it);
      76          33 :     fq.pipe_terminated (pipe_);
      77          33 :     if (pipe_ == current_out)
      78           0 :         current_out = NULL;
      79          33 : }
      80             : 
      81          36 : void zmq::stream_t::xread_activated (pipe_t *pipe_)
      82             : {
      83          36 :     fq.activated (pipe_);
      84          36 : }
      85             : 
      86           0 : void zmq::stream_t::xwrite_activated (pipe_t *pipe_)
      87             : {
      88             :     outpipes_t::iterator it;
      89           0 :     for (it = outpipes.begin (); it != outpipes.end (); ++it)
      90           0 :         if (it->second.pipe == pipe_)
      91             :             break;
      92             : 
      93           0 :     zmq_assert (it != outpipes.end ());
      94           0 :     zmq_assert (!it->second.active);
      95           0 :     it->second.active = true;
      96           0 : }
      97             : 
      98          66 : int zmq::stream_t::xsend (msg_t *msg_)
      99             : {
     100             :     //  If this is the first part of the message it's the ID of the
     101             :     //  peer to send the message to.
     102          66 :     if (!more_out) {
     103          33 :         zmq_assert (!current_out);
     104             : 
     105             :         //  If we have malformed message (prefix with no subsequent message)
     106             :         //  then just silently ignore it.
     107             :         //  TODO: The connections should be killed instead.
     108          33 :         if (msg_->flags () & msg_t::more) {
     109             : 
     110             :             //  Find the pipe associated with the identity stored in the prefix.
     111             :             //  If there's no such pipe return an error
     112          66 :             blob_t identity ((unsigned char*) msg_->data (), msg_->size ());
     113          66 :             outpipes_t::iterator it = outpipes.find (identity);
     114             : 
     115          66 :             if (it != outpipes.end ()) {
     116          33 :                 current_out = it->second.pipe;
     117          33 :                 if (!current_out->check_write ()) {
     118           0 :                     it->second.active = false;
     119           0 :                     current_out = NULL;
     120           0 :                     errno = EAGAIN;
     121           0 :                     return -1;
     122             :                 }
     123             :             }
     124             :             else {
     125           0 :                 errno = EHOSTUNREACH;
     126           0 :                 return -1;
     127             :             }
     128             :         }
     129             : 
     130             :         //  Expect one more message frame.
     131          33 :         more_out = true;
     132             : 
     133          33 :         int rc = msg_->close ();
     134          33 :         errno_assert (rc == 0);
     135          33 :         rc = msg_->init ();
     136          33 :         errno_assert (rc == 0);
     137             :         return 0;
     138             :     }
     139             : 
     140             :     //  Ignore the MORE flag
     141          33 :     msg_->reset_flags (msg_t::more);
     142             : 
     143             :     //  This is the last part of the message.
     144          33 :     more_out = false;
     145             : 
     146             :     //  Push the message into the pipe. If there's no out pipe, just drop it.
     147          33 :     if (current_out) {
     148             : 
     149             :         // Close the remote connection if user has asked to do so
     150             :         // by sending zero length message.
     151             :         // Pending messages in the pipe will be dropped (on receiving term- ack)
     152          33 :         if (msg_->size () == 0) {
     153           9 :             current_out->terminate (false);
     154           9 :             int rc = msg_->close ();
     155           9 :             errno_assert (rc == 0);
     156           9 :             rc = msg_->init ();
     157           9 :             errno_assert (rc == 0);
     158           9 :             current_out = NULL;
     159           9 :             return 0;
     160             :         }
     161          24 :         bool ok = current_out->write (msg_);
     162          24 :         if (likely (ok))
     163          24 :             current_out->flush ();
     164          24 :         current_out = NULL;
     165             :     }
     166             :     else {
     167           0 :         int rc = msg_->close ();
     168           0 :         errno_assert (rc == 0);
     169             :     }
     170             : 
     171             :     //  Detach the message from the data buffer.
     172          24 :     int rc = msg_->init ();
     173          24 :     errno_assert (rc == 0);
     174             : 
     175             :     return 0;
     176             : }
     177             : 
     178          39 : int zmq::stream_t::xsetsockopt (int option_, const void *optval_,
     179             :     size_t optvallen_)
     180             : {
     181          39 :     bool is_int = (optvallen_ == sizeof (int));
     182          39 :     int value = 0;
     183          39 :     if (is_int) memcpy(&value, optval_, sizeof (int));
     184             : 
     185          39 :     switch (option_) {
     186             :         case ZMQ_CONNECT_RID:
     187           3 :             if (optval_ && optvallen_) {
     188           3 :                 connect_rid.assign ((char*) optval_, optvallen_);
     189             :                 return 0;
     190             :             }
     191             :             break;
     192             : 
     193             :         case ZMQ_STREAM_NOTIFY:
     194          18 :             if (is_int && (value == 0 || value == 1)) {
     195          18 :                 options.raw_notify = (value != 0);
     196          18 :                 return 0;
     197             :             }
     198             :             break;
     199             : 
     200             :         default:
     201             :             break;
     202             :     }
     203          18 :     errno = EINVAL;
     204          18 :     return -1;
     205             : }
     206             : 
     207         206 : int zmq::stream_t::xrecv (msg_t *msg_)
     208             : {
     209         206 :     if (prefetched) {
     210          81 :         if (!identity_sent) {
     211           9 :             int rc = msg_->move (prefetched_id);
     212           9 :             errno_assert (rc == 0);
     213           9 :             identity_sent = true;
     214             :         }
     215             :         else {
     216          72 :             int rc = msg_->move (prefetched_msg);
     217          72 :             errno_assert (rc == 0);
     218          72 :             prefetched = false;
     219             :         }
     220             :         return 0;
     221             :     }
     222             : 
     223         125 :     pipe_t *pipe = NULL;
     224         125 :     int rc = fq.recvpipe (&prefetched_msg, &pipe);
     225         125 :     if (rc != 0)
     226             :         return -1;
     227             : 
     228          66 :     zmq_assert (pipe != NULL);
     229          66 :     zmq_assert ((prefetched_msg.flags () & msg_t::more) == 0);
     230             : 
     231             :     //  We have received a frame with TCP data.
     232             :     //  Rather than sending this frame, we keep it in prefetched
     233             :     //  buffer and send a frame with peer's ID.
     234          66 :     blob_t identity = pipe->get_identity ();
     235          66 :     rc = msg_->close();
     236          66 :     errno_assert (rc == 0);
     237          66 :     rc = msg_->init_size (identity.size ());
     238          66 :     errno_assert (rc == 0);
     239             : 
     240             :     // forward metadata (if any)
     241          66 :     metadata_t *metadata = prefetched_msg.metadata();
     242          66 :     if (metadata)
     243          66 :         msg_->set_metadata(metadata);
     244             : 
     245          66 :     memcpy (msg_->data (), identity.data (), identity.size ());
     246          66 :     msg_->set_flags (msg_t::more);
     247             : 
     248          66 :     prefetched = true;
     249          66 :     identity_sent = true;
     250             : 
     251          66 :     return 0;
     252             : }
     253             : 
     254          42 : bool zmq::stream_t::xhas_in ()
     255             : {
     256             :     //  We may already have a message pre-fetched.
     257          42 :     if (prefetched)
     258             :         return true;
     259             : 
     260             :     //  Try to read the next message.
     261             :     //  The message, if read, is kept in the pre-fetch buffer.
     262          42 :     pipe_t *pipe = NULL;
     263          42 :     int rc = fq.recvpipe (&prefetched_msg, &pipe);
     264          42 :     if (rc != 0)
     265             :         return false;
     266             : 
     267           9 :     zmq_assert (pipe != NULL);
     268           9 :     zmq_assert ((prefetched_msg.flags () & msg_t::more) == 0);
     269             : 
     270           9 :     blob_t identity = pipe->get_identity ();
     271           9 :     rc = prefetched_id.init_size (identity.size ());
     272           9 :     errno_assert (rc == 0);
     273             : 
     274             :     // forward metadata (if any)
     275           9 :     metadata_t *metadata = prefetched_msg.metadata();
     276           9 :     if (metadata)
     277           9 :         prefetched_id.set_metadata(metadata);
     278             : 
     279           9 :     memcpy (prefetched_id.data (), identity.data (), identity.size ());
     280           9 :     prefetched_id.set_flags (msg_t::more);
     281             : 
     282           9 :     prefetched = true;
     283           9 :     identity_sent = false;
     284             : 
     285           9 :     return true;
     286             : }
     287             : 
     288          42 : bool zmq::stream_t::xhas_out ()
     289             : {
     290             :     //  In theory, STREAM socket is always ready for writing. Whether actual
     291             :     //  attempt to write succeeds depends on which pipe the message is going
     292             :     //  to be routed to.
     293          42 :     return true;
     294             : }
     295             : 
     296          33 : void zmq::stream_t::identify_peer (pipe_t *pipe_)
     297             : {
     298             :     //  Always assign identity for raw-socket
     299             :     unsigned char buffer [5];
     300          33 :     buffer [0] = 0;
     301             :     blob_t identity;
     302          66 :     if (connect_rid.length ()) {
     303           6 :         identity = blob_t ((unsigned char*) connect_rid.c_str(),
     304             :             connect_rid.length ());
     305           3 :         connect_rid.clear ();
     306           6 :         outpipes_t::iterator it = outpipes.find (identity);
     307           6 :         zmq_assert (it == outpipes.end ());
     308             :     }
     309             :     else {
     310          30 :         put_uint32 (buffer + 1, next_rid++);
     311          30 :         identity = blob_t (buffer, sizeof buffer);
     312          30 :         memcpy (options.identity, identity.data (), identity.size ());
     313          30 :         options.identity_size = (unsigned char) identity.size ();
     314             :     }
     315          33 :     pipe_->set_identity (identity);
     316             :     //  Add the record into output pipes lookup table
     317          33 :     outpipe_t outpipe = {pipe_, true};
     318             :     const bool ok = outpipes.insert (
     319          99 :         outpipes_t::value_type (identity, outpipe)).second;
     320          33 :     zmq_assert (ok);
     321          33 : }

Generated by: LCOV version 1.10