LCOV - code coverage report
Current view: top level - home/h/core/forks/m4-libzmq/src - msg.cpp (source / functions) Hit Total Coverage
Test: zeromq-4.2.0 Code Coverage Lines: 210 263 79.8 %
Date: 2016-05-09 Functions: 35 37 94.6 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*
       2             :     Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
       3             : 
       4             :     This file is part of libzmq, the ZeroMQ core engine in C++.
       5             : 
       6             :     libzmq is free software; you can redistribute it and/or modify it under
       7             :     the terms of the GNU Lesser General Public License (LGPL) as published
       8             :     by the Free Software Foundation; either version 3 of the License, or
       9             :     (at your option) any later version.
      10             : 
      11             :     As a special exception, the Contributors give you permission to link
      12             :     this library with independent modules to produce an executable,
      13             :     regardless of the license terms of these independent modules, and to
      14             :     copy and distribute the resulting executable under terms of your choice,
      15             :     provided that you also meet, for each linked independent module, the
      16             :     terms and conditions of the license of that module. An independent
      17             :     module is a module which is not derived from or based on this library.
      18             :     If you modify this library, you must extend this exception to your
      19             :     version of the library.
      20             : 
      21             :     libzmq is distributed in the hope that it will be useful, but WITHOUT
      22             :     ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
      23             :     FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
      24             :     License for more details.
      25             : 
      26             :     You should have received a copy of the GNU Lesser General Public License
      27             :     along with this program.  If not, see <http://www.gnu.org/licenses/>.
      28             : */
      29             : 
      30             : #include "precompiled.hpp"
      31             : #include "macros.hpp"
      32             : #include "msg.hpp"
      33             : #include "../include/zmq.h"
      34             : 
      35             : #include <string.h>
      36             : #include <stdlib.h>
      37             : #include <new>
      38             : 
      39             : #include "stdint.hpp"
      40             : #include "likely.hpp"
      41             : #include "metadata.hpp"
      42             : #include "err.hpp"
      43             : 
      44             : //  Check whether the sizes of public representation of the message (zmq_msg_t)
      45             : //  and private representation of the message (zmq::msg_t) match.
      46             : 
      47             : typedef char zmq_msg_size_check
      48             :     [2 * ((sizeof (zmq::msg_t) == sizeof (zmq_msg_t)) != 0) - 1];
      49             : 
      50     4835318 : bool zmq::msg_t::check ()
      51             : {
      52    22860178 :      return u.base.type >= type_min && u.base.type <= type_max;
      53             : }
      54             : 
      55      639175 : int zmq::msg_t::init (void* data_, size_t size_,
      56             :                       msg_free_fn* ffn_, void* hint,
      57             :                       content_t* content_)
      58             : {
      59      639175 :     if (size_ < max_vsm_size) {
      60      637942 :         int const rc = init_size(size_);
      61             : 
      62      637942 :         if (rc != -1)
      63             :         {
      64      637942 :             memcpy(data(), data_, size_);
      65      637942 :             return 0;
      66             :         }
      67             :         else
      68             :         {
      69             :             return -1;
      70             :         }
      71             :     }
      72        1233 :     else if(content_)
      73             :     {
      74        1233 :         return init_external_storage(content_, data_, size_, ffn_, hint);
      75             :     }
      76             :     else
      77             :     {
      78           0 :         return init_data(data_, size_, ffn_, hint);
      79             :     }
      80             : }
      81             : 
      82     9607074 : int zmq::msg_t::init ()
      83             : {
      84     9607830 :     u.vsm.metadata = NULL;
      85     9607830 :     u.vsm.type = type_vsm;
      86     9607830 :     u.vsm.flags = 0;
      87     9607830 :     u.vsm.size = 0;
      88     9607830 :     u.vsm.group[0] = '\0';
      89     9607830 :     u.vsm.routing_id = 0;
      90     9607074 :     return 0;
      91             : }
      92             : 
      93     1493429 : int zmq::msg_t::init_size (size_t size_)
      94             : {
      95     1493429 :     if (size_ <= max_vsm_size) {
      96     1492178 :         u.vsm.metadata = NULL;
      97     1492178 :         u.vsm.type = type_vsm;
      98     1492178 :         u.vsm.flags = 0;
      99     1492178 :         u.vsm.size = (unsigned char) size_;
     100     1492178 :         u.vsm.group[0] = '\0';
     101     1492178 :         u.vsm.routing_id = 0;
     102             :     }
     103             :     else {
     104        1251 :         u.lmsg.metadata = NULL;
     105        1251 :         u.lmsg.type = type_lmsg;
     106        1251 :         u.lmsg.flags = 0;
     107        1251 :         u.lmsg.group[0] = '\0';
     108        1251 :         u.lmsg.routing_id = 0;
     109        1251 :         u.lmsg.content = NULL;
     110        1251 :         if (sizeof (content_t) + size_ > size_)
     111        1251 :             u.lmsg.content = (content_t*) malloc (sizeof (content_t) + size_);
     112        1251 :         if (unlikely (!u.lmsg.content)) {
     113           0 :             errno = ENOMEM;
     114           0 :             return -1;
     115             :         }
     116             : 
     117        1251 :         u.lmsg.content->data = u.lmsg.content + 1;
     118        1251 :         u.lmsg.content->size = size_;
     119        1251 :         u.lmsg.content->ffn = NULL;
     120        1251 :         u.lmsg.content->hint = NULL;
     121        1251 :         new (&u.lmsg.content->refcnt) zmq::atomic_counter_t ();
     122             :     }
     123             :     return 0;
     124             : }
     125             : 
     126        1233 : int zmq::msg_t::init_external_storage(content_t* content_, void* data_, size_t size_,
     127             :                                       msg_free_fn *ffn_, void* hint_)
     128             : {
     129        1233 :     zmq_assert(NULL != data_);
     130        1233 :     zmq_assert(NULL != content_);
     131             : 
     132        1233 :     u.zclmsg.metadata = NULL;
     133        1233 :     u.zclmsg.type = type_zclmsg;
     134        1233 :     u.zclmsg.flags = 0;
     135        1233 :     u.zclmsg.group[0] = '\0';
     136        1233 :     u.zclmsg.routing_id = 0;
     137             : 
     138        1233 :     u.zclmsg.content = content_;
     139        1233 :     u.zclmsg.content->data = data_;
     140        1233 :     u.zclmsg.content->size = size_;
     141        1233 :     u.zclmsg.content->ffn = ffn_;
     142        1233 :     u.zclmsg.content->hint = hint_;
     143        1233 :     new (&u.zclmsg.content->refcnt) zmq::atomic_counter_t();
     144             : 
     145        1233 :     return 0;
     146             : }
     147             : 
     148         222 : int zmq::msg_t::init_data (void *data_, size_t size_,
     149             :                            msg_free_fn *ffn_, void *hint_)
     150             : {
     151             :     //  If data is NULL and size is not 0, a segfault
     152             :     //  would occur once the data is accessed
     153         222 :     zmq_assert (data_ != NULL || size_ == 0);
     154             : 
     155             :     //  Initialize constant message if there's no need to deallocate
     156         222 :     if (ffn_ == NULL) {
     157         183 :         u.cmsg.metadata = NULL;
     158         183 :         u.cmsg.type = type_cmsg;
     159         183 :         u.cmsg.flags = 0;
     160         183 :         u.cmsg.data = data_;
     161         183 :         u.cmsg.size = size_;
     162         183 :         u.cmsg.group[0] = '\0';
     163         183 :         u.cmsg.routing_id = 0;
     164             :     }
     165             :     else {
     166          39 :         u.lmsg.metadata = NULL;
     167          39 :         u.lmsg.type = type_lmsg;
     168          39 :         u.lmsg.flags = 0;
     169          39 :         u.lmsg.group[0] = '\0';
     170          39 :         u.lmsg.routing_id = 0;
     171          39 :         u.lmsg.content = (content_t*) malloc (sizeof (content_t));
     172          39 :         if (!u.lmsg.content) {
     173           0 :             errno = ENOMEM;
     174           0 :             return -1;
     175             :         }
     176             : 
     177          39 :         u.lmsg.content->data = data_;
     178          39 :         u.lmsg.content->size = size_;
     179          39 :         u.lmsg.content->ffn = ffn_;
     180          39 :         u.lmsg.content->hint = hint_;
     181          39 :         new (&u.lmsg.content->refcnt) zmq::atomic_counter_t ();
     182             :     }
     183             :     return 0;
     184             : 
     185             : }
     186             : 
     187        9763 : int zmq::msg_t::init_delimiter ()
     188             : {
     189        9763 :     u.delimiter.metadata = NULL;
     190        9763 :     u.delimiter.type = type_delimiter;
     191        9763 :     u.delimiter.flags = 0;
     192        9763 :     u.delimiter.group[0] = '\0';
     193        9763 :     u.delimiter.routing_id = 0;
     194        9763 :     return 0;
     195             : }
     196             : 
     197          18 : int zmq::msg_t::init_join ()
     198             : {
     199          18 :     u.base.metadata = NULL;
     200          18 :     u.base.type = type_join;
     201          18 :     u.base.flags = 0;
     202          18 :     u.base.group[0] = '\0';
     203          18 :     u.base.routing_id = 0;
     204          18 :     return 0;
     205             : }
     206             : 
     207           6 : int zmq::msg_t::init_leave ()
     208             : {
     209           6 :     u.base.metadata = NULL;
     210           6 :     u.base.type = type_leave;
     211           6 :     u.base.flags = 0;
     212           6 :     u.base.group[0] = '\0';
     213           6 :     u.base.routing_id = 0;
     214           6 :     return 0;
     215             : }
     216             : 
     217    10662193 : int zmq::msg_t::close ()
     218             : {
     219             :     //  Check the validity of the message.
     220    10662193 :     if (unlikely (!check ())) {
     221           0 :         errno = EFAULT;
     222           0 :         return -1;
     223             :     }
     224             : 
     225    10662193 :     if (u.base.type == type_lmsg) {
     226             : 
     227             :         //  If the content is not shared, or if it is shared and the reference
     228             :         //  count has dropped to zero, deallocate it.
     229        1317 :         if (!(u.lmsg.flags & msg_t::shared) ||
     230          18 :               !u.lmsg.content->refcnt.sub (1)) {
     231             : 
     232             :             //  We used "placement new" operator to initialize the reference
     233             :             //  counter so we call the destructor explicitly now.
     234        1290 :             u.lmsg.content->refcnt.~atomic_counter_t ();
     235             : 
     236        1290 :             if (u.lmsg.content->ffn)
     237             :                 u.lmsg.content->ffn (u.lmsg.content->data,
     238          39 :                     u.lmsg.content->hint);
     239        1290 :             free (u.lmsg.content);
     240             :         }
     241             :     }
     242             : 
     243    10662193 :     if (is_zcmsg())
     244             :     {
     245        1233 :         zmq_assert( u.zclmsg.content->ffn );
     246             : 
     247             :         //  If the content is not shared, or if it is shared and the reference
     248             :         //  count has dropped to zero, deallocate it.
     249       34363 :         if (!(u.zclmsg.flags & msg_t::shared) ||
     250           0 :             !u.zclmsg.content->refcnt.sub (1)) {
     251             : 
     252             :             //  We used "placement new" operator to initialize the reference
     253             :             //  counter so we call the destructor explicitly now.
     254        1233 :             u.zclmsg.content->refcnt.~atomic_counter_t ();
     255             : 
     256             :             u.zclmsg.content->ffn (u.zclmsg.content->data,
     257        1233 :                           u.zclmsg.content->hint);
     258             :         }
     259             :     }
     260             : 
     261    10695323 :     if (u.base.metadata != NULL) {
     262      635509 :         if (u.base.metadata->drop_ref ()) {
     263         124 :             LIBZMQ_DELETE(u.base.metadata);
     264             :         }
     265      635509 :         u.base.metadata = NULL;
     266             :     }
     267             : 
     268             :     //  Make the message invalid.
     269    10695323 :     u.base.type = 0;
     270             : 
     271    10695323 :     return 0;
     272             : }
     273             : 
     274         756 : int zmq::msg_t::move (msg_t &src_)
     275             : {
     276             :     //  Check the validity of the source.
     277         756 :     if (unlikely (!src_.check ())) {
     278           0 :         errno = EFAULT;
     279           0 :         return -1;
     280             :     }
     281             : 
     282         756 :     int rc = close ();
     283         756 :     if (unlikely (rc < 0))
     284             :         return rc;
     285             : 
     286         756 :     *this = src_;
     287             : 
     288         756 :     rc = src_.init ();
     289             :     if (unlikely (rc < 0))
     290             :         return rc;
     291             : 
     292         756 :     return 0;
     293             : }
     294             : 
     295          57 : int zmq::msg_t::copy (msg_t &src_)
     296             : {
     297             :     //  Check the validity of the source.
     298          57 :     if (unlikely (!src_.check ())) {
     299           0 :         errno = EFAULT;
     300           0 :         return -1;
     301             :     }
     302             : 
     303          57 :     int rc = close ();
     304          57 :     if (unlikely (rc < 0))
     305             :         return rc;
     306             : 
     307          57 :     if (src_.u.base.type == type_lmsg  ) {
     308             : 
     309             :         //  One reference is added to shared messages. Non-shared messages
     310             :         //  are turned into shared messages and reference count is set to 2.
     311           9 :         if (src_.u.lmsg.flags & msg_t::shared)
     312           0 :             src_.u.lmsg.content->refcnt.add (1);
     313             :         else {
     314           9 :             src_.u.lmsg.flags |= msg_t::shared;
     315           9 :             src_.u.lmsg.content->refcnt.set (2);
     316             :         }
     317             :     }
     318             : 
     319          57 :     if (src_.is_zcmsg()) {
     320             : 
     321             :         //  One reference is added to shared messages. Non-shared messages
     322             :         //  are turned into shared messages and reference count is set to 2.
     323           0 :         if (src_.u.zclmsg.flags & msg_t::shared)
     324           0 :             src_.refcnt()->add (1);
     325             :         else {
     326           0 :             src_.u.zclmsg.flags |= msg_t::shared;
     327           0 :             src_.refcnt()->set (2);
     328             :         }
     329             :     }
     330          57 :     if (src_.u.base.metadata != NULL)
     331          42 :         src_.u.base.metadata->add_ref ();
     332             : 
     333          57 :     *this = src_;
     334             : 
     335          57 :     return 0;
     336             : 
     337             : }
     338             : 
     339     3206322 : void *zmq::msg_t::data ()
     340             : {
     341             :     //  Check the validity of the message.
     342     3206322 :     zmq_assert (check ());
     343             : 
     344     3205789 :     switch (u.base.type) {
     345             :     case type_vsm:
     346     3200632 :         return u.vsm.data;
     347             :     case type_lmsg:
     348        2523 :         return u.lmsg.content->data;
     349             :     case type_cmsg:
     350         201 :         return u.cmsg.data;
     351             :     case type_zclmsg:
     352        2433 :         return u.zclmsg.content->data;
     353             :     default:
     354           0 :         zmq_assert (false);
     355           0 :         return NULL;
     356             :     }
     357             : }
     358             : 
     359     4155532 : size_t zmq::msg_t::size ()
     360             : {
     361             :     //  Check the validity of the message.
     362     4155532 :     zmq_assert (check ());
     363             : 
     364     4154665 :     switch (u.base.type) {
     365             :     case type_vsm:
     366     4147693 :         return u.vsm.size;
     367             :     case type_lmsg:
     368        3846 :         return u.lmsg.content->size;
     369             :     case type_zclmsg:
     370        2697 :         return u.zclmsg.content->size;
     371             :     case type_cmsg:
     372         429 :         return u.cmsg.size;
     373             :     default:
     374           0 :         zmq_assert (false);
     375           0 :         return 0;
     376             :     }
     377             : }
     378             : 
     379    10153584 : unsigned char zmq::msg_t::flags ()
     380             : {
     381    10153584 :     return u.base.flags;
     382             : }
     383             : 
     384      646492 : void zmq::msg_t::set_flags (unsigned char flags_)
     385             : {
     386      646492 :     u.base.flags |= flags_;
     387      646492 : }
     388             : 
     389      864555 : void zmq::msg_t::reset_flags (unsigned char flags_)
     390             : {
     391      864555 :     u.base.flags &= ~flags_;
     392      864555 : }
     393             : 
     394         691 : zmq::metadata_t *zmq::msg_t::metadata () const
     395             : {
     396         691 :     return u.base.metadata;
     397             : }
     398             : 
     399      635569 : void zmq::msg_t::set_metadata (zmq::metadata_t *metadata_)
     400             : {
     401             :     assert (metadata_ != NULL);
     402             :     assert (u.base.metadata == NULL);
     403      635569 :     metadata_->add_ref ();
     404      635569 :     u.base.metadata = metadata_;
     405      635569 : }
     406             : 
     407      864522 : void zmq::msg_t::reset_metadata ()
     408             : {
     409      864522 :     if (u.base.metadata) {
     410         102 :         if (u.base.metadata->drop_ref ()) {
     411           0 :             LIBZMQ_DELETE(u.base.metadata);
     412             :         }
     413         102 :         u.base.metadata = NULL;
     414             :     }
     415      864522 : }
     416             : 
     417     2871352 : bool zmq::msg_t::is_identity () const
     418             : {
     419     2871352 :     return (u.base.flags & identity) == identity;
     420             : }
     421             : 
     422     1480851 : bool zmq::msg_t::is_credential () const
     423             : {
     424     1480851 :     return (u.base.flags & credential) == credential;
     425             : }
     426             : 
     427     1481289 : bool zmq::msg_t::is_delimiter () const
     428             : {
     429     1481289 :     return u.base.type == type_delimiter;
     430             : }
     431             : 
     432       63537 : bool zmq::msg_t::is_vsm () const
     433             : {
     434       63537 :     return u.base.type == type_vsm;
     435             : }
     436             : 
     437           9 : bool zmq::msg_t::is_cmsg () const
     438             : {
     439           9 :     return u.base.type == type_cmsg;
     440             : }
     441             : 
     442      639175 : bool zmq::msg_t::is_zcmsg() const
     443             : {
     444    11301425 :     return u.base.type == type_zclmsg;
     445             : }
     446             : 
     447          42 : bool zmq::msg_t::is_join() const
     448             : {
     449          42 :     return u.base.type == type_join;
     450             : }
     451             : 
     452           6 : bool zmq::msg_t::is_leave() const
     453             : {
     454           6 :     return u.base.type == type_leave;
     455             : }
     456             : 
     457          30 : void zmq::msg_t::add_refs (int refs_)
     458             : {
     459          30 :     zmq_assert (refs_ >= 0);
     460             : 
     461             :     //  Operation not supported for messages with metadata.
     462          30 :     zmq_assert (u.base.metadata == NULL);
     463             : 
     464             :     //  No copies required.
     465          30 :     if (!refs_)
     466          30 :         return;
     467             : 
     468             :     //  VSMs, CMSGS and delimiters can be copied straight away. The only
     469             :     //  message type that needs special care are long messages.
     470           0 :     if (u.base.type == type_lmsg || is_zcmsg() ) {
     471           0 :         if (u.base.flags & msg_t::shared)
     472           0 :             refcnt()->add (refs_);
     473             :         else {
     474           0 :             refcnt()->set (refs_ + 1);
     475           0 :             u.base.flags |= msg_t::shared;
     476             :         }
     477             :     }
     478             : }
     479             : 
     480           0 : bool zmq::msg_t::rm_refs (int refs_)
     481             : {
     482           0 :     zmq_assert (refs_ >= 0);
     483             : 
     484             :     //  Operation not supported for messages with metadata.
     485           0 :     zmq_assert (u.base.metadata == NULL);
     486             : 
     487             :     //  No copies required.
     488           0 :     if (!refs_)
     489             :         return true;
     490             : 
     491             :     //  If there's only one reference close the message.
     492           0 :     if ( (u.base.type != type_zclmsg && u.base.type != type_lmsg) || !(u.base.flags & msg_t::shared)) {
     493           0 :         close ();
     494           0 :         return false;
     495             :     }
     496             : 
     497             :     //  The only message type that needs special care are long and zcopy messages.
     498           0 :     if (u.base.type == type_lmsg && !u.lmsg.content->refcnt.sub(refs_)) {
     499             :         //  We used "placement new" operator to initialize the reference
     500             :         //  counter so we call the destructor explicitly now.
     501           0 :         u.lmsg.content->refcnt.~atomic_counter_t ();
     502             : 
     503           0 :         if (u.lmsg.content->ffn)
     504           0 :             u.lmsg.content->ffn (u.lmsg.content->data, u.lmsg.content->hint);
     505           0 :         free (u.lmsg.content);
     506             : 
     507           0 :         return false;
     508             :     }
     509             : 
     510           0 :     if (is_zcmsg() && !u.zclmsg.content->refcnt.sub(refs_)) {
     511             :         // storage for rfcnt is provided externally
     512           0 :         if (u.zclmsg.content->ffn) {
     513           0 :             u.zclmsg.content->ffn(u.zclmsg.content->data, u.zclmsg.content->hint);
     514             :         }
     515             : 
     516             :         return false;
     517             :     }
     518             : 
     519             :     return true;
     520             : }
     521             : 
     522          27 : uint32_t zmq::msg_t::get_routing_id ()
     523             : {
     524          27 :     return u.base.routing_id;
     525             : }
     526             : 
     527      600027 : int zmq::msg_t::set_routing_id (uint32_t routing_id_)
     528             : {
     529      600027 :     if (routing_id_) {
     530      600027 :         u.base.routing_id = routing_id_;
     531      600027 :         return 0;
     532             :     }
     533           0 :     errno = EINVAL;
     534           0 :     return -1;
     535             : }
     536             : 
     537           9 : int zmq::msg_t::reset_routing_id ()
     538             : {
     539           9 :     u.base.routing_id = 0;
     540           9 :     return 0;
     541             : }
     542             : 
     543          87 : const char * zmq::msg_t::group ()
     544             : {
     545          87 :     return u.base.group;
     546             : }
     547             : 
     548          33 : int zmq::msg_t::set_group (const char * group_)
     549             : {
     550          33 :     return set_group (group_, strlen (group_));
     551             : }
     552             : 
     553          54 : int zmq::msg_t::set_group (const char * group_, size_t length_)
     554             : {
     555          54 :     if (length_> ZMQ_GROUP_MAX_LENGTH)
     556             :     {
     557           0 :         errno = EINVAL;
     558           0 :         return -1;
     559             :     }
     560             : 
     561          54 :     strncpy (u.base.group, group_, length_);
     562          54 :     u.base.group[length_] = '\0';
     563             : 
     564          54 :     return 0;
     565             : }
     566             : 
     567           0 : zmq::atomic_counter_t *zmq::msg_t::refcnt()
     568             : {
     569           0 :     switch(u.base.type)
     570             :     {
     571             :         case type_lmsg:
     572           0 :             return &u.lmsg.content->refcnt;
     573             :         case type_zclmsg:
     574           0 :             return &u.zclmsg.content->refcnt;
     575             :         default:
     576           0 :             zmq_assert(false);
     577           0 :             return NULL;
     578             :     }
     579             : }

Generated by: LCOV version 1.10