Line data Source code
1 : /*
2 : Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 :
4 : This file is part of libzmq, the ZeroMQ core engine in C++.
5 :
6 : libzmq is free software; you can redistribute it and/or modify it under
7 : the terms of the GNU Lesser General Public License (LGPL) as published
8 : by the Free Software Foundation; either version 3 of the License, or
9 : (at your option) any later version.
10 :
11 : As a special exception, the Contributors give you permission to link
12 : this library with independent modules to produce an executable,
13 : regardless of the license terms of these independent modules, and to
14 : copy and distribute the resulting executable under terms of your choice,
15 : provided that you also meet, for each linked independent module, the
16 : terms and conditions of the license of that module. An independent
17 : module is a module which is not derived from or based on this library.
18 : If you modify this library, you must extend this exception to your
19 : version of the library.
20 :
21 : libzmq is distributed in the hope that it will be useful, but WITHOUT
22 : ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 : FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 : License for more details.
25 :
26 : You should have received a copy of the GNU Lesser General Public License
27 : along with this program. If not, see <http://www.gnu.org/licenses/>.
28 : */
29 :
30 : #include "precompiled.hpp"
31 : #include "mailbox.hpp"
32 : #include "err.hpp"
33 :
34 25190 : zmq::mailbox_t::mailbox_t ()
35 : {
36 : // Get the pipe into passive state. That way, if the users starts by
37 : // polling on the associated file descriptor it will get woken up when
38 : // new command is posted.
39 12595 : const bool ok = cpipe.read (NULL);
40 12595 : zmq_assert (!ok);
41 12595 : active = false;
42 12595 : }
43 :
44 36336 : zmq::mailbox_t::~mailbox_t ()
45 : {
46 : // TODO: Retrieve and deallocate commands inside the cpipe.
47 :
48 : // Work around problem that other threads might still be in our
49 : // send() method, by waiting on the mutex before disappearing.
50 12595 : sync.lock ();
51 12595 : sync.unlock ();
52 23741 : }
53 :
54 1349553 : zmq::fd_t zmq::mailbox_t::get_fd () const
55 : {
56 1349553 : return signaler.get_fd ();
57 : }
58 :
59 133669 : void zmq::mailbox_t::send (const command_t &cmd_)
60 : {
61 133669 : sync.lock ();
62 133933 : cpipe.write (cmd_, false);
63 133923 : const bool ok = cpipe.flush ();
64 133938 : sync.unlock ();
65 133951 : if (!ok)
66 37568 : signaler.send ();
67 133943 : }
68 :
69 4230434 : int zmq::mailbox_t::recv (command_t *cmd_, int timeout_)
70 : {
71 : // Try to get the command straight away.
72 4230434 : if (active) {
73 127289 : if (cpipe.read (cmd_))
74 : return 0;
75 :
76 : // If there are no more commands available, switch into passive state.
77 31160 : active = false;
78 : }
79 :
80 : // Wait for signal from the command sender.
81 4134305 : int rc = signaler.wait (timeout_);
82 4547736 : if (rc == -1) {
83 4516159 : errno_assert (errno == EAGAIN || errno == EINTR);
84 : return -1;
85 : }
86 :
87 : // Receive the signal.
88 31577 : rc = signaler.recv_failable ();
89 31577 : if (rc == -1) {
90 0 : errno_assert (errno == EAGAIN);
91 : return -1;
92 : }
93 :
94 : // Switch into active state.
95 31577 : active = true;
96 :
97 : // Get a command.
98 31577 : const bool ok = cpipe.read (cmd_);
99 31578 : zmq_assert (ok);
100 : return 0;
101 : }
|