libzmq  master
ZeroMQ C++ Core Engine (LIBZMQ)
inproc_lat.cpp
Go to the documentation of this file.
1 /*
2  Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 
4  This file is part of libzmq, the ZeroMQ core engine in C++.
5 
6  libzmq is free software; you can redistribute it and/or modify it under
7  the terms of the GNU Lesser General Public License (LGPL) as published
8  by the Free Software Foundation; either version 3 of the License, or
9  (at your option) any later version.
10 
11  As a special exception, the Contributors give you permission to link
12  this library with independent modules to produce an executable,
13  regardless of the license terms of these independent modules, and to
14  copy and distribute the resulting executable under terms of your choice,
15  provided that you also meet, for each linked independent module, the
16  terms and conditions of the license of that module. An independent
17  module is a module which is not derived from or based on this library.
18  If you modify this library, you must extend this exception to your
19  version of the library.
20 
21  libzmq is distributed in the hope that it will be useful, but WITHOUT
22  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23  FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24  License for more details.
25 
26  You should have received a copy of the GNU Lesser General Public License
27  along with this program. If not, see <http://www.gnu.org/licenses/>.
28 */
29 
30 #include "../include/zmq.h"
31 
32 #include <stdio.h>
33 #include <stdlib.h>
34 #include <string.h>
35 
36 #include "platform.hpp"
37 
38 #if defined ZMQ_HAVE_WINDOWS
39 #include <windows.h>
40 #include <process.h>
41 #else
42 #include <pthread.h>
43 #endif
44 
45 static size_t message_size;
46 static int roundtrip_count;
47 
48 #if defined ZMQ_HAVE_WINDOWS
49 static unsigned int __stdcall worker (void *ctx_)
50 #else
51 static void *worker (void *ctx_)
52 #endif
53 {
54  void *s;
55  int rc;
56  int i;
57  zmq_msg_t msg;
58 
59  s = zmq_socket (ctx_, ZMQ_REP);
60  if (!s) {
61  printf ("error in zmq_socket: %s\n", zmq_strerror (errno));
62  exit (1);
63  }
64 
65  rc = zmq_connect (s, "inproc://lat_test");
66  if (rc != 0) {
67  printf ("error in zmq_connect: %s\n", zmq_strerror (errno));
68  exit (1);
69  }
70 
71  rc = zmq_msg_init (&msg);
72  if (rc != 0) {
73  printf ("error in zmq_msg_init: %s\n", zmq_strerror (errno));
74  exit (1);
75  }
76 
77  for (i = 0; i != roundtrip_count; i++) {
78  rc = zmq_recvmsg (s, &msg, 0);
79  if (rc < 0) {
80  printf ("error in zmq_recvmsg: %s\n", zmq_strerror (errno));
81  exit (1);
82  }
83  rc = zmq_sendmsg (s, &msg, 0);
84  if (rc < 0) {
85  printf ("error in zmq_sendmsg: %s\n", zmq_strerror (errno));
86  exit (1);
87  }
88  }
89 
90  rc = zmq_msg_close (&msg);
91  if (rc != 0) {
92  printf ("error in zmq_msg_close: %s\n", zmq_strerror (errno));
93  exit (1);
94  }
95 
96  rc = zmq_close (s);
97  if (rc != 0) {
98  printf ("error in zmq_close: %s\n", zmq_strerror (errno));
99  exit (1);
100  }
101 
102 #if defined ZMQ_HAVE_WINDOWS
103  return 0;
104 #else
105  return NULL;
106 #endif
107 }
108 
109 int main (int argc, char *argv [])
110 {
111 #if defined ZMQ_HAVE_WINDOWS
112  HANDLE local_thread;
113 #else
114  pthread_t local_thread;
115 #endif
116  void *ctx;
117  void *s;
118  int rc;
119  int i;
120  zmq_msg_t msg;
121  void *watch;
122  unsigned long elapsed;
123  double latency;
124 
125  if (argc != 3) {
126  printf ("usage: inproc_lat <message-size> <roundtrip-count>\n");
127  return 1;
128  }
129 
130  message_size = atoi (argv [1]);
131  roundtrip_count = atoi (argv [2]);
132 
133  ctx = zmq_init (1);
134  if (!ctx) {
135  printf ("error in zmq_init: %s\n", zmq_strerror (errno));
136  return -1;
137  }
138 
139  s = zmq_socket (ctx, ZMQ_REQ);
140  if (!s) {
141  printf ("error in zmq_socket: %s\n", zmq_strerror (errno));
142  return -1;
143  }
144 
145  rc = zmq_bind (s, "inproc://lat_test");
146  if (rc != 0) {
147  printf ("error in zmq_bind: %s\n", zmq_strerror (errno));
148  return -1;
149  }
150 
151 #if defined ZMQ_HAVE_WINDOWS
152  local_thread = (HANDLE) _beginthreadex (NULL, 0,
153  worker, ctx, 0 , NULL);
154  if (local_thread == 0) {
155  printf ("error in _beginthreadex\n");
156  return -1;
157  }
158 #else
159  rc = pthread_create (&local_thread, NULL, worker, ctx);
160  if (rc != 0) {
161  printf ("error in pthread_create: %s\n", zmq_strerror (rc));
162  return -1;
163  }
164 #endif
165 
166  rc = zmq_msg_init_size (&msg, message_size);
167  if (rc != 0) {
168  printf ("error in zmq_msg_init_size: %s\n", zmq_strerror (errno));
169  return -1;
170  }
171  memset (zmq_msg_data (&msg), 0, message_size);
172 
173  printf ("message size: %d [B]\n", (int) message_size);
174  printf ("roundtrip count: %d\n", (int) roundtrip_count);
175 
176  watch = zmq_stopwatch_start ();
177 
178  for (i = 0; i != roundtrip_count; i++) {
179  rc = zmq_sendmsg (s, &msg, 0);
180  if (rc < 0) {
181  printf ("error in zmq_sendmsg: %s\n", zmq_strerror (errno));
182  return -1;
183  }
184  rc = zmq_recvmsg (s, &msg, 0);
185  if (rc < 0) {
186  printf ("error in zmq_recvmsg: %s\n", zmq_strerror (errno));
187  return -1;
188  }
189  if (zmq_msg_size (&msg) != message_size) {
190  printf ("message of incorrect size received\n");
191  return -1;
192  }
193  }
194 
195  elapsed = zmq_stopwatch_stop (watch);
196 
197  rc = zmq_msg_close (&msg);
198  if (rc != 0) {
199  printf ("error in zmq_msg_close: %s\n", zmq_strerror (errno));
200  return -1;
201  }
202 
203  latency = (double) elapsed / (roundtrip_count * 2);
204 
205 #if defined ZMQ_HAVE_WINDOWS
206  DWORD rc2 = WaitForSingleObject (local_thread, INFINITE);
207  if (rc2 == WAIT_FAILED) {
208  printf ("error in WaitForSingleObject\n");
209  return -1;
210  }
211  BOOL rc3 = CloseHandle (local_thread);
212  if (rc3 == 0) {
213  printf ("error in CloseHandle\n");
214  return -1;
215  }
216 #else
217  rc = pthread_join (local_thread, NULL);
218  if (rc != 0) {
219  printf ("error in pthread_join: %s\n", zmq_strerror (rc));
220  return -1;
221  }
222 #endif
223 
224  printf ("average latency: %.3f [us]\n", (double) latency);
225 
226  rc = zmq_close (s);
227  if (rc != 0) {
228  printf ("error in zmq_close: %s\n", zmq_strerror (errno));
229  return -1;
230  }
231 
232  rc = zmq_ctx_term (ctx);
233  if (rc != 0) {
234  printf ("error in zmq_ctx_term: %s\n", zmq_strerror (errno));
235  return -1;
236  }
237 
238  return 0;
239 }
240 
static void * worker(void *ctx_)
Definition: inproc_lat.cpp:51
static size_t message_size
Definition: inproc_lat.cpp:45
#define ZMQ_REP
Definition: zmq.h:250
ZMQ_EXPORT void * zmq_init(int io_threads)
Definition: zmq.cpp:220
ZMQ_EXPORT int zmq_recvmsg(void *s, zmq_msg_t *msg, int flags)
Definition: zmq.cpp:501
ZMQ_EXPORT unsigned long zmq_stopwatch_stop(void *watch_)
Definition: zmq_utils.cpp:70
ZMQ_EXPORT void * zmq_msg_data(zmq_msg_t *msg)
Definition: zmq.cpp:666
ZMQ_EXPORT void * zmq_socket(void *, int type)
Definition: zmq.cpp:244
#define ZMQ_REQ
Definition: zmq.h:249
ZMQ_EXPORT int zmq_connect(void *s, const char *addr)
Definition: zmq.cpp:332
ZMQ_EXPORT int zmq_close(void *s)
Definition: zmq.cpp:255
ZMQ_EXPORT int zmq_sendmsg(void *s, zmq_msg_t *msg, int flags)
Definition: zmq.cpp:382
ZMQ_EXPORT const char * zmq_strerror(int errnum)
Definition: zmq.cpp:102
Definition: zmq.h:221
ZMQ_EXPORT int zmq_bind(void *s, const char *addr)
Definition: zmq.cpp:321
ZMQ_EXPORT int zmq_ctx_term(void *context)
Definition: zmq.cpp:162
ZMQ_EXPORT void * zmq_stopwatch_start(void)
Definition: zmq_utils.cpp:62
ZMQ_EXPORT int zmq_msg_close(zmq_msg_t *msg)
Definition: zmq.cpp:651
ZMQ_EXPORT int zmq_msg_init_size(zmq_msg_t *msg, size_t size)
Definition: zmq.cpp:618
int main(int argc, char *argv[])
Definition: inproc_lat.cpp:109
ZMQ_EXPORT size_t zmq_msg_size(zmq_msg_t *msg)
Definition: zmq.cpp:671
ZMQ_EXPORT int zmq_msg_init(zmq_msg_t *msg)
Definition: zmq.cpp:613
static int roundtrip_count
Definition: inproc_lat.cpp:46