Line data Source code
1 : /*
2 : Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 :
4 : This file is part of libzmq, the ZeroMQ core engine in C++.
5 :
6 : libzmq is free software; you can redistribute it and/or modify it under
7 : the terms of the GNU Lesser General Public License (LGPL) as published
8 : by the Free Software Foundation; either version 3 of the License, or
9 : (at your option) any later version.
10 :
11 : As a special exception, the Contributors give you permission to link
12 : this library with independent modules to produce an executable,
13 : regardless of the license terms of these independent modules, and to
14 : copy and distribute the resulting executable under terms of your choice,
15 : provided that you also meet, for each linked independent module, the
16 : terms and conditions of the license of that module. An independent
17 : module is a module which is not derived from or based on this library.
18 : If you modify this library, you must extend this exception to your
19 : version of the library.
20 :
21 : libzmq is distributed in the hope that it will be useful, but WITHOUT
22 : ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 : FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 : License for more details.
25 :
26 : You should have received a copy of the GNU Lesser General Public License
27 : along with this program. If not, see <http://www.gnu.org/licenses/>.
28 : */
29 :
30 : #include "precompiled.hpp"
31 : #include "macros.hpp"
32 : #include "ip.hpp"
33 : #include "tcp.hpp"
34 : #include "err.hpp"
35 : #include "platform.hpp"
36 :
37 : #if defined ZMQ_HAVE_WINDOWS
38 : #include "windows.hpp"
39 : #else
40 : #include <fcntl.h>
41 : #include <sys/types.h>
42 : #include <sys/socket.h>
43 : #include <netinet/in.h>
44 : #include <netinet/tcp.h>
45 : #endif
46 :
47 : #if defined ZMQ_HAVE_OPENVMS
48 : #include <ioctl.h>
49 : #endif
50 :
51 6645 : void zmq::tune_tcp_socket (fd_t s_)
52 : {
53 : // Disable Nagle's algorithm. We are doing data batching on 0MQ level,
54 : // so using Nagle wouldn't improve throughput in anyway, but it would
55 : // hurt latency.
56 6645 : int nodelay = 1;
57 : int rc = setsockopt (s_, IPPROTO_TCP, TCP_NODELAY, (char*) &nodelay,
58 6645 : sizeof (int));
59 : #ifdef ZMQ_HAVE_WINDOWS
60 : wsa_assert (rc != SOCKET_ERROR);
61 : #else
62 6641 : errno_assert (rc == 0);
63 : #endif
64 :
65 : #ifdef ZMQ_HAVE_OPENVMS
66 : // Disable delayed acknowledgements as they hurt latency significantly.
67 : int nodelack = 1;
68 : rc = setsockopt (s_, IPPROTO_TCP, TCP_NODELACK, (char*) &nodelack,
69 : sizeof (int));
70 : errno_assert (rc != SOCKET_ERROR);
71 : #endif
72 6641 : }
73 :
74 0 : void zmq::set_tcp_send_buffer (fd_t sockfd_, int bufsize_)
75 : {
76 : const int rc = setsockopt (sockfd_, SOL_SOCKET, SO_SNDBUF,
77 0 : (char*) &bufsize_, sizeof bufsize_);
78 : #ifdef ZMQ_HAVE_WINDOWS
79 : wsa_assert (rc != SOCKET_ERROR);
80 : #else
81 0 : errno_assert (rc == 0);
82 : #endif
83 0 : }
84 :
85 0 : void zmq::set_tcp_receive_buffer (fd_t sockfd_, int bufsize_)
86 : {
87 : const int rc = setsockopt (sockfd_, SOL_SOCKET, SO_RCVBUF,
88 0 : (char *) &bufsize_, sizeof bufsize_);
89 : #ifdef ZMQ_HAVE_WINDOWS
90 : wsa_assert (rc != SOCKET_ERROR);
91 : #else
92 0 : errno_assert (rc == 0);
93 : #endif
94 0 : }
95 :
96 6644 : void zmq::tune_tcp_keepalives (fd_t s_, int keepalive_, int keepalive_cnt_,
97 : int keepalive_idle_, int keepalive_intvl_)
98 : {
99 : // These options are used only under certain #ifdefs below.
100 : LIBZMQ_UNUSED (keepalive_);
101 : LIBZMQ_UNUSED (keepalive_cnt_);
102 : LIBZMQ_UNUSED (keepalive_idle_);
103 : LIBZMQ_UNUSED (keepalive_intvl_);
104 :
105 : // If none of the #ifdefs apply, then s_ is unused.
106 : LIBZMQ_UNUSED (s_);
107 :
108 : // Tuning TCP keep-alives if platform allows it
109 : // All values = -1 means skip and leave it for OS
110 : #ifdef ZMQ_HAVE_WINDOWS
111 : if (keepalive_ != -1) {
112 : tcp_keepalive keepalive_opts;
113 : keepalive_opts.onoff = keepalive_;
114 : keepalive_opts.keepalivetime = keepalive_idle_ != -1 ?
115 : keepalive_idle_ * 1000 : 7200000;
116 : keepalive_opts.keepaliveinterval = keepalive_intvl_ != -1 ?
117 : keepalive_intvl_ * 1000 : 1000;
118 : DWORD num_bytes_returned;
119 : int rc = WSAIoctl(s_, SIO_KEEPALIVE_VALS, &keepalive_opts,
120 : sizeof(keepalive_opts), NULL, 0, &num_bytes_returned, NULL, NULL);
121 : wsa_assert (rc != SOCKET_ERROR);
122 : }
123 : #else
124 : #ifdef ZMQ_HAVE_SO_KEEPALIVE
125 6644 : if (keepalive_ != -1) {
126 : int rc = setsockopt (s_, SOL_SOCKET, SO_KEEPALIVE,
127 0 : (char*) &keepalive_, sizeof (int));
128 0 : errno_assert (rc == 0);
129 :
130 : #ifdef ZMQ_HAVE_TCP_KEEPCNT
131 0 : if (keepalive_cnt_ != -1) {
132 : int rc = setsockopt (s_, IPPROTO_TCP, TCP_KEEPCNT,
133 0 : &keepalive_cnt_, sizeof (int));
134 0 : errno_assert (rc == 0);
135 : }
136 : #endif // ZMQ_HAVE_TCP_KEEPCNT
137 :
138 : #ifdef ZMQ_HAVE_TCP_KEEPIDLE
139 0 : if (keepalive_idle_ != -1) {
140 : int rc = setsockopt (s_, IPPROTO_TCP, TCP_KEEPIDLE,
141 0 : &keepalive_idle_, sizeof (int));
142 0 : errno_assert (rc == 0);
143 : }
144 : #else // ZMQ_HAVE_TCP_KEEPIDLE
145 : #ifdef ZMQ_HAVE_TCP_KEEPALIVE
146 : if (keepalive_idle_ != -1) {
147 : int rc = setsockopt (s_, IPPROTO_TCP, TCP_KEEPALIVE,
148 : &keepalive_idle_, sizeof (int));
149 : errno_assert (rc == 0);
150 : }
151 : #endif // ZMQ_HAVE_TCP_KEEPALIVE
152 : #endif // ZMQ_HAVE_TCP_KEEPIDLE
153 :
154 : #ifdef ZMQ_HAVE_TCP_KEEPINTVL
155 0 : if (keepalive_intvl_ != -1) {
156 : int rc = setsockopt (s_, IPPROTO_TCP, TCP_KEEPINTVL,
157 0 : &keepalive_intvl_, sizeof (int));
158 0 : errno_assert (rc == 0);
159 : }
160 : #endif // ZMQ_HAVE_TCP_KEEPINTVL
161 : }
162 : #endif // ZMQ_HAVE_SO_KEEPALIVE
163 : #endif // ZMQ_HAVE_WINDOWS
164 6644 : }
165 :
166 6643 : void zmq::tune_tcp_maxrt (fd_t sockfd_, int timeout_)
167 : {
168 6643 : if (timeout_ <= 0)
169 6643 : return;
170 :
171 : #if defined (ZMQ_HAVE_WINDOWS) && defined (TCP_MAXRT)
172 : // msdn says it's supported in >= Vista, >= Windows Server 2003
173 : timeout_ /= 1000; // in seconds
174 : int rc = setsockopt (sockfd_, IPPROTO_TCP, TCP_MAXRT, (char*) &timeout_,
175 : sizeof(timeout_));
176 : wsa_assert (rc != SOCKET_ERROR);
177 : // FIXME: should be ZMQ_HAVE_TCP_USER_TIMEOUT
178 : #elif defined (TCP_USER_TIMEOUT)
179 : int rc = setsockopt (sockfd_, IPPROTO_TCP, TCP_USER_TIMEOUT, &timeout_,
180 0 : sizeof(timeout_));
181 0 : errno_assert (rc == 0);
182 : #endif
183 : }
184 :
185 21922 : int zmq::tcp_write (fd_t s_, const void *data_, size_t size_)
186 : {
187 : #ifdef ZMQ_HAVE_WINDOWS
188 :
189 : int nbytes = send (s_, (char*) data_, (int) size_, 0);
190 :
191 : // If not a single byte can be written to the socket in non-blocking mode
192 : // we'll get an error (this may happen during the speculative write).
193 : const int last_error = WSAGetLastError();
194 : if (nbytes == SOCKET_ERROR && last_error == WSAEWOULDBLOCK)
195 : return 0;
196 :
197 : // Signalise peer failure.
198 : if (nbytes == SOCKET_ERROR && (
199 : last_error == WSAENETDOWN ||
200 : last_error == WSAENETRESET ||
201 : last_error == WSAEHOSTUNREACH ||
202 : last_error == WSAECONNABORTED ||
203 : last_error == WSAETIMEDOUT ||
204 : last_error == WSAECONNRESET
205 : ))
206 : return -1;
207 :
208 : // Circumvent a Windows bug:
209 : // See https://support.microsoft.com/en-us/kb/201213
210 : // See https://zeromq.jira.com/browse/LIBZMQ-195
211 : if (nbytes == SOCKET_ERROR && last_error == WSAENOBUFS)
212 : return 0;
213 :
214 : wsa_assert (nbytes != SOCKET_ERROR);
215 : return nbytes;
216 :
217 : #else
218 21922 : ssize_t nbytes = send (s_, data_, size_, 0);
219 :
220 : // Several errors are OK. When speculative write is being done we may not
221 : // be able to write a single byte from the socket. Also, SIGSTOP issued
222 : // by a debugging tool can result in EINTR error.
223 21929 : if (nbytes == -1 && (errno == EAGAIN || errno == EWOULDBLOCK ||
224 : errno == EINTR))
225 : return 0;
226 :
227 : // Signalise peer failure.
228 21928 : if (nbytes == -1) {
229 12 : errno_assert (errno != EACCES
230 : && errno != EBADF
231 : && errno != EDESTADDRREQ
232 : && errno != EFAULT
233 : && errno != EISCONN
234 : && errno != EMSGSIZE
235 : && errno != ENOMEM
236 : && errno != ENOTSOCK
237 : && errno != EOPNOTSUPP);
238 : return -1;
239 : }
240 :
241 21916 : return static_cast <int> (nbytes);
242 :
243 : #endif
244 : }
245 :
246 31783 : int zmq::tcp_read (fd_t s_, void *data_, size_t size_)
247 : {
248 : #ifdef ZMQ_HAVE_WINDOWS
249 :
250 : const int rc = recv (s_, (char*) data_, (int) size_, 0);
251 :
252 : // If not a single byte can be read from the socket in non-blocking mode
253 : // we'll get an error (this may happen during the speculative read).
254 : if (rc == SOCKET_ERROR) {
255 : const int last_error = WSAGetLastError();
256 : if (last_error == WSAEWOULDBLOCK) {
257 : errno = EAGAIN;
258 : }
259 : else {
260 : wsa_assert (last_error == WSAENETDOWN ||
261 : last_error == WSAENETRESET ||
262 : last_error == WSAECONNABORTED ||
263 : last_error == WSAETIMEDOUT ||
264 : last_error == WSAECONNRESET ||
265 : last_error == WSAECONNREFUSED ||
266 : last_error == WSAENOTCONN);
267 : errno = wsa_error_to_errno (last_error);
268 : }
269 : }
270 :
271 : return rc == SOCKET_ERROR ? -1 : rc;
272 :
273 : #else
274 :
275 31785 : const ssize_t rc = recv (s_, data_, size_, 0);
276 :
277 : // Several errors are OK. When speculative read is being done we may not
278 : // be able to read a single byte from the socket. Also, SIGSTOP issued
279 : // by a debugging tool can result in EINTR error.
280 31785 : if (rc == -1) {
281 12357 : errno_assert (errno != EBADF
282 : && errno != EFAULT
283 : && errno != ENOMEM
284 : && errno != ENOTSOCK);
285 12357 : if (errno == EWOULDBLOCK || errno == EINTR)
286 11868 : errno = EAGAIN;
287 : }
288 :
289 31785 : return static_cast <int> (rc);
290 :
291 : #endif
292 : }
|