libzmq  master
ZeroMQ C++ Core Engine (LIBZMQ)
inproc_thr.cpp
Go to the documentation of this file.
1 /*
2  Copyright (c) 2007-2012 iMatix Corporation
3  Copyright (c) 2009-2011 250bpm s.r.o.
4  Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
5 
6  This file is part of libzmq, the ZeroMQ core engine in C++.
7 
8  libzmq is free software; you can redistribute it and/or modify it under
9  the terms of the GNU Lesser General Public License (LGPL) as published
10  by the Free Software Foundation; either version 3 of the License, or
11  (at your option) any later version.
12 
13  As a special exception, the Contributors give you permission to link
14  this library with independent modules to produce an executable,
15  regardless of the license terms of these independent modules, and to
16  copy and distribute the resulting executable under terms of your choice,
17  provided that you also meet, for each linked independent module, the
18  terms and conditions of the license of that module. An independent
19  module is a module which is not derived from or based on this library.
20  If you modify this library, you must extend this exception to your
21  version of the library.
22 
23  libzmq is distributed in the hope that it will be useful, but WITHOUT
24  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
25  FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
26  License for more details.
27 
28  You should have received a copy of the GNU Lesser General Public License
29  along with this program. If not, see <http://www.gnu.org/licenses/>.
30 */
31 
32 #include "../include/zmq.h"
33 
34 #include <stdio.h>
35 #include <stdlib.h>
36 #include <string.h>
37 
38 #include "platform.hpp"
39 
40 #if defined ZMQ_HAVE_WINDOWS
41 #include <windows.h>
42 #include <process.h>
43 #else
44 #include <pthread.h>
45 #endif
46 
47 static int message_count;
48 static size_t message_size;
49 
50 #if defined ZMQ_HAVE_WINDOWS
51 static unsigned int __stdcall worker (void *ctx_)
52 #else
53 static void *worker (void *ctx_)
54 #endif
55 {
56  void *s;
57  int rc;
58  int i;
59  zmq_msg_t msg;
60 
61  s = zmq_socket (ctx_, ZMQ_PUSH);
62  if (!s) {
63  printf ("error in zmq_socket: %s\n", zmq_strerror (errno));
64  exit (1);
65  }
66 
67  rc = zmq_connect (s, "inproc://thr_test");
68  if (rc != 0) {
69  printf ("error in zmq_connect: %s\n", zmq_strerror (errno));
70  exit (1);
71  }
72 
73  for (i = 0; i != message_count; i++) {
74 
75  rc = zmq_msg_init_size (&msg, message_size);
76  if (rc != 0) {
77  printf ("error in zmq_msg_init_size: %s\n", zmq_strerror (errno));
78  exit (1);
79  }
80 #if defined ZMQ_MAKE_VALGRIND_HAPPY
81  memset (zmq_msg_data (&msg), 0, message_size);
82 #endif
83 
84  rc = zmq_sendmsg (s, &msg, 0);
85  if (rc < 0) {
86  printf ("error in zmq_sendmsg: %s\n", zmq_strerror (errno));
87  exit (1);
88  }
89  rc = zmq_msg_close (&msg);
90  if (rc != 0) {
91  printf ("error in zmq_msg_close: %s\n", zmq_strerror (errno));
92  exit (1);
93  }
94  }
95 
96  rc = zmq_close (s);
97  if (rc != 0) {
98  printf ("error in zmq_close: %s\n", zmq_strerror (errno));
99  exit (1);
100  }
101 
102 #if defined ZMQ_HAVE_WINDOWS
103  return 0;
104 #else
105  return NULL;
106 #endif
107 }
108 
109 int main (int argc, char *argv [])
110 {
111 #if defined ZMQ_HAVE_WINDOWS
112  HANDLE local_thread;
113 #else
114  pthread_t local_thread;
115 #endif
116  void *ctx;
117  void *s;
118  int rc;
119  int i;
120  zmq_msg_t msg;
121  void *watch;
122  unsigned long elapsed;
123  unsigned long throughput;
124  double megabits;
125 
126  if (argc != 3) {
127  printf ("usage: inproc_thr <message-size> <message-count>\n");
128  return 1;
129  }
130 
131  message_size = atoi (argv [1]);
132  message_count = atoi (argv [2]);
133 
134  ctx = zmq_init (1);
135  if (!ctx) {
136  printf ("error in zmq_init: %s\n", zmq_strerror (errno));
137  return -1;
138  }
139 
140  s = zmq_socket (ctx, ZMQ_PULL);
141  if (!s) {
142  printf ("error in zmq_socket: %s\n", zmq_strerror (errno));
143  return -1;
144  }
145 
146  rc = zmq_bind (s, "inproc://thr_test");
147  if (rc != 0) {
148  printf ("error in zmq_bind: %s\n", zmq_strerror (errno));
149  return -1;
150  }
151 
152 #if defined ZMQ_HAVE_WINDOWS
153  local_thread = (HANDLE) _beginthreadex (NULL, 0,
154  worker, ctx, 0 , NULL);
155  if (local_thread == 0) {
156  printf ("error in _beginthreadex\n");
157  return -1;
158  }
159 #else
160  rc = pthread_create (&local_thread, NULL, worker, ctx);
161  if (rc != 0) {
162  printf ("error in pthread_create: %s\n", zmq_strerror (rc));
163  return -1;
164  }
165 #endif
166 
167  rc = zmq_msg_init (&msg);
168  if (rc != 0) {
169  printf ("error in zmq_msg_init: %s\n", zmq_strerror (errno));
170  return -1;
171  }
172 
173  printf ("message size: %d [B]\n", (int) message_size);
174  printf ("message count: %d\n", (int) message_count);
175 
176  rc = zmq_recvmsg (s, &msg, 0);
177  if (rc < 0) {
178  printf ("error in zmq_recvmsg: %s\n", zmq_strerror (errno));
179  return -1;
180  }
181  if (zmq_msg_size (&msg) != message_size) {
182  printf ("message of incorrect size received\n");
183  return -1;
184  }
185 
186  watch = zmq_stopwatch_start ();
187 
188  for (i = 0; i != message_count - 1; i++) {
189  rc = zmq_recvmsg (s, &msg, 0);
190  if (rc < 0) {
191  printf ("error in zmq_recvmsg: %s\n", zmq_strerror (errno));
192  return -1;
193  }
194  if (zmq_msg_size (&msg) != message_size) {
195  printf ("message of incorrect size received\n");
196  return -1;
197  }
198  }
199 
200  elapsed = zmq_stopwatch_stop (watch);
201  if (elapsed == 0)
202  elapsed = 1;
203 
204  rc = zmq_msg_close (&msg);
205  if (rc != 0) {
206  printf ("error in zmq_msg_close: %s\n", zmq_strerror (errno));
207  return -1;
208  }
209 
210 #if defined ZMQ_HAVE_WINDOWS
211  DWORD rc2 = WaitForSingleObject (local_thread, INFINITE);
212  if (rc2 == WAIT_FAILED) {
213  printf ("error in WaitForSingleObject\n");
214  return -1;
215  }
216  BOOL rc3 = CloseHandle (local_thread);
217  if (rc3 == 0) {
218  printf ("error in CloseHandle\n");
219  return -1;
220  }
221 #else
222  rc = pthread_join (local_thread, NULL);
223  if (rc != 0) {
224  printf ("error in pthread_join: %s\n", zmq_strerror (rc));
225  return -1;
226  }
227 #endif
228 
229  rc = zmq_close (s);
230  if (rc != 0) {
231  printf ("error in zmq_close: %s\n", zmq_strerror (errno));
232  return -1;
233  }
234 
235  rc = zmq_ctx_term (ctx);
236  if (rc != 0) {
237  printf ("error in zmq_ctx_term: %s\n", zmq_strerror (errno));
238  return -1;
239  }
240 
241  throughput = (unsigned long)
242  ((double) message_count / (double) elapsed * 1000000);
243  megabits = (double) (throughput * message_size * 8) / 1000000;
244 
245  printf ("mean throughput: %d [msg/s]\n", (int) throughput);
246  printf ("mean throughput: %.3f [Mb/s]\n", (double) megabits);
247 
248  return 0;
249 }
250 
static void * worker(void *ctx_)
Definition: inproc_thr.cpp:53
ZMQ_EXPORT void * zmq_init(int io_threads)
Definition: zmq.cpp:220
ZMQ_EXPORT int zmq_recvmsg(void *s, zmq_msg_t *msg, int flags)
Definition: zmq.cpp:501
ZMQ_EXPORT unsigned long zmq_stopwatch_stop(void *watch_)
Definition: zmq_utils.cpp:70
ZMQ_EXPORT void * zmq_msg_data(zmq_msg_t *msg)
Definition: zmq.cpp:666
ZMQ_EXPORT void * zmq_socket(void *, int type)
Definition: zmq.cpp:244
#define ZMQ_PUSH
Definition: zmq.h:254
int main(int argc, char *argv[])
Definition: inproc_thr.cpp:109
ZMQ_EXPORT int zmq_connect(void *s, const char *addr)
Definition: zmq.cpp:332
ZMQ_EXPORT int zmq_close(void *s)
Definition: zmq.cpp:255
ZMQ_EXPORT int zmq_sendmsg(void *s, zmq_msg_t *msg, int flags)
Definition: zmq.cpp:382
ZMQ_EXPORT const char * zmq_strerror(int errnum)
Definition: zmq.cpp:102
Definition: zmq.h:221
ZMQ_EXPORT int zmq_bind(void *s, const char *addr)
Definition: zmq.cpp:321
ZMQ_EXPORT int zmq_ctx_term(void *context)
Definition: zmq.cpp:162
ZMQ_EXPORT void * zmq_stopwatch_start(void)
Definition: zmq_utils.cpp:62
ZMQ_EXPORT int zmq_msg_close(zmq_msg_t *msg)
Definition: zmq.cpp:651
ZMQ_EXPORT int zmq_msg_init_size(zmq_msg_t *msg, size_t size)
Definition: zmq.cpp:618
ZMQ_EXPORT size_t zmq_msg_size(zmq_msg_t *msg)
Definition: zmq.cpp:671
ZMQ_EXPORT int zmq_msg_init(zmq_msg_t *msg)
Definition: zmq.cpp:613
#define ZMQ_PULL
Definition: zmq.h:253
static size_t message_size
Definition: inproc_thr.cpp:48
static int message_count
Definition: inproc_thr.cpp:47