libzmq  master
ZeroMQ C++ Core Engine (LIBZMQ)
msg.cpp
Go to the documentation of this file.
1 /*
2  Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 
4  This file is part of libzmq, the ZeroMQ core engine in C++.
5 
6  libzmq is free software; you can redistribute it and/or modify it under
7  the terms of the GNU Lesser General Public License (LGPL) as published
8  by the Free Software Foundation; either version 3 of the License, or
9  (at your option) any later version.
10 
11  As a special exception, the Contributors give you permission to link
12  this library with independent modules to produce an executable,
13  regardless of the license terms of these independent modules, and to
14  copy and distribute the resulting executable under terms of your choice,
15  provided that you also meet, for each linked independent module, the
16  terms and conditions of the license of that module. An independent
17  module is a module which is not derived from or based on this library.
18  If you modify this library, you must extend this exception to your
19  version of the library.
20 
21  libzmq is distributed in the hope that it will be useful, but WITHOUT
22  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23  FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24  License for more details.
25 
26  You should have received a copy of the GNU Lesser General Public License
27  along with this program. If not, see <http://www.gnu.org/licenses/>.
28 */
29 
30 #include "precompiled.hpp"
31 #include "macros.hpp"
32 #include "msg.hpp"
33 #include "../include/zmq.h"
34 
35 #include <string.h>
36 #include <stdlib.h>
37 #include <new>
38 
39 #include "stdint.hpp"
40 #include "likely.hpp"
41 #include "metadata.hpp"
42 #include "err.hpp"
43 
44 // Check whether the sizes of public representation of the message (zmq_msg_t)
45 // and private representation of the message (zmq::msg_t) match.
46 
47 typedef char zmq_msg_size_check
48  [2 * ((sizeof (zmq::msg_t) == sizeof (zmq_msg_t)) != 0) - 1];
49 
51 {
52  return u.base.type >= type_min && u.base.type <= type_max;
53 }
54 
55 int zmq::msg_t::init (void* data_, size_t size_,
56  msg_free_fn* ffn_, void* hint,
57  content_t* content_)
58 {
59  if (size_ < max_vsm_size) {
60  int const rc = init_size(size_);
61 
62  if (rc != -1)
63  {
64  memcpy(data(), data_, size_);
65  return 0;
66  }
67  else
68  {
69  return -1;
70  }
71  }
72  else if(content_)
73  {
74  return init_external_storage(content_, data_, size_, ffn_, hint);
75  }
76  else
77  {
78  return init_data(data_, size_, ffn_, hint);
79  }
80 }
81 
83 {
84  u.vsm.metadata = NULL;
85  u.vsm.type = type_vsm;
86  u.vsm.flags = 0;
87  u.vsm.size = 0;
88  u.vsm.group[0] = '\0';
89  u.vsm.routing_id = 0;
90  return 0;
91 }
92 
93 int zmq::msg_t::init_size (size_t size_)
94 {
95  if (size_ <= max_vsm_size) {
96  u.vsm.metadata = NULL;
97  u.vsm.type = type_vsm;
98  u.vsm.flags = 0;
99  u.vsm.size = (unsigned char) size_;
100  u.vsm.group[0] = '\0';
101  u.vsm.routing_id = 0;
102  }
103  else {
104  u.lmsg.metadata = NULL;
105  u.lmsg.type = type_lmsg;
106  u.lmsg.flags = 0;
107  u.lmsg.group[0] = '\0';
108  u.lmsg.routing_id = 0;
109  u.lmsg.content = NULL;
110  if (sizeof (content_t) + size_ > size_)
111  u.lmsg.content = (content_t*) malloc (sizeof (content_t) + size_);
112  if (unlikely (!u.lmsg.content)) {
113  errno = ENOMEM;
114  return -1;
115  }
116 
117  u.lmsg.content->data = u.lmsg.content + 1;
118  u.lmsg.content->size = size_;
119  u.lmsg.content->ffn = NULL;
120  u.lmsg.content->hint = NULL;
121  new (&u.lmsg.content->refcnt) zmq::atomic_counter_t ();
122  }
123  return 0;
124 }
125 
126 int zmq::msg_t::init_external_storage(content_t* content_, void* data_, size_t size_,
127  msg_free_fn *ffn_, void* hint_)
128 {
129  zmq_assert(NULL != data_);
130  zmq_assert(NULL != content_);
131 
132  u.zclmsg.metadata = NULL;
133  u.zclmsg.type = type_zclmsg;
134  u.zclmsg.flags = 0;
135  u.zclmsg.group[0] = '\0';
136  u.zclmsg.routing_id = 0;
137 
138  u.zclmsg.content = content_;
139  u.zclmsg.content->data = data_;
140  u.zclmsg.content->size = size_;
141  u.zclmsg.content->ffn = ffn_;
142  u.zclmsg.content->hint = hint_;
143  new (&u.zclmsg.content->refcnt) zmq::atomic_counter_t();
144 
145  return 0;
146 }
147 
148 int zmq::msg_t::init_data (void *data_, size_t size_,
149  msg_free_fn *ffn_, void *hint_)
150 {
151  // If data is NULL and size is not 0, a segfault
152  // would occur once the data is accessed
153  zmq_assert (data_ != NULL || size_ == 0);
154 
155  // Initialize constant message if there's no need to deallocate
156  if (ffn_ == NULL) {
157  u.cmsg.metadata = NULL;
158  u.cmsg.type = type_cmsg;
159  u.cmsg.flags = 0;
160  u.cmsg.data = data_;
161  u.cmsg.size = size_;
162  u.cmsg.group[0] = '\0';
163  u.cmsg.routing_id = 0;
164  }
165  else {
166  u.lmsg.metadata = NULL;
167  u.lmsg.type = type_lmsg;
168  u.lmsg.flags = 0;
169  u.lmsg.group[0] = '\0';
170  u.lmsg.routing_id = 0;
171  u.lmsg.content = (content_t*) malloc (sizeof (content_t));
172  if (!u.lmsg.content) {
173  errno = ENOMEM;
174  return -1;
175  }
176 
177  u.lmsg.content->data = data_;
178  u.lmsg.content->size = size_;
179  u.lmsg.content->ffn = ffn_;
180  u.lmsg.content->hint = hint_;
181  new (&u.lmsg.content->refcnt) zmq::atomic_counter_t ();
182  }
183  return 0;
184 
185 }
186 
188 {
189  u.delimiter.metadata = NULL;
190  u.delimiter.type = type_delimiter;
191  u.delimiter.flags = 0;
192  u.delimiter.group[0] = '\0';
193  u.delimiter.routing_id = 0;
194  return 0;
195 }
196 
198 {
199  u.base.metadata = NULL;
200  u.base.type = type_join;
201  u.base.flags = 0;
202  u.base.group[0] = '\0';
203  u.base.routing_id = 0;
204  return 0;
205 }
206 
208 {
209  u.base.metadata = NULL;
210  u.base.type = type_leave;
211  u.base.flags = 0;
212  u.base.group[0] = '\0';
213  u.base.routing_id = 0;
214  return 0;
215 }
216 
218 {
219  // Check the validity of the message.
220  if (unlikely (!check ())) {
221  errno = EFAULT;
222  return -1;
223  }
224 
225  if (u.base.type == type_lmsg) {
226 
227  // If the content is not shared, or if it is shared and the reference
228  // count has dropped to zero, deallocate it.
229  if (!(u.lmsg.flags & msg_t::shared) ||
230  !u.lmsg.content->refcnt.sub (1)) {
231 
232  // We used "placement new" operator to initialize the reference
233  // counter so we call the destructor explicitly now.
234  u.lmsg.content->refcnt.~atomic_counter_t ();
235 
236  if (u.lmsg.content->ffn)
237  u.lmsg.content->ffn (u.lmsg.content->data,
238  u.lmsg.content->hint);
239  free (u.lmsg.content);
240  }
241  }
242 
243  if (is_zcmsg())
244  {
245  zmq_assert( u.zclmsg.content->ffn );
246 
247  // If the content is not shared, or if it is shared and the reference
248  // count has dropped to zero, deallocate it.
249  if (!(u.zclmsg.flags & msg_t::shared) ||
250  !u.zclmsg.content->refcnt.sub (1)) {
251 
252  // We used "placement new" operator to initialize the reference
253  // counter so we call the destructor explicitly now.
254  u.zclmsg.content->refcnt.~atomic_counter_t ();
255 
256  u.zclmsg.content->ffn (u.zclmsg.content->data,
257  u.zclmsg.content->hint);
258  }
259  }
260 
261  if (u.base.metadata != NULL) {
262  if (u.base.metadata->drop_ref ()) {
263  LIBZMQ_DELETE(u.base.metadata);
264  }
265  u.base.metadata = NULL;
266  }
267 
268  // Make the message invalid.
269  u.base.type = 0;
270 
271  return 0;
272 }
273 
275 {
276  // Check the validity of the source.
277  if (unlikely (!src_.check ())) {
278  errno = EFAULT;
279  return -1;
280  }
281 
282  int rc = close ();
283  if (unlikely (rc < 0))
284  return rc;
285 
286  *this = src_;
287 
288  rc = src_.init ();
289  if (unlikely (rc < 0))
290  return rc;
291 
292  return 0;
293 }
294 
296 {
297  // Check the validity of the source.
298  if (unlikely (!src_.check ())) {
299  errno = EFAULT;
300  return -1;
301  }
302 
303  int rc = close ();
304  if (unlikely (rc < 0))
305  return rc;
306 
307  if (src_.u.base.type == type_lmsg ) {
308 
309  // One reference is added to shared messages. Non-shared messages
310  // are turned into shared messages and reference count is set to 2.
311  if (src_.u.lmsg.flags & msg_t::shared)
312  src_.u.lmsg.content->refcnt.add (1);
313  else {
314  src_.u.lmsg.flags |= msg_t::shared;
315  src_.u.lmsg.content->refcnt.set (2);
316  }
317  }
318 
319  if (src_.is_zcmsg()) {
320 
321  // One reference is added to shared messages. Non-shared messages
322  // are turned into shared messages and reference count is set to 2.
323  if (src_.u.zclmsg.flags & msg_t::shared)
324  src_.refcnt()->add (1);
325  else {
326  src_.u.zclmsg.flags |= msg_t::shared;
327  src_.refcnt()->set (2);
328  }
329  }
330  if (src_.u.base.metadata != NULL)
331  src_.u.base.metadata->add_ref ();
332 
333  *this = src_;
334 
335  return 0;
336 
337 }
338 
339 void *zmq::msg_t::data ()
340 {
341  // Check the validity of the message.
342  zmq_assert (check ());
343 
344  switch (u.base.type) {
345  case type_vsm:
346  return u.vsm.data;
347  case type_lmsg:
348  return u.lmsg.content->data;
349  case type_cmsg:
350  return u.cmsg.data;
351  case type_zclmsg:
352  return u.zclmsg.content->data;
353  default:
354  zmq_assert (false);
355  return NULL;
356  }
357 }
358 
359 size_t zmq::msg_t::size ()
360 {
361  // Check the validity of the message.
362  zmq_assert (check ());
363 
364  switch (u.base.type) {
365  case type_vsm:
366  return u.vsm.size;
367  case type_lmsg:
368  return u.lmsg.content->size;
369  case type_zclmsg:
370  return u.zclmsg.content->size;
371  case type_cmsg:
372  return u.cmsg.size;
373  default:
374  zmq_assert (false);
375  return 0;
376  }
377 }
378 
379 unsigned char zmq::msg_t::flags ()
380 {
381  return u.base.flags;
382 }
383 
384 void zmq::msg_t::set_flags (unsigned char flags_)
385 {
386  u.base.flags |= flags_;
387 }
388 
389 void zmq::msg_t::reset_flags (unsigned char flags_)
390 {
391  u.base.flags &= ~flags_;
392 }
393 
395 {
396  return u.base.metadata;
397 }
398 
400 {
401  assert (metadata_ != NULL);
402  assert (u.base.metadata == NULL);
403  metadata_->add_ref ();
404  u.base.metadata = metadata_;
405 }
406 
408 {
409  if (u.base.metadata) {
410  if (u.base.metadata->drop_ref ()) {
411  LIBZMQ_DELETE(u.base.metadata);
412  }
413  u.base.metadata = NULL;
414  }
415 }
416 
418 {
419  return (u.base.flags & identity) == identity;
420 }
421 
423 {
424  return (u.base.flags & credential) == credential;
425 }
426 
428 {
429  return u.base.type == type_delimiter;
430 }
431 
432 bool zmq::msg_t::is_vsm () const
433 {
434  return u.base.type == type_vsm;
435 }
436 
437 bool zmq::msg_t::is_cmsg () const
438 {
439  return u.base.type == type_cmsg;
440 }
441 
443 {
444  return u.base.type == type_zclmsg;
445 }
446 
448 {
449  return u.base.type == type_join;
450 }
451 
453 {
454  return u.base.type == type_leave;
455 }
456 
457 void zmq::msg_t::add_refs (int refs_)
458 {
459  zmq_assert (refs_ >= 0);
460 
461  // Operation not supported for messages with metadata.
462  zmq_assert (u.base.metadata == NULL);
463 
464  // No copies required.
465  if (!refs_)
466  return;
467 
468  // VSMs, CMSGS and delimiters can be copied straight away. The only
469  // message type that needs special care are long messages.
470  if (u.base.type == type_lmsg || is_zcmsg() ) {
471  if (u.base.flags & msg_t::shared)
472  refcnt()->add (refs_);
473  else {
474  refcnt()->set (refs_ + 1);
475  u.base.flags |= msg_t::shared;
476  }
477  }
478 }
479 
480 bool zmq::msg_t::rm_refs (int refs_)
481 {
482  zmq_assert (refs_ >= 0);
483 
484  // Operation not supported for messages with metadata.
485  zmq_assert (u.base.metadata == NULL);
486 
487  // No copies required.
488  if (!refs_)
489  return true;
490 
491  // If there's only one reference close the message.
492  if ( (u.base.type != type_zclmsg && u.base.type != type_lmsg) || !(u.base.flags & msg_t::shared)) {
493  close ();
494  return false;
495  }
496 
497  // The only message type that needs special care are long and zcopy messages.
498  if (u.base.type == type_lmsg && !u.lmsg.content->refcnt.sub(refs_)) {
499  // We used "placement new" operator to initialize the reference
500  // counter so we call the destructor explicitly now.
501  u.lmsg.content->refcnt.~atomic_counter_t ();
502 
503  if (u.lmsg.content->ffn)
504  u.lmsg.content->ffn (u.lmsg.content->data, u.lmsg.content->hint);
505  free (u.lmsg.content);
506 
507  return false;
508  }
509 
510  if (is_zcmsg() && !u.zclmsg.content->refcnt.sub(refs_)) {
511  // storage for rfcnt is provided externally
512  if (u.zclmsg.content->ffn) {
513  u.zclmsg.content->ffn(u.zclmsg.content->data, u.zclmsg.content->hint);
514  }
515 
516  return false;
517  }
518 
519  return true;
520 }
521 
523 {
524  return u.base.routing_id;
525 }
526 
527 int zmq::msg_t::set_routing_id (uint32_t routing_id_)
528 {
529  if (routing_id_) {
530  u.base.routing_id = routing_id_;
531  return 0;
532  }
533  errno = EINVAL;
534  return -1;
535 }
536 
538 {
539  u.base.routing_id = 0;
540  return 0;
541 }
542 
543 const char * zmq::msg_t::group ()
544 {
545  return u.base.group;
546 }
547 
548 int zmq::msg_t::set_group (const char * group_)
549 {
550  return set_group (group_, strlen (group_));
551 }
552 
553 int zmq::msg_t::set_group (const char * group_, size_t length_)
554 {
555  if (length_> ZMQ_GROUP_MAX_LENGTH)
556  {
557  errno = EINVAL;
558  return -1;
559  }
560 
561  strncpy (u.base.group, group_, length_);
562  u.base.group[length_] = '\0';
563 
564  return 0;
565 }
566 
568 {
569  switch(u.base.type)
570  {
571  case type_lmsg:
572  return &u.lmsg.content->refcnt;
573  case type_zclmsg:
574  return &u.zclmsg.content->refcnt;
575  default:
576  zmq_assert(false);
577  return NULL;
578  }
579 }
void add_ref()
Definition: metadata.cpp:48
#define LIBZMQ_DELETE(p_object)
Definition: macros.hpp:7
int close()
Definition: msg.cpp:217
int set_group(const char *group_)
Definition: msg.cpp:548
bool rm_refs(int refs_)
Definition: msg.cpp:480
#define zmq_assert(x)
Definition: err.hpp:119
int move(msg_t &src_)
Definition: msg.cpp:274
bool is_identity() const
Definition: msg.cpp:417
void( msg_free_fn)(void *data, void *hint)
Definition: msg.hpp:46
int init_external_storage(content_t *content_, void *data_, size_t size_, msg_free_fn *ffn_, void *hint_)
Definition: msg.cpp:126
void add_refs(int refs_)
Definition: msg.cpp:457
void set(integer_t value_)
#define ZMQ_GROUP_MAX_LENGTH
Definition: zmq.h:355
int init_size(size_t size_)
Definition: msg.cpp:93
struct zmq::msg_t::@41::@44 lmsg
struct zmq::msg_t::@41::@45 zclmsg
void reset_flags(unsigned char flags_)
Definition: msg.cpp:389
bool check()
Definition: msg.cpp:50
bool is_join() const
Definition: msg.cpp:447
int init_leave()
Definition: msg.cpp:207
size_t size()
#define unlikely(x)
Definition: likely.hpp:38
int init()
Definition: msg.cpp:82
union zmq::msg_t::@41 u
int init_delimiter()
Definition: msg.cpp:187
void * data()
int init_join()
Definition: msg.cpp:197
int copy(msg_t &src_)
Definition: msg.cpp:295
unsigned char flags()
int reset_routing_id()
Definition: msg.cpp:537
const char * group()
Definition: zmq.h:221
integer_t add(integer_t increment_)
bool is_vsm() const
Definition: msg.cpp:432
void set_flags(unsigned char flags_)
Definition: msg.cpp:384
bool is_zcmsg() const
Definition: msg.cpp:442
bool is_cmsg() const
Definition: msg.cpp:437
bool is_leave() const
Definition: msg.cpp:452
zmq::atomic_counter_t * refcnt()
Definition: msg.cpp:567
int set_routing_id(uint32_t routing_id_)
Definition: msg.cpp:527
bool is_delimiter() const
Definition: msg.cpp:427
char zmq_msg_size_check[2 *((sizeof(zmq::msg_t)==sizeof(zmq_msg_t))!=0)-1]
Definition: msg.cpp:48
void reset_metadata()
Definition: msg.cpp:407
uint32_t get_routing_id()
Definition: msg.cpp:522
bool is_credential() const
Definition: msg.cpp:422
metadata_t * metadata() const
int init_data(void *data_, size_t size_, msg_free_fn *ffn_, void *hint_)
Definition: msg.cpp:148
struct zmq::msg_t::@41::@42 base
void set_metadata(metadata_t *metadata_)
Definition: msg.cpp:399