Line data Source code
1 : /*
2 : Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 :
4 : This file is part of libzmq, the ZeroMQ core engine in C++.
5 :
6 : libzmq is free software; you can redistribute it and/or modify it under
7 : the terms of the GNU Lesser General Public License (LGPL) as published
8 : by the Free Software Foundation; either version 3 of the License, or
9 : (at your option) any later version.
10 :
11 : As a special exception, the Contributors give you permission to link
12 : this library with independent modules to produce an executable,
13 : regardless of the license terms of these independent modules, and to
14 : copy and distribute the resulting executable under terms of your choice,
15 : provided that you also meet, for each linked independent module, the
16 : terms and conditions of the license of that module. An independent
17 : module is a module which is not derived from or based on this library.
18 : If you modify this library, you must extend this exception to your
19 : version of the library.
20 :
21 : libzmq is distributed in the hope that it will be useful, but WITHOUT
22 : ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 : FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 : License for more details.
25 :
26 : You should have received a copy of the GNU Lesser General Public License
27 : along with this program. If not, see <http://www.gnu.org/licenses/>.
28 : */
29 :
30 : #include "precompiled.hpp"
31 : #include "rep.hpp"
32 : #include "err.hpp"
33 : #include "msg.hpp"
34 :
35 150 : zmq::rep_t::rep_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
36 : router_t (parent_, tid_, sid_),
37 : sending_reply (false),
38 150 : request_begins (true)
39 : {
40 150 : options.type = ZMQ_REP;
41 150 : }
42 :
43 150 : zmq::rep_t::~rep_t ()
44 : {
45 150 : }
46 :
47 288 : int zmq::rep_t::xsend (msg_t *msg_)
48 : {
49 : // If we are in the middle of receiving a request, we cannot send reply.
50 288 : if (!sending_reply) {
51 0 : errno = EFSM;
52 0 : return -1;
53 : }
54 :
55 288 : bool more = msg_->flags () & msg_t::more ? true : false;
56 :
57 : // Push message to the reply pipe.
58 288 : int rc = router_t::xsend (msg_);
59 288 : if (rc != 0)
60 : return rc;
61 :
62 : // If the reply is complete flip the FSM back to request receiving state.
63 288 : if (!more)
64 138 : sending_reply = false;
65 :
66 : return 0;
67 : }
68 :
69 675 : int zmq::rep_t::xrecv (msg_t *msg_)
70 : {
71 : // If we are in middle of sending a reply, we cannot receive next request.
72 675 : if (sending_reply) {
73 0 : errno = EFSM;
74 0 : return -1;
75 : }
76 :
77 : // First thing to do when receiving a request is to copy all the labels
78 : // to the reply pipe.
79 675 : if (request_begins) {
80 : while (true) {
81 711 : int rc = router_t::xrecv (msg_);
82 711 : if (rc != 0)
83 : return rc;
84 :
85 381 : if ((msg_->flags () & msg_t::more)) {
86 : // Empty message part delimits the traceback stack.
87 381 : bool bottom = (msg_->size () == 0);
88 :
89 : // Push it to the reply pipe.
90 381 : rc = router_t::xsend (msg_);
91 381 : errno_assert (rc == 0);
92 :
93 381 : if (bottom)
94 : break;
95 : }
96 : else {
97 : // If the traceback stack is malformed, discard anything
98 : // already sent to pipe (we're at end of invalid message).
99 0 : rc = router_t::rollback ();
100 0 : errno_assert (rc == 0);
101 : }
102 : }
103 174 : request_begins = false;
104 : }
105 :
106 : // Get next message part to return to the user.
107 345 : int rc = router_t::xrecv (msg_);
108 345 : if (rc != 0)
109 : return rc;
110 :
111 : // If whole request is read, flip the FSM to reply-sending state.
112 345 : if (!(msg_->flags () & msg_t::more)) {
113 174 : sending_reply = true;
114 174 : request_begins = true;
115 : }
116 :
117 : return 0;
118 : }
119 :
120 54 : bool zmq::rep_t::xhas_in ()
121 : {
122 54 : if (sending_reply)
123 : return false;
124 :
125 54 : return router_t::xhas_in ();
126 : }
127 :
128 54 : bool zmq::rep_t::xhas_out ()
129 : {
130 54 : if (!sending_reply)
131 : return false;
132 :
133 0 : return router_t::xhas_out ();
134 : }
|