Line data Source code
1 : /*
2 : Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 :
4 : This file is part of libzmq, the ZeroMQ core engine in C++.
5 :
6 : libzmq is free software; you can redistribute it and/or modify it under
7 : the terms of the GNU Lesser General Public License (LGPL) as published
8 : by the Free Software Foundation; either version 3 of the License, or
9 : (at your option) any later version.
10 :
11 : As a special exception, the Contributors give you permission to link
12 : this library with independent modules to produce an executable,
13 : regardless of the license terms of these independent modules, and to
14 : copy and distribute the resulting executable under terms of your choice,
15 : provided that you also meet, for each linked independent module, the
16 : terms and conditions of the license of that module. An independent
17 : module is a module which is not derived from or based on this library.
18 : If you modify this library, you must extend this exception to your
19 : version of the library.
20 :
21 : libzmq is distributed in the hope that it will be useful, but WITHOUT
22 : ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 : FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 : License for more details.
25 :
26 : You should have received a copy of the GNU Lesser General Public License
27 : along with this program. If not, see <http://www.gnu.org/licenses/>.
28 : */
29 :
30 : #include "precompiled.hpp"
31 : #include "lb.hpp"
32 : #include "pipe.hpp"
33 : #include "err.hpp"
34 : #include "msg.hpp"
35 :
36 882 : zmq::lb_t::lb_t () :
37 : active (0),
38 : current (0),
39 : more (false),
40 1764 : dropping (false)
41 : {
42 882 : }
43 :
44 1764 : zmq::lb_t::~lb_t ()
45 : {
46 1764 : zmq_assert (pipes.empty ());
47 882 : }
48 :
49 1011 : void zmq::lb_t::attach (pipe_t *pipe_)
50 : {
51 1011 : pipes.push_back (pipe_);
52 1011 : activated (pipe_);
53 1011 : }
54 :
55 1011 : void zmq::lb_t::pipe_terminated (pipe_t *pipe_)
56 : {
57 2022 : pipes_t::size_type index = pipes.index (pipe_);
58 :
59 : // If we are in the middle of multipart message and current pipe
60 : // have disconnected, we have to drop the remainder of the message.
61 1011 : if (index == current && more)
62 0 : dropping = true;
63 :
64 : // Remove the pipe from the list; adjust number of active pipes
65 : // accordingly.
66 1011 : if (index < active) {
67 989 : active--;
68 989 : pipes.swap (index, active);
69 989 : if (current == active)
70 849 : current = 0;
71 : }
72 1011 : pipes.erase (pipe_);
73 1011 : }
74 :
75 1888 : void zmq::lb_t::activated (pipe_t *pipe_)
76 : {
77 : // Move the pipe to the list of active pipes.
78 3776 : pipes.swap (pipes.index (pipe_), active);
79 1888 : active++;
80 1888 : }
81 :
82 186846 : int zmq::lb_t::send (msg_t *msg_)
83 : {
84 186846 : return sendpipe (msg_, NULL);
85 : }
86 :
87 789284 : int zmq::lb_t::sendpipe (msg_t *msg_, pipe_t **pipe_)
88 : {
89 : // Drop the message if required. If we are at the end of the message
90 : // switch back to non-dropping mode.
91 789284 : if (dropping) {
92 :
93 0 : more = msg_->flags () & msg_t::more ? true : false;
94 0 : dropping = more;
95 :
96 0 : int rc = msg_->close ();
97 0 : errno_assert (rc == 0);
98 0 : rc = msg_->init ();
99 0 : errno_assert (rc == 0);
100 : return 0;
101 : }
102 :
103 790182 : while (active > 0) {
104 1577264 : if (pipes [current]->write (msg_))
105 : {
106 787734 : if (pipe_)
107 312 : *pipe_ = pipes [current];
108 : break;
109 : }
110 :
111 : // If send fails for multi-part msg rollback other
112 : // parts sent earlier and return EAGAIN.
113 : // Application should handle this as suitable
114 898 : if (more)
115 : {
116 0 : pipes [current]->rollback ();
117 0 : more = 0;
118 0 : errno = EAGAIN;
119 0 : return -1;
120 : }
121 :
122 898 : active--;
123 898 : if (current < active)
124 0 : pipes.swap (current, active);
125 : else
126 898 : current = 0;
127 : }
128 :
129 : // If there are no pipes we cannot send the message.
130 789284 : if (active == 0) {
131 1550 : errno = EAGAIN;
132 1550 : return -1;
133 : }
134 :
135 : // If it's final part of the message we can flush it downstream and
136 : // continue round-robining (load balance).
137 787734 : more = msg_->flags () & msg_t::more? true: false;
138 787734 : if (!more) {
139 1574478 : pipes [current]->flush ();
140 787239 : current = (current + 1) % active;
141 : }
142 :
143 : // Detach the message from the data buffer.
144 787734 : int rc = msg_->init ();
145 787734 : errno_assert (rc == 0);
146 :
147 : return 0;
148 : }
149 :
150 530954 : bool zmq::lb_t::has_out ()
151 : {
152 : // If one part of the message was already written we can definitely
153 : // write the rest of the message.
154 530954 : if (more)
155 : return true;
156 :
157 530955 : while (active > 0) {
158 :
159 : // Check whether a pipe has room for another message.
160 2206 : if (pipes [current]->check_write ())
161 : return true;
162 :
163 : // Deactivate the pipe.
164 1 : active--;
165 1 : pipes.swap (current, active);
166 1 : if (current == active)
167 0 : current = 0;
168 : }
169 :
170 : return false;
171 : }
|