Line data Source code
1 : /*
2 : Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 :
4 : This file is part of libzmq, the ZeroMQ core engine in C++.
5 :
6 : libzmq is free software; you can redistribute it and/or modify it under
7 : the terms of the GNU Lesser General Public License (LGPL) as published
8 : by the Free Software Foundation; either version 3 of the License, or
9 : (at your option) any later version.
10 :
11 : As a special exception, the Contributors give you permission to link
12 : this library with independent modules to produce an executable,
13 : regardless of the license terms of these independent modules, and to
14 : copy and distribute the resulting executable under terms of your choice,
15 : provided that you also meet, for each linked independent module, the
16 : terms and conditions of the license of that module. An independent
17 : module is a module which is not derived from or based on this library.
18 : If you modify this library, you must extend this exception to your
19 : version of the library.
20 :
21 : libzmq is distributed in the hope that it will be useful, but WITHOUT
22 : ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 : FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 : License for more details.
25 :
26 : You should have received a copy of the GNU Lesser General Public License
27 : along with this program. If not, see <http://www.gnu.org/licenses/>.
28 : */
29 :
30 : #include "precompiled.hpp"
31 : #include "macros.hpp"
32 : #include "msg.hpp"
33 : #include "../include/zmq.h"
34 :
35 : #include <string.h>
36 : #include <stdlib.h>
37 : #include <new>
38 :
39 : #include "stdint.hpp"
40 : #include "likely.hpp"
41 : #include "metadata.hpp"
42 : #include "err.hpp"
43 :
44 : // Check whether the sizes of public representation of the message (zmq_msg_t)
45 : // and private representation of the message (zmq::msg_t) match.
46 :
47 : typedef char zmq_msg_size_check
48 : [2 * ((sizeof (zmq::msg_t) == sizeof (zmq_msg_t)) != 0) - 1];
49 :
50 4835318 : bool zmq::msg_t::check ()
51 : {
52 22860178 : return u.base.type >= type_min && u.base.type <= type_max;
53 : }
54 :
55 639175 : int zmq::msg_t::init (void* data_, size_t size_,
56 : msg_free_fn* ffn_, void* hint,
57 : content_t* content_)
58 : {
59 639175 : if (size_ < max_vsm_size) {
60 637942 : int const rc = init_size(size_);
61 :
62 637942 : if (rc != -1)
63 : {
64 637942 : memcpy(data(), data_, size_);
65 637942 : return 0;
66 : }
67 : else
68 : {
69 : return -1;
70 : }
71 : }
72 1233 : else if(content_)
73 : {
74 1233 : return init_external_storage(content_, data_, size_, ffn_, hint);
75 : }
76 : else
77 : {
78 0 : return init_data(data_, size_, ffn_, hint);
79 : }
80 : }
81 :
82 9607074 : int zmq::msg_t::init ()
83 : {
84 9607830 : u.vsm.metadata = NULL;
85 9607830 : u.vsm.type = type_vsm;
86 9607830 : u.vsm.flags = 0;
87 9607830 : u.vsm.size = 0;
88 9607830 : u.vsm.group[0] = '\0';
89 9607830 : u.vsm.routing_id = 0;
90 9607074 : return 0;
91 : }
92 :
93 1493429 : int zmq::msg_t::init_size (size_t size_)
94 : {
95 1493429 : if (size_ <= max_vsm_size) {
96 1492178 : u.vsm.metadata = NULL;
97 1492178 : u.vsm.type = type_vsm;
98 1492178 : u.vsm.flags = 0;
99 1492178 : u.vsm.size = (unsigned char) size_;
100 1492178 : u.vsm.group[0] = '\0';
101 1492178 : u.vsm.routing_id = 0;
102 : }
103 : else {
104 1251 : u.lmsg.metadata = NULL;
105 1251 : u.lmsg.type = type_lmsg;
106 1251 : u.lmsg.flags = 0;
107 1251 : u.lmsg.group[0] = '\0';
108 1251 : u.lmsg.routing_id = 0;
109 1251 : u.lmsg.content = NULL;
110 1251 : if (sizeof (content_t) + size_ > size_)
111 1251 : u.lmsg.content = (content_t*) malloc (sizeof (content_t) + size_);
112 1251 : if (unlikely (!u.lmsg.content)) {
113 0 : errno = ENOMEM;
114 0 : return -1;
115 : }
116 :
117 1251 : u.lmsg.content->data = u.lmsg.content + 1;
118 1251 : u.lmsg.content->size = size_;
119 1251 : u.lmsg.content->ffn = NULL;
120 1251 : u.lmsg.content->hint = NULL;
121 1251 : new (&u.lmsg.content->refcnt) zmq::atomic_counter_t ();
122 : }
123 : return 0;
124 : }
125 :
126 1233 : int zmq::msg_t::init_external_storage(content_t* content_, void* data_, size_t size_,
127 : msg_free_fn *ffn_, void* hint_)
128 : {
129 1233 : zmq_assert(NULL != data_);
130 1233 : zmq_assert(NULL != content_);
131 :
132 1233 : u.zclmsg.metadata = NULL;
133 1233 : u.zclmsg.type = type_zclmsg;
134 1233 : u.zclmsg.flags = 0;
135 1233 : u.zclmsg.group[0] = '\0';
136 1233 : u.zclmsg.routing_id = 0;
137 :
138 1233 : u.zclmsg.content = content_;
139 1233 : u.zclmsg.content->data = data_;
140 1233 : u.zclmsg.content->size = size_;
141 1233 : u.zclmsg.content->ffn = ffn_;
142 1233 : u.zclmsg.content->hint = hint_;
143 1233 : new (&u.zclmsg.content->refcnt) zmq::atomic_counter_t();
144 :
145 1233 : return 0;
146 : }
147 :
148 222 : int zmq::msg_t::init_data (void *data_, size_t size_,
149 : msg_free_fn *ffn_, void *hint_)
150 : {
151 : // If data is NULL and size is not 0, a segfault
152 : // would occur once the data is accessed
153 222 : zmq_assert (data_ != NULL || size_ == 0);
154 :
155 : // Initialize constant message if there's no need to deallocate
156 222 : if (ffn_ == NULL) {
157 183 : u.cmsg.metadata = NULL;
158 183 : u.cmsg.type = type_cmsg;
159 183 : u.cmsg.flags = 0;
160 183 : u.cmsg.data = data_;
161 183 : u.cmsg.size = size_;
162 183 : u.cmsg.group[0] = '\0';
163 183 : u.cmsg.routing_id = 0;
164 : }
165 : else {
166 39 : u.lmsg.metadata = NULL;
167 39 : u.lmsg.type = type_lmsg;
168 39 : u.lmsg.flags = 0;
169 39 : u.lmsg.group[0] = '\0';
170 39 : u.lmsg.routing_id = 0;
171 39 : u.lmsg.content = (content_t*) malloc (sizeof (content_t));
172 39 : if (!u.lmsg.content) {
173 0 : errno = ENOMEM;
174 0 : return -1;
175 : }
176 :
177 39 : u.lmsg.content->data = data_;
178 39 : u.lmsg.content->size = size_;
179 39 : u.lmsg.content->ffn = ffn_;
180 39 : u.lmsg.content->hint = hint_;
181 39 : new (&u.lmsg.content->refcnt) zmq::atomic_counter_t ();
182 : }
183 : return 0;
184 :
185 : }
186 :
187 9763 : int zmq::msg_t::init_delimiter ()
188 : {
189 9763 : u.delimiter.metadata = NULL;
190 9763 : u.delimiter.type = type_delimiter;
191 9763 : u.delimiter.flags = 0;
192 9763 : u.delimiter.group[0] = '\0';
193 9763 : u.delimiter.routing_id = 0;
194 9763 : return 0;
195 : }
196 :
197 18 : int zmq::msg_t::init_join ()
198 : {
199 18 : u.base.metadata = NULL;
200 18 : u.base.type = type_join;
201 18 : u.base.flags = 0;
202 18 : u.base.group[0] = '\0';
203 18 : u.base.routing_id = 0;
204 18 : return 0;
205 : }
206 :
207 6 : int zmq::msg_t::init_leave ()
208 : {
209 6 : u.base.metadata = NULL;
210 6 : u.base.type = type_leave;
211 6 : u.base.flags = 0;
212 6 : u.base.group[0] = '\0';
213 6 : u.base.routing_id = 0;
214 6 : return 0;
215 : }
216 :
217 10662193 : int zmq::msg_t::close ()
218 : {
219 : // Check the validity of the message.
220 10662193 : if (unlikely (!check ())) {
221 0 : errno = EFAULT;
222 0 : return -1;
223 : }
224 :
225 10662193 : if (u.base.type == type_lmsg) {
226 :
227 : // If the content is not shared, or if it is shared and the reference
228 : // count has dropped to zero, deallocate it.
229 1317 : if (!(u.lmsg.flags & msg_t::shared) ||
230 18 : !u.lmsg.content->refcnt.sub (1)) {
231 :
232 : // We used "placement new" operator to initialize the reference
233 : // counter so we call the destructor explicitly now.
234 1290 : u.lmsg.content->refcnt.~atomic_counter_t ();
235 :
236 1290 : if (u.lmsg.content->ffn)
237 : u.lmsg.content->ffn (u.lmsg.content->data,
238 39 : u.lmsg.content->hint);
239 1290 : free (u.lmsg.content);
240 : }
241 : }
242 :
243 10662193 : if (is_zcmsg())
244 : {
245 1233 : zmq_assert( u.zclmsg.content->ffn );
246 :
247 : // If the content is not shared, or if it is shared and the reference
248 : // count has dropped to zero, deallocate it.
249 34363 : if (!(u.zclmsg.flags & msg_t::shared) ||
250 0 : !u.zclmsg.content->refcnt.sub (1)) {
251 :
252 : // We used "placement new" operator to initialize the reference
253 : // counter so we call the destructor explicitly now.
254 1233 : u.zclmsg.content->refcnt.~atomic_counter_t ();
255 :
256 : u.zclmsg.content->ffn (u.zclmsg.content->data,
257 1233 : u.zclmsg.content->hint);
258 : }
259 : }
260 :
261 10695323 : if (u.base.metadata != NULL) {
262 635509 : if (u.base.metadata->drop_ref ()) {
263 124 : LIBZMQ_DELETE(u.base.metadata);
264 : }
265 635509 : u.base.metadata = NULL;
266 : }
267 :
268 : // Make the message invalid.
269 10695323 : u.base.type = 0;
270 :
271 10695323 : return 0;
272 : }
273 :
274 756 : int zmq::msg_t::move (msg_t &src_)
275 : {
276 : // Check the validity of the source.
277 756 : if (unlikely (!src_.check ())) {
278 0 : errno = EFAULT;
279 0 : return -1;
280 : }
281 :
282 756 : int rc = close ();
283 756 : if (unlikely (rc < 0))
284 : return rc;
285 :
286 756 : *this = src_;
287 :
288 756 : rc = src_.init ();
289 : if (unlikely (rc < 0))
290 : return rc;
291 :
292 756 : return 0;
293 : }
294 :
295 57 : int zmq::msg_t::copy (msg_t &src_)
296 : {
297 : // Check the validity of the source.
298 57 : if (unlikely (!src_.check ())) {
299 0 : errno = EFAULT;
300 0 : return -1;
301 : }
302 :
303 57 : int rc = close ();
304 57 : if (unlikely (rc < 0))
305 : return rc;
306 :
307 57 : if (src_.u.base.type == type_lmsg ) {
308 :
309 : // One reference is added to shared messages. Non-shared messages
310 : // are turned into shared messages and reference count is set to 2.
311 9 : if (src_.u.lmsg.flags & msg_t::shared)
312 0 : src_.u.lmsg.content->refcnt.add (1);
313 : else {
314 9 : src_.u.lmsg.flags |= msg_t::shared;
315 9 : src_.u.lmsg.content->refcnt.set (2);
316 : }
317 : }
318 :
319 57 : if (src_.is_zcmsg()) {
320 :
321 : // One reference is added to shared messages. Non-shared messages
322 : // are turned into shared messages and reference count is set to 2.
323 0 : if (src_.u.zclmsg.flags & msg_t::shared)
324 0 : src_.refcnt()->add (1);
325 : else {
326 0 : src_.u.zclmsg.flags |= msg_t::shared;
327 0 : src_.refcnt()->set (2);
328 : }
329 : }
330 57 : if (src_.u.base.metadata != NULL)
331 42 : src_.u.base.metadata->add_ref ();
332 :
333 57 : *this = src_;
334 :
335 57 : return 0;
336 :
337 : }
338 :
339 3206322 : void *zmq::msg_t::data ()
340 : {
341 : // Check the validity of the message.
342 3206322 : zmq_assert (check ());
343 :
344 3205789 : switch (u.base.type) {
345 : case type_vsm:
346 3200632 : return u.vsm.data;
347 : case type_lmsg:
348 2523 : return u.lmsg.content->data;
349 : case type_cmsg:
350 201 : return u.cmsg.data;
351 : case type_zclmsg:
352 2433 : return u.zclmsg.content->data;
353 : default:
354 0 : zmq_assert (false);
355 0 : return NULL;
356 : }
357 : }
358 :
359 4155532 : size_t zmq::msg_t::size ()
360 : {
361 : // Check the validity of the message.
362 4155532 : zmq_assert (check ());
363 :
364 4154665 : switch (u.base.type) {
365 : case type_vsm:
366 4147693 : return u.vsm.size;
367 : case type_lmsg:
368 3846 : return u.lmsg.content->size;
369 : case type_zclmsg:
370 2697 : return u.zclmsg.content->size;
371 : case type_cmsg:
372 429 : return u.cmsg.size;
373 : default:
374 0 : zmq_assert (false);
375 0 : return 0;
376 : }
377 : }
378 :
379 10153584 : unsigned char zmq::msg_t::flags ()
380 : {
381 10153584 : return u.base.flags;
382 : }
383 :
384 646492 : void zmq::msg_t::set_flags (unsigned char flags_)
385 : {
386 646492 : u.base.flags |= flags_;
387 646492 : }
388 :
389 864555 : void zmq::msg_t::reset_flags (unsigned char flags_)
390 : {
391 864555 : u.base.flags &= ~flags_;
392 864555 : }
393 :
394 691 : zmq::metadata_t *zmq::msg_t::metadata () const
395 : {
396 691 : return u.base.metadata;
397 : }
398 :
399 635569 : void zmq::msg_t::set_metadata (zmq::metadata_t *metadata_)
400 : {
401 : assert (metadata_ != NULL);
402 : assert (u.base.metadata == NULL);
403 635569 : metadata_->add_ref ();
404 635569 : u.base.metadata = metadata_;
405 635569 : }
406 :
407 864522 : void zmq::msg_t::reset_metadata ()
408 : {
409 864522 : if (u.base.metadata) {
410 102 : if (u.base.metadata->drop_ref ()) {
411 0 : LIBZMQ_DELETE(u.base.metadata);
412 : }
413 102 : u.base.metadata = NULL;
414 : }
415 864522 : }
416 :
417 2871352 : bool zmq::msg_t::is_identity () const
418 : {
419 2871352 : return (u.base.flags & identity) == identity;
420 : }
421 :
422 1480851 : bool zmq::msg_t::is_credential () const
423 : {
424 1480851 : return (u.base.flags & credential) == credential;
425 : }
426 :
427 1481289 : bool zmq::msg_t::is_delimiter () const
428 : {
429 1481289 : return u.base.type == type_delimiter;
430 : }
431 :
432 63537 : bool zmq::msg_t::is_vsm () const
433 : {
434 63537 : return u.base.type == type_vsm;
435 : }
436 :
437 9 : bool zmq::msg_t::is_cmsg () const
438 : {
439 9 : return u.base.type == type_cmsg;
440 : }
441 :
442 639175 : bool zmq::msg_t::is_zcmsg() const
443 : {
444 11301425 : return u.base.type == type_zclmsg;
445 : }
446 :
447 42 : bool zmq::msg_t::is_join() const
448 : {
449 42 : return u.base.type == type_join;
450 : }
451 :
452 6 : bool zmq::msg_t::is_leave() const
453 : {
454 6 : return u.base.type == type_leave;
455 : }
456 :
457 30 : void zmq::msg_t::add_refs (int refs_)
458 : {
459 30 : zmq_assert (refs_ >= 0);
460 :
461 : // Operation not supported for messages with metadata.
462 30 : zmq_assert (u.base.metadata == NULL);
463 :
464 : // No copies required.
465 30 : if (!refs_)
466 30 : return;
467 :
468 : // VSMs, CMSGS and delimiters can be copied straight away. The only
469 : // message type that needs special care are long messages.
470 0 : if (u.base.type == type_lmsg || is_zcmsg() ) {
471 0 : if (u.base.flags & msg_t::shared)
472 0 : refcnt()->add (refs_);
473 : else {
474 0 : refcnt()->set (refs_ + 1);
475 0 : u.base.flags |= msg_t::shared;
476 : }
477 : }
478 : }
479 :
480 0 : bool zmq::msg_t::rm_refs (int refs_)
481 : {
482 0 : zmq_assert (refs_ >= 0);
483 :
484 : // Operation not supported for messages with metadata.
485 0 : zmq_assert (u.base.metadata == NULL);
486 :
487 : // No copies required.
488 0 : if (!refs_)
489 : return true;
490 :
491 : // If there's only one reference close the message.
492 0 : if ( (u.base.type != type_zclmsg && u.base.type != type_lmsg) || !(u.base.flags & msg_t::shared)) {
493 0 : close ();
494 0 : return false;
495 : }
496 :
497 : // The only message type that needs special care are long and zcopy messages.
498 0 : if (u.base.type == type_lmsg && !u.lmsg.content->refcnt.sub(refs_)) {
499 : // We used "placement new" operator to initialize the reference
500 : // counter so we call the destructor explicitly now.
501 0 : u.lmsg.content->refcnt.~atomic_counter_t ();
502 :
503 0 : if (u.lmsg.content->ffn)
504 0 : u.lmsg.content->ffn (u.lmsg.content->data, u.lmsg.content->hint);
505 0 : free (u.lmsg.content);
506 :
507 0 : return false;
508 : }
509 :
510 0 : if (is_zcmsg() && !u.zclmsg.content->refcnt.sub(refs_)) {
511 : // storage for rfcnt is provided externally
512 0 : if (u.zclmsg.content->ffn) {
513 0 : u.zclmsg.content->ffn(u.zclmsg.content->data, u.zclmsg.content->hint);
514 : }
515 :
516 : return false;
517 : }
518 :
519 : return true;
520 : }
521 :
522 27 : uint32_t zmq::msg_t::get_routing_id ()
523 : {
524 27 : return u.base.routing_id;
525 : }
526 :
527 600027 : int zmq::msg_t::set_routing_id (uint32_t routing_id_)
528 : {
529 600027 : if (routing_id_) {
530 600027 : u.base.routing_id = routing_id_;
531 600027 : return 0;
532 : }
533 0 : errno = EINVAL;
534 0 : return -1;
535 : }
536 :
537 9 : int zmq::msg_t::reset_routing_id ()
538 : {
539 9 : u.base.routing_id = 0;
540 9 : return 0;
541 : }
542 :
543 87 : const char * zmq::msg_t::group ()
544 : {
545 87 : return u.base.group;
546 : }
547 :
548 33 : int zmq::msg_t::set_group (const char * group_)
549 : {
550 33 : return set_group (group_, strlen (group_));
551 : }
552 :
553 54 : int zmq::msg_t::set_group (const char * group_, size_t length_)
554 : {
555 54 : if (length_> ZMQ_GROUP_MAX_LENGTH)
556 : {
557 0 : errno = EINVAL;
558 0 : return -1;
559 : }
560 :
561 54 : strncpy (u.base.group, group_, length_);
562 54 : u.base.group[length_] = '\0';
563 :
564 54 : return 0;
565 : }
566 :
567 0 : zmq::atomic_counter_t *zmq::msg_t::refcnt()
568 : {
569 0 : switch(u.base.type)
570 : {
571 : case type_lmsg:
572 0 : return &u.lmsg.content->refcnt;
573 : case type_zclmsg:
574 0 : return &u.zclmsg.content->refcnt;
575 : default:
576 0 : zmq_assert(false);
577 0 : return NULL;
578 : }
579 : }
|