Line data Source code
1 : /*
2 : Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 :
4 : This file is part of libzmq, the ZeroMQ core engine in C++.
5 :
6 : libzmq is free software; you can redistribute it and/or modify it under
7 : the terms of the GNU Lesser General Public License (LGPL) as published
8 : by the Free Software Foundation; either version 3 of the License, or
9 : (at your option) any later version.
10 :
11 : As a special exception, the Contributors give you permission to link
12 : this library with independent modules to produce an executable,
13 : regardless of the license terms of these independent modules, and to
14 : copy and distribute the resulting executable under terms of your choice,
15 : provided that you also meet, for each linked independent module, the
16 : terms and conditions of the license of that module. An independent
17 : module is a module which is not derived from or based on this library.
18 : If you modify this library, you must extend this exception to your
19 : version of the library.
20 :
21 : libzmq is distributed in the hope that it will be useful, but WITHOUT
22 : ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 : FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 : License for more details.
25 :
26 : You should have received a copy of the GNU Lesser General Public License
27 : along with this program. If not, see <http://www.gnu.org/licenses/>.
28 : */
29 :
30 : #include "precompiled.hpp"
31 : #include "mailbox_safe.hpp"
32 : #include "clock.hpp"
33 : #include "err.hpp"
34 :
35 51 : zmq::mailbox_safe_t::mailbox_safe_t (mutex_t* sync_) :
36 102 : sync (sync_)
37 : {
38 : // Get the pipe into passive state. That way, if the users starts by
39 : // polling on the associated file descriptor it will get woken up when
40 : // new command is posted.
41 51 : const bool ok = cpipe.read (NULL);
42 51 : zmq_assert (!ok);
43 51 : }
44 :
45 255 : zmq::mailbox_safe_t::~mailbox_safe_t ()
46 : {
47 : // TODO: Retrieve and deallocate commands inside the cpipe.
48 :
49 : // Work around problem that other threads might still be in our
50 : // send() method, by waiting on the mutex before disappearing.
51 51 : sync->lock ();
52 51 : sync->unlock ();
53 102 : }
54 :
55 54 : void zmq::mailbox_safe_t::add_signaler (signaler_t* signaler)
56 : {
57 54 : signalers.push_back(signaler);
58 54 : }
59 :
60 0 : void zmq::mailbox_safe_t::remove_signaler (signaler_t* signaler)
61 : {
62 0 : std::vector<signaler_t*>::iterator it = signalers.begin();
63 :
64 : // TODO: make a copy of array and signal outside the lock
65 0 : for (; it != signalers.end(); ++it){
66 0 : if (*it == signaler)
67 : break;
68 : }
69 :
70 0 : if (it != signalers.end())
71 0 : signalers.erase(it);
72 0 : }
73 :
74 51 : void zmq::mailbox_safe_t::clear_signalers ()
75 : {
76 51 : signalers.clear ();
77 51 : }
78 :
79 1571 : void zmq::mailbox_safe_t::send (const command_t &cmd_)
80 : {
81 1571 : sync->lock ();
82 1571 : cpipe.write (cmd_, false);
83 1571 : const bool ok = cpipe.flush ();
84 :
85 1571 : if (!ok) {
86 1485 : cond_var.broadcast ();
87 7671 : for (std::vector<signaler_t*>::iterator it = signalers.begin(); it != signalers.end(); ++it){
88 82 : (*it)->send();
89 : }
90 : }
91 :
92 1571 : sync->unlock ();
93 1571 : }
94 :
95 9598 : int zmq::mailbox_safe_t::recv (command_t *cmd_, int timeout_)
96 : {
97 : // Try to get the command straight away.
98 9598 : if (cpipe.read (cmd_))
99 : return 0;
100 :
101 : // Wait for signal from the command sender.
102 8984 : int rc = cond_var.wait (sync, timeout_);
103 8984 : if (rc == -1) {
104 7507 : errno_assert (errno == EAGAIN || errno == EINTR);
105 : return -1;
106 : }
107 :
108 : // Another thread may already fetch the command
109 1477 : const bool ok = cpipe.read (cmd_);
110 :
111 1477 : if (!ok) {
112 520 : errno = EAGAIN;
113 520 : return -1;
114 : }
115 :
116 : return 0;
117 : }
|