libzmq  master
ZeroMQ C++ Core Engine (LIBZMQ)
lb.cpp
Go to the documentation of this file.
1 /*
2  Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 
4  This file is part of libzmq, the ZeroMQ core engine in C++.
5 
6  libzmq is free software; you can redistribute it and/or modify it under
7  the terms of the GNU Lesser General Public License (LGPL) as published
8  by the Free Software Foundation; either version 3 of the License, or
9  (at your option) any later version.
10 
11  As a special exception, the Contributors give you permission to link
12  this library with independent modules to produce an executable,
13  regardless of the license terms of these independent modules, and to
14  copy and distribute the resulting executable under terms of your choice,
15  provided that you also meet, for each linked independent module, the
16  terms and conditions of the license of that module. An independent
17  module is a module which is not derived from or based on this library.
18  If you modify this library, you must extend this exception to your
19  version of the library.
20 
21  libzmq is distributed in the hope that it will be useful, but WITHOUT
22  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23  FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24  License for more details.
25 
26  You should have received a copy of the GNU Lesser General Public License
27  along with this program. If not, see <http://www.gnu.org/licenses/>.
28 */
29 
30 #include "precompiled.hpp"
31 #include "lb.hpp"
32 #include "pipe.hpp"
33 #include "err.hpp"
34 #include "msg.hpp"
35 
37  active (0),
38  current (0),
39  more (false),
40  dropping (false)
41 {
42 }
43 
45 {
46  zmq_assert (pipes.empty ());
47 }
48 
50 {
51  pipes.push_back (pipe_);
52  activated (pipe_);
53 }
54 
56 {
57  pipes_t::size_type index = pipes.index (pipe_);
58 
59  // If we are in the middle of multipart message and current pipe
60  // have disconnected, we have to drop the remainder of the message.
61  if (index == current && more)
62  dropping = true;
63 
64  // Remove the pipe from the list; adjust number of active pipes
65  // accordingly.
66  if (index < active) {
67  active--;
68  pipes.swap (index, active);
69  if (current == active)
70  current = 0;
71  }
72  pipes.erase (pipe_);
73 }
74 
76 {
77  // Move the pipe to the list of active pipes.
78  pipes.swap (pipes.index (pipe_), active);
79  active++;
80 }
81 
83 {
84  return sendpipe (msg_, NULL);
85 }
86 
87 int zmq::lb_t::sendpipe (msg_t *msg_, pipe_t **pipe_)
88 {
89  // Drop the message if required. If we are at the end of the message
90  // switch back to non-dropping mode.
91  if (dropping) {
92 
93  more = msg_->flags () & msg_t::more ? true : false;
94  dropping = more;
95 
96  int rc = msg_->close ();
97  errno_assert (rc == 0);
98  rc = msg_->init ();
99  errno_assert (rc == 0);
100  return 0;
101  }
102 
103  while (active > 0) {
104  if (pipes [current]->write (msg_))
105  {
106  if (pipe_)
107  *pipe_ = pipes [current];
108  break;
109  }
110 
111  // If send fails for multi-part msg rollback other
112  // parts sent earlier and return EAGAIN.
113  // Application should handle this as suitable
114  if (more)
115  {
116  pipes [current]->rollback ();
117  more = 0;
118  errno = EAGAIN;
119  return -1;
120  }
121 
122  active--;
123  if (current < active)
125  else
126  current = 0;
127  }
128 
129  // If there are no pipes we cannot send the message.
130  if (active == 0) {
131  errno = EAGAIN;
132  return -1;
133  }
134 
135  // If it's final part of the message we can flush it downstream and
136  // continue round-robining (load balance).
137  more = msg_->flags () & msg_t::more? true: false;
138  if (!more) {
139  pipes [current]->flush ();
140  current = (current + 1) % active;
141  }
142 
143  // Detach the message from the data buffer.
144  int rc = msg_->init ();
145  errno_assert (rc == 0);
146 
147  return 0;
148 }
149 
151 {
152  // If one part of the message was already written we can definitely
153  // write the rest of the message.
154  if (more)
155  return true;
156 
157  while (active > 0) {
158 
159  // Check whether a pipe has room for another message.
160  if (pipes [current]->check_write ())
161  return true;
162 
163  // Deactivate the pipe.
164  active--;
166  if (current == active)
167  current = 0;
168  }
169 
170  return false;
171 }
int close()
Definition: msg.cpp:217
std::vector< pipe_t * >::size_type size_type
Definition: array.hpp:93
#define zmq_assert(x)
Definition: err.hpp:119
void swap(size_type index1_, size_type index2_)
Definition: array.hpp:136
void pipe_terminated(pipe_t *pipe_)
Definition: lb.cpp:55
pipes_t::size_type current
Definition: lb.hpp:74
void push_back(T *item_)
Definition: array.hpp:118
bool has_out()
Definition: lb.cpp:150
pipes_t pipes
Definition: lb.hpp:67
void erase(T *item_)
Definition: array.hpp:125
lb_t()
Definition: lb.cpp:36
int init()
Definition: msg.cpp:82
int sendpipe(msg_t *msg_, pipe_t **pipe_)
Definition: lb.cpp:87
size_type index(T *item_)
Definition: array.hpp:150
~lb_t()
Definition: lb.cpp:44
void attach(pipe_t *pipe_)
Definition: lb.cpp:49
int send(msg_t *msg_)
Definition: lb.cpp:82
pipes_t::size_type active
Definition: lb.hpp:71
#define errno_assert(x)
Definition: err.hpp:129
bool dropping
Definition: lb.hpp:80
bool empty()
Definition: array.hpp:108
bool more
Definition: lb.hpp:77
void activated(pipe_t *pipe_)
Definition: lb.cpp:75
unsigned char flags
Definition: msg.hpp:181