Branch data Line data Source code
1 : : /*
2 : : Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 : :
4 : : This file is part of libzmq, the ZeroMQ core engine in C++.
5 : :
6 : : libzmq is free software; you can redistribute it and/or modify it under
7 : : the terms of the GNU Lesser General Public License (LGPL) as published
8 : : by the Free Software Foundation; either version 3 of the License, or
9 : : (at your option) any later version.
10 : :
11 : : As a special exception, the Contributors give you permission to link
12 : : this library with independent modules to produce an executable,
13 : : regardless of the license terms of these independent modules, and to
14 : : copy and distribute the resulting executable under terms of your choice,
15 : : provided that you also meet, for each linked independent module, the
16 : : terms and conditions of the license of that module. An independent
17 : : module is a module which is not derived from or based on this library.
18 : : If you modify this library, you must extend this exception to your
19 : : version of the library.
20 : :
21 : : libzmq is distributed in the hope that it will be useful, but WITHOUT
22 : : ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 : : FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 : : License for more details.
25 : :
26 : : You should have received a copy of the GNU Lesser General Public License
27 : : along with this program. If not, see <http://www.gnu.org/licenses/>.
28 : : */
29 : :
30 : : #include "precompiled.hpp"
31 : : #include "macros.hpp"
32 : : #include "msg.hpp"
33 : : #include "../include/zmq.h"
34 : :
35 : : #include <string.h>
36 : : #include <stdlib.h>
37 : : #include <new>
38 : :
39 : : #include "stdint.hpp"
40 : : #include "likely.hpp"
41 : : #include "metadata.hpp"
42 : : #include "err.hpp"
43 : :
44 : : // Check whether the sizes of public representation of the message (zmq_msg_t)
45 : : // and private representation of the message (zmq::msg_t) match.
46 : :
47 : : typedef char zmq_msg_size_check
48 : : [2 * ((sizeof (zmq::msg_t) == sizeof (zmq_msg_t)) != 0) - 1];
49 : :
50 : 6138147 : bool zmq::msg_t::check ()
51 : : {
52 : 6138147 : return u.base.type >= type_min && u.base.type <= type_max;
53 : : }
54 : :
55 : 212206 : int zmq::msg_t::init (void* data_, size_t size_,
56 : : msg_free_fn* ffn_, void* hint,
57 : : content_t* content_)
58 : : {
59 : 212206 : if (size_ < max_vsm_size) {
60 : 211718 : int const rc = init_size(size_);
61 : :
62 : 211718 : if (rc != -1)
63 : : {
64 : 211718 : memcpy(data(), data_, size_);
65 : 211718 : return 0;
66 : : }
67 : : else
68 : : {
69 : 0 : return -1;
70 : : }
71 : : }
72 : 488 : else if(content_)
73 : : {
74 : 488 : return init_external_storage(content_, data_, size_, ffn_, hint);
75 : : }
76 : : else
77 : : {
78 : 0 : return init_data(data_, size_, ffn_, hint);
79 : : }
80 : : }
81 : :
82 : 3153642 : int zmq::msg_t::init ()
83 : : {
84 : 3153642 : u.vsm.metadata = NULL;
85 : 3153642 : u.vsm.type = type_vsm;
86 : 3153642 : u.vsm.flags = 0;
87 : 3153642 : u.vsm.size = 0;
88 : 3153642 : u.vsm.group[0] = '\0';
89 : 3153642 : u.vsm.routing_id = 0;
90 : 3153642 : u.vsm.fd = retired_fd;
91 : 3153642 : return 0;
92 : : }
93 : :
94 : 495945 : int zmq::msg_t::init_size (size_t size_)
95 : : {
96 : 495945 : if (size_ <= max_vsm_size) {
97 : 495395 : u.vsm.metadata = NULL;
98 : 495395 : u.vsm.type = type_vsm;
99 : 495395 : u.vsm.flags = 0;
100 : 495395 : u.vsm.size = (unsigned char) size_;
101 : 495395 : u.vsm.group[0] = '\0';
102 : 495395 : u.vsm.routing_id = 0;
103 : 495395 : u.vsm.fd = retired_fd;
104 : : }
105 : : else {
106 : 550 : u.lmsg.metadata = NULL;
107 : 550 : u.lmsg.type = type_lmsg;
108 : 550 : u.lmsg.flags = 0;
109 : 550 : u.lmsg.group[0] = '\0';
110 : 550 : u.lmsg.routing_id = 0;
111 : 550 : u.lmsg.fd = retired_fd;
112 : 550 : u.lmsg.content = NULL;
113 : 550 : if (sizeof (content_t) + size_ > size_)
114 : 550 : u.lmsg.content = (content_t*) malloc (sizeof (content_t) + size_);
115 : 550 : if (unlikely (!u.lmsg.content)) {
116 : 0 : errno = ENOMEM;
117 : 0 : return -1;
118 : : }
119 : :
120 : 550 : u.lmsg.content->data = u.lmsg.content + 1;
121 : 550 : u.lmsg.content->size = size_;
122 : 550 : u.lmsg.content->ffn = NULL;
123 : 550 : u.lmsg.content->hint = NULL;
124 : 550 : new (&u.lmsg.content->refcnt) zmq::atomic_counter_t ();
125 : : }
126 : 495944 : return 0;
127 : : }
128 : :
129 : 488 : int zmq::msg_t::init_external_storage(content_t* content_, void* data_, size_t size_,
130 : : msg_free_fn *ffn_, void* hint_)
131 : : {
132 : 488 : zmq_assert(NULL != data_);
133 : 488 : zmq_assert(NULL != content_);
134 : :
135 : 488 : u.zclmsg.metadata = NULL;
136 : 488 : u.zclmsg.type = type_zclmsg;
137 : 488 : u.zclmsg.flags = 0;
138 : 488 : u.zclmsg.group[0] = '\0';
139 : 488 : u.zclmsg.routing_id = 0;
140 : 488 : u.zclmsg.fd = retired_fd;
141 : :
142 : 488 : u.zclmsg.content = content_;
143 : 488 : u.zclmsg.content->data = data_;
144 : 488 : u.zclmsg.content->size = size_;
145 : 488 : u.zclmsg.content->ffn = ffn_;
146 : 488 : u.zclmsg.content->hint = hint_;
147 : 488 : new (&u.zclmsg.content->refcnt) zmq::atomic_counter_t();
148 : :
149 : 488 : return 0;
150 : : }
151 : :
152 : 70 : int zmq::msg_t::init_data (void *data_, size_t size_,
153 : : msg_free_fn *ffn_, void *hint_)
154 : : {
155 : : // If data is NULL and size is not 0, a segfault
156 : : // would occur once the data is accessed
157 : 70 : zmq_assert (data_ != NULL || size_ == 0);
158 : :
159 : : // Initialize constant message if there's no need to deallocate
160 : 70 : if (ffn_ == NULL) {
161 : 57 : u.cmsg.metadata = NULL;
162 : 57 : u.cmsg.type = type_cmsg;
163 : 57 : u.cmsg.flags = 0;
164 : 57 : u.cmsg.data = data_;
165 : 57 : u.cmsg.size = size_;
166 : 57 : u.cmsg.group[0] = '\0';
167 : 57 : u.cmsg.routing_id = 0;
168 : 57 : u.cmsg.fd = retired_fd;
169 : : }
170 : : else {
171 : 13 : u.lmsg.metadata = NULL;
172 : 13 : u.lmsg.type = type_lmsg;
173 : 13 : u.lmsg.flags = 0;
174 : 13 : u.lmsg.group[0] = '\0';
175 : 13 : u.lmsg.routing_id = 0;
176 : 13 : u.lmsg.fd = retired_fd;
177 : 13 : u.lmsg.content = (content_t*) malloc (sizeof (content_t));
178 : 13 : if (!u.lmsg.content) {
179 : 0 : errno = ENOMEM;
180 : 0 : return -1;
181 : : }
182 : :
183 : 13 : u.lmsg.content->data = data_;
184 : 13 : u.lmsg.content->size = size_;
185 : 13 : u.lmsg.content->ffn = ffn_;
186 : 13 : u.lmsg.content->hint = hint_;
187 : 13 : new (&u.lmsg.content->refcnt) zmq::atomic_counter_t ();
188 : : }
189 : 70 : return 0;
190 : :
191 : : }
192 : :
193 : 2930 : int zmq::msg_t::init_delimiter ()
194 : : {
195 : 2930 : u.delimiter.metadata = NULL;
196 : 2930 : u.delimiter.type = type_delimiter;
197 : 2930 : u.delimiter.flags = 0;
198 : 2930 : u.delimiter.group[0] = '\0';
199 : 2930 : u.delimiter.routing_id = 0;
200 : 2930 : u.delimiter.fd = retired_fd;
201 : 2930 : return 0;
202 : : }
203 : :
204 : 6 : int zmq::msg_t::init_join ()
205 : : {
206 : 6 : u.base.metadata = NULL;
207 : 6 : u.base.type = type_join;
208 : 6 : u.base.flags = 0;
209 : 6 : u.base.group[0] = '\0';
210 : 6 : u.base.routing_id = 0;
211 : 6 : u.base.fd = retired_fd;
212 : 6 : return 0;
213 : : }
214 : :
215 : 2 : int zmq::msg_t::init_leave ()
216 : : {
217 : 2 : u.base.metadata = NULL;
218 : 2 : u.base.type = type_leave;
219 : 2 : u.base.flags = 0;
220 : 2 : u.base.group[0] = '\0';
221 : 2 : u.base.routing_id = 0;
222 : 2 : u.base.fd = retired_fd;
223 : 2 : return 0;
224 : : }
225 : :
226 : 2768844 : int zmq::msg_t::close ()
227 : : {
228 : : // Check the validity of the message.
229 : 2768844 : if (unlikely (!check ())) {
230 : 0 : errno = EFAULT;
231 : 0 : return -1;
232 : : }
233 : :
234 : 3028369 : if (u.base.type == type_lmsg) {
235 : :
236 : : // If the content is not shared, or if it is shared and the reference
237 : : // count has dropped to zero, deallocate it.
238 : 572 : if (!(u.lmsg.flags & msg_t::shared) ||
239 : 6 : !u.lmsg.content->refcnt.sub (1)) {
240 : :
241 : : // We used "placement new" operator to initialize the reference
242 : : // counter so we call the destructor explicitly now.
243 : 563 : u.lmsg.content->refcnt.~atomic_counter_t ();
244 : :
245 : 563 : if (u.lmsg.content->ffn)
246 : : u.lmsg.content->ffn (u.lmsg.content->data,
247 : 13 : u.lmsg.content->hint);
248 : 563 : free (u.lmsg.content);
249 : : }
250 : : }
251 : :
252 : 3028369 : if (is_zcmsg())
253 : : {
254 : 488 : zmq_assert( u.zclmsg.content->ffn );
255 : :
256 : : // If the content is not shared, or if it is shared and the reference
257 : : // count has dropped to zero, deallocate it.
258 : 488 : if (!(u.zclmsg.flags & msg_t::shared) ||
259 : 0 : !u.zclmsg.content->refcnt.sub (1)) {
260 : :
261 : : // We used "placement new" operator to initialize the reference
262 : : // counter so we call the destructor explicitly now.
263 : 488 : u.zclmsg.content->refcnt.~atomic_counter_t ();
264 : :
265 : : u.zclmsg.content->ffn (u.zclmsg.content->data,
266 : 488 : u.zclmsg.content->hint);
267 : : }
268 : : }
269 : :
270 : 3059387 : if (u.base.metadata != NULL) {
271 : 211766 : if (u.base.metadata->drop_ref ()) {
272 : 22 : LIBZMQ_DELETE(u.base.metadata);
273 : : }
274 : 211766 : u.base.metadata = NULL;
275 : : }
276 : :
277 : : // Make the message invalid.
278 : 3059387 : u.base.type = 0;
279 : :
280 : 3059387 : return 0;
281 : : }
282 : :
283 : 243 : int zmq::msg_t::move (msg_t &src_)
284 : : {
285 : : // Check the validity of the source.
286 : 243 : if (unlikely (!src_.check ())) {
287 : 0 : errno = EFAULT;
288 : 0 : return -1;
289 : : }
290 : :
291 : 243 : int rc = close ();
292 : 243 : if (unlikely (rc < 0))
293 : 0 : return rc;
294 : :
295 : 243 : *this = src_;
296 : :
297 : 243 : rc = src_.init ();
298 : 243 : if (unlikely (rc < 0))
299 : 0 : return rc;
300 : :
301 : 243 : return 0;
302 : : }
303 : :
304 : 19 : int zmq::msg_t::copy (msg_t &src_)
305 : : {
306 : : // Check the validity of the source.
307 : 19 : if (unlikely (!src_.check ())) {
308 : 0 : errno = EFAULT;
309 : 0 : return -1;
310 : : }
311 : :
312 : 19 : int rc = close ();
313 : 19 : if (unlikely (rc < 0))
314 : 0 : return rc;
315 : :
316 : 19 : if (src_.u.base.type == type_lmsg ) {
317 : :
318 : : // One reference is added to shared messages. Non-shared messages
319 : : // are turned into shared messages and reference count is set to 2.
320 : 3 : if (src_.u.lmsg.flags & msg_t::shared)
321 : 0 : src_.u.lmsg.content->refcnt.add (1);
322 : : else {
323 : 3 : src_.u.lmsg.flags |= msg_t::shared;
324 : 3 : src_.u.lmsg.content->refcnt.set (2);
325 : : }
326 : : }
327 : :
328 : 19 : if (src_.is_zcmsg()) {
329 : :
330 : : // One reference is added to shared messages. Non-shared messages
331 : : // are turned into shared messages and reference count is set to 2.
332 : 0 : if (src_.u.zclmsg.flags & msg_t::shared)
333 : 0 : src_.refcnt()->add (1);
334 : : else {
335 : 0 : src_.u.zclmsg.flags |= msg_t::shared;
336 : 0 : src_.refcnt()->set (2);
337 : : }
338 : : }
339 : 19 : if (src_.u.base.metadata != NULL)
340 : 14 : src_.u.base.metadata->add_ref ();
341 : :
342 : 19 : *this = src_;
343 : :
344 : 19 : return 0;
345 : :
346 : : }
347 : :
348 : 1027067 : void *zmq::msg_t::data ()
349 : : {
350 : : // Check the validity of the message.
351 : 1027067 : zmq_assert (check ());
352 : :
353 : 1028457 : switch (u.base.type) {
354 : : case type_vsm:
355 : 1026379 : return u.vsm.data;
356 : : case type_lmsg:
357 : 1061 : return u.lmsg.content->data;
358 : : case type_cmsg:
359 : 59 : return u.cmsg.data;
360 : : case type_zclmsg:
361 : 958 : return u.zclmsg.content->data;
362 : : default:
363 : 0 : zmq_assert (false);
364 : 0 : return NULL;
365 : : }
366 : : }
367 : :
368 : 1375401 : size_t zmq::msg_t::size ()
369 : : {
370 : : // Check the validity of the message.
371 : 1375401 : zmq_assert (check ());
372 : :
373 : 1399775 : switch (u.base.type) {
374 : : case type_vsm:
375 : 1396952 : return u.vsm.size;
376 : : case type_lmsg:
377 : 1654 : return u.lmsg.content->size;
378 : : case type_zclmsg:
379 : 1042 : return u.zclmsg.content->size;
380 : : case type_cmsg:
381 : 127 : return u.cmsg.size;
382 : : default:
383 : 0 : zmq_assert (false);
384 : 0 : return 0;
385 : : }
386 : : }
387 : :
388 : 3166579 : unsigned char zmq::msg_t::flags ()
389 : : {
390 : 3166579 : return u.base.flags;
391 : : }
392 : :
393 : 213786 : void zmq::msg_t::set_flags (unsigned char flags_)
394 : : {
395 : 213786 : u.base.flags |= flags_;
396 : 213786 : }
397 : :
398 : 285695 : void zmq::msg_t::reset_flags (unsigned char flags_)
399 : : {
400 : 285695 : u.base.flags &= ~flags_;
401 : 285695 : }
402 : :
403 : 1 : zmq::fd_t zmq::msg_t::fd ()
404 : : {
405 : 1 : return u.base.fd;
406 : : }
407 : :
408 : 211497 : void zmq::msg_t::set_fd (fd_t fd_)
409 : : {
410 : 211497 : u.base.fd = fd_;
411 : 211497 : }
412 : :
413 : 223 : zmq::metadata_t *zmq::msg_t::metadata () const
414 : : {
415 : 223 : return u.base.metadata;
416 : : }
417 : :
418 : 211786 : void zmq::msg_t::set_metadata (zmq::metadata_t *metadata_)
419 : : {
420 : : assert (metadata_ != NULL);
421 : : assert (u.base.metadata == NULL);
422 : 211786 : metadata_->add_ref ();
423 : 211786 : u.base.metadata = metadata_;
424 : 211786 : }
425 : :
426 : 285684 : void zmq::msg_t::reset_metadata ()
427 : : {
428 : 285684 : if (u.base.metadata) {
429 : 34 : if (u.base.metadata->drop_ref ()) {
430 : 0 : LIBZMQ_DELETE(u.base.metadata);
431 : : }
432 : 34 : u.base.metadata = NULL;
433 : : }
434 : 285684 : }
435 : :
436 : 961585 : bool zmq::msg_t::is_identity () const
437 : : {
438 : 961585 : return (u.base.flags & identity) == identity;
439 : : }
440 : :
441 : 496933 : bool zmq::msg_t::is_credential () const
442 : : {
443 : 496933 : return (u.base.flags & credential) == credential;
444 : : }
445 : :
446 : 497135 : bool zmq::msg_t::is_delimiter () const
447 : : {
448 : 497135 : return u.base.type == type_delimiter;
449 : : }
450 : :
451 : 22652 : bool zmq::msg_t::is_vsm () const
452 : : {
453 : 22652 : return u.base.type == type_vsm;
454 : : }
455 : :
456 : 3 : bool zmq::msg_t::is_cmsg () const
457 : : {
458 : 3 : return u.base.type == type_cmsg;
459 : : }
460 : :
461 : 2968764 : bool zmq::msg_t::is_zcmsg() const
462 : : {
463 : 2968764 : return u.base.type == type_zclmsg;
464 : : }
465 : :
466 : 14 : bool zmq::msg_t::is_join() const
467 : : {
468 : 14 : return u.base.type == type_join;
469 : : }
470 : :
471 : 2 : bool zmq::msg_t::is_leave() const
472 : : {
473 : 2 : return u.base.type == type_leave;
474 : : }
475 : :
476 : 6 : void zmq::msg_t::add_refs (int refs_)
477 : : {
478 : 6 : zmq_assert (refs_ >= 0);
479 : :
480 : : // Operation not supported for messages with metadata.
481 : 6 : zmq_assert (u.base.metadata == NULL);
482 : :
483 : : // No copies required.
484 : 6 : if (!refs_)
485 : 12 : return;
486 : :
487 : : // VSMs, CMSGS and delimiters can be copied straight away. The only
488 : : // message type that needs special care are long messages.
489 : 0 : if (u.base.type == type_lmsg || is_zcmsg() ) {
490 : 0 : if (u.base.flags & msg_t::shared)
491 : 0 : refcnt()->add (refs_);
492 : : else {
493 : 0 : refcnt()->set (refs_ + 1);
494 : 0 : u.base.flags |= msg_t::shared;
495 : : }
496 : : }
497 : : }
498 : :
499 : 0 : bool zmq::msg_t::rm_refs (int refs_)
500 : : {
501 : 0 : zmq_assert (refs_ >= 0);
502 : :
503 : : // Operation not supported for messages with metadata.
504 : 0 : zmq_assert (u.base.metadata == NULL);
505 : :
506 : : // No copies required.
507 : 0 : if (!refs_)
508 : 0 : return true;
509 : :
510 : : // If there's only one reference close the message.
511 : 0 : if ( (u.base.type != type_zclmsg && u.base.type != type_lmsg) || !(u.base.flags & msg_t::shared)) {
512 : 0 : close ();
513 : 0 : return false;
514 : : }
515 : :
516 : : // The only message type that needs special care are long and zcopy messages.
517 : 0 : if (u.base.type == type_lmsg && !u.lmsg.content->refcnt.sub(refs_)) {
518 : : // We used "placement new" operator to initialize the reference
519 : : // counter so we call the destructor explicitly now.
520 : 0 : u.lmsg.content->refcnt.~atomic_counter_t ();
521 : :
522 : 0 : if (u.lmsg.content->ffn)
523 : 0 : u.lmsg.content->ffn (u.lmsg.content->data, u.lmsg.content->hint);
524 : 0 : free (u.lmsg.content);
525 : :
526 : 0 : return false;
527 : : }
528 : :
529 : 0 : if (is_zcmsg() && !u.zclmsg.content->refcnt.sub(refs_)) {
530 : : // storage for rfcnt is provided externally
531 : 0 : if (u.zclmsg.content->ffn) {
532 : 0 : u.zclmsg.content->ffn(u.zclmsg.content->data, u.zclmsg.content->hint);
533 : : }
534 : :
535 : 0 : return false;
536 : : }
537 : :
538 : 0 : return true;
539 : : }
540 : :
541 : 9 : uint32_t zmq::msg_t::get_routing_id ()
542 : : {
543 : 9 : return u.base.routing_id;
544 : : }
545 : :
546 : 200009 : int zmq::msg_t::set_routing_id (uint32_t routing_id_)
547 : : {
548 : 200009 : if (routing_id_) {
549 : 200009 : u.base.routing_id = routing_id_;
550 : 200009 : return 0;
551 : : }
552 : 0 : errno = EINVAL;
553 : 0 : return -1;
554 : : }
555 : :
556 : 3 : int zmq::msg_t::reset_routing_id ()
557 : : {
558 : 3 : u.base.routing_id = 0;
559 : 3 : return 0;
560 : : }
561 : :
562 : 29 : const char * zmq::msg_t::group ()
563 : : {
564 : 29 : return u.base.group;
565 : : }
566 : :
567 : 11 : int zmq::msg_t::set_group (const char * group_)
568 : : {
569 : 11 : return set_group (group_, strlen (group_));
570 : : }
571 : :
572 : 18 : int zmq::msg_t::set_group (const char * group_, size_t length_)
573 : : {
574 : 18 : if (length_> ZMQ_GROUP_MAX_LENGTH)
575 : : {
576 : 0 : errno = EINVAL;
577 : 0 : return -1;
578 : : }
579 : :
580 : 18 : strncpy (u.base.group, group_, length_);
581 : 18 : u.base.group[length_] = '\0';
582 : :
583 : 18 : return 0;
584 : : }
585 : :
586 : 0 : zmq::atomic_counter_t *zmq::msg_t::refcnt()
587 : : {
588 : 0 : switch(u.base.type)
589 : : {
590 : : case type_lmsg:
591 : 0 : return &u.lmsg.content->refcnt;
592 : : case type_zclmsg:
593 : 0 : return &u.zclmsg.content->refcnt;
594 : : default:
595 : 0 : zmq_assert(false);
596 : 0 : return NULL;
597 : : }
598 : : }
|