Line data Source code
1 : /*
2 : Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 :
4 : This file is part of libzmq, the ZeroMQ core engine in C++.
5 :
6 : libzmq is free software; you can redistribute it and/or modify it under
7 : the terms of the GNU Lesser General Public License (LGPL) as published
8 : by the Free Software Foundation; either version 3 of the License, or
9 : (at your option) any later version.
10 :
11 : As a special exception, the Contributors give you permission to link
12 : this library with independent modules to produce an executable,
13 : regardless of the license terms of these independent modules, and to
14 : copy and distribute the resulting executable under terms of your choice,
15 : provided that you also meet, for each linked independent module, the
16 : terms and conditions of the license of that module. An independent
17 : module is a module which is not derived from or based on this library.
18 : If you modify this library, you must extend this exception to your
19 : version of the library.
20 :
21 : libzmq is distributed in the hope that it will be useful, but WITHOUT
22 : ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 : FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 : License for more details.
25 :
26 : You should have received a copy of the GNU Lesser General Public License
27 : along with this program. If not, see <http://www.gnu.org/licenses/>.
28 : */
29 :
30 : #include "precompiled.hpp"
31 : #include "ipc_connecter.hpp"
32 :
33 : #if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
34 :
35 : #include <new>
36 : #include <string>
37 :
38 : #include "stream_engine.hpp"
39 : #include "io_thread.hpp"
40 : #include "platform.hpp"
41 : #include "random.hpp"
42 : #include "err.hpp"
43 : #include "ip.hpp"
44 : #include "address.hpp"
45 : #include "ipc_address.hpp"
46 : #include "session_base.hpp"
47 :
48 : #include <unistd.h>
49 : #include <sys/types.h>
50 : #include <sys/socket.h>
51 : #include <sys/un.h>
52 :
53 65 : zmq::ipc_connecter_t::ipc_connecter_t (class io_thread_t *io_thread_,
54 : class session_base_t *session_, const options_t &options_,
55 : const address_t *addr_, bool delayed_start_) :
56 : own_t (io_thread_, options_),
57 : io_object_t (io_thread_),
58 : addr (addr_),
59 : s (retired_fd),
60 : handle_valid (false),
61 : delayed_start (delayed_start_),
62 : timer_started (false),
63 : session (session_),
64 65 : current_reconnect_ivl(options.reconnect_ivl)
65 : {
66 65 : zmq_assert (addr);
67 130 : zmq_assert (addr->protocol == "ipc");
68 65 : addr->to_string (endpoint);
69 65 : socket = session-> get_socket();
70 65 : }
71 :
72 260 : zmq::ipc_connecter_t::~ipc_connecter_t ()
73 : {
74 65 : zmq_assert (!timer_started);
75 65 : zmq_assert (!handle_valid);
76 65 : zmq_assert (s == retired_fd);
77 130 : }
78 :
79 65 : void zmq::ipc_connecter_t::process_plug ()
80 : {
81 65 : if (delayed_start)
82 2 : add_reconnect_timer ();
83 : else
84 63 : start_connecting ();
85 65 : }
86 :
87 65 : void zmq::ipc_connecter_t::process_term (int linger_)
88 : {
89 65 : if (timer_started) {
90 2 : cancel_timer (reconnect_timer_id);
91 2 : timer_started = false;
92 : }
93 :
94 65 : if (handle_valid) {
95 0 : rm_fd (handle);
96 0 : handle_valid = false;
97 : }
98 :
99 65 : if (s != retired_fd)
100 0 : close ();
101 :
102 65 : own_t::process_term (linger_);
103 65 : }
104 :
105 0 : void zmq::ipc_connecter_t::in_event ()
106 : {
107 : // We are not polling for incoming data, so we are actually called
108 : // because of error here. However, we can get error on out event as well
109 : // on some platforms, so we'll simply handle both events in the same way.
110 0 : out_event ();
111 0 : }
112 :
113 63 : void zmq::ipc_connecter_t::out_event ()
114 : {
115 63 : fd_t fd = connect ();
116 63 : rm_fd (handle);
117 63 : handle_valid = false;
118 :
119 : // Handle the error condition by attempt to reconnect.
120 63 : if (fd == retired_fd) {
121 0 : close ();
122 0 : add_reconnect_timer();
123 63 : return;
124 : }
125 : // Create the engine object for this connection.
126 : stream_engine_t *engine = new (std::nothrow)
127 63 : stream_engine_t (fd, options, endpoint);
128 63 : alloc_assert (engine);
129 :
130 : // Attach the engine to the corresponding session object.
131 63 : send_attach (session, engine);
132 :
133 : // Shut the connecter down.
134 63 : terminate ();
135 :
136 63 : socket->event_connected (endpoint, fd);
137 : }
138 :
139 0 : void zmq::ipc_connecter_t::timer_event (int id_)
140 : {
141 0 : zmq_assert (id_ == reconnect_timer_id);
142 0 : timer_started = false;
143 0 : start_connecting ();
144 0 : }
145 :
146 63 : void zmq::ipc_connecter_t::start_connecting ()
147 : {
148 : // Open the connecting socket.
149 63 : int rc = open ();
150 :
151 : // Connect may succeed in synchronous manner.
152 63 : if (rc == 0) {
153 63 : handle = add_fd (s);
154 63 : handle_valid = true;
155 63 : out_event ();
156 : }
157 :
158 : // Connection establishment may be delayed. Poll for its completion.
159 : else
160 0 : if (rc == -1 && errno == EINPROGRESS) {
161 0 : handle = add_fd (s);
162 0 : handle_valid = true;
163 0 : set_pollout (handle);
164 0 : socket->event_connect_delayed (endpoint, zmq_errno());
165 : }
166 :
167 : // Handle any other error condition by eventual reconnect.
168 : else {
169 0 : if (s != retired_fd)
170 0 : close ();
171 0 : add_reconnect_timer ();
172 : }
173 63 : }
174 :
175 2 : void zmq::ipc_connecter_t::add_reconnect_timer()
176 : {
177 2 : int rc_ivl = get_new_reconnect_ivl();
178 2 : add_timer (rc_ivl, reconnect_timer_id);
179 2 : socket->event_connect_retried (endpoint, rc_ivl);
180 2 : timer_started = true;
181 2 : }
182 :
183 2 : int zmq::ipc_connecter_t::get_new_reconnect_ivl ()
184 : {
185 : // The new interval is the current interval + random value.
186 4 : int this_interval = current_reconnect_ivl +
187 4 : (generate_random () % options.reconnect_ivl);
188 :
189 : // Only change the current reconnect interval if the maximum reconnect
190 : // interval was set and if it's larger than the reconnect interval.
191 2 : if (options.reconnect_ivl_max > 0 &&
192 : options.reconnect_ivl_max > options.reconnect_ivl) {
193 :
194 : // Calculate the next interval
195 0 : current_reconnect_ivl = current_reconnect_ivl * 2;
196 0 : if(current_reconnect_ivl >= options.reconnect_ivl_max) {
197 0 : current_reconnect_ivl = options.reconnect_ivl_max;
198 : }
199 : }
200 2 : return this_interval;
201 : }
202 :
203 63 : int zmq::ipc_connecter_t::open ()
204 : {
205 63 : zmq_assert (s == retired_fd);
206 :
207 : // Create the socket.
208 63 : s = open_socket (AF_UNIX, SOCK_STREAM, 0);
209 63 : if (s == -1)
210 : return -1;
211 :
212 : // Set the non-blocking flag.
213 63 : unblock_socket (s);
214 :
215 : // Connect to the remote peer.
216 : int rc = ::connect (
217 : s, addr->resolved.ipc_addr->addr (),
218 63 : addr->resolved.ipc_addr->addrlen ());
219 :
220 : // Connect was successful immediately.
221 63 : if (rc == 0)
222 : return 0;
223 :
224 : // Translate other error codes indicating asynchronous connect has been
225 : // launched to a uniform EINPROGRESS.
226 0 : if (rc == -1 && errno == EINTR) {
227 0 : errno = EINPROGRESS;
228 0 : return -1;
229 : }
230 :
231 : // Forward the error.
232 : return -1;
233 : }
234 :
235 0 : int zmq::ipc_connecter_t::close ()
236 : {
237 0 : zmq_assert (s != retired_fd);
238 0 : int rc = ::close (s);
239 0 : errno_assert (rc == 0);
240 0 : socket->event_closed (endpoint, s);
241 0 : s = retired_fd;
242 0 : return 0;
243 : }
244 :
245 63 : zmq::fd_t zmq::ipc_connecter_t::connect ()
246 : {
247 : // Following code should handle both Berkeley-derived socket
248 : // implementations and Solaris.
249 63 : int err = 0;
250 : #if defined ZMQ_HAVE_HPUX
251 : int len = sizeof (err);
252 : #else
253 63 : socklen_t len = sizeof (err);
254 : #endif
255 63 : int rc = getsockopt (s, SOL_SOCKET, SO_ERROR, (char*) &err, &len);
256 63 : if (rc == -1) {
257 0 : if (errno == ENOPROTOOPT)
258 0 : errno = 0;
259 0 : err = errno;
260 : }
261 63 : if (err != 0) {
262 :
263 : // Assert if the error was caused by 0MQ bug.
264 : // Networking problems are OK. No need to assert.
265 0 : errno = err;
266 0 : errno_assert (errno == ECONNREFUSED || errno == ECONNRESET ||
267 : errno == ETIMEDOUT || errno == EHOSTUNREACH ||
268 : errno == ENETUNREACH || errno == ENETDOWN);
269 :
270 : return retired_fd;
271 : }
272 :
273 63 : fd_t result = s;
274 63 : s = retired_fd;
275 63 : return result;
276 : }
277 :
278 : #endif
279 :
|