libzmq  master
ZeroMQ C++ Core Engine (LIBZMQ)
ipc_listener.cpp
Go to the documentation of this file.
1 /*
2  Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 
4  This file is part of libzmq, the ZeroMQ core engine in C++.
5 
6  libzmq is free software; you can redistribute it and/or modify it under
7  the terms of the GNU Lesser General Public License (LGPL) as published
8  by the Free Software Foundation; either version 3 of the License, or
9  (at your option) any later version.
10 
11  As a special exception, the Contributors give you permission to link
12  this library with independent modules to produce an executable,
13  regardless of the license terms of these independent modules, and to
14  copy and distribute the resulting executable under terms of your choice,
15  provided that you also meet, for each linked independent module, the
16  terms and conditions of the license of that module. An independent
17  module is a module which is not derived from or based on this library.
18  If you modify this library, you must extend this exception to your
19  version of the library.
20 
21  libzmq is distributed in the hope that it will be useful, but WITHOUT
22  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23  FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24  License for more details.
25 
26  You should have received a copy of the GNU Lesser General Public License
27  along with this program. If not, see <http://www.gnu.org/licenses/>.
28 */
29 
30 #include "precompiled.hpp"
31 #include "ipc_listener.hpp"
32 
33 #if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
34 
35 #include <new>
36 
37 #include <string.h>
38 
39 #include "stream_engine.hpp"
40 #include "ipc_address.hpp"
41 #include "io_thread.hpp"
42 #include "session_base.hpp"
43 #include "config.hpp"
44 #include "err.hpp"
45 #include "ip.hpp"
46 #include "socket_base.hpp"
47 
48 #include <unistd.h>
49 #include <sys/socket.h>
50 #include <fcntl.h>
51 #include <sys/un.h>
52 #include <sys/stat.h>
53 
54 #ifdef ZMQ_HAVE_LOCAL_PEERCRED
55 # include <sys/types.h>
56 # include <sys/ucred.h>
57 #endif
58 #ifdef ZMQ_HAVE_SO_PEERCRED
59 # include <sys/types.h>
60 # include <pwd.h>
61 # include <grp.h>
62 # if defined ZMQ_HAVE_OPENBSD
63 # define ucred sockpeercred
64 # endif
65 #endif
66 
67 const char *zmq::ipc_listener_t::tmp_env_vars[] = {
68  "TMPDIR",
69  "TEMPDIR",
70  "TMP",
71  0 // Sentinel
72 };
73 
75  std::string& file_)
76 {
77  std::string tmp_path;
78 
79  // If TMPDIR, TEMPDIR, or TMP are available and are directories, create
80  // the socket directory there.
81  const char **tmp_env = tmp_env_vars;
82  while ( tmp_path.empty() && *tmp_env != 0 ) {
83  char *tmpdir = getenv(*tmp_env);
84  struct stat statbuf;
85 
86  // Confirm it is actually a directory before trying to use
87  if ( tmpdir != 0 && ::stat(tmpdir, &statbuf) == 0 && S_ISDIR(statbuf.st_mode) ) {
88  tmp_path.assign(tmpdir);
89  if ( *(tmp_path.rbegin()) != '/' ) {
90  tmp_path.push_back('/');
91  }
92  }
93 
94  // Try the next environment variable
95  ++tmp_env;
96  }
97 
98  // Append a directory name
99  tmp_path.append("tmpXXXXXX");
100 
101  // We need room for tmp_path + trailing NUL
102  std::vector<char> buffer(tmp_path.length()+1);
103  strcpy(buffer.data(), tmp_path.c_str());
104 
105 #ifdef HAVE_MKDTEMP
106  // Create the directory. POSIX requires that mkdtemp() creates the
107  // directory with 0700 permissions, meaning the only possible race
108  // with socket creation could be the same user. However, since
109  // each socket is created in a directory created by mkdtemp(), and
110  // mkdtemp() guarantees a unique directory name, there will be no
111  // collision.
112  if ( mkdtemp(buffer.data()) == 0 ) {
113  return -1;
114  }
115 
116  path_.assign(buffer.data());
117  file_.assign (path_ + "/socket");
118 #else
119  // Silence -Wunused-parameter. #pragma and __attribute__((unused)) are not
120  // very portable unfortunately...
121  (void) path_;
122  int fd = mkstemp (buffer.data());
123  if (fd == -1)
124  return -1;
125  ::close (fd);
126 
127  file_.assign (buffer.data());
128 #endif
129 
130  return 0;
131 }
132 
134  socket_base_t *socket_, const options_t &options_) :
135  own_t (io_thread_, options_),
136  io_object_t (io_thread_),
137  has_file (false),
138  s (retired_fd),
139  socket (socket_)
140 {
141 }
142 
144 {
145  zmq_assert (s == retired_fd);
146 }
147 
149 {
150  // Start polling for incoming connections.
151  handle = add_fd (s);
152  set_pollin (handle);
153 }
154 
156 {
157  rm_fd (handle);
158  close ();
159  own_t::process_term (linger_);
160 }
161 
163 {
164  fd_t fd = accept ();
165 
166  // If connection was reset by the peer in the meantime, just ignore it.
167  // TODO: Handle specific errors like ENFILE/EMFILE etc.
168  if (fd == retired_fd) {
170  return;
171  }
172 
173  // Create the engine object for this connection.
174  stream_engine_t *engine = new (std::nothrow)
176  alloc_assert (engine);
177 
178  // Choose I/O thread to run connecter in. Given that we are already
179  // running in an I/O thread, there must be at least one available.
181  zmq_assert (io_thread);
182 
183  // Create and launch a session object.
184  session_base_t *session = session_base_t::create (io_thread, false, socket,
185  options, NULL);
186  errno_assert (session);
187  session->inc_seqnum ();
188  launch_child (session);
189  send_attach (session, engine, false);
191 }
192 
193 int zmq::ipc_listener_t::get_address (std::string &addr_)
194 {
195  struct sockaddr_storage ss;
196 #ifdef ZMQ_HAVE_HPUX
197  int sl = sizeof (ss);
198 #else
199  socklen_t sl = sizeof (ss);
200 #endif
201  int rc = getsockname (s, (sockaddr *) &ss, &sl);
202  if (rc != 0) {
203  addr_.clear ();
204  return rc;
205  }
206 
207  ipc_address_t addr ((struct sockaddr *) &ss, sl);
208  return addr.to_string (addr_);
209 }
210 
211 int zmq::ipc_listener_t::set_address (const char *addr_)
212 {
213  // Create addr on stack for auto-cleanup
214  std::string addr (addr_);
215 
216  // Allow wildcard file
217  if (options.use_fd == -1 && addr [0] == '*') {
218  if ( create_wildcard_address(tmp_socket_dirname, addr) < 0 ) {
219  return -1;
220  }
221  }
222 
223  // Get rid of the file associated with the UNIX domain socket that
224  // may have been left behind by the previous run of the application.
225  // MUST NOT unlink if the FD is managed by the user, or it will stop
226  // working after the first client connects. The user will take care of
227  // cleaning up the file after the service is stopped.
228  if (options.use_fd == -1) {
229  ::unlink (addr.c_str());
230  }
231  filename.clear ();
232 
233  // Initialise the address structure.
235  int rc = address.resolve (addr.c_str());
236  if (rc != 0) {
237  if ( !tmp_socket_dirname.empty() ) {
238  // We need to preserve errno to return to the user
239  int errno_ = errno;
240  ::rmdir(tmp_socket_dirname.c_str ());
241  tmp_socket_dirname.clear();
242  errno = errno_;
243  }
244  return -1;
245  }
246 
247  address.to_string (endpoint);
248 
249  if (options.use_fd != -1) {
250  s = options.use_fd;
251  } else {
252  // Create a listening socket.
253  s = open_socket (AF_UNIX, SOCK_STREAM, 0);
254  if (s == -1) {
255  if ( !tmp_socket_dirname.empty() ) {
256  // We need to preserve errno to return to the user
257  int errno_ = errno;
258  ::rmdir(tmp_socket_dirname.c_str ());
259  tmp_socket_dirname.clear();
260  errno = errno_;
261  }
262  return -1;
263  }
264 
265  // Bind the socket to the file path.
266  rc = bind (s, address.addr (), address.addrlen ());
267  if (rc != 0)
268  goto error;
269 
270  // Listen for incoming connections.
271  rc = listen (s, options.backlog);
272  if (rc != 0)
273  goto error;
274  }
275 
276  filename.assign (addr.c_str());
277  has_file = true;
278 
280  return 0;
281 
282 error:
283  int err = errno;
284  close ();
285  errno = err;
286  return -1;
287 }
288 
290 {
291  zmq_assert (s != retired_fd);
292  int rc = ::close (s);
293  errno_assert (rc == 0);
294 
295  s = retired_fd;
296 
297  // If there's an underlying UNIX domain socket, get rid of the file it
298  // is associated with.
299  // MUST NOT unlink if the FD is managed by the user, or it will stop
300  // working after the first client connects. The user will take care of
301  // cleaning up the file after the service is stopped.
302  if (has_file && options.use_fd == -1) {
303  rc = 0;
304 
305  if ( !filename.empty () ) {
306  rc = ::unlink(filename.c_str ());
307  }
308 
309  if ( rc == 0 && !tmp_socket_dirname.empty() ) {
310  rc = ::rmdir(tmp_socket_dirname.c_str ());
311  tmp_socket_dirname.clear();
312  }
313 
314  if (rc != 0) {
316  return -1;
317  }
318  }
319 
321  return 0;
322 }
323 
324 #if defined ZMQ_HAVE_SO_PEERCRED
325 
326 bool zmq::ipc_listener_t::filter (fd_t sock)
327 {
328  if (options.ipc_uid_accept_filters.empty () &&
329  options.ipc_pid_accept_filters.empty () &&
330  options.ipc_gid_accept_filters.empty ())
331  return true;
332 
333  struct ucred cred;
334  socklen_t size = sizeof (cred);
335 
336  if (getsockopt (sock, SOL_SOCKET, SO_PEERCRED, &cred, &size))
337  return false;
338  if (options.ipc_uid_accept_filters.find (cred.uid) != options.ipc_uid_accept_filters.end () ||
339  options.ipc_gid_accept_filters.find (cred.gid) != options.ipc_gid_accept_filters.end () ||
340  options.ipc_pid_accept_filters.find (cred.pid) != options.ipc_pid_accept_filters.end ())
341  return true;
342 
343  struct passwd *pw;
344  struct group *gr;
345 
346  if (!(pw = getpwuid (cred.uid)))
347  return false;
348  for (options_t::ipc_gid_accept_filters_t::const_iterator it = options.ipc_gid_accept_filters.begin ();
349  it != options.ipc_gid_accept_filters.end (); it++) {
350  if (!(gr = getgrgid (*it)))
351  continue;
352  for (char **mem = gr->gr_mem; *mem; mem++) {
353  if (!strcmp (*mem, pw->pw_name))
354  return true;
355  }
356  }
357  return false;
358 }
359 
360 #elif defined ZMQ_HAVE_LOCAL_PEERCRED
361 
362 bool zmq::ipc_listener_t::filter (fd_t sock)
363 {
364  if (options.ipc_uid_accept_filters.empty () &&
365  options.ipc_gid_accept_filters.empty ())
366  return true;
367 
368  struct xucred cred;
369  socklen_t size = sizeof (cred);
370 
371  if (getsockopt (sock, 0, LOCAL_PEERCRED, &cred, &size))
372  return false;
373  if (cred.cr_version != XUCRED_VERSION)
374  return false;
375  if (options.ipc_uid_accept_filters.find (cred.cr_uid) != options.ipc_uid_accept_filters.end ())
376  return true;
377  for (int i = 0; i < cred.cr_ngroups; i++) {
378  if (options.ipc_gid_accept_filters.find (cred.cr_groups[i]) != options.ipc_gid_accept_filters.end ())
379  return true;
380  }
381 
382  return false;
383 }
384 
385 #endif
386 
388 {
389  // Accept one connection and deal with different failure modes.
390  // The situation where connection cannot be accepted due to insufficient
391  // resources is considered valid and treated by ignoring the connection.
392  zmq_assert (s != retired_fd);
393  fd_t sock = ::accept (s, NULL, NULL);
394  if (sock == -1) {
395  errno_assert (errno == EAGAIN || errno == EWOULDBLOCK ||
396  errno == EINTR || errno == ECONNABORTED || errno == EPROTO ||
397  errno == ENFILE);
398  return retired_fd;
399  }
400 
401  // Race condition can cause socket not to be closed (if fork happens
402  // between accept and this point).
403 #ifdef FD_CLOEXEC
404  int rc = fcntl (sock, F_SETFD, FD_CLOEXEC);
405  errno_assert (rc != -1);
406 #endif
407 
408  // IPC accept() filters
409 #if defined ZMQ_HAVE_SO_PEERCRED || defined ZMQ_HAVE_LOCAL_PEERCRED
410  if (!filter (sock)) {
411  int rc = ::close (sock);
412  errno_assert (rc == 0);
413  return retired_fd;
414  }
415 #endif
416 
417  return sock;
418 }
419 
420 #endif
#define size
static session_base_t * create(zmq::io_thread_t *io_thread_, bool active_, zmq::socket_base_t *socket_, const options_t &options_, address_t *addr_)
void send_attach(zmq::session_base_t *destination_, zmq::i_engine *engine_, bool inc_seqnum_=true)
Definition: object.cpp:225
void process_term(int linger_)
Definition: own.cpp:158
void event_accept_failed(const std::string &addr_, int err_)
fd_t open_socket(int domain_, int type_, int protocol_)
Definition: ip.cpp:50
int get_address(std::string &addr_)
ipc_listener_t(zmq::io_thread_t *io_thread_, zmq::socket_base_t *socket_, const options_t &options_)
zmq::socket_base_t * socket
const sockaddr * addr() const
Definition: ipc_address.cpp:94
#define zmq_assert(x)
Definition: err.hpp:119
static const char * tmp_env_vars[]
Definition: command.hpp:84
zmq::io_thread_t * choose_io_thread(uint64_t affinity_)
Definition: object.cpp:189
static int create_wildcard_address(std::string &path_, std::string &file_)
void event_listening(const std::string &addr_, int fd_)
int resolve(const char *path_)
Definition: ipc_address.cpp:58
ZMQ_EXPORT int zmq_errno(void)
Definition: zmq.cpp:107
uint64_t affinity
Definition: options.hpp:70
socklen_t addrlen() const
Definition: ipc_address.cpp:99
void inc_seqnum()
Definition: own.cpp:66
void event_close_failed(const std::string &addr_, int fd_)
void set_pollin(handle_t handle_)
Definition: io_object.cpp:74
int set_address(const char *addr_)
int to_string(std::string &addr_)
Definition: ipc_address.cpp:77
#define EPROTO
Definition: err.hpp:58
void event_closed(const std::string &addr_, int fd_)
void launch_child(own_t *object_)
Definition: own.cpp:81
#define alloc_assert(x)
Definition: err.hpp:159
#define errno_assert(x)
Definition: err.hpp:129
#define ECONNABORTED
Definition: zmq.h:148
std::string filename
handle_t add_fd(fd_t fd_)
Definition: io_object.cpp:64
void process_term(int linger_)
options_t options
Definition: own.hpp:109
void event_accepted(const std::string &addr_, int fd_)
std::string tmp_socket_dirname
const char * address
Definition: test_fork.cpp:32
void rm_fd(handle_t handle_)
Definition: io_object.cpp:69