Line data Source code
1 : /*
2 : Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 :
4 : This file is part of libzmq, the ZeroMQ core engine in C++.
5 :
6 : libzmq is free software; you can redistribute it and/or modify it under
7 : the terms of the GNU Lesser General Public License (LGPL) as published
8 : by the Free Software Foundation; either version 3 of the License, or
9 : (at your option) any later version.
10 :
11 : As a special exception, the Contributors give you permission to link
12 : this library with independent modules to produce an executable,
13 : regardless of the license terms of these independent modules, and to
14 : copy and distribute the resulting executable under terms of your choice,
15 : provided that you also meet, for each linked independent module, the
16 : terms and conditions of the license of that module. An independent
17 : module is a module which is not derived from or based on this library.
18 : If you modify this library, you must extend this exception to your
19 : version of the library.
20 :
21 : libzmq is distributed in the hope that it will be useful, but WITHOUT
22 : ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 : FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 : License for more details.
25 :
26 : You should have received a copy of the GNU Lesser General Public License
27 : along with this program. If not, see <http://www.gnu.org/licenses/>.
28 : */
29 :
30 : #include "precompiled.hpp"
31 : #include <stdlib.h>
32 :
33 : #include <new>
34 : #include <algorithm>
35 :
36 : #include "macros.hpp"
37 : #include "platform.hpp"
38 : #if defined ZMQ_HAVE_WINDOWS
39 : #include "windows.hpp"
40 : #endif
41 :
42 : #include "err.hpp"
43 : #include "pipe.hpp"
44 : #include "mtrie.hpp"
45 :
46 258 : zmq::mtrie_t::mtrie_t () :
47 : pipes (0),
48 : min (0),
49 : count (0),
50 339 : live_nodes (0)
51 : {
52 258 : }
53 :
54 339 : zmq::mtrie_t::~mtrie_t ()
55 : {
56 339 : if (pipes) {
57 0 : LIBZMQ_DELETE(pipes);
58 : }
59 :
60 339 : if (count == 1) {
61 0 : zmq_assert (next.node);
62 0 : LIBZMQ_DELETE(next.node);
63 : }
64 339 : else if (count > 1) {
65 0 : for (unsigned short i = 0; i != count; ++i) {
66 0 : LIBZMQ_DELETE(next.table[i]);
67 : }
68 0 : free (next.table);
69 : }
70 339 : }
71 :
72 120 : bool zmq::mtrie_t::add (unsigned char *prefix_, size_t size_, pipe_t *pipe_)
73 : {
74 120 : return add_helper (prefix_, size_, pipe_);
75 : }
76 :
77 216 : bool zmq::mtrie_t::add_helper (unsigned char *prefix_, size_t size_,
78 : pipe_t *pipe_)
79 : {
80 : // We are at the node corresponding to the prefix. We are done.
81 216 : if (!size_) {
82 120 : bool result = !pipes;
83 120 : if (!pipes) {
84 168 : pipes = new (std::nothrow) pipes_t;
85 84 : alloc_assert (pipes);
86 : }
87 120 : pipes->insert (pipe_);
88 120 : return result;
89 : }
90 :
91 96 : unsigned char c = *prefix_;
92 96 : if (c < min || c >= min + count) {
93 :
94 : // The character is out of range of currently handled
95 : // characters. We have to extend the table.
96 81 : if (!count) {
97 66 : min = c;
98 66 : count = 1;
99 66 : next.node = NULL;
100 : }
101 : else
102 15 : if (count == 1) {
103 15 : unsigned char oldc = min;
104 15 : mtrie_t *oldp = next.node;
105 15 : count = (min < c ? c - min : min - c) + 1;
106 : next.table = (mtrie_t**)
107 15 : malloc (sizeof (mtrie_t*) * count);
108 15 : alloc_assert (next.table);
109 219 : for (unsigned short i = 0; i != count; ++i)
110 219 : next.table [i] = 0;
111 30 : min = std::min (min, c);
112 15 : next.table [oldc - min] = oldp;
113 : }
114 : else
115 0 : if (min < c) {
116 : // The new character is above the current character range.
117 0 : unsigned short old_count = count;
118 0 : count = c - min + 1;
119 : next.table = (mtrie_t**) realloc (next.table,
120 0 : sizeof (mtrie_t*) * count);
121 0 : alloc_assert (next.table);
122 0 : for (unsigned short i = old_count; i != count; i++)
123 0 : next.table [i] = NULL;
124 : }
125 : else {
126 : // The new character is below the current character range.
127 0 : unsigned short old_count = count;
128 0 : count = (min + old_count) - c;
129 : next.table = (mtrie_t**) realloc (next.table,
130 0 : sizeof (mtrie_t*) * count);
131 0 : alloc_assert (next.table);
132 0 : memmove (next.table + min - c, next.table,
133 0 : old_count * sizeof (mtrie_t*));
134 0 : for (unsigned short i = 0; i != min - c; i++)
135 0 : next.table [i] = NULL;
136 0 : min = c;
137 : }
138 : }
139 :
140 : // If next node does not exist, create one.
141 96 : if (count == 1) {
142 81 : if (!next.node) {
143 132 : next.node = new (std::nothrow) mtrie_t;
144 66 : alloc_assert (next.node);
145 66 : ++live_nodes;
146 : }
147 81 : return next.node->add_helper (prefix_ + 1, size_ - 1, pipe_);
148 : }
149 : else {
150 15 : if (!next.table [c - min]) {
151 30 : next.table [c - min] = new (std::nothrow) mtrie_t;
152 15 : alloc_assert (next.table [c - min]);
153 15 : ++live_nodes;
154 : }
155 15 : return next.table [c - min]->add_helper (prefix_ + 1, size_ - 1, pipe_);
156 : }
157 : }
158 :
159 :
160 3028 : void zmq::mtrie_t::rm (pipe_t *pipe_,
161 : void (*func_) (unsigned char *data_, size_t size_, void *arg_),
162 : void *arg_, bool call_on_uniq_)
163 : {
164 3028 : unsigned char *buff = NULL;
165 3028 : rm_helper (pipe_, &buff, 0, 0, func_, arg_, call_on_uniq_);
166 3028 : free (buff);
167 3028 : }
168 :
169 3124 : void zmq::mtrie_t::rm_helper (pipe_t *pipe_, unsigned char **buff_,
170 : size_t buffsize_, size_t maxbuffsize_,
171 : void (*func_) (unsigned char *data_, size_t size_, void *arg_),
172 : void *arg_, bool call_on_uniq_)
173 : {
174 : // Remove the subscription from this node.
175 3241 : if (pipes && pipes->erase (pipe_)) {
176 189 : if (!call_on_uniq_ || pipes->empty ()) {
177 84 : func_ (*buff_, buffsize_, arg_);
178 : }
179 :
180 216 : if (pipes->empty ()) {
181 156 : LIBZMQ_DELETE(pipes);
182 : }
183 : }
184 :
185 : // Adjust the buffer.
186 3124 : if (buffsize_ >= maxbuffsize_) {
187 3028 : maxbuffsize_ = buffsize_ + 256;
188 3028 : *buff_ = (unsigned char*) realloc (*buff_, maxbuffsize_);
189 3028 : alloc_assert (*buff_);
190 : }
191 :
192 : // If there are no subnodes in the trie, return.
193 3124 : if (count == 0)
194 : return;
195 :
196 : // If there's one subnode (optimisation).
197 81 : if (count == 1) {
198 66 : (*buff_) [buffsize_] = min;
199 66 : buffsize_++;
200 : next.node->rm_helper (pipe_, buff_, buffsize_, maxbuffsize_,
201 66 : func_, arg_, call_on_uniq_);
202 :
203 : // Prune the node if it was made redundant by the removal
204 132 : if (next.node->is_redundant ()) {
205 54 : LIBZMQ_DELETE(next.node);
206 54 : count = 0;
207 54 : --live_nodes;
208 54 : zmq_assert (live_nodes == 0);
209 : }
210 : return;
211 : }
212 :
213 : // If there are multiple subnodes.
214 : //
215 : // New min non-null character in the node table after the removal
216 15 : unsigned char new_min = min + count - 1;
217 : // New max non-null character in the node table after the removal
218 15 : unsigned char new_max = min;
219 234 : for (unsigned short c = 0; c != count; c++) {
220 219 : (*buff_) [buffsize_] = min + c;
221 219 : if (next.table [c]) {
222 : next.table [c]->rm_helper (pipe_, buff_, buffsize_ + 1,
223 30 : maxbuffsize_, func_, arg_, call_on_uniq_);
224 :
225 : // Prune redundant nodes from the mtrie
226 60 : if (next.table [c]->is_redundant ()) {
227 21 : LIBZMQ_DELETE(next.table[c]);
228 :
229 21 : zmq_assert (live_nodes > 0);
230 21 : --live_nodes;
231 : }
232 : else {
233 : // The node is not redundant, so it's a candidate for being
234 : // the new min/max node.
235 : //
236 : // We loop through the node array from left to right, so the
237 : // first non-null, non-redundant node encountered is the new
238 : // minimum index. Conversely, the last non-redundant, non-null
239 : // node encountered is the new maximum index.
240 9 : if (c + min < new_min)
241 3 : new_min = c + min;
242 9 : if (c + min > new_max)
243 6 : new_max = c + min;
244 : }
245 : }
246 : }
247 :
248 15 : zmq_assert (count > 1);
249 :
250 : // Free the node table if it's no longer used.
251 15 : if (live_nodes == 0) {
252 6 : free (next.table);
253 6 : next.table = NULL;
254 6 : count = 0;
255 : }
256 : // Compact the node table if possible
257 : else
258 9 : if (live_nodes == 1) {
259 : // If there's only one live node in the table we can
260 : // switch to using the more compact single-node
261 : // representation
262 9 : zmq_assert (new_min == new_max);
263 9 : zmq_assert (new_min >= min && new_min < min + count);
264 9 : mtrie_t *node = next.table [new_min - min];
265 9 : zmq_assert (node);
266 9 : free (next.table);
267 9 : next.node = node;
268 9 : count = 1;
269 9 : min = new_min;
270 : }
271 : else
272 0 : if (new_min > min || new_max < min + count - 1) {
273 0 : zmq_assert (new_max - new_min + 1 > 1);
274 :
275 0 : mtrie_t **old_table = next.table;
276 0 : zmq_assert (new_min > min || new_max < min + count - 1);
277 0 : zmq_assert (new_min >= min);
278 0 : zmq_assert (new_max <= min + count - 1);
279 0 : zmq_assert (new_max - new_min + 1 < count);
280 :
281 0 : count = new_max - new_min + 1;
282 0 : next.table = (mtrie_t**) malloc (sizeof (mtrie_t*) * count);
283 0 : alloc_assert (next.table);
284 :
285 0 : memmove (next.table, old_table + (new_min - min),
286 0 : sizeof (mtrie_t*) * count);
287 0 : free (old_table);
288 :
289 0 : min = new_min;
290 : }
291 : }
292 :
293 6 : bool zmq::mtrie_t::rm (unsigned char *prefix_, size_t size_, pipe_t *pipe_)
294 : {
295 6 : return rm_helper (prefix_, size_, pipe_);
296 : }
297 :
298 12 : bool zmq::mtrie_t::rm_helper (unsigned char *prefix_, size_t size_,
299 : pipe_t *pipe_)
300 : {
301 12 : if (!size_) {
302 6 : if (pipes) {
303 12 : pipes_t::size_type erased = pipes->erase (pipe_);
304 6 : zmq_assert (erased == 1);
305 12 : if (pipes->empty ()) {
306 12 : LIBZMQ_DELETE(pipes);
307 : }
308 : }
309 6 : return !pipes;
310 : }
311 :
312 6 : unsigned char c = *prefix_;
313 6 : if (!count || c < min || c >= min + count)
314 : return false;
315 :
316 : mtrie_t *next_node =
317 6 : count == 1 ? next.node : next.table [c - min];
318 :
319 6 : if (!next_node)
320 : return false;
321 :
322 6 : bool ret = next_node->rm_helper (prefix_ + 1, size_ - 1, pipe_);
323 :
324 6 : if (next_node->is_redundant ()) {
325 6 : LIBZMQ_DELETE(next_node);
326 6 : zmq_assert (count > 0);
327 :
328 6 : if (count == 1) {
329 6 : next.node = 0;
330 6 : count = 0;
331 6 : --live_nodes;
332 6 : zmq_assert (live_nodes == 0);
333 : }
334 : else {
335 0 : next.table [c - min] = 0;
336 0 : zmq_assert (live_nodes > 1);
337 0 : --live_nodes;
338 :
339 : // Compact the table if possible
340 0 : if (live_nodes == 1) {
341 : // If there's only one live node in the table we can
342 : // switch to using the more compact single-node
343 : // representation
344 : unsigned short i;
345 0 : for (i = 0; i < count; ++i)
346 0 : if (next.table [i])
347 : break;
348 :
349 0 : zmq_assert (i < count);
350 0 : min += i;
351 0 : count = 1;
352 0 : mtrie_t *oldp = next.table [i];
353 0 : free (next.table);
354 0 : next.node = oldp;
355 : }
356 : else
357 0 : if (c == min) {
358 : // We can compact the table "from the left"
359 : unsigned short i;
360 0 : for (i = 1; i < count; ++i)
361 0 : if (next.table [i])
362 : break;
363 :
364 0 : zmq_assert (i < count);
365 0 : min += i;
366 0 : count -= i;
367 0 : mtrie_t **old_table = next.table;
368 0 : next.table = (mtrie_t**) malloc (sizeof (mtrie_t*) * count);
369 0 : alloc_assert (next.table);
370 0 : memmove (next.table, old_table + i, sizeof (mtrie_t*) * count);
371 0 : free (old_table);
372 : }
373 : else
374 0 : if (c == min + count - 1) {
375 : // We can compact the table "from the right"
376 : unsigned short i;
377 0 : for (i = 1; i < count; ++i)
378 0 : if (next.table [count - 1 - i])
379 : break;
380 :
381 0 : zmq_assert (i < count);
382 0 : count -= i;
383 0 : mtrie_t **old_table = next.table;
384 0 : next.table = (mtrie_t**) malloc (sizeof (mtrie_t*) * count);
385 0 : alloc_assert (next.table);
386 0 : memmove (next.table, old_table, sizeof (mtrie_t*) * count);
387 0 : free (old_table);
388 : }
389 : }
390 : }
391 :
392 6 : return ret;
393 : }
394 :
395 75206 : void zmq::mtrie_t::match (unsigned char *data_, size_t size_,
396 : void (*func_) (pipe_t *pipe_, void *arg_), void *arg_)
397 : {
398 75206 : mtrie_t *current = this;
399 : while (true) {
400 :
401 : // Signal the pipes attached to this node.
402 75347 : if (current->pipes) {
403 300782 : for (pipes_t::iterator it = current->pipes->begin ();
404 300812 : it != current->pipes->end (); ++it)
405 75218 : func_ (*it, arg_);
406 : }
407 :
408 : // If we are at the end of the message, there's nothing more to match.
409 75347 : if (!size_)
410 : break;
411 :
412 : // If there are no subnodes in the trie, return.
413 192 : if (current->count == 0)
414 : break;
415 :
416 : // If there's one subnode (optimisation).
417 144 : if (current->count == 1) {
418 108 : if (data_ [0] != current->min)
419 : break;
420 105 : current = current->next.node;
421 105 : data_++;
422 105 : size_--;
423 105 : continue;
424 : }
425 :
426 : // If there are multiple subnodes.
427 72 : if (data_ [0] < current->min || data_ [0] >=
428 36 : current->min + current->count)
429 : break;
430 36 : if (!current->next.table [data_ [0] - current->min])
431 : break;
432 36 : current = current->next.table [data_ [0] - current->min];
433 36 : data_++;
434 36 : size_--;
435 : }
436 75206 : }
437 :
438 0 : bool zmq::mtrie_t::is_redundant () const
439 : {
440 102 : return !pipes && live_nodes == 0;
441 : }
|