Line data Source code
1 : /*
2 : Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 :
4 : This file is part of libzmq, the ZeroMQ core engine in C++.
5 :
6 : libzmq is free software; you can redistribute it and/or modify it under
7 : the terms of the GNU Lesser General Public License (LGPL) as published
8 : by the Free Software Foundation; either version 3 of the License, or
9 : (at your option) any later version.
10 :
11 : As a special exception, the Contributors give you permission to link
12 : this library with independent modules to produce an executable,
13 : regardless of the license terms of these independent modules, and to
14 : copy and distribute the resulting executable under terms of your choice,
15 : provided that you also meet, for each linked independent module, the
16 : terms and conditions of the license of that module. An independent
17 : module is a module which is not derived from or based on this library.
18 : If you modify this library, you must extend this exception to your
19 : version of the library.
20 :
21 : libzmq is distributed in the hope that it will be useful, but WITHOUT
22 : ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 : FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 : License for more details.
25 :
26 : You should have received a copy of the GNU Lesser General Public License
27 : along with this program. If not, see <http://www.gnu.org/licenses/>.
28 : */
29 :
30 : #ifndef __ZMQ_YPIPE_HPP_INCLUDED__
31 : #define __ZMQ_YPIPE_HPP_INCLUDED__
32 :
33 : #include "atomic_ptr.hpp"
34 : #include "yqueue.hpp"
35 : #include "platform.hpp"
36 : #include "ypipe_base.hpp"
37 :
38 : namespace zmq
39 : {
40 :
41 : // Lock-free queue implementation.
42 : // Only a single thread can read from the pipe at any specific moment.
43 : // Only a single thread can write to the pipe at any specific moment.
44 : // T is the type of the object in the queue.
45 : // N is granularity of the pipe, i.e. how many items are needed to
46 : // perform next memory allocation.
47 :
48 : template <typename T, int N> class ypipe_t : public ypipe_base_t <T>
49 : {
50 : public:
51 :
52 : // Initialises the pipe.
53 28343 : inline ypipe_t ()
54 113372 : {
55 : // Insert terminator element into the queue.
56 28343 : queue.push ();
57 :
58 : // Let all the pointers to point to the terminator.
59 : // (unless pipe is dead, in which case c is set to NULL).
60 85029 : r = w = f = &queue.back ();
61 28343 : c.set (&queue.back ());
62 28343 : }
63 :
64 : // The destructor doesn't have to be virtual. It is made virtual
65 : // just to keep ICC and code checking tools from complaining.
66 15687 : inline virtual ~ypipe_t ()
67 : {
68 44025 : }
69 :
70 : // Following function (write) deliberately copies uninitialised data
71 : // when used with zmq_msg. Initialising the VSM body for
72 : // non-VSM messages won't be good for performance.
73 :
74 : #ifdef ZMQ_HAVE_OPENVMS
75 : #pragma message save
76 : #pragma message disable(UNINIT)
77 : #endif
78 :
79 : // Write an item to the pipe. Don't flush it yet. If incomplete is
80 : // set to true the item is assumed to be continued by items
81 : // subsequently written to the pipe. Incomplete items are never
82 : // flushed down the stream.
83 1625549 : inline void write (const T &value_, bool incomplete_)
84 : {
85 : // Place the value to the queue, add new terminator element.
86 4872845 : queue.back () = value_;
87 1625549 : queue.push ();
88 :
89 : // Move the "flush up to here" poiter.
90 1625376 : if (!incomplete_)
91 4870944 : f = &queue.back ();
92 1625376 : }
93 :
94 : #ifdef ZMQ_HAVE_OPENVMS
95 : #pragma message restore
96 : #endif
97 :
98 : // Pop an incomplete item from the pipe. Returns true if such
99 : // item exists, false otherwise.
100 11878 : inline bool unwrite (T *value_)
101 : {
102 11956 : if (f == &queue.back ())
103 : return false;
104 39 : queue.unpush ();
105 117 : *value_ = queue.back ();
106 39 : return true;
107 : }
108 :
109 : // Flush all the completed items into the pipe. Returns false if
110 : // the reader thread is sleeping. In that case, caller is obliged to
111 : // wake the reader up before using the pipe again.
112 1009988 : inline bool flush ()
113 : {
114 : // If there are no un-flushed items, do nothing.
115 1009988 : if (w == f)
116 : return true;
117 :
118 : // Try to set 'c' to 'f'.
119 2000554 : if (c.cas (w, f) != w) {
120 :
121 : // Compare-and-swap was unseccessful because 'c' is NULL.
122 : // This means that the reader is asleep. Therefore we don't
123 : // care about thread-safeness and update c in non-atomic
124 : // manner. We'll return false to let the caller know
125 : // that reader is sleeping.
126 50173 : c.set (f);
127 50173 : w = f;
128 50173 : return false;
129 : }
130 :
131 : // Reader is alive. Nothing special to do now. Just move
132 : // the 'first un-flushed item' pointer to 'f'.
133 950104 : w = f;
134 950104 : return true;
135 : }
136 :
137 : // Check whether item is available for reading.
138 1697964 : inline bool check_read ()
139 : {
140 : // Was the value prefetched already? If so, return.
141 2039502 : if (&queue.front () != r && r)
142 : return true;
143 :
144 : // There's no prefetched value, so let us prefetch more values.
145 : // Prefetching is to simply retrieve the
146 : // pointer from c in atomic fashion. If there are no
147 : // items to prefetch, set c to NULL (using compare-and-swap).
148 341538 : r = c.cas (&queue.front (), NULL);
149 :
150 : // If there are no elements prefetched, exit.
151 : // During pipe's lifetime r should never be NULL, however,
152 : // it can happen during pipe shutdown when items
153 : // are being deallocated.
154 512307 : if (&queue.front () == r || !r)
155 : return false;
156 :
157 : // There was at least one value prefetched.
158 88916 : return true;
159 : }
160 :
161 : // Reads an item from the pipe. Returns false if there is no value.
162 : // available.
163 1695796 : inline bool read (T *value_)
164 : {
165 : // Try to prefetch a value.
166 1695796 : if (!check_read ())
167 : return false;
168 :
169 : // There was at least one value prefetched.
170 : // Return it to the caller.
171 1616721 : *value_ = queue.front ();
172 1616721 : queue.pop ();
173 1616819 : return true;
174 : }
175 :
176 : // Applies the function fn to the first elemenent in the pipe
177 : // and returns the value returned by the fn.
178 : // The pipe mustn't be empty or the function crashes.
179 377 : inline bool probe (bool (*fn)(const T &))
180 : {
181 377 : bool rc = check_read ();
182 377 : zmq_assert (rc);
183 :
184 377 : return (*fn) (queue.front ());
185 : }
186 :
187 : protected:
188 :
189 : // Allocation-efficient queue to store pipe items.
190 : // Front of the queue points to the first prefetched item, back of
191 : // the pipe points to last un-flushed item. Front is used only by
192 : // reader thread, while back is used only by writer thread.
193 : yqueue_t <T, N> queue;
194 :
195 : // Points to the first un-flushed item. This variable is used
196 : // exclusively by writer thread.
197 : T *w;
198 :
199 : // Points to the first un-prefetched item. This variable is used
200 : // exclusively by reader thread.
201 : T *r;
202 :
203 : // Points to the first item to be flushed in the future.
204 : T *f;
205 :
206 : // The single point of contention between writer and reader thread.
207 : // Points past the last flushed item. If it is NULL,
208 : // reader is asleep. This pointer should be always accessed using
209 : // atomic operations.
210 : atomic_ptr_t <T> c;
211 :
212 : // Disable copying of ypipe object.
213 : ypipe_t (const ypipe_t&);
214 : const ypipe_t &operator = (const ypipe_t&);
215 : };
216 :
217 : }
218 :
219 : #endif
|