LCOV - code coverage report
Current view: top level - home/h/core/forks/m4-libzmq/src - proxy.cpp (source / functions) Hit Total Coverage
Test: zeromq-4.2.0 Code Coverage Lines: 48 57 84.2 %
Date: 2016-05-09 Functions: 3 3 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*
       2             :     Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
       3             : 
       4             :     This file is part of libzmq, the ZeroMQ core engine in C++.
       5             : 
       6             :     libzmq is free software; you can redistribute it and/or modify it under
       7             :     the terms of the GNU Lesser General Public License (LGPL) as published
       8             :     by the Free Software Foundation; either version 3 of the License, or
       9             :     (at your option) any later version.
      10             : 
      11             :     As a special exception, the Contributors give you permission to link
      12             :     this library with independent modules to produce an executable,
      13             :     regardless of the license terms of these independent modules, and to
      14             :     copy and distribute the resulting executable under terms of your choice,
      15             :     provided that you also meet, for each linked independent module, the
      16             :     terms and conditions of the license of that module. An independent
      17             :     module is a module which is not derived from or based on this library.
      18             :     If you modify this library, you must extend this exception to your
      19             :     version of the library.
      20             : 
      21             :     libzmq is distributed in the hope that it will be useful, but WITHOUT
      22             :     ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
      23             :     FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
      24             :     License for more details.
      25             : 
      26             :     You should have received a copy of the GNU Lesser General Public License
      27             :     along with this program.  If not, see <http://www.gnu.org/licenses/>.
      28             : */
      29             : 
      30             : #include "precompiled.hpp"
      31             : #include <stddef.h>
      32             : #include "poller.hpp"
      33             : #include "proxy.hpp"
      34             : #include "likely.hpp"
      35             : 
      36             : //  On AIX platform, poll.h has to be included first to get consistent
      37             : //  definition of pollfd structure (AIX uses 'reqevents' and 'retnevents'
      38             : //  instead of 'events' and 'revents' and defines macros to map from POSIX-y
      39             : //  names to AIX-specific names).
      40             : #if defined ZMQ_POLL_BASED_ON_POLL
      41             : #include <poll.h>
      42             : #endif
      43             : 
      44             : // These headers end up pulling in zmq.h somewhere in their include
      45             : // dependency chain
      46             : #include "socket_base.hpp"
      47             : #include "err.hpp"
      48             : 
      49             : // zmq.h must be included *after* poll.h for AIX to build properly
      50             : #include "../include/zmq.h"
      51             : 
      52         111 : int capture(
      53             :         class zmq::socket_base_t *capture_,
      54             :         zmq::msg_t& msg_,
      55             :         int more_ = 0)
      56             : {
      57             :     //  Copy message to capture socket if any
      58         111 :     if (capture_) {
      59             :         zmq::msg_t ctrl;
      60           0 :         int rc = ctrl.init ();
      61           0 :         if (unlikely (rc < 0))
      62           0 :             return -1;
      63           0 :         rc = ctrl.copy (msg_);
      64           0 :         if (unlikely (rc < 0))
      65             :             return -1;
      66           0 :         rc = capture_->send (&ctrl, more_? ZMQ_SNDMORE: 0);
      67           0 :         if (unlikely (rc < 0))
      68             :             return -1;
      69             :     }
      70             :     return 0;
      71             : }
      72             : 
      73          54 : int forward(
      74             :         class zmq::socket_base_t *from_,
      75             :         class zmq::socket_base_t *to_,
      76             :         class zmq::socket_base_t *capture_,
      77             :         zmq::msg_t& msg_)
      78             : {
      79             :     int more;
      80             :     size_t moresz;
      81             :     while (true) {
      82         102 :         int rc = from_->recv (&msg_, 0);
      83         102 :         if (unlikely (rc < 0))
      84             :             return -1;
      85             : 
      86         102 :         moresz = sizeof more;
      87         102 :         rc = from_->getsockopt (ZMQ_RCVMORE, &more, &moresz);
      88         102 :         if (unlikely (rc < 0))
      89             :             return -1;
      90             : 
      91             :         //  Copy message to capture socket if any
      92         102 :         rc = capture(capture_, msg_, more);
      93         102 :         if (unlikely (rc < 0))
      94             :             return -1;
      95             : 
      96         102 :         rc = to_->send (&msg_, more? ZMQ_SNDMORE: 0);
      97         102 :         if (unlikely (rc < 0))
      98             :             return -1;
      99         102 :         if (more == 0)
     100             :             break;
     101             :     }
     102             :     return 0;
     103             : }
     104             : 
     105           9 : int zmq::proxy (
     106             :     class socket_base_t *frontend_,
     107             :     class socket_base_t *backend_,
     108             :     class socket_base_t *capture_,
     109             :     class socket_base_t *control_)
     110             : {
     111             :     msg_t msg;
     112           9 :     int rc = msg.init ();
     113           9 :     if (rc != 0)
     114             :         return -1;
     115             : 
     116             :     //  The algorithm below assumes ratio of requests and replies processed
     117             :     //  under full load to be 1:1.
     118             : 
     119             :     int more;
     120             :     size_t moresz;
     121             :     zmq_pollitem_t items [] = {
     122             :         { frontend_, 0, ZMQ_POLLIN, 0 },
     123             :         { backend_, 0, ZMQ_POLLIN, 0 },
     124             :         { control_, 0, ZMQ_POLLIN, 0 }
     125           9 :     };
     126           9 :     int qt_poll_items = (control_ ? 3 : 2);
     127             :     zmq_pollitem_t itemsout [] = {
     128             :         { frontend_, 0, ZMQ_POLLOUT, 0 },
     129             :         { backend_, 0, ZMQ_POLLOUT, 0 }
     130           9 :     };
     131             : 
     132             :     //  Proxy can be in these three states
     133             :     enum {
     134             :         active,
     135             :         paused,
     136             :         terminated
     137           9 :     } state = active;
     138             : 
     139      264999 :     while (state != terminated) {
     140             :         //  Wait while there are either requests or replies to process.
     141      264981 :         rc = zmq_poll (&items [0], qt_poll_items, -1);
     142      264981 :         if (unlikely (rc < 0))
     143             :             return -1;
     144             : 
     145             :         //  Get the pollout separately because when combining this with pollin it maxes the CPU
     146             :         //  because pollout shall most of the time return directly.
     147             :         //  POLLOUT is only checked when frontend and backend sockets are not the same.
     148      264981 :         if (frontend_ != backend_) {
     149      264972 :             rc = zmq_poll (&itemsout [0], 2, 0);
     150      264972 :             if (unlikely (rc < 0)) {
     151             :                 return -1;
     152             :             }
     153             :         }
     154             : 
     155             :         //  Process a control command if any
     156      264981 :         if (control_ && items [2].revents & ZMQ_POLLIN) {
     157           9 :             rc = control_->recv (&msg, 0);
     158           9 :             if (unlikely (rc < 0))
     159             :                 return -1;
     160             : 
     161           9 :             moresz = sizeof more;
     162           9 :             rc = control_->getsockopt (ZMQ_RCVMORE, &more, &moresz);
     163           9 :             if (unlikely (rc < 0) || more)
     164             :                 return -1;
     165             : 
     166             :             //  Copy message to capture socket if any
     167           9 :             rc = capture(capture_, msg);
     168           9 :             if (unlikely (rc < 0))
     169             :                 return -1;
     170             : 
     171           9 :             if (msg.size () == 5 && memcmp (msg.data (), "PAUSE", 5) == 0)
     172             :                 state = paused;
     173             :             else
     174           9 :             if (msg.size () == 6 && memcmp (msg.data (), "RESUME", 6) == 0)
     175             :                 state = active;
     176             :             else
     177           9 :             if (msg.size () == 9 && memcmp (msg.data (), "TERMINATE", 9) == 0)
     178             :                 state = terminated;
     179             :             else {
     180             :                 //  This is an API error, we should assert
     181           0 :                 puts ("E: invalid command sent to proxy");
     182           0 :                 zmq_assert (false);
     183             :             }
     184             :         }
     185             :         //  Process a request
     186      264981 :         if (state == active
     187      264972 :         &&  items [0].revents & ZMQ_POLLIN
     188      264943 :         &&  (frontend_ == backend_ || itemsout [1].revents & ZMQ_POLLOUT)) {
     189          24 :             rc = forward(frontend_, backend_, capture_,msg);
     190          24 :             if (unlikely (rc < 0))
     191             :                 return -1;
     192             :         }
     193             :         //  Process a reply
     194      529962 :         if (state == active
     195      264981 :         &&  frontend_ != backend_
     196      264966 :         &&  items [1].revents & ZMQ_POLLIN
     197          30 :         &&  itemsout [0].revents & ZMQ_POLLOUT) {
     198          30 :             rc = forward(backend_, frontend_, capture_,msg);
     199          30 :             if (unlikely (rc < 0))
     200             :                 return -1;
     201             :         }
     202             :     }
     203             :     return 0;
     204             : }

Generated by: LCOV version 1.10