Line data Source code
1 : /*
2 : Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 :
4 : This file is part of libzmq, the ZeroMQ core engine in C++.
5 :
6 : libzmq is free software; you can redistribute it and/or modify it under
7 : the terms of the GNU Lesser General Public License (LGPL) as published
8 : by the Free Software Foundation; either version 3 of the License, or
9 : (at your option) any later version.
10 :
11 : As a special exception, the Contributors give you permission to link
12 : this library with independent modules to produce an executable,
13 : regardless of the license terms of these independent modules, and to
14 : copy and distribute the resulting executable under terms of your choice,
15 : provided that you also meet, for each linked independent module, the
16 : terms and conditions of the license of that module. An independent
17 : module is a module which is not derived from or based on this library.
18 : If you modify this library, you must extend this exception to your
19 : version of the library.
20 :
21 : libzmq is distributed in the hope that it will be useful, but WITHOUT
22 : ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 : FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 : License for more details.
25 :
26 : You should have received a copy of the GNU Lesser General Public License
27 : along with this program. If not, see <http://www.gnu.org/licenses/>.
28 : */
29 :
30 : #include "precompiled.hpp"
31 : #include "macros.hpp"
32 : #include "server.hpp"
33 : #include "pipe.hpp"
34 : #include "wire.hpp"
35 : #include "random.hpp"
36 : #include "likely.hpp"
37 : #include "err.hpp"
38 :
39 15 : zmq::server_t::server_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
40 : socket_base_t (parent_, tid_, sid_, true),
41 30 : next_rid (generate_random ())
42 : {
43 15 : options.type = ZMQ_SERVER;
44 15 : }
45 :
46 60 : zmq::server_t::~server_t ()
47 : {
48 30 : zmq_assert (outpipes.empty ());
49 30 : }
50 :
51 15 : void zmq::server_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
52 : {
53 : LIBZMQ_UNUSED (subscribe_to_all_);
54 :
55 15 : zmq_assert (pipe_);
56 :
57 15 : uint32_t routing_id = next_rid++;
58 15 : if (!routing_id)
59 0 : routing_id = next_rid++; // Never use RID zero
60 :
61 15 : pipe_->set_routing_id (routing_id);
62 : // Add the record into output pipes lookup table
63 15 : outpipe_t outpipe = {pipe_, true};
64 45 : bool ok = outpipes.insert (outpipes_t::value_type (routing_id, outpipe)).second;
65 15 : zmq_assert (ok);
66 :
67 15 : fq.attach (pipe_);
68 15 : }
69 :
70 15 : void zmq::server_t::xpipe_terminated (pipe_t *pipe_)
71 : {
72 30 : outpipes_t::iterator it = outpipes.find (pipe_->get_routing_id ());
73 30 : zmq_assert (it != outpipes.end ());
74 15 : outpipes.erase (it);
75 15 : fq.pipe_terminated (pipe_);
76 15 : }
77 :
78 129 : void zmq::server_t::xread_activated (pipe_t *pipe_)
79 : {
80 129 : fq.activated (pipe_);
81 129 : }
82 :
83 0 : void zmq::server_t::xwrite_activated (pipe_t *pipe_)
84 : {
85 : outpipes_t::iterator it;
86 0 : for (it = outpipes.begin (); it != outpipes.end (); ++it)
87 0 : if (it->second.pipe == pipe_)
88 : break;
89 :
90 0 : zmq_assert (it != outpipes.end ());
91 0 : zmq_assert (!it->second.active);
92 0 : it->second.active = true;
93 0 : }
94 :
95 18 : int zmq::server_t::xsend (msg_t *msg_)
96 : {
97 : // SERVER sockets do not allow multipart data (ZMQ_SNDMORE)
98 18 : if (msg_->flags () & msg_t::more) {
99 9 : errno = EINVAL;
100 9 : return -1;
101 : }
102 : // Find the pipe associated with the routing stored in the message.
103 9 : uint32_t routing_id = msg_->get_routing_id ();
104 18 : outpipes_t::iterator it = outpipes.find (routing_id);
105 :
106 18 : if (it != outpipes.end ()) {
107 9 : if (!it->second.pipe->check_write ()) {
108 0 : it->second.active = false;
109 0 : errno = EAGAIN;
110 0 : return -1;
111 : }
112 : }
113 : else {
114 0 : errno = EHOSTUNREACH;
115 0 : return -1;
116 : }
117 :
118 : // Message might be delivered over inproc, so we reset routing id
119 9 : int rc = msg_->reset_routing_id ();
120 9 : errno_assert (rc == 0);
121 :
122 9 : bool ok = it->second.pipe->write (msg_);
123 9 : if (unlikely (!ok)) {
124 : // Message failed to send - we must close it ourselves.
125 0 : rc = msg_->close ();
126 0 : errno_assert (rc == 0);
127 : }
128 : else
129 9 : it->second.pipe->flush ();
130 :
131 : // Detach the message from the data buffer.
132 9 : rc = msg_->init ();
133 9 : errno_assert (rc == 0);
134 :
135 : return 0;
136 : }
137 :
138 600241 : int zmq::server_t::xrecv (msg_t *msg_)
139 : {
140 600241 : pipe_t *pipe = NULL;
141 600241 : int rc = fq.recvpipe (msg_, &pipe);
142 :
143 : // Drop any messages with more flag
144 600241 : while (rc == 0 && msg_->flags () & msg_t::more) {
145 :
146 : // drop all frames of the current multi-frame message
147 0 : rc = fq.recvpipe (msg_, NULL);
148 :
149 0 : while (rc == 0 && msg_->flags () & msg_t::more)
150 0 : rc = fq.recvpipe (msg_, NULL);
151 :
152 : // get the new message
153 0 : if (rc == 0)
154 0 : rc = fq.recvpipe (msg_, &pipe);
155 : }
156 :
157 600241 : if (rc != 0)
158 : return rc;
159 :
160 600018 : zmq_assert (pipe != NULL);
161 :
162 600018 : uint32_t routing_id = pipe->get_routing_id ();
163 600018 : msg_->set_routing_id (routing_id);
164 :
165 : return 0;
166 : }
167 :
168 11 : bool zmq::server_t::xhas_in ()
169 : {
170 11 : return fq.has_in ();
171 : }
172 :
173 11 : bool zmq::server_t::xhas_out ()
174 : {
175 : // In theory, SERVER socket is always ready for writing. Whether actual
176 : // attempt to write succeeds depends on which pipe the message is going
177 : // to be routed to.
178 11 : return true;
179 : }
180 :
181 0 : zmq::blob_t zmq::server_t::get_credential () const
182 : {
183 0 : return fq.get_credential ();
184 : }
|