libzmq  master
ZeroMQ C++ Core Engine (LIBZMQ)
ipc_connecter.cpp
Go to the documentation of this file.
1 /*
2  Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 
4  This file is part of libzmq, the ZeroMQ core engine in C++.
5 
6  libzmq is free software; you can redistribute it and/or modify it under
7  the terms of the GNU Lesser General Public License (LGPL) as published
8  by the Free Software Foundation; either version 3 of the License, or
9  (at your option) any later version.
10 
11  As a special exception, the Contributors give you permission to link
12  this library with independent modules to produce an executable,
13  regardless of the license terms of these independent modules, and to
14  copy and distribute the resulting executable under terms of your choice,
15  provided that you also meet, for each linked independent module, the
16  terms and conditions of the license of that module. An independent
17  module is a module which is not derived from or based on this library.
18  If you modify this library, you must extend this exception to your
19  version of the library.
20 
21  libzmq is distributed in the hope that it will be useful, but WITHOUT
22  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23  FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24  License for more details.
25 
26  You should have received a copy of the GNU Lesser General Public License
27  along with this program. If not, see <http://www.gnu.org/licenses/>.
28 */
29 
30 #include "precompiled.hpp"
31 #include "ipc_connecter.hpp"
32 
33 #if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
34 
35 #include <new>
36 #include <string>
37 
38 #include "stream_engine.hpp"
39 #include "io_thread.hpp"
40 #include "platform.hpp"
41 #include "random.hpp"
42 #include "err.hpp"
43 #include "ip.hpp"
44 #include "address.hpp"
45 #include "ipc_address.hpp"
46 #include "session_base.hpp"
47 
48 #include <unistd.h>
49 #include <sys/types.h>
50 #include <sys/socket.h>
51 #include <sys/un.h>
52 
54  class session_base_t *session_, const options_t &options_,
55  const address_t *addr_, bool delayed_start_) :
56  own_t (io_thread_, options_),
57  io_object_t (io_thread_),
58  addr (addr_),
59  s (retired_fd),
60  handle_valid (false),
61  delayed_start (delayed_start_),
62  timer_started (false),
63  session (session_),
64  current_reconnect_ivl(options.reconnect_ivl)
65 {
66  zmq_assert (addr);
67  zmq_assert (addr->protocol == "ipc");
69  socket = session-> get_socket();
70 }
71 
73 {
76  zmq_assert (s == retired_fd);
77 }
78 
80 {
81  if (delayed_start)
83  else
85 }
86 
88 {
89  if (timer_started) {
91  timer_started = false;
92  }
93 
94  if (handle_valid) {
95  rm_fd (handle);
96  handle_valid = false;
97  }
98 
99  if (s != retired_fd)
100  close ();
101 
102  own_t::process_term (linger_);
103 }
104 
106 {
107  // We are not polling for incoming data, so we are actually called
108  // because of error here. However, we can get error on out event as well
109  // on some platforms, so we'll simply handle both events in the same way.
110  out_event ();
111 }
112 
114 {
115  fd_t fd = connect ();
116  rm_fd (handle);
117  handle_valid = false;
118 
119  // Handle the error condition by attempt to reconnect.
120  if (fd == retired_fd) {
121  close ();
123  return;
124  }
125  // Create the engine object for this connection.
126  stream_engine_t *engine = new (std::nothrow)
128  alloc_assert (engine);
129 
130  // Attach the engine to the corresponding session object.
131  send_attach (session, engine);
132 
133  // Shut the connecter down.
134  terminate ();
135 
137 }
138 
140 {
142  timer_started = false;
143  start_connecting ();
144 }
145 
147 {
148  // Open the connecting socket.
149  int rc = open ();
150 
151  // Connect may succeed in synchronous manner.
152  if (rc == 0) {
153  handle = add_fd (s);
154  handle_valid = true;
155  out_event ();
156  }
157 
158  // Connection establishment may be delayed. Poll for its completion.
159  else
160  if (rc == -1 && errno == EINPROGRESS) {
161  handle = add_fd (s);
162  handle_valid = true;
165  }
166 
167  // Handle any other error condition by eventual reconnect.
168  else {
169  if (s != retired_fd)
170  close ();
172  }
173 }
174 
176 {
177  int rc_ivl = get_new_reconnect_ivl();
178  add_timer (rc_ivl, reconnect_timer_id);
180  timer_started = true;
181 }
182 
184 {
185  // The new interval is the current interval + random value.
186  int this_interval = current_reconnect_ivl +
188 
189  // Only change the current reconnect interval if the maximum reconnect
190  // interval was set and if it's larger than the reconnect interval.
191  if (options.reconnect_ivl_max > 0 &&
193 
194  // Calculate the next interval
198  }
199  }
200  return this_interval;
201 }
202 
204 {
205  zmq_assert (s == retired_fd);
206 
207  // Create the socket.
208  s = open_socket (AF_UNIX, SOCK_STREAM, 0);
209  if (s == -1)
210  return -1;
211 
212  // Set the non-blocking flag.
213  unblock_socket (s);
214 
215  // Connect to the remote peer.
216  int rc = ::connect (
217  s, addr->resolved.ipc_addr->addr (),
219 
220  // Connect was successful immediately.
221  if (rc == 0)
222  return 0;
223 
224  // Translate other error codes indicating asynchronous connect has been
225  // launched to a uniform EINPROGRESS.
226  if (rc == -1 && errno == EINTR) {
227  errno = EINPROGRESS;
228  return -1;
229  }
230 
231  // Forward the error.
232  return -1;
233 }
234 
236 {
237  zmq_assert (s != retired_fd);
238  int rc = ::close (s);
239  errno_assert (rc == 0);
241  s = retired_fd;
242  return 0;
243 }
244 
246 {
247  // Following code should handle both Berkeley-derived socket
248  // implementations and Solaris.
249  int err = 0;
250 #if defined ZMQ_HAVE_HPUX
251  int len = sizeof (err);
252 #else
253  socklen_t len = sizeof (err);
254 #endif
255  int rc = getsockopt (s, SOL_SOCKET, SO_ERROR, (char*) &err, &len);
256  if (rc == -1) {
257  if (errno == ENOPROTOOPT)
258  errno = 0;
259  err = errno;
260  }
261  if (err != 0) {
262 
263  // Assert if the error was caused by 0MQ bug.
264  // Networking problems are OK. No need to assert.
265  errno = err;
266  errno_assert (errno == ECONNREFUSED || errno == ECONNRESET ||
267  errno == ETIMEDOUT || errno == EHOSTUNREACH ||
268  errno == ENETUNREACH || errno == ENETDOWN);
269 
270  return retired_fd;
271  }
272 
273  fd_t result = s;
274  s = retired_fd;
275  return result;
276 }
277 
278 #endif
279 
void send_attach(zmq::session_base_t *destination_, zmq::i_engine *engine_, bool inc_seqnum_=true)
Definition: object.cpp:225
void process_term(int linger_)
Definition: own.cpp:158
void set_pollout(handle_t handle_)
Definition: io_object.cpp:84
ipc_address_t * ipc_addr
Definition: address.hpp:63
fd_t open_socket(int domain_, int type_, int protocol_)
Definition: ip.cpp:50
void unblock_socket(fd_t s_)
Definition: ip.cpp:84
const sockaddr * addr() const
Definition: ipc_address.cpp:94
#define zmq_assert(x)
Definition: err.hpp:119
void cancel_timer(int id_)
Definition: io_object.cpp:99
ipc_connecter_t(zmq::io_thread_t *io_thread_, zmq::session_base_t *session_, const options_t &options_, const address_t *addr_, bool delayed_start_)
ZMQ_EXPORT int zmq_errno(void)
Definition: zmq.cpp:107
void event_connected(const std::string &addr_, int fd_)
void terminate()
Definition: own.cpp:135
socklen_t addrlen() const
Definition: ipc_address.cpp:99
int reconnect_ivl_max
Definition: options.hpp:118
void timer_event(int id_)
void event_connect_delayed(const std::string &addr_, int err_)
const address_t * addr
uint32_t generate_random()
Definition: random.cpp:54
const std::string protocol
Definition: address.hpp:54
#define EHOSTUNREACH
Definition: zmq.h:160
void process_term(int linger_)
#define ETIMEDOUT
Definition: zmq.h:157
#define ECONNREFUSED
Definition: zmq.h:130
void add_timer(int timout_, int id_)
Definition: io_object.cpp:94
void event_closed(const std::string &addr_, int fd_)
#define ECONNRESET
Definition: zmq.h:151
void event_connect_retried(const std::string &addr_, int interval_)
#define ENETDOWN
Definition: zmq.h:121
#define EINPROGRESS
Definition: zmq.h:133
#define alloc_assert(x)
Definition: err.hpp:159
#define errno_assert(x)
Definition: err.hpp:129
#define ENETUNREACH
Definition: zmq.h:145
handle_t add_fd(fd_t fd_)
Definition: io_object.cpp:64
options_t options
Definition: own.hpp:109
union zmq::address_t::@0 resolved
int to_string(std::string &addr_) const
Definition: address.cpp:95
void rm_fd(handle_t handle_)
Definition: io_object.cpp:69
zmq::session_base_t * session
zmq::socket_base_t * socket