libzmq  master
ZeroMQ C++ Core Engine (LIBZMQ)
pair.cpp
Go to the documentation of this file.
1 /*
2  Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 
4  This file is part of libzmq, the ZeroMQ core engine in C++.
5 
6  libzmq is free software; you can redistribute it and/or modify it under
7  the terms of the GNU Lesser General Public License (LGPL) as published
8  by the Free Software Foundation; either version 3 of the License, or
9  (at your option) any later version.
10 
11  As a special exception, the Contributors give you permission to link
12  this library with independent modules to produce an executable,
13  regardless of the license terms of these independent modules, and to
14  copy and distribute the resulting executable under terms of your choice,
15  provided that you also meet, for each linked independent module, the
16  terms and conditions of the license of that module. An independent
17  module is a module which is not derived from or based on this library.
18  If you modify this library, you must extend this exception to your
19  version of the library.
20 
21  libzmq is distributed in the hope that it will be useful, but WITHOUT
22  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23  FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24  License for more details.
25 
26  You should have received a copy of the GNU Lesser General Public License
27  along with this program. If not, see <http://www.gnu.org/licenses/>.
28 */
29 
30 #include "precompiled.hpp"
31 #include "macros.hpp"
32 #include "pair.hpp"
33 #include "err.hpp"
34 #include "pipe.hpp"
35 #include "msg.hpp"
36 
37 zmq::pair_t::pair_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
38  socket_base_t (parent_, tid_, sid_),
39  pipe (NULL),
40  last_in (NULL)
41 {
43 }
44 
46 {
47  zmq_assert (!pipe);
48 }
49 
50 void zmq::pair_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
51 {
52  LIBZMQ_UNUSED (subscribe_to_all_);
53 
54  zmq_assert (pipe_ != NULL);
55 
56  // ZMQ_PAIR socket can only be connected to a single peer.
57  // The socket rejects any further connection requests.
58  if (pipe == NULL)
59  pipe = pipe_;
60  else
61  pipe_->terminate (false);
62 }
63 
65 {
66  if (pipe_ == pipe) {
67  if (last_in == pipe) {
69  last_in = NULL;
70  }
71  pipe = NULL;
72  }
73 }
74 
76 {
77  // There's just one pipe. No lists of active and inactive pipes.
78  // There's nothing to do here.
79 }
80 
82 {
83  // There's just one pipe. No lists of active and inactive pipes.
84  // There's nothing to do here.
85 }
86 
88 {
89  if (!pipe || !pipe->write (msg_)) {
90  errno = EAGAIN;
91  return -1;
92  }
93 
94  if (!(msg_->flags () & msg_t::more))
95  pipe->flush ();
96 
97  // Detach the original message from the data buffer.
98  int rc = msg_->init ();
99  errno_assert (rc == 0);
100 
101  return 0;
102 }
103 
105 {
106  // Deallocate old content of the message.
107  int rc = msg_->close ();
108  errno_assert (rc == 0);
109 
110  if (!pipe || !pipe->read (msg_)) {
111 
112  // Initialise the output parameter to be a 0-byte message.
113  rc = msg_->init ();
114  errno_assert (rc == 0);
115 
116  errno = EAGAIN;
117  return -1;
118  }
119  last_in = pipe;
120  return 0;
121 }
122 
124 {
125  if (!pipe)
126  return false;
127 
128  return pipe->check_read ();
129 }
130 
132 {
133  if (!pipe)
134  return false;
135 
136  return pipe->check_write ();
137 }
138 
140 {
142 }
bool check_write()
Definition: pipe.cpp:206
bool read(msg_t *msg_)
Definition: pipe.cpp:169
int close()
Definition: msg.cpp:217
bool write(msg_t *msg_)
Definition: pipe.cpp:221
bool xhas_in()
Definition: pair.cpp:123
#define zmq_assert(x)
Definition: err.hpp:119
bool xhas_out()
Definition: pair.cpp:131
zmq::pipe_t * pipe
Definition: pair.hpp:66
int xrecv(zmq::msg_t *msg_)
Definition: pair.cpp:104
blob_t saved_credential
Definition: pair.hpp:70
blob_t get_credential() const
Definition: pipe.cpp:138
~pair_t()
Definition: pair.cpp:45
void xwrite_activated(zmq::pipe_t *pipe_)
Definition: pair.cpp:81
#define LIBZMQ_UNUSED(object)
Definition: macros.hpp:6
int init()
Definition: msg.cpp:82
std::basic_string< unsigned char > blob_t
Definition: blob.hpp:134
int xsend(zmq::msg_t *msg_)
Definition: pair.cpp:87
void flush()
Definition: pipe.cpp:248
void xread_activated(zmq::pipe_t *pipe_)
Definition: pair.cpp:75
void terminate(bool delay_)
Definition: pipe.cpp:385
#define errno_assert(x)
Definition: err.hpp:129
zmq::pipe_t * last_in
Definition: pair.hpp:68
void xattach_pipe(zmq::pipe_t *pipe_, bool subscribe_to_all_)
Definition: pair.cpp:50
#define ZMQ_PAIR
Definition: zmq.h:246
void xpipe_terminated(zmq::pipe_t *pipe_)
Definition: pair.cpp:64
bool check_read()
Definition: pipe.cpp:143
options_t options
Definition: own.hpp:109
blob_t get_credential() const
Definition: pair.cpp:139
unsigned char flags
Definition: msg.hpp:181
pair_t(zmq::ctx_t *parent_, uint32_t tid_, int sid)
Definition: pair.cpp:37