libzmq  master
ZeroMQ C++ Core Engine (LIBZMQ)
mtrie.cpp
Go to the documentation of this file.
1 /*
2  Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 
4  This file is part of libzmq, the ZeroMQ core engine in C++.
5 
6  libzmq is free software; you can redistribute it and/or modify it under
7  the terms of the GNU Lesser General Public License (LGPL) as published
8  by the Free Software Foundation; either version 3 of the License, or
9  (at your option) any later version.
10 
11  As a special exception, the Contributors give you permission to link
12  this library with independent modules to produce an executable,
13  regardless of the license terms of these independent modules, and to
14  copy and distribute the resulting executable under terms of your choice,
15  provided that you also meet, for each linked independent module, the
16  terms and conditions of the license of that module. An independent
17  module is a module which is not derived from or based on this library.
18  If you modify this library, you must extend this exception to your
19  version of the library.
20 
21  libzmq is distributed in the hope that it will be useful, but WITHOUT
22  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23  FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24  License for more details.
25 
26  You should have received a copy of the GNU Lesser General Public License
27  along with this program. If not, see <http://www.gnu.org/licenses/>.
28 */
29 
30 #include "precompiled.hpp"
31 #include <stdlib.h>
32 
33 #include <new>
34 #include <algorithm>
35 
36 #include "macros.hpp"
37 #include "platform.hpp"
38 #if defined ZMQ_HAVE_WINDOWS
39 #include "windows.hpp"
40 #endif
41 
42 #include "err.hpp"
43 #include "pipe.hpp"
44 #include "mtrie.hpp"
45 
47  pipes (0),
48  min (0),
49  count (0),
50  live_nodes (0)
51 {
52 }
53 
55 {
56  if (pipes) {
58  }
59 
60  if (count == 1) {
61  zmq_assert (next.node);
62  LIBZMQ_DELETE(next.node);
63  }
64  else if (count > 1) {
65  for (unsigned short i = 0; i != count; ++i) {
66  LIBZMQ_DELETE(next.table[i]);
67  }
68  free (next.table);
69  }
70 }
71 
72 bool zmq::mtrie_t::add (unsigned char *prefix_, size_t size_, pipe_t *pipe_)
73 {
74  return add_helper (prefix_, size_, pipe_);
75 }
76 
77 bool zmq::mtrie_t::add_helper (unsigned char *prefix_, size_t size_,
78  pipe_t *pipe_)
79 {
80  // We are at the node corresponding to the prefix. We are done.
81  if (!size_) {
82  bool result = !pipes;
83  if (!pipes) {
84  pipes = new (std::nothrow) pipes_t;
86  }
87  pipes->insert (pipe_);
88  return result;
89  }
90 
91  unsigned char c = *prefix_;
92  if (c < min || c >= min + count) {
93 
94  // The character is out of range of currently handled
95  // characters. We have to extend the table.
96  if (!count) {
97  min = c;
98  count = 1;
99  next.node = NULL;
100  }
101  else
102  if (count == 1) {
103  unsigned char oldc = min;
104  mtrie_t *oldp = next.node;
105  count = (min < c ? c - min : min - c) + 1;
106  next.table = (mtrie_t**)
107  malloc (sizeof (mtrie_t*) * count);
108  alloc_assert (next.table);
109  for (unsigned short i = 0; i != count; ++i)
110  next.table [i] = 0;
111  min = std::min (min, c);
112  next.table [oldc - min] = oldp;
113  }
114  else
115  if (min < c) {
116  // The new character is above the current character range.
117  unsigned short old_count = count;
118  count = c - min + 1;
119  next.table = (mtrie_t**) realloc (next.table,
120  sizeof (mtrie_t*) * count);
121  alloc_assert (next.table);
122  for (unsigned short i = old_count; i != count; i++)
123  next.table [i] = NULL;
124  }
125  else {
126  // The new character is below the current character range.
127  unsigned short old_count = count;
128  count = (min + old_count) - c;
129  next.table = (mtrie_t**) realloc (next.table,
130  sizeof (mtrie_t*) * count);
131  alloc_assert (next.table);
132  memmove (next.table + min - c, next.table,
133  old_count * sizeof (mtrie_t*));
134  for (unsigned short i = 0; i != min - c; i++)
135  next.table [i] = NULL;
136  min = c;
137  }
138  }
139 
140  // If next node does not exist, create one.
141  if (count == 1) {
142  if (!next.node) {
143  next.node = new (std::nothrow) mtrie_t;
144  alloc_assert (next.node);
145  ++live_nodes;
146  }
147  return next.node->add_helper (prefix_ + 1, size_ - 1, pipe_);
148  }
149  else {
150  if (!next.table [c - min]) {
151  next.table [c - min] = new (std::nothrow) mtrie_t;
152  alloc_assert (next.table [c - min]);
153  ++live_nodes;
154  }
155  return next.table [c - min]->add_helper (prefix_ + 1, size_ - 1, pipe_);
156  }
157 }
158 
159 
161  void (*func_) (unsigned char *data_, size_t size_, void *arg_),
162  void *arg_, bool call_on_uniq_)
163 {
164  unsigned char *buff = NULL;
165  rm_helper (pipe_, &buff, 0, 0, func_, arg_, call_on_uniq_);
166  free (buff);
167 }
168 
169 void zmq::mtrie_t::rm_helper (pipe_t *pipe_, unsigned char **buff_,
170  size_t buffsize_, size_t maxbuffsize_,
171  void (*func_) (unsigned char *data_, size_t size_, void *arg_),
172  void *arg_, bool call_on_uniq_)
173 {
174  // Remove the subscription from this node.
175  if (pipes && pipes->erase (pipe_)) {
176  if (!call_on_uniq_ || pipes->empty ()) {
177  func_ (*buff_, buffsize_, arg_);
178  }
179 
180  if (pipes->empty ()) {
182  }
183  }
184 
185  // Adjust the buffer.
186  if (buffsize_ >= maxbuffsize_) {
187  maxbuffsize_ = buffsize_ + 256;
188  *buff_ = (unsigned char*) realloc (*buff_, maxbuffsize_);
189  alloc_assert (*buff_);
190  }
191 
192  // If there are no subnodes in the trie, return.
193  if (count == 0)
194  return;
195 
196  // If there's one subnode (optimisation).
197  if (count == 1) {
198  (*buff_) [buffsize_] = min;
199  buffsize_++;
200  next.node->rm_helper (pipe_, buff_, buffsize_, maxbuffsize_,
201  func_, arg_, call_on_uniq_);
202 
203  // Prune the node if it was made redundant by the removal
204  if (next.node->is_redundant ()) {
205  LIBZMQ_DELETE(next.node);
206  count = 0;
207  --live_nodes;
208  zmq_assert (live_nodes == 0);
209  }
210  return;
211  }
212 
213  // If there are multiple subnodes.
214  //
215  // New min non-null character in the node table after the removal
216  unsigned char new_min = min + count - 1;
217  // New max non-null character in the node table after the removal
218  unsigned char new_max = min;
219  for (unsigned short c = 0; c != count; c++) {
220  (*buff_) [buffsize_] = min + c;
221  if (next.table [c]) {
222  next.table [c]->rm_helper (pipe_, buff_, buffsize_ + 1,
223  maxbuffsize_, func_, arg_, call_on_uniq_);
224 
225  // Prune redundant nodes from the mtrie
226  if (next.table [c]->is_redundant ()) {
227  LIBZMQ_DELETE(next.table[c]);
228 
229  zmq_assert (live_nodes > 0);
230  --live_nodes;
231  }
232  else {
233  // The node is not redundant, so it's a candidate for being
234  // the new min/max node.
235  //
236  // We loop through the node array from left to right, so the
237  // first non-null, non-redundant node encountered is the new
238  // minimum index. Conversely, the last non-redundant, non-null
239  // node encountered is the new maximum index.
240  if (c + min < new_min)
241  new_min = c + min;
242  if (c + min > new_max)
243  new_max = c + min;
244  }
245  }
246  }
247 
248  zmq_assert (count > 1);
249 
250  // Free the node table if it's no longer used.
251  if (live_nodes == 0) {
252  free (next.table);
253  next.table = NULL;
254  count = 0;
255  }
256  // Compact the node table if possible
257  else
258  if (live_nodes == 1) {
259  // If there's only one live node in the table we can
260  // switch to using the more compact single-node
261  // representation
262  zmq_assert (new_min == new_max);
263  zmq_assert (new_min >= min && new_min < min + count);
264  mtrie_t *node = next.table [new_min - min];
265  zmq_assert (node);
266  free (next.table);
267  next.node = node;
268  count = 1;
269  min = new_min;
270  }
271  else
272  if (new_min > min || new_max < min + count - 1) {
273  zmq_assert (new_max - new_min + 1 > 1);
274 
275  mtrie_t **old_table = next.table;
276  zmq_assert (new_min > min || new_max < min + count - 1);
277  zmq_assert (new_min >= min);
278  zmq_assert (new_max <= min + count - 1);
279  zmq_assert (new_max - new_min + 1 < count);
280 
281  count = new_max - new_min + 1;
282  next.table = (mtrie_t**) malloc (sizeof (mtrie_t*) * count);
283  alloc_assert (next.table);
284 
285  memmove (next.table, old_table + (new_min - min),
286  sizeof (mtrie_t*) * count);
287  free (old_table);
288 
289  min = new_min;
290  }
291 }
292 
293 bool zmq::mtrie_t::rm (unsigned char *prefix_, size_t size_, pipe_t *pipe_)
294 {
295  return rm_helper (prefix_, size_, pipe_);
296 }
297 
298 bool zmq::mtrie_t::rm_helper (unsigned char *prefix_, size_t size_,
299  pipe_t *pipe_)
300 {
301  if (!size_) {
302  if (pipes) {
303  pipes_t::size_type erased = pipes->erase (pipe_);
304  zmq_assert (erased == 1);
305  if (pipes->empty ()) {
307  }
308  }
309  return !pipes;
310  }
311 
312  unsigned char c = *prefix_;
313  if (!count || c < min || c >= min + count)
314  return false;
315 
316  mtrie_t *next_node =
317  count == 1 ? next.node : next.table [c - min];
318 
319  if (!next_node)
320  return false;
321 
322  bool ret = next_node->rm_helper (prefix_ + 1, size_ - 1, pipe_);
323 
324  if (next_node->is_redundant ()) {
325  LIBZMQ_DELETE(next_node);
326  zmq_assert (count > 0);
327 
328  if (count == 1) {
329  next.node = 0;
330  count = 0;
331  --live_nodes;
332  zmq_assert (live_nodes == 0);
333  }
334  else {
335  next.table [c - min] = 0;
336  zmq_assert (live_nodes > 1);
337  --live_nodes;
338 
339  // Compact the table if possible
340  if (live_nodes == 1) {
341  // If there's only one live node in the table we can
342  // switch to using the more compact single-node
343  // representation
344  unsigned short i;
345  for (i = 0; i < count; ++i)
346  if (next.table [i])
347  break;
348 
349  zmq_assert (i < count);
350  min += i;
351  count = 1;
352  mtrie_t *oldp = next.table [i];
353  free (next.table);
354  next.node = oldp;
355  }
356  else
357  if (c == min) {
358  // We can compact the table "from the left"
359  unsigned short i;
360  for (i = 1; i < count; ++i)
361  if (next.table [i])
362  break;
363 
364  zmq_assert (i < count);
365  min += i;
366  count -= i;
367  mtrie_t **old_table = next.table;
368  next.table = (mtrie_t**) malloc (sizeof (mtrie_t*) * count);
369  alloc_assert (next.table);
370  memmove (next.table, old_table + i, sizeof (mtrie_t*) * count);
371  free (old_table);
372  }
373  else
374  if (c == min + count - 1) {
375  // We can compact the table "from the right"
376  unsigned short i;
377  for (i = 1; i < count; ++i)
378  if (next.table [count - 1 - i])
379  break;
380 
381  zmq_assert (i < count);
382  count -= i;
383  mtrie_t **old_table = next.table;
384  next.table = (mtrie_t**) malloc (sizeof (mtrie_t*) * count);
385  alloc_assert (next.table);
386  memmove (next.table, old_table, sizeof (mtrie_t*) * count);
387  free (old_table);
388  }
389  }
390  }
391 
392  return ret;
393 }
394 
395 void zmq::mtrie_t::match (unsigned char *data_, size_t size_,
396  void (*func_) (pipe_t *pipe_, void *arg_), void *arg_)
397 {
398  mtrie_t *current = this;
399  while (true) {
400 
401  // Signal the pipes attached to this node.
402  if (current->pipes) {
403  for (pipes_t::iterator it = current->pipes->begin ();
404  it != current->pipes->end (); ++it)
405  func_ (*it, arg_);
406  }
407 
408  // If we are at the end of the message, there's nothing more to match.
409  if (!size_)
410  break;
411 
412  // If there are no subnodes in the trie, return.
413  if (current->count == 0)
414  break;
415 
416  // If there's one subnode (optimisation).
417  if (current->count == 1) {
418  if (data_ [0] != current->min)
419  break;
420  current = current->next.node;
421  data_++;
422  size_--;
423  continue;
424  }
425 
426  // If there are multiple subnodes.
427  if (data_ [0] < current->min || data_ [0] >=
428  current->min + current->count)
429  break;
430  if (!current->next.table [data_ [0] - current->min])
431  break;
432  current = current->next.table [data_ [0] - current->min];
433  data_++;
434  size_--;
435  }
436 }
437 
439 {
440  return !pipes && live_nodes == 0;
441 }
bool is_redundant() const
Definition: mtrie.cpp:438
void match(unsigned char *data_, size_t size_, void(*func_)(zmq::pipe_t *pipe_, void *arg_), void *arg_)
Definition: mtrie.cpp:395
#define LIBZMQ_DELETE(p_object)
Definition: macros.hpp:7
#define zmq_assert(x)
Definition: err.hpp:119
pipes_t * pipes
Definition: mtrie.hpp:85
union zmq::mtrie_t::@48 next
class mtrie_t * node
Definition: mtrie.hpp:91
class mtrie_t ** table
Definition: mtrie.hpp:92
unsigned char min
Definition: mtrie.hpp:87
unsigned short live_nodes
Definition: mtrie.hpp:89
bool add(unsigned char *prefix_, size_t size_, zmq::pipe_t *pipe_)
Definition: mtrie.cpp:72
unsigned short count
Definition: mtrie.hpp:88
#define alloc_assert(x)
Definition: err.hpp:159
void rm_helper(zmq::pipe_t *pipe_, unsigned char **buff_, size_t buffsize_, size_t maxbuffsize_, void(*func_)(unsigned char *data_, size_t size_, void *arg_), void *arg_, bool call_on_uniq_)
Definition: mtrie.cpp:169
void rm(zmq::pipe_t *pipe_, void(*func_)(unsigned char *data_, size_t size_, void *arg_), void *arg_, bool call_on_uniq_)
Definition: mtrie.cpp:160
bool add_helper(unsigned char *prefix_, size_t size_, zmq::pipe_t *pipe_)
Definition: mtrie.cpp:77
std::set< zmq::pipe_t * > pipes_t
Definition: mtrie.hpp:84