Line data Source code
1 : /*
2 : Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 :
4 : This file is part of libzmq, the ZeroMQ core engine in C++.
5 :
6 : libzmq is free software; you can redistribute it and/or modify it under
7 : the terms of the GNU Lesser General Public License (LGPL) as published
8 : by the Free Software Foundation; either version 3 of the License, or
9 : (at your option) any later version.
10 :
11 : As a special exception, the Contributors give you permission to link
12 : this library with independent modules to produce an executable,
13 : regardless of the license terms of these independent modules, and to
14 : copy and distribute the resulting executable under terms of your choice,
15 : provided that you also meet, for each linked independent module, the
16 : terms and conditions of the license of that module. An independent
17 : module is a module which is not derived from or based on this library.
18 : If you modify this library, you must extend this exception to your
19 : version of the library.
20 :
21 : libzmq is distributed in the hope that it will be useful, but WITHOUT
22 : ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 : FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 : License for more details.
25 :
26 : You should have received a copy of the GNU Lesser General Public License
27 : along with this program. If not, see <http://www.gnu.org/licenses/>.
28 : */
29 :
30 : #ifndef __ZMQ_PIPE_HPP_INCLUDED__
31 : #define __ZMQ_PIPE_HPP_INCLUDED__
32 :
33 : #include "msg.hpp"
34 : #include "ypipe_base.hpp"
35 : #include "config.hpp"
36 : #include "object.hpp"
37 : #include "stdint.hpp"
38 : #include "array.hpp"
39 : #include "blob.hpp"
40 :
41 : namespace zmq
42 : {
43 :
44 : class object_t;
45 : class pipe_t;
46 :
47 : // Create a pipepair for bi-directional transfer of messages.
48 : // First HWM is for messages passed from first pipe to the second pipe.
49 : // Second HWM is for messages passed from second pipe to the first pipe.
50 : // Delay specifies how the pipe behaves when the peer terminates. If true
51 : // pipe receives all the pending messages before terminating, otherwise it
52 : // terminates straight away.
53 : // If conflate is true, only the most recently arrived message could be
54 : // read (older messages are discarded)
55 : int pipepair (zmq::object_t *parents_ [2], zmq::pipe_t* pipes_ [2],
56 : int hwms_ [2], bool conflate_ [2]);
57 :
58 18455 : struct i_pipe_events
59 : {
60 18458 : virtual ~i_pipe_events () {}
61 :
62 : virtual void read_activated (zmq::pipe_t *pipe_) = 0;
63 : virtual void write_activated (zmq::pipe_t *pipe_) = 0;
64 : virtual void hiccuped (zmq::pipe_t *pipe_) = 0;
65 : virtual void pipe_terminated (zmq::pipe_t *pipe_) = 0;
66 : };
67 :
68 : // Note that pipe can be stored in three different arrays.
69 : // The array of inbound pipes (1), the array of outbound pipes (2) and
70 : // the generic array of pipes to be deallocated (3).
71 :
72 : class pipe_t :
73 : public object_t,
74 : public array_item_t <1>,
75 : public array_item_t <2>,
76 : public array_item_t <3>
77 : {
78 : // This allows pipepair to create pipe objects.
79 : friend int pipepair (zmq::object_t *parents_ [2], zmq::pipe_t* pipes_ [2],
80 : int hwms_ [2], bool conflate_ [2]);
81 :
82 : public:
83 :
84 : // Specifies the object to send events to.
85 : void set_event_sink (i_pipe_events *sink_);
86 :
87 : // Pipe endpoint can store an routing ID to be used by its clients.
88 : void set_routing_id (uint32_t routing_id_);
89 : uint32_t get_routing_id ();
90 :
91 : // Pipe endpoint can store an opaque ID to be used by its clients.
92 : void set_identity (const blob_t &identity_);
93 : blob_t get_identity ();
94 :
95 : blob_t get_credential () const;
96 :
97 : // Returns true if there is at least one message to read in the pipe.
98 : bool check_read ();
99 :
100 : // Reads a message to the underlying pipe.
101 : bool read (msg_t *msg_);
102 :
103 : // Checks whether messages can be written to the pipe. If the pipe is
104 : // closed or if writing the message would cause high watermark the
105 : // function returns false.
106 : bool check_write ();
107 :
108 : // Writes a message to the underlying pipe. Returns false if the
109 : // message does not pass check_write. If false, the message object
110 : // retains ownership of its message buffer.
111 : bool write (msg_t *msg_);
112 :
113 : // Remove unfinished parts of the outbound message from the pipe.
114 : void rollback ();
115 :
116 : // Flush the messages downstream.
117 : void flush ();
118 :
119 : // Temporarily disconnects the inbound message stream and drops
120 : // all the messages on the fly. Causes 'hiccuped' event to be generated
121 : // in the peer.
122 : void hiccup ();
123 :
124 : // Ensure the pipe won't block on receiving pipe_term.
125 : void set_nodelay ();
126 :
127 : // Ask pipe to terminate. The termination will happen asynchronously
128 : // and user will be notified about actual deallocation by 'terminated'
129 : // event. If delay is true, the pending messages will be processed
130 : // before actual shutdown.
131 : void terminate (bool delay_);
132 :
133 : // Set the high water marks.
134 : void set_hwms (int inhwm_, int outhwm_);
135 :
136 : // Set the boost to high water marks, used by inproc sockets so total hwm are sum of connect and bind sockets watermarks
137 : void set_hwms_boost(int inhwmboost_, int outhwmboost_);
138 :
139 : // Returns true if HWM is not reached
140 : bool check_hwm () const;
141 : private:
142 :
143 : // Type of the underlying lock-free pipe.
144 : typedef ypipe_base_t <msg_t> upipe_t;
145 :
146 : // Command handlers.
147 : void process_activate_read ();
148 : void process_activate_write (uint64_t msgs_read_);
149 : void process_hiccup (void *pipe_);
150 : void process_pipe_term ();
151 : void process_pipe_term_ack ();
152 :
153 : // Handler for delimiter read from the pipe.
154 : void process_delimiter ();
155 :
156 : // Constructor is private. Pipe can only be created using
157 : // pipepair function.
158 : pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_,
159 : int inhwm_, int outhwm_, bool conflate_);
160 :
161 : // Pipepair uses this function to let us know about
162 : // the peer pipe object.
163 : void set_peer (pipe_t *pipe_);
164 :
165 : // Destructor is private. Pipe objects destroy themselves.
166 : ~pipe_t ();
167 :
168 : // Underlying pipes for both directions.
169 : upipe_t *inpipe;
170 : upipe_t *outpipe;
171 :
172 : // Can the pipe be read from / written to?
173 : bool in_active;
174 : bool out_active;
175 :
176 : // High watermark for the outbound pipe.
177 : int hwm;
178 :
179 : // Low watermark for the inbound pipe.
180 : int lwm;
181 :
182 : // boosts for high and low watermarks, used with inproc sockets so hwm are sum of send and recv hmws on each side of pipe
183 : int inhwmboost;
184 : int outhwmboost;
185 :
186 : // Number of messages read and written so far.
187 : uint64_t msgs_read;
188 : uint64_t msgs_written;
189 :
190 : // Last received peer's msgs_read. The actual number in the peer
191 : // can be higher at the moment.
192 : uint64_t peers_msgs_read;
193 :
194 : // The pipe object on the other side of the pipepair.
195 : pipe_t *peer;
196 :
197 : // Sink to send events to.
198 : i_pipe_events *sink;
199 :
200 : // States of the pipe endpoint:
201 : // active: common state before any termination begins,
202 : // delimiter_received: delimiter was read from pipe before
203 : // term command was received,
204 : // waiting_for_delimiter: term command was already received
205 : // from the peer but there are still pending messages to read,
206 : // term_ack_sent: all pending messages were already read and
207 : // all we are waiting for is ack from the peer,
208 : // term_req_sent1: 'terminate' was explicitly called by the user,
209 : // term_req_sent2: user called 'terminate' and then we've got
210 : // term command from the peer as well.
211 : enum {
212 : active,
213 : delimiter_received,
214 : waiting_for_delimiter,
215 : term_ack_sent,
216 : term_req_sent1,
217 : term_req_sent2
218 : } state;
219 :
220 : // If true, we receive all the pending inbound messages before
221 : // terminating. If false, we terminate immediately when the peer
222 : // asks us to.
223 : bool delay;
224 :
225 : // Identity of the writer. Used uniquely by the reader side.
226 : blob_t identity;
227 :
228 : // Identity of the writer. Used uniquely by the reader side.
229 : int routing_id;
230 :
231 : // Pipe's credential.
232 : blob_t credential;
233 :
234 : // Returns true if the message is delimiter; false otherwise.
235 : static bool is_delimiter (const msg_t &msg_);
236 :
237 : // Computes appropriate low watermark from the given high watermark.
238 : static int compute_lwm (int hwm_);
239 :
240 : const bool conflate;
241 :
242 : // Disable copying.
243 : pipe_t (const pipe_t&);
244 : const pipe_t &operator = (const pipe_t&);
245 : };
246 :
247 : }
248 :
249 : #endif
|