libzmq  master
ZeroMQ C++ Core Engine (LIBZMQ)
server.cpp
Go to the documentation of this file.
1 /*
2  Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 
4  This file is part of libzmq, the ZeroMQ core engine in C++.
5 
6  libzmq is free software; you can redistribute it and/or modify it under
7  the terms of the GNU Lesser General Public License (LGPL) as published
8  by the Free Software Foundation; either version 3 of the License, or
9  (at your option) any later version.
10 
11  As a special exception, the Contributors give you permission to link
12  this library with independent modules to produce an executable,
13  regardless of the license terms of these independent modules, and to
14  copy and distribute the resulting executable under terms of your choice,
15  provided that you also meet, for each linked independent module, the
16  terms and conditions of the license of that module. An independent
17  module is a module which is not derived from or based on this library.
18  If you modify this library, you must extend this exception to your
19  version of the library.
20 
21  libzmq is distributed in the hope that it will be useful, but WITHOUT
22  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23  FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24  License for more details.
25 
26  You should have received a copy of the GNU Lesser General Public License
27  along with this program. If not, see <http://www.gnu.org/licenses/>.
28 */
29 
30 #include "precompiled.hpp"
31 #include "macros.hpp"
32 #include "server.hpp"
33 #include "pipe.hpp"
34 #include "wire.hpp"
35 #include "random.hpp"
36 #include "likely.hpp"
37 #include "err.hpp"
38 
39 zmq::server_t::server_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
40  socket_base_t (parent_, tid_, sid_, true),
41  next_rid (generate_random ())
42 {
44 }
45 
47 {
48  zmq_assert (outpipes.empty ());
49 }
50 
51 void zmq::server_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
52 {
53  LIBZMQ_UNUSED (subscribe_to_all_);
54 
55  zmq_assert (pipe_);
56 
57  uint32_t routing_id = next_rid++;
58  if (!routing_id)
59  routing_id = next_rid++; // Never use RID zero
60 
61  pipe_->set_routing_id (routing_id);
62  // Add the record into output pipes lookup table
63  outpipe_t outpipe = {pipe_, true};
64  bool ok = outpipes.insert (outpipes_t::value_type (routing_id, outpipe)).second;
65  zmq_assert (ok);
66 
67  fq.attach (pipe_);
68 }
69 
71 {
72  outpipes_t::iterator it = outpipes.find (pipe_->get_routing_id ());
73  zmq_assert (it != outpipes.end ());
74  outpipes.erase (it);
75  fq.pipe_terminated (pipe_);
76 }
77 
79 {
80  fq.activated (pipe_);
81 }
82 
84 {
85  outpipes_t::iterator it;
86  for (it = outpipes.begin (); it != outpipes.end (); ++it)
87  if (it->second.pipe == pipe_)
88  break;
89 
90  zmq_assert (it != outpipes.end ());
91  zmq_assert (!it->second.active);
92  it->second.active = true;
93 }
94 
96 {
97  // SERVER sockets do not allow multipart data (ZMQ_SNDMORE)
98  if (msg_->flags () & msg_t::more) {
99  errno = EINVAL;
100  return -1;
101  }
102  // Find the pipe associated with the routing stored in the message.
103  uint32_t routing_id = msg_->get_routing_id ();
104  outpipes_t::iterator it = outpipes.find (routing_id);
105 
106  if (it != outpipes.end ()) {
107  if (!it->second.pipe->check_write ()) {
108  it->second.active = false;
109  errno = EAGAIN;
110  return -1;
111  }
112  }
113  else {
114  errno = EHOSTUNREACH;
115  return -1;
116  }
117 
118  // Message might be delivered over inproc, so we reset routing id
119  int rc = msg_->reset_routing_id ();
120  errno_assert (rc == 0);
121 
122  bool ok = it->second.pipe->write (msg_);
123  if (unlikely (!ok)) {
124  // Message failed to send - we must close it ourselves.
125  rc = msg_->close ();
126  errno_assert (rc == 0);
127  }
128  else
129  it->second.pipe->flush ();
130 
131  // Detach the message from the data buffer.
132  rc = msg_->init ();
133  errno_assert (rc == 0);
134 
135  return 0;
136 }
137 
139 {
140  pipe_t *pipe = NULL;
141  int rc = fq.recvpipe (msg_, &pipe);
142 
143  // Drop any messages with more flag
144  while (rc == 0 && msg_->flags () & msg_t::more) {
145 
146  // drop all frames of the current multi-frame message
147  rc = fq.recvpipe (msg_, NULL);
148 
149  while (rc == 0 && msg_->flags () & msg_t::more)
150  rc = fq.recvpipe (msg_, NULL);
151 
152  // get the new message
153  if (rc == 0)
154  rc = fq.recvpipe (msg_, &pipe);
155  }
156 
157  if (rc != 0)
158  return rc;
159 
160  zmq_assert (pipe != NULL);
161 
162  uint32_t routing_id = pipe->get_routing_id ();
163  msg_->set_routing_id (routing_id);
164 
165  return 0;
166 }
167 
169 {
170  return fq.has_in ();
171 }
172 
174 {
175  // In theory, SERVER socket is always ready for writing. Whether actual
176  // attempt to write succeeds depends on which pipe the message is going
177  // to be routed to.
178  return true;
179 }
180 
182 {
183  return fq.get_credential ();
184 }
int close()
Definition: msg.cpp:217
blob_t get_credential() const
Definition: fq.cpp:158
#define zmq_assert(x)
Definition: err.hpp:119
int recvpipe(msg_t *msg_, pipe_t **pipe_)
Definition: fq.cpp:88
void xwrite_activated(zmq::pipe_t *pipe_)
Definition: server.cpp:83
void pipe_terminated(pipe_t *pipe_)
Definition: fq.cpp:56
bool xhas_in()
Definition: server.cpp:168
void set_routing_id(uint32_t routing_id_)
Definition: pipe.cpp:118
outpipes_t outpipes
Definition: server.hpp:84
uint32_t get_routing_id()
Definition: pipe.cpp:123
void xattach_pipe(zmq::pipe_t *pipe_, bool subscribe_to_all_)
Definition: server.cpp:51
void xread_activated(zmq::pipe_t *pipe_)
Definition: server.cpp:78
#define LIBZMQ_UNUSED(object)
Definition: macros.hpp:6
#define unlikely(x)
Definition: likely.hpp:38
int init()
Definition: msg.cpp:82
server_t(zmq::ctx_t *parent_, uint32_t tid_, int sid)
Definition: server.cpp:39
uint32_t generate_random()
Definition: random.cpp:54
#define EHOSTUNREACH
Definition: zmq.h:160
void attach(pipe_t *pipe_)
Definition: fq.cpp:49
bool has_in()
Definition: fq.cpp:134
std::basic_string< unsigned char > blob_t
Definition: blob.hpp:134
int xsend(zmq::msg_t *msg_)
Definition: server.cpp:95
int reset_routing_id()
Definition: msg.cpp:537
#define ZMQ_SERVER
int xrecv(zmq::msg_t *msg_)
Definition: server.cpp:138
void activated(pipe_t *pipe_)
Definition: fq.cpp:76
#define errno_assert(x)
Definition: err.hpp:129
void xpipe_terminated(zmq::pipe_t *pipe_)
Definition: server.cpp:70
int set_routing_id(uint32_t routing_id_)
Definition: msg.cpp:527
blob_t get_credential() const
Definition: server.cpp:181
options_t options
Definition: own.hpp:109
uint32_t next_rid
Definition: server.hpp:88
uint32_t get_routing_id()
Definition: msg.cpp:522
unsigned char flags
Definition: msg.hpp:181
bool xhas_out()
Definition: server.cpp:173