libzmq  master
ZeroMQ C++ Core Engine (LIBZMQ)
yqueue.hpp
Go to the documentation of this file.
1 /*
2  Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 
4  This file is part of libzmq, the ZeroMQ core engine in C++.
5 
6  libzmq is free software; you can redistribute it and/or modify it under
7  the terms of the GNU Lesser General Public License (LGPL) as published
8  by the Free Software Foundation; either version 3 of the License, or
9  (at your option) any later version.
10 
11  As a special exception, the Contributors give you permission to link
12  this library with independent modules to produce an executable,
13  regardless of the license terms of these independent modules, and to
14  copy and distribute the resulting executable under terms of your choice,
15  provided that you also meet, for each linked independent module, the
16  terms and conditions of the license of that module. An independent
17  module is a module which is not derived from or based on this library.
18  If you modify this library, you must extend this exception to your
19  version of the library.
20 
21  libzmq is distributed in the hope that it will be useful, but WITHOUT
22  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23  FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24  License for more details.
25 
26  You should have received a copy of the GNU Lesser General Public License
27  along with this program. If not, see <http://www.gnu.org/licenses/>.
28 */
29 
30 #ifndef __ZMQ_YQUEUE_HPP_INCLUDED__
31 #define __ZMQ_YQUEUE_HPP_INCLUDED__
32 
33 #include <stdlib.h>
34 #include <stddef.h>
35 
36 #include "err.hpp"
37 #include "atomic_ptr.hpp"
38 
39 namespace zmq
40 {
41 
42  // yqueue is an efficient queue implementation. The main goal is
43  // to minimise number of allocations/deallocations needed. Thus yqueue
44  // allocates/deallocates elements in batches of N.
45  //
46  // yqueue allows one thread to use push/back function and another one
47  // to use pop/front functions. However, user must ensure that there's no
48  // pop on the empty queue and that both threads don't access the same
49  // element in unsynchronised manner.
50  //
51  // T is the type of the object in the queue.
52  // N is granularity of the queue (how many pushes have to be done till
53  // actual memory allocation is required).
54 #ifdef HAVE_POSIX_MEMALIGN
55  // ALIGN is the memory alignment size to use in the case where we have
56  // posix_memalign available. Default value is 64, this alignment will
57  // prevent two queue chunks from occupying the same CPU cache line on
58  // architectures where cache lines are <= 64 bytes (e.g. most things
59  // except POWER).
60  template <typename T, int N, size_t ALIGN = 64> class yqueue_t
61 #else
62  template <typename T, int N> class yqueue_t
63 #endif
64  {
65  public:
66 
67  // Create the queue.
68  inline yqueue_t ()
69  {
72  begin_pos = 0;
73  back_chunk = NULL;
74  back_pos = 0;
76  end_pos = 0;
77  }
78 
79  // Destroy the queue.
80  inline ~yqueue_t ()
81  {
82  while (true) {
83  if (begin_chunk == end_chunk) {
84  free (begin_chunk);
85  break;
86  }
87  chunk_t *o = begin_chunk;
89  free (o);
90  }
91 
92  chunk_t *sc = spare_chunk.xchg (NULL);
93  free (sc);
94  }
95 
96  // Returns reference to the front element of the queue.
97  // If the queue is empty, behaviour is undefined.
98  inline T &front ()
99  {
100  return begin_chunk->values [begin_pos];
101  }
102 
103  // Returns reference to the back element of the queue.
104  // If the queue is empty, behaviour is undefined.
105  inline T &back ()
106  {
107  return back_chunk->values [back_pos];
108  }
109 
110  // Adds an element to the back end of the queue.
111  inline void push ()
112  {
114  back_pos = end_pos;
115 
116  if (++end_pos != N)
117  return;
118 
119  chunk_t *sc = spare_chunk.xchg (NULL);
120  if (sc) {
121  end_chunk->next = sc;
122  sc->prev = end_chunk;
123  } else {
127  }
129  end_pos = 0;
130  }
131 
132  // Removes element from the back end of the queue. In other words
133  // it rollbacks last push to the queue. Take care: Caller is
134  // responsible for destroying the object being unpushed.
135  // The caller must also guarantee that the queue isn't empty when
136  // unpush is called. It cannot be done automatically as the read
137  // side of the queue can be managed by different, completely
138  // unsynchronised thread.
139  inline void unpush ()
140  {
141  // First, move 'back' one position backwards.
142  if (back_pos)
143  --back_pos;
144  else {
145  back_pos = N - 1;
147  }
148 
149  // Now, move 'end' position backwards. Note that obsolete end chunk
150  // is not used as a spare chunk. The analysis shows that doing so
151  // would require free and atomic operation per chunk deallocated
152  // instead of a simple free.
153  if (end_pos)
154  --end_pos;
155  else {
156  end_pos = N - 1;
158  free (end_chunk->next);
159  end_chunk->next = NULL;
160  }
161  }
162 
163  // Removes an element from the front end of the queue.
164  inline void pop ()
165  {
166  if (++ begin_pos == N) {
167  chunk_t *o = begin_chunk;
169  begin_chunk->prev = NULL;
170  begin_pos = 0;
171 
172  // 'o' has been more recently used than spare_chunk,
173  // so for cache reasons we'll get rid of the spare and
174  // use 'o' as the spare.
175  chunk_t *cs = spare_chunk.xchg (o);
176  free (cs);
177  }
178  }
179 
180  private:
181 
182  // Individual memory chunk to hold N elements.
183  struct chunk_t
184  {
185  T values [N];
188  };
189 
190  inline chunk_t *allocate_chunk ()
191  {
192 #ifdef HAVE_POSIX_MEMALIGN
193  void *pv;
194  if (posix_memalign(&pv, ALIGN, sizeof (chunk_t)) == 0)
195  return (chunk_t*) pv;
196  return NULL;
197 #else
198  return (chunk_t*) malloc (sizeof (chunk_t));
199 #endif
200  }
201 
202  // Back position may point to invalid memory if the queue is empty,
203  // while begin & end positions are always valid. Begin position is
204  // accessed exclusively be queue reader (front/pop), while back and
205  // end positions are accessed exclusively by queue writer (back/push).
206  chunk_t *begin_chunk;
208  chunk_t *back_chunk;
209  int back_pos;
210  chunk_t *end_chunk;
211  int end_pos;
212 
213  // People are likely to produce and consume at similar rates. In
214  // this scenario holding onto the most recently freed chunk saves
215  // us from having to call malloc/free.
217 
218  // Disable copying of yqueue.
219  yqueue_t (const yqueue_t&);
220  const yqueue_t &operator = (const yqueue_t&);
221  };
222 
223 }
224 
225 #endif
const yqueue_t & operator=(const yqueue_t &)
chunk_t * back_chunk
Definition: yqueue.hpp:208
atomic_ptr_t< chunk_t > spare_chunk
Definition: yqueue.hpp:216
chunk_t * allocate_chunk()
Definition: yqueue.hpp:190
void pop()
Definition: yqueue.hpp:164
chunk_t * end_chunk
Definition: yqueue.hpp:210
T & back()
Definition: yqueue.hpp:105
void unpush()
Definition: yqueue.hpp:139
#define alloc_assert(x)
Definition: err.hpp:159
chunk_t * begin_chunk
Definition: yqueue.hpp:206
T & front()
Definition: yqueue.hpp:98
Definition: address.hpp:35
void push()
Definition: yqueue.hpp:111