Line data Source code
1 : /*
2 : Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 :
4 : This file is part of libzmq, the ZeroMQ core engine in C++.
5 :
6 : libzmq is free software; you can redistribute it and/or modify it under
7 : the terms of the GNU Lesser General Public License (LGPL) as published
8 : by the Free Software Foundation; either version 3 of the License, or
9 : (at your option) any later version.
10 :
11 : As a special exception, the Contributors give you permission to link
12 : this library with independent modules to produce an executable,
13 : regardless of the license terms of these independent modules, and to
14 : copy and distribute the resulting executable under terms of your choice,
15 : provided that you also meet, for each linked independent module, the
16 : terms and conditions of the license of that module. An independent
17 : module is a module which is not derived from or based on this library.
18 : If you modify this library, you must extend this exception to your
19 : version of the library.
20 :
21 : libzmq is distributed in the hope that it will be useful, but WITHOUT
22 : ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 : FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 : License for more details.
25 :
26 : You should have received a copy of the GNU Lesser General Public License
27 : along with this program. If not, see <http://www.gnu.org/licenses/>.
28 : */
29 :
30 : #ifndef __ZMQ_YQUEUE_HPP_INCLUDED__
31 : #define __ZMQ_YQUEUE_HPP_INCLUDED__
32 :
33 : #include <stdlib.h>
34 : #include <stddef.h>
35 :
36 : #include "err.hpp"
37 : #include "atomic_ptr.hpp"
38 :
39 : namespace zmq
40 : {
41 :
42 : // yqueue is an efficient queue implementation. The main goal is
43 : // to minimise number of allocations/deallocations needed. Thus yqueue
44 : // allocates/deallocates elements in batches of N.
45 : //
46 : // yqueue allows one thread to use push/back function and another one
47 : // to use pop/front functions. However, user must ensure that there's no
48 : // pop on the empty queue and that both threads don't access the same
49 : // element in unsynchronised manner.
50 : //
51 : // T is the type of the object in the queue.
52 : // N is granularity of the queue (how many pushes have to be done till
53 : // actual memory allocation is required).
54 : #ifdef HAVE_POSIX_MEMALIGN
55 : // ALIGN is the memory alignment size to use in the case where we have
56 : // posix_memalign available. Default value is 64, this alignment will
57 : // prevent two queue chunks from occupying the same CPU cache line on
58 : // architectures where cache lines are <= 64 bytes (e.g. most things
59 : // except POWER).
60 : template <typename T, int N, size_t ALIGN = 64> class yqueue_t
61 : #else
62 : template <typename T, int N> class yqueue_t
63 : #endif
64 : {
65 : public:
66 :
67 : // Create the queue.
68 28343 : inline yqueue_t ()
69 28343 : {
70 28343 : begin_chunk = allocate_chunk();
71 28343 : alloc_assert (begin_chunk);
72 28343 : begin_pos = 0;
73 28343 : back_chunk = NULL;
74 28343 : back_pos = 0;
75 28343 : end_chunk = begin_chunk;
76 28343 : end_pos = 0;
77 28343 : }
78 :
79 : // Destroy the queue.
80 28332 : inline ~yqueue_t ()
81 : {
82 : while (true) {
83 28381 : if (begin_chunk == end_chunk) {
84 28332 : free (begin_chunk);
85 : break;
86 : }
87 49 : chunk_t *o = begin_chunk;
88 49 : begin_chunk = begin_chunk->next;
89 49 : free (o);
90 : }
91 :
92 56664 : chunk_t *sc = spare_chunk.xchg (NULL);
93 28381 : free (sc);
94 28332 : }
95 :
96 : // Returns reference to the front element of the queue.
97 : // If the queue is empty, behaviour is undefined.
98 : inline T &front ()
99 : {
100 : return begin_chunk->values [begin_pos];
101 : }
102 :
103 : // Returns reference to the back element of the queue.
104 : // If the queue is empty, behaviour is undefined.
105 : inline T &back ()
106 : {
107 : return back_chunk->values [back_pos];
108 : }
109 :
110 : // Adds an element to the back end of the queue.
111 1653832 : inline void push ()
112 : {
113 1653832 : back_chunk = end_chunk;
114 1653832 : back_pos = end_pos;
115 :
116 1653832 : if (++end_pos != N)
117 1653832 : return;
118 :
119 24560 : chunk_t *sc = spare_chunk.xchg (NULL);
120 12280 : if (sc) {
121 6634 : end_chunk->next = sc;
122 6634 : sc->prev = end_chunk;
123 : } else {
124 5646 : end_chunk->next = allocate_chunk();
125 5646 : alloc_assert (end_chunk->next);
126 5646 : end_chunk->next->prev = end_chunk;
127 : }
128 12280 : end_chunk = end_chunk->next;
129 12280 : end_pos = 0;
130 : }
131 :
132 : // Removes element from the back end of the queue. In other words
133 : // it rollbacks last push to the queue. Take care: Caller is
134 : // responsible for destroying the object being unpushed.
135 : // The caller must also guarantee that the queue isn't empty when
136 : // unpush is called. It cannot be done automatically as the read
137 : // side of the queue can be managed by different, completely
138 : // unsynchronised thread.
139 39 : inline void unpush ()
140 : {
141 : // First, move 'back' one position backwards.
142 39 : if (back_pos)
143 39 : --back_pos;
144 : else {
145 0 : back_pos = N - 1;
146 0 : back_chunk = back_chunk->prev;
147 : }
148 :
149 : // Now, move 'end' position backwards. Note that obsolete end chunk
150 : // is not used as a spare chunk. The analysis shows that doing so
151 : // would require free and atomic operation per chunk deallocated
152 : // instead of a simple free.
153 39 : if (end_pos)
154 39 : --end_pos;
155 : else {
156 0 : end_pos = N - 1;
157 0 : end_chunk = end_chunk->prev;
158 0 : free (end_chunk->next);
159 0 : end_chunk->next = NULL;
160 : }
161 39 : }
162 :
163 : // Removes an element from the front end of the queue.
164 1616795 : inline void pop ()
165 : {
166 1616795 : if (++ begin_pos == N) {
167 12231 : chunk_t *o = begin_chunk;
168 12231 : begin_chunk = begin_chunk->next;
169 12231 : begin_chunk->prev = NULL;
170 12231 : begin_pos = 0;
171 :
172 : // 'o' has been more recently used than spare_chunk,
173 : // so for cache reasons we'll get rid of the spare and
174 : // use 'o' as the spare.
175 24462 : chunk_t *cs = spare_chunk.xchg (o);
176 12231 : free (cs);
177 : }
178 1616795 : }
179 :
180 : private:
181 :
182 : // Individual memory chunk to hold N elements.
183 : struct chunk_t
184 : {
185 : T values [N];
186 : chunk_t *prev;
187 : chunk_t *next;
188 : };
189 :
190 33988 : inline chunk_t *allocate_chunk ()
191 : {
192 : #ifdef HAVE_POSIX_MEMALIGN
193 : void *pv;
194 33988 : if (posix_memalign(&pv, ALIGN, sizeof (chunk_t)) == 0)
195 33989 : return (chunk_t*) pv;
196 : return NULL;
197 : #else
198 : return (chunk_t*) malloc (sizeof (chunk_t));
199 : #endif
200 : }
201 :
202 : // Back position may point to invalid memory if the queue is empty,
203 : // while begin & end positions are always valid. Begin position is
204 : // accessed exclusively be queue reader (front/pop), while back and
205 : // end positions are accessed exclusively by queue writer (back/push).
206 : chunk_t *begin_chunk;
207 : int begin_pos;
208 : chunk_t *back_chunk;
209 : int back_pos;
210 : chunk_t *end_chunk;
211 : int end_pos;
212 :
213 : // People are likely to produce and consume at similar rates. In
214 : // this scenario holding onto the most recently freed chunk saves
215 : // us from having to call malloc/free.
216 : atomic_ptr_t<chunk_t> spare_chunk;
217 :
218 : // Disable copying of yqueue.
219 : yqueue_t (const yqueue_t&);
220 : const yqueue_t &operator = (const yqueue_t&);
221 : };
222 :
223 : }
224 :
225 : #endif
|