libzmq  master
ZeroMQ C++ Core Engine (LIBZMQ)
norm_engine.hpp
Go to the documentation of this file.
1 
2 #ifndef __ZMQ_NORM_ENGINE_HPP_INCLUDED__
3 #define __ZMQ_NORM_ENGINE_HPP_INCLUDED__
4 
5 #if defined ZMQ_HAVE_NORM
6 
7 #include "io_object.hpp"
8 #include "i_engine.hpp"
9 #include "options.hpp"
10 #include "v2_decoder.hpp"
11 #include "v2_encoder.hpp"
12 
13 #include <normApi.h>
14 
15 namespace zmq
16 {
17  class io_thread_t;
18  class session_base_t;
19 
20  class norm_engine_t : public io_object_t, public i_engine
21  {
22  public:
23  norm_engine_t (zmq::io_thread_t *parent_, const options_t &options_);
24  ~norm_engine_t ();
25 
26  // create NORM instance, session, etc
27  int init(const char* network_, bool send, bool recv);
28  void shutdown();
29 
30  // i_engine interface implementation.
31  // Plug the engine to the session.
32  virtual void plug (zmq::io_thread_t *io_thread_,
33  class session_base_t *session_);
34 
35  // Terminate and deallocate the engine. Note that 'detached'
36  // events are not fired on termination.
37  virtual void terminate ();
38 
39  // This method is called by the session to signalise that more
40  // messages can be written to the pipe.
41  virtual void restart_input ();
42 
43  // This method is called by the session to signalise that there
44  // are messages to send available.
45  virtual void restart_output ();
46 
47  virtual void zap_msg_available () {};
48 
49  // i_poll_events interface implementation.
50  // (we only need in_event() for NormEvent notification)
51  // (i.e., don't have any output events or timers (yet))
52  void in_event ();
53 
54  private:
55  void unplug();
56  void send_data();
57  void recv_data(NormObjectHandle stream);
58 
59 
60  enum {BUFFER_SIZE = 2048};
61 
62  // Used to keep track of streams from multiple senders
63  class NormRxStreamState
64  {
65  public:
66  NormRxStreamState(NormObjectHandle normStream,
67  int64_t maxMsgSize);
68  ~NormRxStreamState();
69 
70  NormObjectHandle GetStreamHandle() const
71  {return norm_stream;}
72 
73  bool Init();
74 
75  void SetRxReady(bool state)
76  {rx_ready = state;}
77  bool IsRxReady() const
78  {return rx_ready;}
79 
80  void SetSync(bool state)
81  {in_sync = state;}
82  bool InSync() const
83  {return in_sync;}
84 
85  // These are used to feed data to decoder
86  // and its underlying "msg" buffer
87  char* AccessBuffer()
88  {return (char*)(buffer_ptr + buffer_count);}
89  size_t GetBytesNeeded() const
90  {return (buffer_size - buffer_count);}
91  void IncrementBufferCount(size_t count)
92  {buffer_count += count;}
93  msg_t* AccessMsg()
94  {return zmq_decoder->msg();}
95  // This invokes the decoder "decode" method
96  // returning 0 if more data is needed,
97  // 1 if the message is complete, If an error
98  // occurs the 'sync' is dropped and the
99  // decoder re-initialized
100  int Decode();
101 
102  class List
103  {
104  public:
105  List();
106  ~List();
107 
108  void Append(NormRxStreamState& item);
109  void Remove(NormRxStreamState& item);
110 
111  bool IsEmpty() const
112  {return (NULL == head);}
113 
114  void Destroy();
115 
116  class Iterator
117  {
118  public:
119  Iterator(const List& list);
120  NormRxStreamState* GetNextItem();
121  private:
122  NormRxStreamState* next_item;
123  };
124  friend class Iterator;
125 
126  private:
127  NormRxStreamState* head;
128  NormRxStreamState* tail;
129 
130  }; // end class zmq::norm_engine_t::NormRxStreamState::List
131 
132  friend class List;
133 
134  List* AccessList()
135  {return list;}
136 
137 
138  private:
139  NormObjectHandle norm_stream;
140  int64_t max_msg_size;
141  bool in_sync;
142  bool rx_ready;
143  v2_decoder_t* zmq_decoder;
144  bool skip_norm_sync;
145  unsigned char* buffer_ptr;
146  size_t buffer_size;
147  size_t buffer_count;
148 
149  NormRxStreamState* prev;
150  NormRxStreamState* next;
151  NormRxStreamState::List* list;
152 
153  }; // end class zmq::norm_engine_t::NormRxStreamState
154 
155  session_base_t* zmq_session;
156  options_t options;
157  NormInstanceHandle norm_instance;
158  handle_t norm_descriptor_handle;
159  NormSessionHandle norm_session;
160  bool is_sender;
161  bool is_receiver;
162  // Sender state
163  msg_t tx_msg;
164  v2_encoder_t zmq_encoder; // for tx messages (we use v2 for now)
165  NormObjectHandle norm_tx_stream;
166  bool tx_first_msg;
167  bool tx_more_bit;
168  bool zmq_output_ready; // zmq has msg(s) to send
169  bool norm_tx_ready; // norm has tx queue vacancy
170  // TBD - maybe don't need buffer if can access zmq message buffer directly?
171  char tx_buffer[BUFFER_SIZE];
172  unsigned int tx_index;
173  unsigned int tx_len;
174 
175  // Receiver state
176  // Lists of norm rx streams from remote senders
177  bool zmq_input_ready; // zmq ready to receive msg(s)
178  NormRxStreamState::List rx_pending_list; // rx streams waiting for data reception
179  NormRxStreamState::List rx_ready_list; // rx streams ready for NormStreamRead()
180  NormRxStreamState::List msg_ready_list; // rx streams w/ msg ready for push to zmq
181 
182 
183  }; // end class norm_engine_t
184 }
185 
186 #endif // ZMQ_HAVE_NORM
187 
188 #endif // !__ZMQ_NORM_ENGINE_HPP_INCLUDED__
Definition: command.hpp:81
Definition: address.hpp:35