libzmq  master
ZeroMQ C++ Core Engine (LIBZMQ)
dist.cpp
Go to the documentation of this file.
1 /*
2  Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 
4  This file is part of libzmq, the ZeroMQ core engine in C++.
5 
6  libzmq is free software; you can redistribute it and/or modify it under
7  the terms of the GNU Lesser General Public License (LGPL) as published
8  by the Free Software Foundation; either version 3 of the License, or
9  (at your option) any later version.
10 
11  As a special exception, the Contributors give you permission to link
12  this library with independent modules to produce an executable,
13  regardless of the license terms of these independent modules, and to
14  copy and distribute the resulting executable under terms of your choice,
15  provided that you also meet, for each linked independent module, the
16  terms and conditions of the license of that module. An independent
17  module is a module which is not derived from or based on this library.
18  If you modify this library, you must extend this exception to your
19  version of the library.
20 
21  libzmq is distributed in the hope that it will be useful, but WITHOUT
22  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23  FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24  License for more details.
25 
26  You should have received a copy of the GNU Lesser General Public License
27  along with this program. If not, see <http://www.gnu.org/licenses/>.
28 */
29 
30 #include "precompiled.hpp"
31 #include "dist.hpp"
32 #include "pipe.hpp"
33 #include "err.hpp"
34 #include "msg.hpp"
35 #include "likely.hpp"
36 
38  matching (0),
39  active (0),
40  eligible (0),
41  more (false)
42 {
43 }
44 
46 {
47  zmq_assert (pipes.empty ());
48 }
49 
51 {
52  // If we are in the middle of sending a message, we'll add new pipe
53  // into the list of eligible pipes. Otherwise we add it to the list
54  // of active pipes.
55  if (more) {
56  pipes.push_back (pipe_);
57  pipes.swap (eligible, pipes.size () - 1);
58  eligible++;
59  }
60  else {
61  pipes.push_back (pipe_);
62  pipes.swap (active, pipes.size () - 1);
63  active++;
64  eligible++;
65  }
66 }
67 
69 {
70  // If pipe is already matching do nothing.
71  if (pipes.index (pipe_) < matching)
72  return;
73 
74  // If the pipe isn't eligible, ignore it.
75  if (pipes.index (pipe_) >= eligible)
76  return;
77 
78  // Mark the pipe as matching.
79  pipes.swap (pipes.index (pipe_), matching);
80  matching++;
81 }
82 
84 {
85  pipes_t::size_type prev_matching = matching;
86 
87  // Reset matching to 0
88  unmatch();
89 
90  // Mark all matching pipes as not matching and vice-versa.
91  // To do this, push all pipes that are eligible but not
92  // matched - i.e. between "matching" and "eligible" -
93  // to the beginning of the queue.
94  for (pipes_t::size_type i = prev_matching; i < eligible; ++i) {
95  pipes.swap(i, matching++);
96  }
97 }
98 
100 {
101  matching = 0;
102 }
103 
105 {
106  // Remove the pipe from the list; adjust number of matching, active and/or
107  // eligible pipes accordingly.
108  if (pipes.index (pipe_) < matching) {
109  pipes.swap (pipes.index (pipe_), matching - 1);
110  matching--;
111  }
112  if (pipes.index (pipe_) < active) {
113  pipes.swap (pipes.index (pipe_), active - 1);
114  active--;
115  }
116  if (pipes.index (pipe_) < eligible) {
117  pipes.swap (pipes.index (pipe_), eligible - 1);
118  eligible--;
119  }
120 
121  pipes.erase (pipe_);
122 }
123 
125 {
126  // Move the pipe from passive to eligible state.
127  if (eligible < pipes.size ()) {
128  pipes.swap (pipes.index (pipe_), eligible);
129  eligible++;
130  }
131 
132  // If there's no message being sent at the moment, move it to
133  // the active state.
134  if (!more && active < pipes.size ()) {
135  pipes.swap (eligible - 1, active);
136  active++;
137  }
138 }
139 
141 {
142  matching = active;
143  return send_to_matching (msg_);
144 }
145 
147 {
148  // Is this end of a multipart message?
149  bool msg_more = msg_->flags () & msg_t::more ? true : false;
150 
151  // Push the message to matching pipes.
152  distribute (msg_);
153 
154  // If multipart message is fully sent, activate all the eligible pipes.
155  if (!msg_more)
156  active = eligible;
157 
158  more = msg_more;
159 
160  return 0;
161 }
162 
164 {
165  // If there are no matching pipes available, simply drop the message.
166  if (matching == 0) {
167  int rc = msg_->close ();
168  errno_assert (rc == 0);
169  rc = msg_->init ();
170  errno_assert (rc == 0);
171  return;
172  }
173 
174  if (msg_->is_vsm ()) {
175  for (pipes_t::size_type i = 0; i < matching; ++i)
176  if(!write (pipes [i], msg_))
177  --i; // Retry last write because index will have been swapped
178  int rc = msg_->close();
179  errno_assert (rc == 0);
180  rc = msg_->init ();
181  errno_assert (rc == 0);
182  return;
183  }
184 
185  // Add matching-1 references to the message. We already hold one reference,
186  // that's why -1.
187  msg_->add_refs ((int) matching - 1);
188 
189  // Push copy of the message to each matching pipe.
190  int failed = 0;
191  for (pipes_t::size_type i = 0; i < matching; ++i)
192  if (!write (pipes [i], msg_)) {
193  ++failed;
194  --i; // Retry last write because index will have been swapped
195  }
196  if (unlikely (failed))
197  msg_->rm_refs (failed);
198 
199  // Detach the original message from the data buffer. Note that we don't
200  // close the message. That's because we've already used all the references.
201  int rc = msg_->init ();
202  errno_assert (rc == 0);
203 }
204 
206 {
207  return true;
208 }
209 
210 bool zmq::dist_t::write (pipe_t *pipe_, msg_t *msg_)
211 {
212  if (!pipe_->write (msg_)) {
213  pipes.swap (pipes.index (pipe_), matching - 1);
214  matching--;
215  pipes.swap (pipes.index (pipe_), active - 1);
216  active--;
217  pipes.swap (active, eligible - 1);
218  eligible--;
219  return false;
220  }
221  if (!(msg_->flags () & msg_t::more))
222  pipe_->flush ();
223  return true;
224 }
225 
227 {
228  for (pipes_t::size_type i = 0; i < matching; ++i)
229  if (!pipes [i]->check_hwm ())
230  return false;
231 
232  return true;
233 }
234 
235 
int close()
Definition: msg.cpp:217
size_type size()
Definition: array.hpp:103
bool write(msg_t *msg_)
Definition: pipe.cpp:221
std::vector< zmq::pipe_t * >::size_type size_type
Definition: array.hpp:93
int send_to_all(zmq::msg_t *msg_)
Definition: dist.cpp:140
bool rm_refs(int refs_)
Definition: msg.cpp:480
#define zmq_assert(x)
Definition: err.hpp:119
void swap(size_type index1_, size_type index2_)
Definition: array.hpp:136
void activated(zmq::pipe_t *pipe_)
Definition: dist.cpp:124
void add_refs(int refs_)
Definition: msg.cpp:457
void push_back(T *item_)
Definition: array.hpp:118
void match(zmq::pipe_t *pipe_)
Definition: dist.cpp:68
pipes_t::size_type eligible
Definition: dist.hpp:109
pipes_t::size_type matching
Definition: dist.hpp:97
void erase(T *item_)
Definition: array.hpp:125
bool check_hwm()
Definition: dist.cpp:226
#define unlikely(x)
Definition: likely.hpp:38
int send_to_matching(zmq::msg_t *msg_)
Definition: dist.cpp:146
void attach(zmq::pipe_t *pipe_)
Definition: dist.cpp:50
int init()
Definition: msg.cpp:82
bool more
Definition: dist.hpp:112
size_type index(T *item_)
Definition: array.hpp:150
void flush()
Definition: pipe.cpp:248
bool has_out()
Definition: dist.cpp:205
~dist_t()
Definition: dist.cpp:45
pipes_t pipes
Definition: dist.hpp:94
bool write(zmq::pipe_t *pipe_, zmq::msg_t *msg_)
Definition: dist.cpp:210
pipes_t::size_type active
Definition: dist.hpp:102
bool is_vsm() const
Definition: msg.cpp:432
#define errno_assert(x)
Definition: err.hpp:129
void unmatch()
Definition: dist.cpp:99
dist_t()
Definition: dist.cpp:37
void pipe_terminated(zmq::pipe_t *pipe_)
Definition: dist.cpp:104
void reverse_match()
Definition: dist.cpp:83
bool empty()
Definition: array.hpp:108
void distribute(zmq::msg_t *msg_)
Definition: dist.cpp:163
unsigned char flags
Definition: msg.hpp:181