Line data Source code
1 : /*
2 : Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 :
4 : This file is part of libzmq, the ZeroMQ core engine in C++.
5 :
6 : libzmq is free software; you can redistribute it and/or modify it under
7 : the terms of the GNU Lesser General Public License (LGPL) as published
8 : by the Free Software Foundation; either version 3 of the License, or
9 : (at your option) any later version.
10 :
11 : As a special exception, the Contributors give you permission to link
12 : this library with independent modules to produce an executable,
13 : regardless of the license terms of these independent modules, and to
14 : copy and distribute the resulting executable under terms of your choice,
15 : provided that you also meet, for each linked independent module, the
16 : terms and conditions of the license of that module. An independent
17 : module is a module which is not derived from or based on this library.
18 : If you modify this library, you must extend this exception to your
19 : version of the library.
20 :
21 : libzmq is distributed in the hope that it will be useful, but WITHOUT
22 : ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 : FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 : License for more details.
25 :
26 : You should have received a copy of the GNU Lesser General Public License
27 : along with this program. If not, see <http://www.gnu.org/licenses/>.
28 : */
29 :
30 : #ifndef __ZMQ_DBUFFER_HPP_INCLUDED__
31 : #define __ZMQ_DBUFFER_HPP_INCLUDED__
32 :
33 : #include <stdlib.h>
34 : #include <stddef.h>
35 : #include <algorithm>
36 :
37 : #include "mutex.hpp"
38 : #include "msg.hpp"
39 :
40 : namespace zmq
41 : {
42 :
43 : // dbuffer is a single-producer single-consumer double-buffer
44 : // implementation.
45 : //
46 : // The producer writes to a back buffer and then tries to swap
47 : // pointers between the back and front buffers. If it fails,
48 : // due to the consumer reading from the front buffer, it just
49 : // gives up, which is ok since writes are many and redundant.
50 : //
51 : // The reader simply reads from the front buffer.
52 : //
53 : // has_msg keeps track of whether there has been a not yet read
54 : // value written, it is used by ypipe_conflate to mimic ypipe
55 : // functionality regarding a reader being asleep
56 :
57 : template <typename T> class dbuffer_t;
58 :
59 : template <> class dbuffer_t<msg_t>
60 : {
61 : public:
62 :
63 6 : inline dbuffer_t ()
64 : : back (&storage[0])
65 : , front (&storage[1])
66 6 : , has_msg (false)
67 : {
68 6 : back->init ();
69 6 : front->init ();
70 6 : }
71 :
72 6 : inline ~dbuffer_t()
73 6 : {
74 6 : back->close ();
75 6 : front->close ();
76 6 : }
77 :
78 63 : inline void write (const msg_t &value_)
79 : {
80 63 : msg_t& xvalue = const_cast<msg_t&>(value_);
81 :
82 63 : zmq_assert (xvalue.check ());
83 63 : back->move (xvalue); // cannot just overwrite, might leak
84 :
85 63 : zmq_assert (back->check ());
86 :
87 63 : if (sync.try_lock ())
88 : {
89 63 : std::swap (back, front);
90 63 : has_msg = true;
91 :
92 63 : sync.unlock ();
93 : }
94 63 : }
95 :
96 6 : inline bool read (msg_t *value_)
97 : {
98 6 : if (!value_)
99 : return false;
100 :
101 : {
102 6 : scoped_lock_t lock (sync);
103 6 : if (!has_msg)
104 : return false;
105 :
106 6 : zmq_assert (front->check ());
107 :
108 6 : *value_ = *front;
109 6 : front->init (); // avoid double free
110 :
111 6 : has_msg = false;
112 6 : return true;
113 : }
114 : }
115 :
116 :
117 : inline bool check_read ()
118 : {
119 9 : scoped_lock_t lock (sync);
120 :
121 9 : return has_msg;
122 : }
123 :
124 0 : inline bool probe (bool (*fn)(const msg_t &))
125 : {
126 0 : scoped_lock_t lock (sync);
127 0 : return (*fn) (*front);
128 : }
129 :
130 :
131 : private:
132 : msg_t storage[2];
133 : msg_t *back, *front;
134 :
135 : mutex_t sync;
136 : bool has_msg;
137 :
138 : // Disable copying of dbuffer.
139 : dbuffer_t (const dbuffer_t&);
140 : const dbuffer_t &operator = (const dbuffer_t&);
141 : };
142 : }
143 :
144 : #endif
|