Line data Source code
1 : /*
2 : Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 :
4 : This file is part of libzmq, the ZeroMQ core engine in C++.
5 :
6 : libzmq is free software; you can redistribute it and/or modify it under
7 : the terms of the GNU Lesser General Public License (LGPL) as published
8 : by the Free Software Foundation; either version 3 of the License, or
9 : (at your option) any later version.
10 :
11 : As a special exception, the Contributors give you permission to link
12 : this library with independent modules to produce an executable,
13 : regardless of the license terms of these independent modules, and to
14 : copy and distribute the resulting executable under terms of your choice,
15 : provided that you also meet, for each linked independent module, the
16 : terms and conditions of the license of that module. An independent
17 : module is a module which is not derived from or based on this library.
18 : If you modify this library, you must extend this exception to your
19 : version of the library.
20 :
21 : libzmq is distributed in the hope that it will be useful, but WITHOUT
22 : ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 : FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 : License for more details.
25 :
26 : You should have received a copy of the GNU Lesser General Public License
27 : along with this program. If not, see <http://www.gnu.org/licenses/>.
28 : */
29 :
30 : #ifndef __ZMQ_YPIPE_CONFLATE_HPP_INCLUDED__
31 : #define __ZMQ_YPIPE_CONFLATE_HPP_INCLUDED__
32 :
33 : #include "platform.hpp"
34 : #include "dbuffer.hpp"
35 : #include "ypipe_base.hpp"
36 :
37 : namespace zmq
38 : {
39 :
40 : // Adapter for dbuffer, to plug it in instead of a queue for the sake
41 : // of implementing the conflate socket option, which, if set, makes
42 : // the receiving side to discard all incoming messages but the last one.
43 : //
44 : // reader_awake flag is needed here to mimic ypipe delicate behaviour
45 : // around the reader being asleep (see 'c' pointer being NULL in ypipe.hpp)
46 :
47 : template <typename T> class ypipe_conflate_t : public ypipe_base_t <T>
48 : {
49 : public:
50 :
51 : // Initialises the pipe.
52 : inline ypipe_conflate_t ()
53 12 : : reader_awake(false)
54 : {
55 : }
56 :
57 : // The destructor doesn't have to be virtual. It is made virtual
58 : // just to keep ICC and code checking tools from complaining.
59 6 : inline virtual ~ypipe_conflate_t ()
60 : {
61 12 : }
62 :
63 : // Following function (write) deliberately copies uninitialised data
64 : // when used with zmq_msg. Initialising the VSM body for
65 : // non-VSM messages won't be good for performance.
66 :
67 : #ifdef ZMQ_HAVE_OPENVMS
68 : #pragma message save
69 : #pragma message disable(UNINIT)
70 : #endif
71 63 : inline void write (const T &value_, bool incomplete_)
72 : {
73 : (void)incomplete_;
74 :
75 63 : dbuffer.write (value_);
76 63 : }
77 :
78 : #ifdef ZMQ_HAVE_OPENVMS
79 : #pragma message restore
80 : #endif
81 :
82 : // There are no incomplete items for conflate ypipe
83 3 : inline bool unwrite (T *)
84 : {
85 3 : return false;
86 : }
87 :
88 : // Flush is no-op for conflate ypipe. Reader asleep behaviour
89 : // is as of the usual ypipe.
90 : // Returns false if the reader thread is sleeping. In that case,
91 : // caller is obliged to wake the reader up before using the pipe again.
92 9 : inline bool flush ()
93 : {
94 9 : return reader_awake;
95 : }
96 :
97 : // Check whether item is available for reading.
98 9 : inline bool check_read ()
99 : {
100 18 : bool res = dbuffer.check_read ();
101 9 : if (!res)
102 3 : reader_awake = false;
103 :
104 9 : return res;
105 : }
106 :
107 : // Reads an item from the pipe. Returns false if there is no value.
108 : // available.
109 9 : inline bool read (T *value_)
110 : {
111 9 : if (!check_read ())
112 : return false;
113 :
114 6 : return dbuffer.read (value_);
115 : }
116 :
117 : // Applies the function fn to the first elemenent in the pipe
118 : // and returns the value returned by the fn.
119 : // The pipe mustn't be empty or the function crashes.
120 0 : inline bool probe (bool (*fn)(const T &))
121 : {
122 0 : return dbuffer.probe (fn);
123 : }
124 :
125 : protected:
126 :
127 : dbuffer_t <T> dbuffer;
128 : bool reader_awake;
129 :
130 : // Disable copying of ypipe object.
131 : ypipe_conflate_t (const ypipe_conflate_t&);
132 : const ypipe_conflate_t &operator = (const ypipe_conflate_t&);
133 : };
134 :
135 : }
136 :
137 : #endif
|