libzmq  master
ZeroMQ C++ Core Engine (LIBZMQ)
proxy.cpp
Go to the documentation of this file.
1 /*
2  Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 
4  This file is part of libzmq, the ZeroMQ core engine in C++.
5 
6  libzmq is free software; you can redistribute it and/or modify it under
7  the terms of the GNU Lesser General Public License (LGPL) as published
8  by the Free Software Foundation; either version 3 of the License, or
9  (at your option) any later version.
10 
11  As a special exception, the Contributors give you permission to link
12  this library with independent modules to produce an executable,
13  regardless of the license terms of these independent modules, and to
14  copy and distribute the resulting executable under terms of your choice,
15  provided that you also meet, for each linked independent module, the
16  terms and conditions of the license of that module. An independent
17  module is a module which is not derived from or based on this library.
18  If you modify this library, you must extend this exception to your
19  version of the library.
20 
21  libzmq is distributed in the hope that it will be useful, but WITHOUT
22  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23  FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24  License for more details.
25 
26  You should have received a copy of the GNU Lesser General Public License
27  along with this program. If not, see <http://www.gnu.org/licenses/>.
28 */
29 
30 #include "precompiled.hpp"
31 #include <stddef.h>
32 #include "poller.hpp"
33 #include "proxy.hpp"
34 #include "likely.hpp"
35 
36 // On AIX platform, poll.h has to be included first to get consistent
37 // definition of pollfd structure (AIX uses 'reqevents' and 'retnevents'
38 // instead of 'events' and 'revents' and defines macros to map from POSIX-y
39 // names to AIX-specific names).
40 #if defined ZMQ_POLL_BASED_ON_POLL
41 #include <poll.h>
42 #endif
43 
44 // These headers end up pulling in zmq.h somewhere in their include
45 // dependency chain
46 #include "socket_base.hpp"
47 #include "err.hpp"
48 
49 // zmq.h must be included *after* poll.h for AIX to build properly
50 #include "../include/zmq.h"
51 
52 int capture(
53  class zmq::socket_base_t *capture_,
54  zmq::msg_t& msg_,
55  int more_ = 0)
56 {
57  // Copy message to capture socket if any
58  if (capture_) {
59  zmq::msg_t ctrl;
60  int rc = ctrl.init ();
61  if (unlikely (rc < 0))
62  return -1;
63  rc = ctrl.copy (msg_);
64  if (unlikely (rc < 0))
65  return -1;
66  rc = capture_->send (&ctrl, more_? ZMQ_SNDMORE: 0);
67  if (unlikely (rc < 0))
68  return -1;
69  }
70  return 0;
71 }
72 
73 int forward(
74  class zmq::socket_base_t *from_,
75  class zmq::socket_base_t *to_,
76  class zmq::socket_base_t *capture_,
77  zmq::msg_t& msg_)
78 {
79  int more;
80  size_t moresz;
81  while (true) {
82  int rc = from_->recv (&msg_, 0);
83  if (unlikely (rc < 0))
84  return -1;
85 
86  moresz = sizeof more;
87  rc = from_->getsockopt (ZMQ_RCVMORE, &more, &moresz);
88  if (unlikely (rc < 0))
89  return -1;
90 
91  // Copy message to capture socket if any
92  rc = capture(capture_, msg_, more);
93  if (unlikely (rc < 0))
94  return -1;
95 
96  rc = to_->send (&msg_, more? ZMQ_SNDMORE: 0);
97  if (unlikely (rc < 0))
98  return -1;
99  if (more == 0)
100  break;
101  }
102  return 0;
103 }
104 
106  class socket_base_t *frontend_,
107  class socket_base_t *backend_,
108  class socket_base_t *capture_,
109  class socket_base_t *control_)
110 {
111  msg_t msg;
112  int rc = msg.init ();
113  if (rc != 0)
114  return -1;
115 
116  // The algorithm below assumes ratio of requests and replies processed
117  // under full load to be 1:1.
118 
119  int more;
120  size_t moresz;
121  zmq_pollitem_t items [] = {
122  { frontend_, 0, ZMQ_POLLIN, 0 },
123  { backend_, 0, ZMQ_POLLIN, 0 },
124  { control_, 0, ZMQ_POLLIN, 0 }
125  };
126  int qt_poll_items = (control_ ? 3 : 2);
127  zmq_pollitem_t itemsout [] = {
128  { frontend_, 0, ZMQ_POLLOUT, 0 },
129  { backend_, 0, ZMQ_POLLOUT, 0 }
130  };
131 
132  // Proxy can be in these three states
133  enum {
134  active,
135  paused,
136  terminated
137  } state = active;
138 
139  while (state != terminated) {
140  // Wait while there are either requests or replies to process.
141  rc = zmq_poll (&items [0], qt_poll_items, -1);
142  if (unlikely (rc < 0))
143  return -1;
144 
145  // Get the pollout separately because when combining this with pollin it maxes the CPU
146  // because pollout shall most of the time return directly.
147  // POLLOUT is only checked when frontend and backend sockets are not the same.
148  if (frontend_ != backend_) {
149  rc = zmq_poll (&itemsout [0], 2, 0);
150  if (unlikely (rc < 0)) {
151  return -1;
152  }
153  }
154 
155  // Process a control command if any
156  if (control_ && items [2].revents & ZMQ_POLLIN) {
157  rc = control_->recv (&msg, 0);
158  if (unlikely (rc < 0))
159  return -1;
160 
161  moresz = sizeof more;
162  rc = control_->getsockopt (ZMQ_RCVMORE, &more, &moresz);
163  if (unlikely (rc < 0) || more)
164  return -1;
165 
166  // Copy message to capture socket if any
167  rc = capture(capture_, msg);
168  if (unlikely (rc < 0))
169  return -1;
170 
171  if (msg.size () == 5 && memcmp (msg.data (), "PAUSE", 5) == 0)
172  state = paused;
173  else
174  if (msg.size () == 6 && memcmp (msg.data (), "RESUME", 6) == 0)
175  state = active;
176  else
177  if (msg.size () == 9 && memcmp (msg.data (), "TERMINATE", 9) == 0)
178  state = terminated;
179  else {
180  // This is an API error, we should assert
181  puts ("E: invalid command sent to proxy");
182  zmq_assert (false);
183  }
184  }
185  // Process a request
186  if (state == active
187  && items [0].revents & ZMQ_POLLIN
188  && (frontend_ == backend_ || itemsout [1].revents & ZMQ_POLLOUT)) {
189  rc = forward(frontend_, backend_, capture_,msg);
190  if (unlikely (rc < 0))
191  return -1;
192  }
193  // Process a reply
194  if (state == active
195  && frontend_ != backend_
196  && items [1].revents & ZMQ_POLLIN
197  && itemsout [0].revents & ZMQ_POLLOUT) {
198  rc = forward(backend_, frontend_, capture_,msg);
199  if (unlikely (rc < 0))
200  return -1;
201  }
202  }
203  return 0;
204 }
#define ZMQ_SNDMORE
Definition: zmq.h:346
int capture(class zmq::socket_base_t *capture_, zmq::msg_t &msg_, int more_=0)
Definition: proxy.cpp:52
#define zmq_assert(x)
Definition: err.hpp:119
int forward(class zmq::socket_base_t *from_, class zmq::socket_base_t *to_, class zmq::socket_base_t *capture_, zmq::msg_t &msg_)
Definition: proxy.cpp:73
unsigned char size
Definition: msg.hpp:188
int getsockopt(int option_, void *optval_, size_t *optvallen_)
int send(zmq::msg_t *msg_, int flags_)
#define unlikely(x)
Definition: likely.hpp:38
int init()
Definition: msg.cpp:82
int copy(msg_t &src_)
Definition: msg.cpp:295
#define ZMQ_RCVMORE
Definition: zmq.h:272
int proxy(class socket_base_t *frontend_, class socket_base_t *backend_, class socket_base_t *capture_, class socket_base_t *control_=NULL)
Definition: proxy.cpp:105
ZMQ_EXPORT int zmq_poll(zmq_pollitem_t *items, int nitems, long timeout)
Definition: zmq.cpp:748
#define ZMQ_POLLOUT
Definition: zmq.h:411
unsigned char data[max_vsm_size]
Definition: msg.hpp:187
#define ZMQ_POLLIN
Definition: zmq.h:410
int recv(zmq::msg_t *msg_, int flags_)