LCOV - code coverage report
Current view: top level - home/h/core/forks/m4-libzmq/src - ypipe.hpp (source / functions) Hit Total Coverage
Test: zeromq-4.2.0 Code Coverage Lines: 41 41 100.0 %
Date: 2016-05-09 Functions: 13 18 72.2 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*
       2             :     Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
       3             : 
       4             :     This file is part of libzmq, the ZeroMQ core engine in C++.
       5             : 
       6             :     libzmq is free software; you can redistribute it and/or modify it under
       7             :     the terms of the GNU Lesser General Public License (LGPL) as published
       8             :     by the Free Software Foundation; either version 3 of the License, or
       9             :     (at your option) any later version.
      10             : 
      11             :     As a special exception, the Contributors give you permission to link
      12             :     this library with independent modules to produce an executable,
      13             :     regardless of the license terms of these independent modules, and to
      14             :     copy and distribute the resulting executable under terms of your choice,
      15             :     provided that you also meet, for each linked independent module, the
      16             :     terms and conditions of the license of that module. An independent
      17             :     module is a module which is not derived from or based on this library.
      18             :     If you modify this library, you must extend this exception to your
      19             :     version of the library.
      20             : 
      21             :     libzmq is distributed in the hope that it will be useful, but WITHOUT
      22             :     ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
      23             :     FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
      24             :     License for more details.
      25             : 
      26             :     You should have received a copy of the GNU Lesser General Public License
      27             :     along with this program.  If not, see <http://www.gnu.org/licenses/>.
      28             : */
      29             : 
      30             : #ifndef __ZMQ_YPIPE_HPP_INCLUDED__
      31             : #define __ZMQ_YPIPE_HPP_INCLUDED__
      32             : 
      33             : #include "atomic_ptr.hpp"
      34             : #include "yqueue.hpp"
      35             : #include "platform.hpp"
      36             : #include "ypipe_base.hpp"
      37             : 
      38             : namespace zmq
      39             : {
      40             : 
      41             :     //  Lock-free queue implementation.
      42             :     //  Only a single thread can read from the pipe at any specific moment.
      43             :     //  Only a single thread can write to the pipe at any specific moment.
      44             :     //  T is the type of the object in the queue.
      45             :     //  N is granularity of the pipe, i.e. how many items are needed to
      46             :     //  perform next memory allocation.
      47             : 
      48             :     template <typename T, int N> class ypipe_t : public ypipe_base_t <T>
      49             :     {
      50             :     public:
      51             : 
      52             :         //  Initialises the pipe.
      53       28343 :         inline ypipe_t ()
      54      113372 :         {
      55             :             //  Insert terminator element into the queue.
      56       28343 :             queue.push ();
      57             : 
      58             :             //  Let all the pointers to point to the terminator.
      59             :             //  (unless pipe is dead, in which case c is set to NULL).
      60       85029 :             r = w = f = &queue.back ();
      61       28343 :             c.set (&queue.back ());
      62       28343 :         }
      63             : 
      64             :         //  The destructor doesn't have to be virtual. It is made virtual
      65             :         //  just to keep ICC and code checking tools from complaining.
      66       15687 :         inline virtual ~ypipe_t ()
      67             :         {
      68       44025 :         }
      69             : 
      70             :         //  Following function (write) deliberately copies uninitialised data
      71             :         //  when used with zmq_msg. Initialising the VSM body for
      72             :         //  non-VSM messages won't be good for performance.
      73             : 
      74             : #ifdef ZMQ_HAVE_OPENVMS
      75             : #pragma message save
      76             : #pragma message disable(UNINIT)
      77             : #endif
      78             : 
      79             :         //  Write an item to the pipe.  Don't flush it yet. If incomplete is
      80             :         //  set to true the item is assumed to be continued by items
      81             :         //  subsequently written to the pipe. Incomplete items are never
      82             :         //  flushed down the stream.
      83     1625549 :         inline void write (const T &value_, bool incomplete_)
      84             :         {
      85             :             //  Place the value to the queue, add new terminator element.
      86     4872845 :             queue.back () = value_;
      87     1625549 :             queue.push ();
      88             : 
      89             :             //  Move the "flush up to here" poiter.
      90     1625376 :             if (!incomplete_)
      91     4870944 :                 f = &queue.back ();
      92     1625376 :         }
      93             : 
      94             : #ifdef ZMQ_HAVE_OPENVMS
      95             : #pragma message restore
      96             : #endif
      97             : 
      98             :         //  Pop an incomplete item from the pipe. Returns true if such
      99             :         //  item exists, false otherwise.
     100       11878 :         inline bool unwrite (T *value_)
     101             :         {
     102       11956 :             if (f == &queue.back ())
     103             :                 return false;
     104          39 :             queue.unpush ();
     105         117 :             *value_ = queue.back ();
     106          39 :             return true;
     107             :         }
     108             : 
     109             :         //  Flush all the completed items into the pipe. Returns false if
     110             :         //  the reader thread is sleeping. In that case, caller is obliged to
     111             :         //  wake the reader up before using the pipe again.
     112     1009988 :         inline bool flush ()
     113             :         {
     114             :             //  If there are no un-flushed items, do nothing.
     115     1009988 :             if (w == f)
     116             :                 return true;
     117             : 
     118             :             //  Try to set 'c' to 'f'.
     119     2000554 :             if (c.cas (w, f) != w) {
     120             : 
     121             :                 //  Compare-and-swap was unseccessful because 'c' is NULL.
     122             :                 //  This means that the reader is asleep. Therefore we don't
     123             :                 //  care about thread-safeness and update c in non-atomic
     124             :                 //  manner. We'll return false to let the caller know
     125             :                 //  that reader is sleeping.
     126       50173 :                 c.set (f);
     127       50173 :                 w = f;
     128       50173 :                 return false;
     129             :             }
     130             : 
     131             :             //  Reader is alive. Nothing special to do now. Just move
     132             :             //  the 'first un-flushed item' pointer to 'f'.
     133      950104 :             w = f;
     134      950104 :             return true;
     135             :         }
     136             : 
     137             :         //  Check whether item is available for reading.
     138     1697964 :         inline bool check_read ()
     139             :         {
     140             :             //  Was the value prefetched already? If so, return.
     141     2039502 :             if (&queue.front () != r && r)
     142             :                  return true;
     143             : 
     144             :             //  There's no prefetched value, so let us prefetch more values.
     145             :             //  Prefetching is to simply retrieve the
     146             :             //  pointer from c in atomic fashion. If there are no
     147             :             //  items to prefetch, set c to NULL (using compare-and-swap).
     148      341538 :             r = c.cas (&queue.front (), NULL);
     149             : 
     150             :             //  If there are no elements prefetched, exit.
     151             :             //  During pipe's lifetime r should never be NULL, however,
     152             :             //  it can happen during pipe shutdown when items
     153             :             //  are being deallocated.
     154      512307 :             if (&queue.front () == r || !r)
     155             :                 return false;
     156             : 
     157             :             //  There was at least one value prefetched.
     158       88916 :             return true;
     159             :         }
     160             : 
     161             :         //  Reads an item from the pipe. Returns false if there is no value.
     162             :         //  available.
     163     1695796 :         inline bool read (T *value_)
     164             :         {
     165             :             //  Try to prefetch a value.
     166     1695796 :             if (!check_read ())
     167             :                 return false;
     168             : 
     169             :             //  There was at least one value prefetched.
     170             :             //  Return it to the caller.
     171     1616721 :             *value_ = queue.front ();
     172     1616721 :             queue.pop ();
     173     1616819 :             return true;
     174             :         }
     175             : 
     176             :         //  Applies the function fn to the first elemenent in the pipe
     177             :         //  and returns the value returned by the fn.
     178             :         //  The pipe mustn't be empty or the function crashes.
     179         377 :         inline bool probe (bool (*fn)(const T &))
     180             :         {
     181         377 :             bool rc = check_read ();
     182         377 :             zmq_assert (rc);
     183             : 
     184         377 :             return (*fn) (queue.front ());
     185             :         }
     186             : 
     187             :     protected:
     188             : 
     189             :         //  Allocation-efficient queue to store pipe items.
     190             :         //  Front of the queue points to the first prefetched item, back of
     191             :         //  the pipe points to last un-flushed item. Front is used only by
     192             :         //  reader thread, while back is used only by writer thread.
     193             :         yqueue_t <T, N> queue;
     194             : 
     195             :         //  Points to the first un-flushed item. This variable is used
     196             :         //  exclusively by writer thread.
     197             :         T *w;
     198             : 
     199             :         //  Points to the first un-prefetched item. This variable is used
     200             :         //  exclusively by reader thread.
     201             :         T *r;
     202             : 
     203             :         //  Points to the first item to be flushed in the future.
     204             :         T *f;
     205             : 
     206             :         //  The single point of contention between writer and reader thread.
     207             :         //  Points past the last flushed item. If it is NULL,
     208             :         //  reader is asleep. This pointer should be always accessed using
     209             :         //  atomic operations.
     210             :         atomic_ptr_t <T> c;
     211             : 
     212             :         //  Disable copying of ypipe object.
     213             :         ypipe_t (const ypipe_t&);
     214             :         const ypipe_t &operator = (const ypipe_t&);
     215             :     };
     216             : 
     217             : }
     218             : 
     219             : #endif

Generated by: LCOV version 1.10