libzmq  master
ZeroMQ C++ Core Engine (LIBZMQ)
vmci_connecter.cpp
Go to the documentation of this file.
1 /*
2  Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 
4  This file is part of libzmq, the ZeroMQ core engine in C++.
5 
6  libzmq is free software; you can redistribute it and/or modify it under
7  the terms of the GNU Lesser General Public License (LGPL) as published
8  by the Free Software Foundation; either version 3 of the License, or
9  (at your option) any later version.
10 
11  As a special exception, the Contributors give you permission to link
12  this library with independent modules to produce an executable,
13  regardless of the license terms of these independent modules, and to
14  copy and distribute the resulting executable under terms of your choice,
15  provided that you also meet, for each linked independent module, the
16  terms and conditions of the license of that module. An independent
17  module is a module which is not derived from or based on this library.
18  If you modify this library, you must extend this exception to your
19  version of the library.
20 
21  libzmq is distributed in the hope that it will be useful, but WITHOUT
22  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23  FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24  License for more details.
25 
26  You should have received a copy of the GNU Lesser General Public License
27  along with this program. If not, see <http://www.gnu.org/licenses/>.
28 */
29 
30 #include "vmci_connecter.hpp"
31 
32 #if defined ZMQ_HAVE_VMCI
33 
34 #include <new>
35 
36 #include "stream_engine.hpp"
37 #include "io_thread.hpp"
38 #include "platform.hpp"
39 #include "random.hpp"
40 #include "err.hpp"
41 #include "ip.hpp"
42 #include "address.hpp"
43 #include "session_base.hpp"
44 #include "vmci_address.hpp"
45 #include "vmci.hpp"
46 
47 zmq::vmci_connecter_t::vmci_connecter_t (class io_thread_t *io_thread_,
48  class session_base_t *session_, const options_t &options_,
49  const address_t *addr_, bool delayed_start_) :
50  own_t (io_thread_, options_),
51  io_object_t (io_thread_),
52  addr (addr_),
53  s (retired_fd),
54  handle_valid (false),
55  delayed_start (delayed_start_),
56  timer_started (false),
57  session (session_),
58  current_reconnect_ivl(options.reconnect_ivl)
59 {
60  zmq_assert (addr);
61  zmq_assert (addr->protocol == "vmci");
62  addr->to_string (endpoint);
63  socket = session-> get_socket();
64 }
65 
66 zmq::vmci_connecter_t::~vmci_connecter_t ()
67 {
68  zmq_assert (!timer_started);
69  zmq_assert (!handle_valid);
70  zmq_assert (s == retired_fd);
71 }
72 
73 void zmq::vmci_connecter_t::process_plug ()
74 {
75  if (delayed_start)
76  add_reconnect_timer ();
77  else
78  start_connecting ();
79 }
80 
81 void zmq::vmci_connecter_t::process_term (int linger_)
82 {
83  if (timer_started) {
84  cancel_timer (reconnect_timer_id);
85  timer_started = false;
86  }
87 
88  if (handle_valid) {
89  rm_fd (handle);
90  handle_valid = false;
91  }
92 
93  if (s != retired_fd)
94  close ();
95 
96  own_t::process_term (linger_);
97 }
98 
99 void zmq::vmci_connecter_t::in_event ()
100 {
101  // We are not polling for incoming data, so we are actually called
102  // because of error here. However, we can get error on out event as well
103  // on some platforms, so we'll simply handle both events in the same way.
104  out_event ();
105 }
106 
107 void zmq::vmci_connecter_t::out_event ()
108 {
109  fd_t fd = connect ();
110  rm_fd (handle);
111  handle_valid = false;
112 
113  // Handle the error condition by attempt to reconnect.
114  if (fd == retired_fd) {
115  close ();
116  add_reconnect_timer();
117  return;
118  }
119 
120  tune_vmci_buffer_size (this->get_ctx (), fd, options.vmci_buffer_size,
121  options.vmci_buffer_min_size, options.vmci_buffer_max_size);
122 
123  if (options.vmci_connect_timeout > 0)
124  {
125 #if defined ZMQ_HAVE_WINDOWS
126  tune_vmci_connect_timeout (this->get_ctx (), fd, options.vmci_connect_timeout);
127 #else
128  struct timeval timeout = {0, options.vmci_connect_timeout * 1000};
129  tune_vmci_connect_timeout (this->get_ctx (), fd, timeout);
130 #endif
131  }
132 
133  // Create the engine object for this connection.
134  stream_engine_t *engine = new (std::nothrow)
135  stream_engine_t (fd, options, endpoint);
136  alloc_assert (engine);
137 
138  // Attach the engine to the corresponding session object.
139  send_attach (session, engine);
140 
141  // Shut the connecter down.
142  terminate ();
143 
144  socket->event_connected (endpoint, fd);
145 }
146 
147 void zmq::vmci_connecter_t::timer_event (int id_)
148 {
149  zmq_assert (id_ == reconnect_timer_id);
150  timer_started = false;
151  start_connecting ();
152 }
153 
154 void zmq::vmci_connecter_t::start_connecting ()
155 {
156  // Open the connecting socket.
157  int rc = open ();
158 
159  // Connect may succeed in synchronous manner.
160  if (rc == 0) {
161  handle = add_fd (s);
162  handle_valid = true;
163  out_event ();
164  }
165 
166  // Connection establishment may be delayed. Poll for its completion.
167  else
168  if (rc == -1 && errno == EINPROGRESS) {
169  handle = add_fd (s);
170  handle_valid = true;
171  set_pollout (handle);
172  socket->event_connect_delayed (endpoint, zmq_errno());
173  }
174 
175  // Handle any other error condition by eventual reconnect.
176  else {
177  if (s != retired_fd)
178  close ();
179  add_reconnect_timer ();
180  }
181 }
182 
183 void zmq::vmci_connecter_t::add_reconnect_timer()
184 {
185  int rc_ivl = get_new_reconnect_ivl();
186  add_timer (rc_ivl, reconnect_timer_id);
187  socket->event_connect_retried (endpoint, rc_ivl);
188  timer_started = true;
189 }
190 
191 int zmq::vmci_connecter_t::get_new_reconnect_ivl ()
192 {
193  // The new interval is the current interval + random value.
194  int this_interval = current_reconnect_ivl +
195  (generate_random () % options.reconnect_ivl);
196 
197  // Only change the current reconnect interval if the maximum reconnect
198  // interval was set and if it's larger than the reconnect interval.
199  if (options.reconnect_ivl_max > 0 &&
200  options.reconnect_ivl_max > options.reconnect_ivl) {
201 
202  // Calculate the next interval
203  current_reconnect_ivl = current_reconnect_ivl * 2;
204  if(current_reconnect_ivl >= options.reconnect_ivl_max) {
205  current_reconnect_ivl = options.reconnect_ivl_max;
206  }
207  }
208  return this_interval;
209 }
210 
211 int zmq::vmci_connecter_t::open ()
212 {
213  zmq_assert (s == retired_fd);
214 
215  int family = this->get_ctx ()->get_vmci_socket_family ();
216  if (family == -1)
217  return -1;
218 
219  // Create the socket.
220  s = open_socket (family, SOCK_STREAM, 0);
221 #ifdef ZMQ_HAVE_WINDOWS
222  if (s == INVALID_SOCKET) {
223  errno = wsa_error_to_errno(WSAGetLastError());
224  return -1;
225  }
226 #else
227  if (s == -1)
228  return -1;
229 #endif
230 
231  // Set the non-blocking flag.
232  unblock_socket (s);
233 
234  // Connect to the remote peer.
235  int rc = ::connect (
236  s, addr->resolved.vmci_addr->addr (),
237  addr->resolved.vmci_addr->addrlen ());
238 
239  // Connect was successful immediately.
240  if (rc == 0)
241  return 0;
242 
243  // Translate error codes indicating asynchronous connect has been
244  // launched to a uniform EINPROGRESS.
245 #ifdef ZMQ_HAVE_WINDOWS
246  const int error_code = WSAGetLastError();
247  if (error_code == WSAEINPROGRESS || error_code == WSAEWOULDBLOCK)
248  errno = EINPROGRESS;
249  else
250  errno = wsa_error_to_errno(error_code);
251 #else
252  if (errno == EINTR)
253  errno = EINPROGRESS;
254 #endif
255 
256  // Forward the error.
257  return -1;
258 }
259 
260 void zmq::vmci_connecter_t::close ()
261 {
262  zmq_assert (s != retired_fd);
263 #ifdef ZMQ_HAVE_WINDOWS
264  const int rc = closesocket (s);
265  wsa_assert (rc != SOCKET_ERROR);
266 #else
267  const int rc = ::close (s);
268  errno_assert (rc == 0);
269 #endif
270  socket->event_closed (endpoint, s);
271  s = retired_fd;
272 }
273 
274 zmq::fd_t zmq::vmci_connecter_t::connect ()
275 {
276  // Following code should handle both Berkeley-derived socket
277  // implementations and Solaris.
278  int err = 0;
279 #if defined ZMQ_HAVE_HPUX
280  int len = sizeof (err);
281 #else
282  socklen_t len = sizeof (err);
283 #endif
284  int rc = getsockopt (s, SOL_SOCKET, SO_ERROR, (char*) &err, &len);
285 
286  // Assert if the error was caused by 0MQ bug.
287  // Networking problems are OK. No need to assert.
288 #ifdef ZMQ_HAVE_WINDOWS
289  zmq_assert(rc == 0);
290  if (err != 0) {
291  if (err != WSAECONNREFUSED
292  && err != WSAETIMEDOUT
293  && err != WSAECONNABORTED
294  && err != WSAEHOSTUNREACH
295  && err != WSAENETUNREACH
296  && err != WSAENETDOWN
297  && err != WSAEACCES
298  && err != WSAEINVAL
299  && err != WSAEADDRINUSE
300  && err != WSAECONNRESET)
301  {
302  wsa_assert_no(err);
303  }
304  return retired_fd;
305  }
306 #else
307  // Following code should handle both Berkeley-derived socket
308  // implementations and Solaris.
309  if (rc == -1)
310  err = errno;
311  if (err != 0) {
312  errno = err;
313  errno_assert(
314  errno == ECONNREFUSED ||
315  errno == ECONNRESET ||
316  errno == ETIMEDOUT ||
317  errno == EHOSTUNREACH ||
318  errno == ENETUNREACH ||
319  errno == ENETDOWN ||
320  errno == EINVAL);
321  return retired_fd;
322  }
323 #endif
324 
325  fd_t result = s;
326  s = retired_fd;
327  return result;
328 }
329 
330 #endif
void process_term(int linger_)
Definition: own.cpp:158
int fd_t
Definition: fd.hpp:50
fd_t open_socket(int domain_, int type_, int protocol_)
Definition: ip.cpp:50
void unblock_socket(fd_t s_)
Definition: ip.cpp:84
#define zmq_assert(x)
Definition: err.hpp:119
ZMQ_EXPORT int zmq_errno(void)
Definition: zmq.cpp:107
uint32_t generate_random()
Definition: random.cpp:54
#define EHOSTUNREACH
Definition: zmq.h:160
#define ETIMEDOUT
Definition: zmq.h:157
#define ECONNREFUSED
Definition: zmq.h:130
#define ECONNRESET
Definition: zmq.h:151
#define ENETDOWN
Definition: zmq.h:121
#define EINPROGRESS
Definition: zmq.h:133
#define alloc_assert(x)
Definition: err.hpp:159
#define errno_assert(x)
Definition: err.hpp:129
#define ENETUNREACH
Definition: zmq.h:145