libzmq  master
ZeroMQ C++ Core Engine (LIBZMQ)
dbuffer.hpp
Go to the documentation of this file.
1 /*
2  Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 
4  This file is part of libzmq, the ZeroMQ core engine in C++.
5 
6  libzmq is free software; you can redistribute it and/or modify it under
7  the terms of the GNU Lesser General Public License (LGPL) as published
8  by the Free Software Foundation; either version 3 of the License, or
9  (at your option) any later version.
10 
11  As a special exception, the Contributors give you permission to link
12  this library with independent modules to produce an executable,
13  regardless of the license terms of these independent modules, and to
14  copy and distribute the resulting executable under terms of your choice,
15  provided that you also meet, for each linked independent module, the
16  terms and conditions of the license of that module. An independent
17  module is a module which is not derived from or based on this library.
18  If you modify this library, you must extend this exception to your
19  version of the library.
20 
21  libzmq is distributed in the hope that it will be useful, but WITHOUT
22  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23  FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24  License for more details.
25 
26  You should have received a copy of the GNU Lesser General Public License
27  along with this program. If not, see <http://www.gnu.org/licenses/>.
28 */
29 
30 #ifndef __ZMQ_DBUFFER_HPP_INCLUDED__
31 #define __ZMQ_DBUFFER_HPP_INCLUDED__
32 
33 #include <stdlib.h>
34 #include <stddef.h>
35 #include <algorithm>
36 
37 #include "mutex.hpp"
38 #include "msg.hpp"
39 
40 namespace zmq
41 {
42 
43  // dbuffer is a single-producer single-consumer double-buffer
44  // implementation.
45  //
46  // The producer writes to a back buffer and then tries to swap
47  // pointers between the back and front buffers. If it fails,
48  // due to the consumer reading from the front buffer, it just
49  // gives up, which is ok since writes are many and redundant.
50  //
51  // The reader simply reads from the front buffer.
52  //
53  // has_msg keeps track of whether there has been a not yet read
54  // value written, it is used by ypipe_conflate to mimic ypipe
55  // functionality regarding a reader being asleep
56 
57  template <typename T> class dbuffer_t;
58 
59  template <> class dbuffer_t<msg_t>
60  {
61  public:
62 
63  inline dbuffer_t ()
64  : back (&storage[0])
65  , front (&storage[1])
66  , has_msg (false)
67  {
68  back->init ();
69  front->init ();
70  }
71 
72  inline ~dbuffer_t()
73  {
74  back->close ();
75  front->close ();
76  }
77 
78  inline void write (const msg_t &value_)
79  {
80  msg_t& xvalue = const_cast<msg_t&>(value_);
81 
82  zmq_assert (xvalue.check ());
83  back->move (xvalue); // cannot just overwrite, might leak
84 
85  zmq_assert (back->check ());
86 
87  if (sync.try_lock ())
88  {
89  std::swap (back, front);
90  has_msg = true;
91 
92  sync.unlock ();
93  }
94  }
95 
96  inline bool read (msg_t *value_)
97  {
98  if (!value_)
99  return false;
100 
101  {
102  scoped_lock_t lock (sync);
103  if (!has_msg)
104  return false;
105 
106  zmq_assert (front->check ());
107 
108  *value_ = *front;
109  front->init (); // avoid double free
110 
111  has_msg = false;
112  return true;
113  }
114  }
115 
116 
117  inline bool check_read ()
118  {
119  scoped_lock_t lock (sync);
120 
121  return has_msg;
122  }
123 
124  inline bool probe (bool (*fn)(const msg_t &))
125  {
126  scoped_lock_t lock (sync);
127  return (*fn) (*front);
128  }
129 
130 
131  private:
132  msg_t storage[2];
133  msg_t *back, *front;
134 
136  bool has_msg;
137 
138  // Disable copying of dbuffer.
139  dbuffer_t (const dbuffer_t&);
140  const dbuffer_t &operator = (const dbuffer_t&);
141  };
142 }
143 
144 #endif
bool probe(bool(*fn)(const msg_t &))
Definition: dbuffer.hpp:124
#define zmq_assert(x)
Definition: err.hpp:119
void write(const msg_t &value_)
Definition: dbuffer.hpp:78
bool check()
Definition: msg.cpp:50
int init()
Definition: msg.cpp:82
bool read(msg_t *value_)
Definition: dbuffer.hpp:96
Definition: address.hpp:35