LCOV - code coverage report
Current view: top level - home/h/core/forks/m4-libzmq/src - yqueue.hpp (source / functions) Hit Total Coverage
Test: zeromq-4.2.0 Code Coverage Lines: 51 57 89.5 %
Date: 2016-05-09 Functions: 12 13 92.3 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*
       2             :     Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
       3             : 
       4             :     This file is part of libzmq, the ZeroMQ core engine in C++.
       5             : 
       6             :     libzmq is free software; you can redistribute it and/or modify it under
       7             :     the terms of the GNU Lesser General Public License (LGPL) as published
       8             :     by the Free Software Foundation; either version 3 of the License, or
       9             :     (at your option) any later version.
      10             : 
      11             :     As a special exception, the Contributors give you permission to link
      12             :     this library with independent modules to produce an executable,
      13             :     regardless of the license terms of these independent modules, and to
      14             :     copy and distribute the resulting executable under terms of your choice,
      15             :     provided that you also meet, for each linked independent module, the
      16             :     terms and conditions of the license of that module. An independent
      17             :     module is a module which is not derived from or based on this library.
      18             :     If you modify this library, you must extend this exception to your
      19             :     version of the library.
      20             : 
      21             :     libzmq is distributed in the hope that it will be useful, but WITHOUT
      22             :     ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
      23             :     FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
      24             :     License for more details.
      25             : 
      26             :     You should have received a copy of the GNU Lesser General Public License
      27             :     along with this program.  If not, see <http://www.gnu.org/licenses/>.
      28             : */
      29             : 
      30             : #ifndef __ZMQ_YQUEUE_HPP_INCLUDED__
      31             : #define __ZMQ_YQUEUE_HPP_INCLUDED__
      32             : 
      33             : #include <stdlib.h>
      34             : #include <stddef.h>
      35             : 
      36             : #include "err.hpp"
      37             : #include "atomic_ptr.hpp"
      38             : 
      39             : namespace zmq
      40             : {
      41             : 
      42             :     //  yqueue is an efficient queue implementation. The main goal is
      43             :     //  to minimise number of allocations/deallocations needed. Thus yqueue
      44             :     //  allocates/deallocates elements in batches of N.
      45             :     //
      46             :     //  yqueue allows one thread to use push/back function and another one
      47             :     //  to use pop/front functions. However, user must ensure that there's no
      48             :     //  pop on the empty queue and that both threads don't access the same
      49             :     //  element in unsynchronised manner.
      50             :     //
      51             :     //  T is the type of the object in the queue.
      52             :     //  N is granularity of the queue (how many pushes have to be done till
      53             :     //  actual memory allocation is required).
      54             : #ifdef HAVE_POSIX_MEMALIGN
      55             :     // ALIGN is the memory alignment size to use in the case where we have
      56             :     // posix_memalign available. Default value is 64, this alignment will
      57             :     // prevent two queue chunks from occupying the same CPU cache line on
      58             :     // architectures where cache lines are <= 64 bytes (e.g. most things
      59             :     // except POWER).
      60             :     template <typename T, int N, size_t ALIGN = 64> class yqueue_t
      61             : #else
      62             :     template <typename T, int N> class yqueue_t
      63             : #endif
      64             :     {
      65             :     public:
      66             : 
      67             :         //  Create the queue.
      68       28343 :         inline yqueue_t ()
      69       28343 :         {
      70       28343 :              begin_chunk = allocate_chunk();
      71       28343 :              alloc_assert (begin_chunk);
      72       28343 :              begin_pos = 0;
      73       28343 :              back_chunk = NULL;
      74       28343 :              back_pos = 0;
      75       28343 :              end_chunk = begin_chunk;
      76       28343 :              end_pos = 0;
      77       28343 :         }
      78             : 
      79             :         //  Destroy the queue.
      80       28332 :         inline ~yqueue_t ()
      81             :         {
      82             :             while (true) {
      83       28381 :                 if (begin_chunk == end_chunk) {
      84       28332 :                     free (begin_chunk);
      85             :                     break;
      86             :                 }
      87          49 :                 chunk_t *o = begin_chunk;
      88          49 :                 begin_chunk = begin_chunk->next;
      89          49 :                 free (o);
      90             :             }
      91             : 
      92       56664 :             chunk_t *sc = spare_chunk.xchg (NULL);
      93       28381 :             free (sc);
      94       28332 :         }
      95             : 
      96             :         //  Returns reference to the front element of the queue.
      97             :         //  If the queue is empty, behaviour is undefined.
      98             :         inline T &front ()
      99             :         {
     100             :              return begin_chunk->values [begin_pos];
     101             :         }
     102             : 
     103             :         //  Returns reference to the back element of the queue.
     104             :         //  If the queue is empty, behaviour is undefined.
     105             :         inline T &back ()
     106             :         {
     107             :             return back_chunk->values [back_pos];
     108             :         }
     109             : 
     110             :         //  Adds an element to the back end of the queue.
     111     1653832 :         inline void push ()
     112             :         {
     113     1653832 :             back_chunk = end_chunk;
     114     1653832 :             back_pos = end_pos;
     115             : 
     116     1653832 :             if (++end_pos != N)
     117     1653832 :                 return;
     118             : 
     119       24560 :             chunk_t *sc = spare_chunk.xchg (NULL);
     120       12280 :             if (sc) {
     121        6634 :                 end_chunk->next = sc;
     122        6634 :                 sc->prev = end_chunk;
     123             :             } else {
     124        5646 :                 end_chunk->next = allocate_chunk();
     125        5646 :                 alloc_assert (end_chunk->next);
     126        5646 :                 end_chunk->next->prev = end_chunk;
     127             :             }
     128       12280 :             end_chunk = end_chunk->next;
     129       12280 :             end_pos = 0;
     130             :         }
     131             : 
     132             :         //  Removes element from the back end of the queue. In other words
     133             :         //  it rollbacks last push to the queue. Take care: Caller is
     134             :         //  responsible for destroying the object being unpushed.
     135             :         //  The caller must also guarantee that the queue isn't empty when
     136             :         //  unpush is called. It cannot be done automatically as the read
     137             :         //  side of the queue can be managed by different, completely
     138             :         //  unsynchronised thread.
     139          39 :         inline void unpush ()
     140             :         {
     141             :             //  First, move 'back' one position backwards.
     142          39 :             if (back_pos)
     143          39 :                 --back_pos;
     144             :             else {
     145           0 :                 back_pos = N - 1;
     146           0 :                 back_chunk = back_chunk->prev;
     147             :             }
     148             : 
     149             :             //  Now, move 'end' position backwards. Note that obsolete end chunk
     150             :             //  is not used as a spare chunk. The analysis shows that doing so
     151             :             //  would require free and atomic operation per chunk deallocated
     152             :             //  instead of a simple free.
     153          39 :             if (end_pos)
     154          39 :                 --end_pos;
     155             :             else {
     156           0 :                 end_pos = N - 1;
     157           0 :                 end_chunk = end_chunk->prev;
     158           0 :                 free (end_chunk->next);
     159           0 :                 end_chunk->next = NULL;
     160             :             }
     161          39 :         }
     162             : 
     163             :         //  Removes an element from the front end of the queue.
     164     1616795 :         inline void pop ()
     165             :         {
     166     1616795 :             if (++ begin_pos == N) {
     167       12231 :                 chunk_t *o = begin_chunk;
     168       12231 :                 begin_chunk = begin_chunk->next;
     169       12231 :                 begin_chunk->prev = NULL;
     170       12231 :                 begin_pos = 0;
     171             : 
     172             :                 //  'o' has been more recently used than spare_chunk,
     173             :                 //  so for cache reasons we'll get rid of the spare and
     174             :                 //  use 'o' as the spare.
     175       24462 :                 chunk_t *cs = spare_chunk.xchg (o);
     176       12231 :                 free (cs);
     177             :             }
     178     1616795 :         }
     179             : 
     180             :     private:
     181             : 
     182             :         //  Individual memory chunk to hold N elements.
     183             :         struct chunk_t
     184             :         {
     185             :              T values [N];
     186             :              chunk_t *prev;
     187             :              chunk_t *next;
     188             :         };
     189             : 
     190       33988 :         inline chunk_t *allocate_chunk ()
     191             :         {
     192             : #ifdef HAVE_POSIX_MEMALIGN
     193             :                 void *pv;
     194       33988 :                 if (posix_memalign(&pv, ALIGN, sizeof (chunk_t)) == 0)
     195       33989 :                     return (chunk_t*) pv;
     196             :                 return NULL;
     197             : #else
     198             :                 return (chunk_t*) malloc (sizeof (chunk_t));
     199             : #endif
     200             :         }
     201             : 
     202             :         //  Back position may point to invalid memory if the queue is empty,
     203             :         //  while begin & end positions are always valid. Begin position is
     204             :         //  accessed exclusively be queue reader (front/pop), while back and
     205             :         //  end positions are accessed exclusively by queue writer (back/push).
     206             :         chunk_t *begin_chunk;
     207             :         int begin_pos;
     208             :         chunk_t *back_chunk;
     209             :         int back_pos;
     210             :         chunk_t *end_chunk;
     211             :         int end_pos;
     212             : 
     213             :         //  People are likely to produce and consume at similar rates.  In
     214             :         //  this scenario holding onto the most recently freed chunk saves
     215             :         //  us from having to call malloc/free.
     216             :         atomic_ptr_t<chunk_t> spare_chunk;
     217             : 
     218             :         //  Disable copying of yqueue.
     219             :         yqueue_t (const yqueue_t&);
     220             :         const yqueue_t &operator = (const yqueue_t&);
     221             :     };
     222             : 
     223             : }
     224             : 
     225             : #endif

Generated by: LCOV version 1.10