libzmq  master
ZeroMQ C++ Core Engine (LIBZMQ)
tipc_connecter.cpp
Go to the documentation of this file.
1 /*
2  Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 
4  This file is part of libzmq, the ZeroMQ core engine in C++.
5 
6  libzmq is free software; you can redistribute it and/or modify it under
7  the terms of the GNU Lesser General Public License (LGPL) as published
8  by the Free Software Foundation; either version 3 of the License, or
9  (at your option) any later version.
10 
11  As a special exception, the Contributors give you permission to link
12  this library with independent modules to produce an executable,
13  regardless of the license terms of these independent modules, and to
14  copy and distribute the resulting executable under terms of your choice,
15  provided that you also meet, for each linked independent module, the
16  terms and conditions of the license of that module. An independent
17  module is a module which is not derived from or based on this library.
18  If you modify this library, you must extend this exception to your
19  version of the library.
20 
21  libzmq is distributed in the hope that it will be useful, but WITHOUT
22  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23  FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24  License for more details.
25 
26  You should have received a copy of the GNU Lesser General Public License
27  along with this program. If not, see <http://www.gnu.org/licenses/>.
28 */
29 
30 #include "tipc_connecter.hpp"
31 
32 #if defined ZMQ_HAVE_TIPC
33 
34 #include <new>
35 #include <string>
36 
37 #include "stream_engine.hpp"
38 #include "io_thread.hpp"
39 #include "platform.hpp"
40 #include "random.hpp"
41 #include "err.hpp"
42 #include "ip.hpp"
43 #include "address.hpp"
44 #include "tipc_address.hpp"
45 #include "session_base.hpp"
46 
47 #include <unistd.h>
48 #include <sys/types.h>
49 #include <sys/socket.h>
50 
51 zmq::tipc_connecter_t::tipc_connecter_t (class io_thread_t *io_thread_,
52  class session_base_t *session_, const options_t &options_,
53  const address_t *addr_, bool delayed_start_) :
54  own_t (io_thread_, options_),
55  io_object_t (io_thread_),
56  addr (addr_),
57  s (retired_fd),
58  handle_valid (false),
59  delayed_start (delayed_start_),
60  timer_started (false),
61  session (session_),
62  current_reconnect_ivl(options.reconnect_ivl)
63 {
64  zmq_assert (addr);
65  zmq_assert (addr->protocol == "tipc");
66  addr->to_string (endpoint);
67  socket = session-> get_socket();
68 }
69 
70 zmq::tipc_connecter_t::~tipc_connecter_t ()
71 {
72  zmq_assert (!timer_started);
73  zmq_assert (!handle_valid);
74  zmq_assert (s == retired_fd);
75 }
76 
77 void zmq::tipc_connecter_t::process_plug ()
78 {
79  if (delayed_start)
80  add_reconnect_timer ();
81  else
82  start_connecting ();
83 }
84 
85 void zmq::tipc_connecter_t::process_term (int linger_)
86 {
87  if (timer_started) {
88  cancel_timer (reconnect_timer_id);
89  timer_started = false;
90  }
91 
92  if (handle_valid) {
93  rm_fd (handle);
94  handle_valid = false;
95  }
96 
97  if (s != retired_fd)
98  close ();
99 
100  own_t::process_term (linger_);
101 }
102 
103 void zmq::tipc_connecter_t::in_event ()
104 {
105  // We are not polling for incoming data, so we are actually called
106  // because of error here. However, we can get error on out event as well
107  // on some platforms, so we'll simply handle both events in the same way.
108  out_event ();
109 }
110 
111 void zmq::tipc_connecter_t::out_event ()
112 {
113  fd_t fd = connect ();
114  rm_fd (handle);
115  handle_valid = false;
116 
117  // Handle the error condition by attempt to reconnect.
118  if (fd == retired_fd) {
119  close ();
120  add_reconnect_timer();
121  return;
122  }
123  // Create the engine object for this connection.
124  stream_engine_t *engine = new (std::nothrow) stream_engine_t (fd, options, endpoint);
125  alloc_assert (engine);
126 
127  // Attach the engine to the corresponding session object.
128  send_attach (session, engine);
129 
130  // Shut the connecter down.
131  terminate ();
132 
133  socket->event_connected (endpoint, fd);
134 }
135 
136 void zmq::tipc_connecter_t::timer_event (int id_)
137 {
138  zmq_assert (id_ == reconnect_timer_id);
139  timer_started = false;
140  start_connecting ();
141 }
142 
143 void zmq::tipc_connecter_t::start_connecting ()
144 {
145  // Open the connecting socket.
146  int rc = open ();
147 
148  // Connect may succeed in synchronous manner.
149  if (rc == 0) {
150  handle = add_fd (s);
151  handle_valid = true;
152  out_event ();
153  }
154 
155  // Connection establishment may be delayed. Poll for its completion.
156  else
157  if (rc == -1 && errno == EINPROGRESS) {
158  handle = add_fd (s);
159  handle_valid = true;
160  set_pollout (handle);
161  socket->event_connect_delayed (endpoint, zmq_errno());
162  }
163 
164  // Handle any other error condition by eventual reconnect.
165  else {
166  if (s != retired_fd)
167  close ();
168  add_reconnect_timer ();
169  }
170 }
171 
172 void zmq::tipc_connecter_t::add_reconnect_timer()
173 {
174  int rc_ivl = get_new_reconnect_ivl();
175  add_timer (rc_ivl, reconnect_timer_id);
176  socket->event_connect_retried (endpoint, rc_ivl);
177  timer_started = true;
178 }
179 
180 int zmq::tipc_connecter_t::get_new_reconnect_ivl ()
181 {
182  // The new interval is the current interval + random value.
183  int this_interval = current_reconnect_ivl +
184  (generate_random () % options.reconnect_ivl);
185 
186  // Only change the current reconnect interval if the maximum reconnect
187  // interval was set and if it's larger than the reconnect interval.
188  if (options.reconnect_ivl_max > 0 &&
189  options.reconnect_ivl_max > options.reconnect_ivl) {
190 
191  // Calculate the next interval
192  current_reconnect_ivl = current_reconnect_ivl * 2;
193  if(current_reconnect_ivl >= options.reconnect_ivl_max) {
194  current_reconnect_ivl = options.reconnect_ivl_max;
195  }
196  }
197  return this_interval;
198 }
199 
200 int zmq::tipc_connecter_t::open ()
201 {
202  zmq_assert (s == retired_fd);
203 
204  // Create the socket.
205  s = open_socket (AF_TIPC, SOCK_STREAM, 0);
206  if (s == -1)
207  return -1;
208 
209  // Set the non-blocking flag.
210  unblock_socket (s);
211  // Connect to the remote peer.
212  int rc = ::connect (
213  s, addr->resolved.tipc_addr->addr (),
214  addr->resolved.tipc_addr->addrlen ());
215 
216  // Connect was successful immediately.
217  if (rc == 0)
218  return 0;
219 
220  // Translate other error codes indicating asynchronous connect has been
221  // launched to a uniform EINPROGRESS.
222  if (rc == -1 && errno == EINTR) {
223  errno = EINPROGRESS;
224  return -1;
225  }
226  // Forward the error.
227  return -1;
228 }
229 
230 void zmq::tipc_connecter_t::close ()
231 {
232  zmq_assert (s != retired_fd);
233  int rc = ::close (s);
234  errno_assert (rc == 0);
235  socket->event_closed (endpoint, s);
236  s = retired_fd;
237 }
238 
239 zmq::fd_t zmq::tipc_connecter_t::connect ()
240 {
241  // Following code should handle both Berkeley-derived socket
242  // implementations and Solaris.
243  int err = 0;
244  socklen_t len = sizeof (err);
245 
246  int rc = getsockopt (s, SOL_SOCKET, SO_ERROR, (char*) &err, &len);
247  if (rc == -1)
248  err = errno;
249  if (err != 0) {
250 
251  // Assert if the error was caused by 0MQ bug.
252  // Networking problems are OK. No need to assert.
253  errno = err;
254  errno_assert (errno == ECONNREFUSED || errno == ECONNRESET ||
255  errno == ETIMEDOUT || errno == EHOSTUNREACH ||
256  errno == ENETUNREACH || errno == ENETDOWN);
257 
258  return retired_fd;
259  }
260  fd_t result = s;
261  s = retired_fd;
262  return result;
263 }
264 
265 #endif
266 
void process_term(int linger_)
Definition: own.cpp:158
int fd_t
Definition: fd.hpp:50
fd_t open_socket(int domain_, int type_, int protocol_)
Definition: ip.cpp:50
void unblock_socket(fd_t s_)
Definition: ip.cpp:84
#define zmq_assert(x)
Definition: err.hpp:119
ZMQ_EXPORT int zmq_errno(void)
Definition: zmq.cpp:107
uint32_t generate_random()
Definition: random.cpp:54
#define EHOSTUNREACH
Definition: zmq.h:160
#define ETIMEDOUT
Definition: zmq.h:157
#define ECONNREFUSED
Definition: zmq.h:130
#define ECONNRESET
Definition: zmq.h:151
#define ENETDOWN
Definition: zmq.h:121
#define EINPROGRESS
Definition: zmq.h:133
#define alloc_assert(x)
Definition: err.hpp:159
#define errno_assert(x)
Definition: err.hpp:129
#define ENETUNREACH
Definition: zmq.h:145