libzmq  master
ZeroMQ C++ Core Engine (LIBZMQ)
tcp.cpp
Go to the documentation of this file.
1 /*
2  Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 
4  This file is part of libzmq, the ZeroMQ core engine in C++.
5 
6  libzmq is free software; you can redistribute it and/or modify it under
7  the terms of the GNU Lesser General Public License (LGPL) as published
8  by the Free Software Foundation; either version 3 of the License, or
9  (at your option) any later version.
10 
11  As a special exception, the Contributors give you permission to link
12  this library with independent modules to produce an executable,
13  regardless of the license terms of these independent modules, and to
14  copy and distribute the resulting executable under terms of your choice,
15  provided that you also meet, for each linked independent module, the
16  terms and conditions of the license of that module. An independent
17  module is a module which is not derived from or based on this library.
18  If you modify this library, you must extend this exception to your
19  version of the library.
20 
21  libzmq is distributed in the hope that it will be useful, but WITHOUT
22  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23  FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24  License for more details.
25 
26  You should have received a copy of the GNU Lesser General Public License
27  along with this program. If not, see <http://www.gnu.org/licenses/>.
28 */
29 
30 #include "precompiled.hpp"
31 #include "macros.hpp"
32 #include "ip.hpp"
33 #include "tcp.hpp"
34 #include "err.hpp"
35 #include "platform.hpp"
36 
37 #if defined ZMQ_HAVE_WINDOWS
38 #include "windows.hpp"
39 #else
40 #include <fcntl.h>
41 #include <sys/types.h>
42 #include <sys/socket.h>
43 #include <netinet/in.h>
44 #include <netinet/tcp.h>
45 #endif
46 
47 #if defined ZMQ_HAVE_OPENVMS
48 #include <ioctl.h>
49 #endif
50 
52 {
53  // Disable Nagle's algorithm. We are doing data batching on 0MQ level,
54  // so using Nagle wouldn't improve throughput in anyway, but it would
55  // hurt latency.
56  int nodelay = 1;
57  int rc = setsockopt (s_, IPPROTO_TCP, TCP_NODELAY, (char*) &nodelay,
58  sizeof (int));
59 #ifdef ZMQ_HAVE_WINDOWS
60  wsa_assert (rc != SOCKET_ERROR);
61 #else
62  errno_assert (rc == 0);
63 #endif
64 
65 #ifdef ZMQ_HAVE_OPENVMS
66  // Disable delayed acknowledgements as they hurt latency significantly.
67  int nodelack = 1;
68  rc = setsockopt (s_, IPPROTO_TCP, TCP_NODELACK, (char*) &nodelack,
69  sizeof (int));
70  errno_assert (rc != SOCKET_ERROR);
71 #endif
72 }
73 
74 void zmq::set_tcp_send_buffer (fd_t sockfd_, int bufsize_)
75 {
76  const int rc = setsockopt (sockfd_, SOL_SOCKET, SO_SNDBUF,
77  (char*) &bufsize_, sizeof bufsize_);
78 #ifdef ZMQ_HAVE_WINDOWS
79  wsa_assert (rc != SOCKET_ERROR);
80 #else
81  errno_assert (rc == 0);
82 #endif
83 }
84 
85 void zmq::set_tcp_receive_buffer (fd_t sockfd_, int bufsize_)
86 {
87  const int rc = setsockopt (sockfd_, SOL_SOCKET, SO_RCVBUF,
88  (char *) &bufsize_, sizeof bufsize_);
89 #ifdef ZMQ_HAVE_WINDOWS
90  wsa_assert (rc != SOCKET_ERROR);
91 #else
92  errno_assert (rc == 0);
93 #endif
94 }
95 
96 void zmq::tune_tcp_keepalives (fd_t s_, int keepalive_, int keepalive_cnt_,
97  int keepalive_idle_, int keepalive_intvl_)
98 {
99  // These options are used only under certain #ifdefs below.
100  LIBZMQ_UNUSED (keepalive_);
101  LIBZMQ_UNUSED (keepalive_cnt_);
102  LIBZMQ_UNUSED (keepalive_idle_);
103  LIBZMQ_UNUSED (keepalive_intvl_);
104 
105  // If none of the #ifdefs apply, then s_ is unused.
106  LIBZMQ_UNUSED (s_);
107 
108  // Tuning TCP keep-alives if platform allows it
109  // All values = -1 means skip and leave it for OS
110 #ifdef ZMQ_HAVE_WINDOWS
111  if (keepalive_ != -1) {
112  tcp_keepalive keepalive_opts;
113  keepalive_opts.onoff = keepalive_;
114  keepalive_opts.keepalivetime = keepalive_idle_ != -1 ?
115  keepalive_idle_ * 1000 : 7200000;
116  keepalive_opts.keepaliveinterval = keepalive_intvl_ != -1 ?
117  keepalive_intvl_ * 1000 : 1000;
118  DWORD num_bytes_returned;
119  int rc = WSAIoctl(s_, SIO_KEEPALIVE_VALS, &keepalive_opts,
120  sizeof(keepalive_opts), NULL, 0, &num_bytes_returned, NULL, NULL);
121  wsa_assert (rc != SOCKET_ERROR);
122  }
123 #else
124 #ifdef ZMQ_HAVE_SO_KEEPALIVE
125  if (keepalive_ != -1) {
126  int rc = setsockopt (s_, SOL_SOCKET, SO_KEEPALIVE,
127  (char*) &keepalive_, sizeof (int));
128  errno_assert (rc == 0);
129 
130 #ifdef ZMQ_HAVE_TCP_KEEPCNT
131  if (keepalive_cnt_ != -1) {
132  int rc = setsockopt (s_, IPPROTO_TCP, TCP_KEEPCNT,
133  &keepalive_cnt_, sizeof (int));
134  errno_assert (rc == 0);
135  }
136 #endif // ZMQ_HAVE_TCP_KEEPCNT
137 
138 #ifdef ZMQ_HAVE_TCP_KEEPIDLE
139  if (keepalive_idle_ != -1) {
140  int rc = setsockopt (s_, IPPROTO_TCP, TCP_KEEPIDLE,
141  &keepalive_idle_, sizeof (int));
142  errno_assert (rc == 0);
143  }
144 #else // ZMQ_HAVE_TCP_KEEPIDLE
145 #ifdef ZMQ_HAVE_TCP_KEEPALIVE
146  if (keepalive_idle_ != -1) {
147  int rc = setsockopt (s_, IPPROTO_TCP, TCP_KEEPALIVE,
148  &keepalive_idle_, sizeof (int));
149  errno_assert (rc == 0);
150  }
151 #endif // ZMQ_HAVE_TCP_KEEPALIVE
152 #endif // ZMQ_HAVE_TCP_KEEPIDLE
153 
154 #ifdef ZMQ_HAVE_TCP_KEEPINTVL
155  if (keepalive_intvl_ != -1) {
156  int rc = setsockopt (s_, IPPROTO_TCP, TCP_KEEPINTVL,
157  &keepalive_intvl_, sizeof (int));
158  errno_assert (rc == 0);
159  }
160 #endif // ZMQ_HAVE_TCP_KEEPINTVL
161  }
162 #endif // ZMQ_HAVE_SO_KEEPALIVE
163 #endif // ZMQ_HAVE_WINDOWS
164 }
165 
166 void zmq::tune_tcp_maxrt (fd_t sockfd_, int timeout_)
167 {
168  if (timeout_ <= 0)
169  return;
170 
171  LIBZMQ_UNUSED (sockfd_);
172 
173 #if defined (ZMQ_HAVE_WINDOWS) && defined (TCP_MAXRT)
174  // msdn says it's supported in >= Vista, >= Windows Server 2003
175  timeout_ /= 1000; // in seconds
176  int rc = setsockopt (sockfd_, IPPROTO_TCP, TCP_MAXRT, (char*) &timeout_,
177  sizeof(timeout_));
178  wsa_assert (rc != SOCKET_ERROR);
179 // FIXME: should be ZMQ_HAVE_TCP_USER_TIMEOUT
180 #elif defined (TCP_USER_TIMEOUT)
181  int rc = setsockopt (sockfd_, IPPROTO_TCP, TCP_USER_TIMEOUT, &timeout_,
182  sizeof(timeout_));
183  errno_assert (rc == 0);
184 #endif
185 }
186 
187  int zmq::tcp_write (fd_t s_, const void *data_, size_t size_)
188 {
189 #ifdef ZMQ_HAVE_WINDOWS
190 
191  int nbytes = send (s_, (char*) data_, (int) size_, 0);
192 
193  // If not a single byte can be written to the socket in non-blocking mode
194  // we'll get an error (this may happen during the speculative write).
195  const int last_error = WSAGetLastError();
196  if (nbytes == SOCKET_ERROR && last_error == WSAEWOULDBLOCK)
197  return 0;
198 
199  // Signalise peer failure.
200  if (nbytes == SOCKET_ERROR && (
201  last_error == WSAENETDOWN ||
202  last_error == WSAENETRESET ||
203  last_error == WSAEHOSTUNREACH ||
204  last_error == WSAECONNABORTED ||
205  last_error == WSAETIMEDOUT ||
206  last_error == WSAECONNRESET
207  ))
208  return -1;
209 
210  // Circumvent a Windows bug:
211  // See https://support.microsoft.com/en-us/kb/201213
212  // See https://zeromq.jira.com/browse/LIBZMQ-195
213  if (nbytes == SOCKET_ERROR && last_error == WSAENOBUFS)
214  return 0;
215 
216  wsa_assert (nbytes != SOCKET_ERROR);
217  return nbytes;
218 
219 #else
220  ssize_t nbytes = send (s_, data_, size_, 0);
221 
222  // Several errors are OK. When speculative write is being done we may not
223  // be able to write a single byte from the socket. Also, SIGSTOP issued
224  // by a debugging tool can result in EINTR error.
225  if (nbytes == -1 && (errno == EAGAIN || errno == EWOULDBLOCK ||
226  errno == EINTR))
227  return 0;
228 
229  // Signalise peer failure.
230  if (nbytes == -1) {
231  errno_assert (errno != EACCES
232  && errno != EBADF
233  && errno != EDESTADDRREQ
234  && errno != EFAULT
235  && errno != EISCONN
236  && errno != EMSGSIZE
237  && errno != ENOMEM
238  && errno != ENOTSOCK
239  && errno != EOPNOTSUPP);
240  return -1;
241  }
242 
243  return static_cast <int> (nbytes);
244 
245 #endif
246 }
247 
248 int zmq::tcp_read (fd_t s_, void *data_, size_t size_)
249 {
250 #ifdef ZMQ_HAVE_WINDOWS
251 
252  const int rc = recv (s_, (char*) data_, (int) size_, 0);
253 
254  // If not a single byte can be read from the socket in non-blocking mode
255  // we'll get an error (this may happen during the speculative read).
256  if (rc == SOCKET_ERROR) {
257  const int last_error = WSAGetLastError();
258  if (last_error == WSAEWOULDBLOCK) {
259  errno = EAGAIN;
260  }
261  else {
262  wsa_assert (last_error == WSAENETDOWN ||
263  last_error == WSAENETRESET ||
264  last_error == WSAECONNABORTED ||
265  last_error == WSAETIMEDOUT ||
266  last_error == WSAECONNRESET ||
267  last_error == WSAECONNREFUSED ||
268  last_error == WSAENOTCONN);
269  errno = wsa_error_to_errno (last_error);
270  }
271  }
272 
273  return rc == SOCKET_ERROR ? -1 : rc;
274 
275 #else
276 
277  const ssize_t rc = recv (s_, data_, size_, 0);
278 
279  // Several errors are OK. When speculative read is being done we may not
280  // be able to read a single byte from the socket. Also, SIGSTOP issued
281  // by a debugging tool can result in EINTR error.
282  if (rc == -1) {
283  errno_assert (errno != EBADF
284  && errno != EFAULT
285  && errno != ENOMEM
286  && errno != ENOTSOCK);
287  if (errno == EWOULDBLOCK || errno == EINTR)
288  errno = EAGAIN;
289  }
290 
291  return static_cast <int> (rc);
292 
293 #endif
294 }
void tune_tcp_keepalives(fd_t s_, int keepalive_, int keepalive_cnt_, int keepalive_idle_, int keepalive_intvl_)
Definition: tcp.cpp:96
int fd_t
Definition: fd.hpp:50
#define ENOTSOCK
Definition: zmq.h:136
void set_tcp_receive_buffer(fd_t sockfd_, int bufsize_)
Definition: tcp.cpp:85
#define LIBZMQ_UNUSED(object)
Definition: macros.hpp:6
void tune_tcp_maxrt(fd_t sockfd_, int timeout_)
Definition: tcp.cpp:166
void tune_tcp_socket(fd_t s_)
Definition: tcp.cpp:51
#define EMSGSIZE
Definition: zmq.h:139
void set_tcp_send_buffer(fd_t sockfd_, int bufsize_)
Definition: tcp.cpp:74
int tcp_write(fd_t s_, const void *data_, size_t size_)
Definition: tcp.cpp:187
#define errno_assert(x)
Definition: err.hpp:129
int tcp_read(fd_t s_, void *data_, size_t size_)
Definition: tcp.cpp:248