libzmq  master
ZeroMQ C++ Core Engine (LIBZMQ)
vmci_listener.cpp
Go to the documentation of this file.
1 /*
2  Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 
4  This file is part of libzmq, the ZeroMQ core engine in C++.
5 
6  libzmq is free software; you can redistribute it and/or modify it under
7  the terms of the GNU Lesser General Public License (LGPL) as published
8  by the Free Software Foundation; either version 3 of the License, or
9  (at your option) any later version.
10 
11  As a special exception, the Contributors give you permission to link
12  this library with independent modules to produce an executable,
13  regardless of the license terms of these independent modules, and to
14  copy and distribute the resulting executable under terms of your choice,
15  provided that you also meet, for each linked independent module, the
16  terms and conditions of the license of that module. An independent
17  module is a module which is not derived from or based on this library.
18  If you modify this library, you must extend this exception to your
19  version of the library.
20 
21  libzmq is distributed in the hope that it will be useful, but WITHOUT
22  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23  FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24  License for more details.
25 
26  You should have received a copy of the GNU Lesser General Public License
27  along with this program. If not, see <http://www.gnu.org/licenses/>.
28 */
29 
30 #include "vmci_listener.hpp"
31 
32 #if defined ZMQ_HAVE_VMCI
33 
34 #include <new>
35 
36 #include "stream_engine.hpp"
37 #include "vmci_address.hpp"
38 #include "io_thread.hpp"
39 #include "session_base.hpp"
40 #include "config.hpp"
41 #include "err.hpp"
42 #include "ip.hpp"
43 #include "socket_base.hpp"
44 #include "vmci.hpp"
45 
46 #if defined ZMQ_HAVE_WINDOWS
47 #include "windows.hpp"
48 #else
49 #include <unistd.h>
50 #include <fcntl.h>
51 #endif
52 
53 zmq::vmci_listener_t::vmci_listener_t (io_thread_t *io_thread_,
54  socket_base_t *socket_, const options_t &options_) :
55  own_t (io_thread_, options_),
56  io_object_t (io_thread_),
57  s (retired_fd),
58  socket (socket_)
59 {
60 }
61 
62 zmq::vmci_listener_t::~vmci_listener_t ()
63 {
64  zmq_assert (s == retired_fd);
65 }
66 
67 void zmq::vmci_listener_t::process_plug ()
68 {
69  // Start polling for incoming connections.
70  handle = add_fd (s);
71  set_pollin (handle);
72 }
73 
74 void zmq::vmci_listener_t::process_term (int linger_)
75 {
76  rm_fd (handle);
77  close ();
78  own_t::process_term (linger_);
79 }
80 
81 void zmq::vmci_listener_t::in_event ()
82 {
83  fd_t fd = accept ();
84 
85  // If connection was reset by the peer in the meantime, just ignore it.
86  if (fd == retired_fd) {
87  socket->event_accept_failed (endpoint, zmq_errno());
88  return;
89  }
90 
91  tune_vmci_buffer_size (this->get_ctx (), fd, options.vmci_buffer_size, options.vmci_buffer_min_size, options.vmci_buffer_max_size);
92 
93  if (options.vmci_connect_timeout > 0)
94  {
95 #if defined ZMQ_HAVE_WINDOWS
96  tune_vmci_connect_timeout (this->get_ctx (), fd, options.vmci_connect_timeout);
97 #else
98  struct timeval timeout = {0, options.vmci_connect_timeout * 1000};
99  tune_vmci_connect_timeout (this->get_ctx (), fd, timeout);
100 #endif
101  }
102 
103  // Create the engine object for this connection.
104  stream_engine_t *engine = new (std::nothrow)
105  stream_engine_t (fd, options, endpoint);
106  alloc_assert (engine);
107 
108  // Choose I/O thread to run connecter in. Given that we are already
109  // running in an I/O thread, there must be at least one available.
110  io_thread_t *io_thread = choose_io_thread (options.affinity);
111  zmq_assert (io_thread);
112 
113  // Create and launch a session object.
114  session_base_t *session = session_base_t::create (io_thread, false, socket,
115  options, NULL);
116  errno_assert (session);
117  session->inc_seqnum ();
118  launch_child (session);
119  send_attach (session, engine, false);
120  socket->event_accepted (endpoint, fd);
121 }
122 
123 int zmq::vmci_listener_t::get_address (std::string &addr_)
124 {
125  struct sockaddr_storage ss;
126 #ifdef ZMQ_HAVE_HPUX
127  int sl = sizeof (ss);
128 #else
129  socklen_t sl = sizeof (ss);
130 #endif
131  int rc = getsockname (s, (sockaddr *) &ss, &sl);
132  if (rc != 0) {
133  addr_.clear ();
134  return rc;
135  }
136 
137  vmci_address_t addr ((struct sockaddr *) &ss, sl, this->get_ctx ());
138  return addr.to_string (addr_);
139 }
140 
141 int zmq::vmci_listener_t::set_address (const char *addr_)
142 {
143  // Create addr on stack for auto-cleanup
144  std::string addr (addr_);
145 
146  // Initialise the address structure.
147  vmci_address_t address(this->get_ctx ());
148  int rc = address.resolve (addr.c_str());
149  if (rc != 0)
150  return -1;
151 
152  // Create a listening socket.
153  s = open_socket (this->get_ctx ()->get_vmci_socket_family (), SOCK_STREAM, 0);
154 #ifdef ZMQ_HAVE_WINDOWS
155  if (s == INVALID_SOCKET) {
156  errno = wsa_error_to_errno(WSAGetLastError());
157  return -1;
158  }
159 #if !defined _WIN32_WCE
160  // On Windows, preventing sockets to be inherited by child processes.
161  BOOL brc = SetHandleInformation((HANDLE)s, HANDLE_FLAG_INHERIT, 0);
162  win_assert(brc);
163 #endif
164 #else
165  if (s == -1)
166  return -1;
167 #endif
168 
169  address.to_string (endpoint);
170 
171  // Bind the socket.
172  rc = bind (s, address.addr (), address.addrlen ());
173 #ifdef ZMQ_HAVE_WINDOWS
174  if (rc == SOCKET_ERROR) {
175  errno = wsa_error_to_errno(WSAGetLastError());
176  goto error;
177  }
178 #else
179  if (rc != 0)
180  goto error;
181 #endif
182 
183  // Listen for incoming connections.
184  rc = listen (s, options.backlog);
185 #ifdef ZMQ_HAVE_WINDOWS
186  if (rc == SOCKET_ERROR) {
187  errno = wsa_error_to_errno(WSAGetLastError());
188  goto error;
189  }
190 #else
191  if (rc != 0)
192  goto error;
193 #endif
194 
195  socket->event_listening (endpoint, s);
196  return 0;
197 
198  error:
199  int err = errno;
200  close ();
201  errno = err;
202  return -1;
203 }
204 
205 void zmq::vmci_listener_t::close ()
206 {
207  zmq_assert (s != retired_fd);
208 #ifdef ZMQ_HAVE_WINDOWS
209  int rc = closesocket (s);
210  wsa_assert (rc != SOCKET_ERROR);
211 #else
212  int rc = ::close (s);
213  errno_assert (rc == 0);
214 #endif
215  socket->event_closed (endpoint, s);
216  s = retired_fd;
217 }
218 
219 zmq::fd_t zmq::vmci_listener_t::accept ()
220 {
221  // Accept one connection and deal with different failure modes.
222  // The situation where connection cannot be accepted due to insufficient
223  // resources is considered valid and treated by ignoring the connection.
224  zmq_assert (s != retired_fd);
225  fd_t sock = ::accept (s, NULL, NULL);
226 
227 #ifdef ZMQ_HAVE_WINDOWS
228  if (sock == INVALID_SOCKET) {
229  wsa_assert(WSAGetLastError() == WSAEWOULDBLOCK ||
230  WSAGetLastError() == WSAECONNRESET ||
231  WSAGetLastError() == WSAEMFILE ||
232  WSAGetLastError() == WSAENOBUFS);
233  return retired_fd;
234  }
235 #if !defined _WIN32_WCE
236  // On Windows, preventing sockets to be inherited by child processes.
237  BOOL brc = SetHandleInformation((HANDLE)sock, HANDLE_FLAG_INHERIT, 0);
238  win_assert(brc);
239 #endif
240 #else
241  if (sock == -1) {
242  errno_assert(errno == EAGAIN || errno == EWOULDBLOCK ||
243  errno == EINTR || errno == ECONNABORTED || errno == EPROTO ||
244  errno == ENOBUFS || errno == ENOMEM || errno == EMFILE ||
245  errno == ENFILE);
246  return retired_fd;
247  }
248 #endif
249 
250  // Race condition can cause socket not to be closed (if fork happens
251  // between accept and this point).
252 #ifdef FD_CLOEXEC
253  int rc = fcntl (sock, F_SETFD, FD_CLOEXEC);
254  errno_assert (rc != -1);
255 #endif
256 
257  return sock;
258 }
259 
260 #endif
static session_base_t * create(zmq::io_thread_t *io_thread_, bool active_, zmq::socket_base_t *socket_, const options_t &options_, address_t *addr_)
void process_term(int linger_)
Definition: own.cpp:158
int fd_t
Definition: fd.hpp:50
fd_t open_socket(int domain_, int type_, int protocol_)
Definition: ip.cpp:50
#define zmq_assert(x)
Definition: err.hpp:119
Definition: command.hpp:84
#define ENOBUFS
Definition: zmq.h:118
ZMQ_EXPORT int zmq_errno(void)
Definition: zmq.cpp:107
#define EPROTO
Definition: err.hpp:58
#define alloc_assert(x)
Definition: err.hpp:159
#define errno_assert(x)
Definition: err.hpp:129
#define ECONNABORTED
Definition: zmq.h:148
const char * address
Definition: test_fork.cpp:32