libzmq  master
ZeroMQ C++ Core Engine (LIBZMQ)
tipc_listener.cpp
Go to the documentation of this file.
1  /*
2  Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 
4  This file is part of libzmq, the ZeroMQ core engine in C++.
5 
6  libzmq is free software; you can redistribute it and/or modify it under
7  the terms of the GNU Lesser General Public License (LGPL) as published
8  by the Free Software Foundation; either version 3 of the License, or
9  (at your option) any later version.
10 
11  As a special exception, the Contributors give you permission to link
12  this library with independent modules to produce an executable,
13  regardless of the license terms of these independent modules, and to
14  copy and distribute the resulting executable under terms of your choice,
15  provided that you also meet, for each linked independent module, the
16  terms and conditions of the license of that module. An independent
17  module is a module which is not derived from or based on this library.
18  If you modify this library, you must extend this exception to your
19  version of the library.
20 
21  libzmq is distributed in the hope that it will be useful, but WITHOUT
22  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23  FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24  License for more details.
25 
26  You should have received a copy of the GNU Lesser General Public License
27  along with this program. If not, see <http://www.gnu.org/licenses/>.
28 */
29 
30 #include "tipc_listener.hpp"
31 
32 #if defined ZMQ_HAVE_TIPC
33 
34 #include <new>
35 
36 #include <string.h>
37 
38 #include "stream_engine.hpp"
39 #include "tipc_address.hpp"
40 #include "io_thread.hpp"
41 #include "session_base.hpp"
42 #include "config.hpp"
43 #include "err.hpp"
44 #include "ip.hpp"
45 #include "socket_base.hpp"
46 
47 #include <unistd.h>
48 #include <sys/socket.h>
49 #include <fcntl.h>
50 #include <linux/tipc.h>
51 
52 zmq::tipc_listener_t::tipc_listener_t (io_thread_t *io_thread_,
53  socket_base_t *socket_, const options_t &options_) :
54  own_t (io_thread_, options_),
55  io_object_t (io_thread_),
56  s (retired_fd),
57  socket (socket_)
58 {
59 }
60 
61 zmq::tipc_listener_t::~tipc_listener_t ()
62 {
63  zmq_assert (s == retired_fd);
64 }
65 
66 void zmq::tipc_listener_t::process_plug ()
67 {
68  // Start polling for incoming connections.
69  handle = add_fd (s);
70  set_pollin (handle);
71 }
72 
73 void zmq::tipc_listener_t::process_term (int linger_)
74 {
75  rm_fd (handle);
76  close ();
77  own_t::process_term (linger_);
78 }
79 
80 void zmq::tipc_listener_t::in_event ()
81 {
82  fd_t fd = accept ();
83 
84  // If connection was reset by the peer in the meantime, just ignore it.
85  // TODO: Handle specific errors like ENFILE/EMFILE etc.
86  if (fd == retired_fd) {
87  socket->event_accept_failed (endpoint, zmq_errno());
88  return;
89  }
90 
91  // Create the engine object for this connection.
92  stream_engine_t *engine = new (std::nothrow) stream_engine_t (fd, options, endpoint);
93  alloc_assert (engine);
94 
95  // Choose I/O thread to run connecter in. Given that we are already
96  // running in an I/O thread, there must be at least one available.
97  io_thread_t *io_thread = choose_io_thread (options.affinity);
98  zmq_assert (io_thread);
99 
100  // Create and launch a session object.
101  session_base_t *session = session_base_t::create (io_thread, false, socket,
102  options, NULL);
103  errno_assert (session);
104  session->inc_seqnum ();
105  launch_child (session);
106  send_attach (session, engine, false);
107  socket->event_accepted (endpoint, fd);
108 }
109 
110 int zmq::tipc_listener_t::get_address (std::string &addr_)
111 {
112  struct sockaddr_storage ss;
113  socklen_t sl = sizeof (ss);
114 
115  int rc = getsockname (s, (sockaddr *) &ss, &sl);
116  if (rc != 0) {
117  addr_.clear ();
118  return rc;
119  }
120 
121  tipc_address_t addr ((struct sockaddr *) &ss, sl);
122  return addr.to_string (addr_);
123 }
124 
125 int zmq::tipc_listener_t::set_address (const char *addr_)
126 {
127  //convert str to address struct
128  int rc = address.resolve(addr_);
129  if (rc != 0)
130  return -1;
131  // Create a listening socket.
132  s = open_socket (AF_TIPC, SOCK_STREAM, 0);
133  if (s == -1)
134  return -1;
135 
136  address.to_string (endpoint);
137 
138  // Bind the socket to tipc name.
139  rc = bind (s, address.addr (), address.addrlen ());
140  if (rc != 0)
141  goto error;
142 
143  // Listen for incoming connections.
144  rc = listen (s, options.backlog);
145  if (rc != 0)
146  goto error;
147 
148  socket->event_listening (endpoint, s);
149  return 0;
150 
151 error:
152  int err = errno;
153  close ();
154  errno = err;
155  return -1;
156 }
157 
158 void zmq::tipc_listener_t::close ()
159 {
160  zmq_assert (s != retired_fd);
161  int rc = ::close (s);
162  errno_assert (rc == 0);
163  s = retired_fd;
164  socket->event_closed (endpoint, s);
165 }
166 
167 zmq::fd_t zmq::tipc_listener_t::accept ()
168 {
169  // Accept one connection and deal with different failure modes.
170  // The situation where connection cannot be accepted due to insufficient
171  // resources is considered valid and treated by ignoring the connection.
172  struct sockaddr_storage ss = {};
173  socklen_t ss_len = sizeof(ss);
174 
175  zmq_assert (s != retired_fd);
176  fd_t sock = ::accept (s, (struct sockaddr *) &ss, &ss_len);
177  if (sock == -1) {
178  errno_assert (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS ||
179  errno == EINTR || errno == ECONNABORTED || errno == EPROTO || errno == EMFILE ||
180  errno == ENFILE);
181  return retired_fd;
182  }
183  /*FIXME Accept filters?*/
184  return sock;
185 }
186 
187 #endif
188 
static session_base_t * create(zmq::io_thread_t *io_thread_, bool active_, zmq::socket_base_t *socket_, const options_t &options_, address_t *addr_)
void process_term(int linger_)
Definition: own.cpp:158
int fd_t
Definition: fd.hpp:50
fd_t open_socket(int domain_, int type_, int protocol_)
Definition: ip.cpp:50
#define zmq_assert(x)
Definition: err.hpp:119
Definition: command.hpp:84
#define ENOBUFS
Definition: zmq.h:118
ZMQ_EXPORT int zmq_errno(void)
Definition: zmq.cpp:107
#define EPROTO
Definition: err.hpp:58
#define alloc_assert(x)
Definition: err.hpp:159
#define errno_assert(x)
Definition: err.hpp:129
#define ECONNABORTED
Definition: zmq.h:148
const char * address
Definition: test_fork.cpp:32