LCOV - code coverage report
Current view: top level - home/h/core/forks/m4-libzmq/src - req.cpp (source / functions) Hit Total Coverage
Test: zeromq-4.2.0 Code Coverage Lines: 109 129 84.5 %
Date: 2016-05-09 Functions: 11 16 68.8 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*
       2             :     Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
       3             : 
       4             :     This file is part of libzmq, the ZeroMQ core engine in C++.
       5             : 
       6             :     libzmq is free software; you can redistribute it and/or modify it under
       7             :     the terms of the GNU Lesser General Public License (LGPL) as published
       8             :     by the Free Software Foundation; either version 3 of the License, or
       9             :     (at your option) any later version.
      10             : 
      11             :     As a special exception, the Contributors give you permission to link
      12             :     this library with independent modules to produce an executable,
      13             :     regardless of the license terms of these independent modules, and to
      14             :     copy and distribute the resulting executable under terms of your choice,
      15             :     provided that you also meet, for each linked independent module, the
      16             :     terms and conditions of the license of that module. An independent
      17             :     module is a module which is not derived from or based on this library.
      18             :     If you modify this library, you must extend this exception to your
      19             :     version of the library.
      20             : 
      21             :     libzmq is distributed in the hope that it will be useful, but WITHOUT
      22             :     ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
      23             :     FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
      24             :     License for more details.
      25             : 
      26             :     You should have received a copy of the GNU Lesser General Public License
      27             :     along with this program.  If not, see <http://www.gnu.org/licenses/>.
      28             : */
      29             : 
      30             : #include "precompiled.hpp"
      31             : #include "req.hpp"
      32             : #include "err.hpp"
      33             : #include "msg.hpp"
      34             : #include "wire.hpp"
      35             : #include "random.hpp"
      36             : #include "likely.hpp"
      37             : 
      38             : extern "C"
      39             : {
      40          27 :     static void free_id (void *data, void *hint)
      41             :     {
      42          27 :         free (data);
      43          27 :     }
      44             : }
      45             : 
      46         108 : zmq::req_t::req_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
      47             :     dealer_t (parent_, tid_, sid_),
      48             :     receiving_reply (false),
      49             :     message_begins (true),
      50             :     reply_pipe (NULL),
      51             :     request_id_frames_enabled (false),
      52         108 :     request_id (generate_random ()),
      53         216 :     strict (true)
      54             : {
      55         108 :     options.type = ZMQ_REQ;
      56         108 : }
      57             : 
      58         108 : zmq::req_t::~req_t ()
      59             : {
      60         108 : }
      61             : 
      62         207 : int zmq::req_t::xsend (msg_t *msg_)
      63             : {
      64             :     //  If we've sent a request and we still haven't got the reply,
      65             :     //  we can't send another request unless the strict option is disabled.
      66         207 :     if (receiving_reply) {
      67           9 :         if (strict) {
      68           0 :             errno = EFSM;
      69           0 :             return -1;
      70             :         }
      71             : 
      72           9 :         receiving_reply = false;
      73           9 :         message_begins = true;
      74             :     }
      75             : 
      76             :     //  First part of the request is the request identity.
      77         207 :     if (message_begins) {
      78         150 :         reply_pipe = NULL;
      79             : 
      80         150 :         if (request_id_frames_enabled) {
      81          27 :             request_id++;
      82             : 
      83             :             //  Copy request id before sending (see issue #1695 for details).
      84          27 :             uint32_t *request_id_copy = (uint32_t *) malloc (sizeof (uint32_t));
      85          27 :             *request_id_copy = request_id;
      86             : 
      87             :             msg_t id;
      88             :             int rc = id.init_data (request_id_copy, sizeof (uint32_t),
      89          27 :                 free_id, NULL);
      90          27 :             errno_assert (rc == 0);
      91          27 :             id.set_flags (msg_t::more);
      92             : 
      93          27 :             rc = dealer_t::sendpipe (&id, &reply_pipe);
      94          27 :             if (rc != 0)
      95           0 :                 return -1;
      96             :         }
      97             : 
      98             :         msg_t bottom;
      99         150 :         int rc = bottom.init ();
     100         150 :         errno_assert (rc == 0);
     101         150 :         bottom.set_flags (msg_t::more);
     102             : 
     103         150 :         rc = dealer_t::sendpipe (&bottom, &reply_pipe);
     104         150 :         if (rc != 0)
     105             :             return -1;
     106         129 :         zmq_assert (reply_pipe);
     107             : 
     108         129 :         message_begins = false;
     109             : 
     110             :         // Eat all currently available messages before the request is fully
     111             :         // sent. This is done to avoid:
     112             :         //   REQ sends request to A, A replies, B replies too.
     113             :         //   A's reply was first and matches, that is used.
     114             :         //   An hour later REQ sends a request to B. B's old reply is used.
     115             :         msg_t drop;
     116             :         while (true) {
     117         138 :             rc = drop.init ();
     118         138 :             errno_assert (rc == 0);
     119         138 :             rc = dealer_t::xrecv (&drop);
     120         138 :             if (rc != 0)
     121             :                 break;
     122           9 :             drop.close ();
     123             :         }
     124             :     }
     125             : 
     126         186 :     bool more = msg_->flags () & msg_t::more ? true : false;
     127             : 
     128         186 :     int rc = dealer_t::xsend (msg_);
     129         186 :     if (rc != 0)
     130             :         return rc;
     131             : 
     132             :     //  If the request was fully sent, flip the FSM into reply-receiving state.
     133         186 :     if (!more) {
     134         129 :         receiving_reply = true;
     135         129 :         message_begins = true;
     136             :     }
     137             : 
     138             :     return 0;
     139             : }
     140             : 
     141         351 : int zmq::req_t::xrecv (msg_t *msg_)
     142             : {
     143             :     //  If request wasn't send, we can't wait for reply.
     144         351 :     if (!receiving_reply) {
     145           0 :         errno = EFSM;
     146           0 :         return -1;
     147             :     }
     148             : 
     149             :     //  Skip messages until one with the right first frames is found.
     150         471 :     while (message_begins) {
     151             :         //  If enabled, the first frame must have the correct request_id.
     152         312 :         if (request_id_frames_enabled) {
     153          51 :             int rc = recv_reply_pipe (msg_);
     154          51 :             if (rc != 0)
     155             :                 return rc;
     156             : 
     157          21 :             if (unlikely (!(msg_->flags () & msg_t::more) ||
     158             :                           msg_->size () != sizeof (request_id) ||
     159             :                           *static_cast<uint32_t *> (msg_->data ()) != request_id)) {
     160             :                 //  Skip the remaining frames and try the next message
     161          18 :                 while (msg_->flags () & msg_t::more) {
     162          12 :                     rc = recv_reply_pipe (msg_);
     163          12 :                     errno_assert (rc == 0);
     164             :                 }
     165             :                 continue;
     166             :             }
     167             :         }
     168             : 
     169             :         //  The next frame must be 0.
     170             :         // TODO: Failing this check should also close the connection with the peer!
     171         276 :         int rc = recv_reply_pipe (msg_);
     172         276 :         if (rc != 0)
     173             :             return rc;
     174             : 
     175         114 :         if (unlikely (!(msg_->flags () & msg_t::more) || msg_->size () != 0)) {
     176             :             //  Skip the remaining frames and try the next message
     177           0 :             while (msg_->flags () & msg_t::more) {
     178           0 :                 rc = recv_reply_pipe (msg_);
     179           0 :                 errno_assert (rc == 0);
     180             :             }
     181             :             continue;
     182             :         }
     183             : 
     184         114 :         message_begins = false;
     185             :     }
     186             : 
     187         159 :     int rc = recv_reply_pipe (msg_);
     188         159 :     if (rc != 0)
     189             :         return rc;
     190             : 
     191             :     //  If the reply is fully received, flip the FSM into request-sending state.
     192         159 :     if (!(msg_->flags () & msg_t::more)) {
     193         114 :         receiving_reply = false;
     194         114 :         message_begins = true;
     195             :     }
     196             : 
     197             :     return 0;
     198             : }
     199             : 
     200           0 : bool zmq::req_t::xhas_in ()
     201             : {
     202             :     //  TODO: Duplicates should be removed here.
     203             : 
     204           0 :     if (!receiving_reply)
     205             :         return false;
     206             : 
     207           0 :     return dealer_t::xhas_in ();
     208             : }
     209             : 
     210           0 : bool zmq::req_t::xhas_out ()
     211             : {
     212           0 :     if (receiving_reply)
     213             :         return false;
     214             : 
     215           0 :     return dealer_t::xhas_out ();
     216             : }
     217             : 
     218         123 : int zmq::req_t::xsetsockopt (int option_, const void *optval_, size_t optvallen_)
     219             : {
     220         123 :     bool is_int = (optvallen_ == sizeof (int));
     221         123 :     int value = 0;
     222         123 :     if (is_int)
     223             :         memcpy (&value, optval_, sizeof (int));
     224             : 
     225         123 :     switch (option_) {
     226             :         case ZMQ_REQ_CORRELATE:
     227           9 :             if (is_int && value >= 0) {
     228           9 :                 request_id_frames_enabled = (value != 0);
     229           9 :                 return 0;
     230             :             }
     231             :             break;
     232             : 
     233             :         case ZMQ_REQ_RELAXED:
     234           6 :             if (is_int && value >= 0) {
     235           6 :                 strict = (value == 0);
     236           6 :                 return 0;
     237             :             }
     238             :             break;
     239             : 
     240             :         default:
     241             :             break;
     242             :     }
     243             : 
     244         108 :     return dealer_t::xsetsockopt (option_, optval_, optvallen_);
     245             : }
     246             : 
     247         138 : void zmq::req_t::xpipe_terminated (pipe_t *pipe_)
     248             : {
     249         138 :     if (reply_pipe == pipe_)
     250          78 :         reply_pipe = NULL;
     251         138 :     dealer_t::xpipe_terminated (pipe_);
     252         138 : }
     253             : 
     254         498 : int zmq::req_t::recv_reply_pipe (msg_t *msg_)
     255             : {
     256             :     while (true) {
     257         507 :         pipe_t *pipe = NULL;
     258         507 :         int rc = dealer_t::recvpipe (msg_, &pipe);
     259         507 :         if (rc != 0)
     260         498 :             return rc;
     261         315 :         if (!reply_pipe || pipe == reply_pipe)
     262             :             return 0;
     263           9 :     }
     264             : }
     265             : 
     266          99 : zmq::req_session_t::req_session_t (io_thread_t *io_thread_, bool connect_,
     267             :       socket_base_t *socket_, const options_t &options_,
     268             :       address_t *addr_) :
     269             :     session_base_t (io_thread_, connect_, socket_, options_, addr_),
     270          99 :     state (bottom)
     271             : {
     272          99 : }
     273             : 
     274          99 : zmq::req_session_t::~req_session_t ()
     275             : {
     276          99 : }
     277             : 
     278         261 : int zmq::req_session_t::push_msg (msg_t *msg_)
     279             : {
     280         261 :     switch (state) {
     281             :     case bottom:
     282          96 :         if (msg_->flags () == msg_t::more) {
     283             :             //  In case option ZMQ_CORRELATE is on, allow request_id to be
     284             :             //  transfered as first frame (would be too cumbersome to check
     285             :             //  whether the option is actually on or not).
     286          96 :             if (msg_->size () == sizeof (uint32_t)) {
     287          27 :                 state = request_id;
     288          27 :                 return session_base_t::push_msg (msg_);
     289             :             }
     290          69 :             else if (msg_->size () == 0) {
     291          69 :                 state = body;
     292          69 :                 return session_base_t::push_msg (msg_);
     293             :             }
     294             :         }
     295             :         break;
     296             :     case request_id:
     297          27 :         if (msg_->flags () == msg_t::more && msg_->size () == 0) {
     298          27 :             state = body;
     299          27 :             return session_base_t::push_msg (msg_);
     300             :         }
     301             :         break;
     302             :     case body:
     303         138 :         if (msg_->flags () == msg_t::more)
     304          42 :             return session_base_t::push_msg (msg_);
     305          96 :         if (msg_->flags () == 0) {
     306          96 :             state = bottom;
     307          96 :             return session_base_t::push_msg (msg_);
     308             :         }
     309             :         break;
     310             :     }
     311           0 :     errno = EFAULT;
     312           0 :     return -1;
     313             : }
     314             : 
     315           0 : void zmq::req_session_t::reset ()
     316             : {
     317           0 :     session_base_t::reset ();
     318           0 :     state = bottom;
     319           0 : }

Generated by: LCOV version 1.10