2 #include "platform.hpp" 4 #if defined ZMQ_HAVE_NORM 10 zmq::norm_engine_t::norm_engine_t(io_thread_t* parent_,
11 const options_t& options_)
12 : io_object_t(parent_), zmq_session(NULL), options(options_),
13 norm_instance(NORM_INSTANCE_INVALID), norm_session(NORM_SESSION_INVALID),
14 is_sender(false), is_receiver(false),
15 zmq_encoder(0), norm_tx_stream(NORM_OBJECT_INVALID),
16 tx_first_msg(true), tx_more_bit(false),
17 zmq_output_ready(false), norm_tx_ready(false),
18 tx_index(0), tx_len(0),
19 zmq_input_ready(false)
21 int rc = tx_msg.init();
25 zmq::norm_engine_t::~norm_engine_t()
31 int zmq::norm_engine_t::init(
const char* network_,
bool send,
bool recv)
37 NormNodeId localId = NORM_NODE_ANY;
38 const char* ifacePtr = strchr(network_,
',');
41 size_t idLen = ifacePtr - network_;
42 if (idLen > 31) idLen = 31;
44 strncpy(idText, network_, idLen);
46 localId = (NormNodeId)atoi(idText);
56 const char* addrPtr = strchr(ifacePtr,
';');
59 size_t ifaceLen = addrPtr - ifacePtr;
60 if (ifaceLen > 255) ifaceLen = 255;
61 strncpy(ifaceName, ifacePtr, ifaceLen);
62 ifaceName[ifaceLen] =
'\0';
73 const char* portPtr = strrchr(addrPtr,
':');
81 size_t addrLen = portPtr - addrPtr;
82 if (addrLen > 255) addrLen = 255;
83 strncpy(addr, addrPtr, addrLen);
86 unsigned short portNumber = atoi(portPtr);
88 if (NORM_INSTANCE_INVALID == norm_instance)
90 if (NORM_INSTANCE_INVALID == (norm_instance = NormCreateInstance()))
104 norm_session = NormCreateSession(norm_instance, addr, portNumber, localId);
105 if (NORM_SESSION_INVALID == norm_session)
107 int savedErrno = errno;
108 NormDestroyInstance(norm_instance);
109 norm_instance = NORM_INSTANCE_INVALID;
114 if (NormIsUnicastAddress(addr))
116 NormSetDefaultUnicastNack(norm_session,
true);
122 NormSetTTL(norm_session, 255);
123 NormSetRxPortReuse(norm_session,
true);
124 NormSetLoopback(norm_session,
true);
125 if (NULL != ifacePtr)
129 NormSetMulticastInterface(norm_session, ifacePtr);
138 NormSetDefaultSyncPolicy(norm_session, NORM_SYNC_STREAM);
139 if (!NormStartReceiver(norm_session, 2*1024*1024))
142 int savedErrno = errno;
143 NormDestroyInstance(norm_instance);
144 norm_session = NORM_SESSION_INVALID;
145 norm_instance = NORM_INSTANCE_INVALID;
155 NormSessionId instanceId = NormGetRandomSessionId();
157 if (!NormStartSender(norm_session, instanceId, 2*1024*1024, 1400, 16, 4))
160 int savedErrno = errno;
161 NormDestroyInstance(norm_instance);
162 norm_session = NORM_SESSION_INVALID;
163 norm_instance = NORM_INSTANCE_INVALID;
167 NormSetCongestionControl(norm_session,
true);
168 norm_tx_ready =
true;
170 if (NORM_OBJECT_INVALID == (norm_tx_stream = NormStreamOpen(norm_session, 2*1024*1024)))
173 int savedErrno = errno;
174 NormDestroyInstance(norm_instance);
175 norm_session = NORM_SESSION_INVALID;
176 norm_instance = NORM_INSTANCE_INVALID;
189 void zmq::norm_engine_t::shutdown()
194 NormStopReceiver(norm_session);
197 rx_pending_list.Destroy();
198 rx_ready_list.Destroy();
199 msg_ready_list.Destroy();
205 NormStopSender(norm_session);
208 if (NORM_SESSION_INVALID != norm_session)
210 NormDestroySession(norm_session);
211 norm_session = NORM_SESSION_INVALID;
213 if (NORM_INSTANCE_INVALID != norm_instance)
215 NormStopInstance(norm_instance);
216 NormDestroyInstance(norm_instance);
217 norm_instance = NORM_INSTANCE_INVALID;
224 zmq_session = session_;
225 if (is_sender) zmq_output_ready =
true;
226 if (is_receiver) zmq_input_ready =
true;
228 fd_t normDescriptor = NormGetDescriptor(norm_instance);
229 norm_descriptor_handle = add_fd(normDescriptor);
231 set_pollin(norm_descriptor_handle);
233 if (is_sender) send_data();
237 void zmq::norm_engine_t::unplug()
239 rm_fd(norm_descriptor_handle);
244 void zmq::norm_engine_t::terminate()
251 void zmq::norm_engine_t::restart_output()
254 zmq_output_ready =
true;
255 if (norm_tx_ready) send_data();
259 void zmq::norm_engine_t::send_data()
262 while (zmq_output_ready && norm_tx_ready)
268 size_t space = BUFFER_SIZE;
269 unsigned char* bufPtr = (
unsigned char*)tx_buffer;
270 tx_len = zmq_encoder.encode(&bufPtr, space);
276 tx_first_msg =
false;
287 NormStreamFlush(norm_tx_stream,
true, NORM_FLUSH_ACTIVE);
290 if (-1 == zmq_session->pull_msg(&tx_msg))
293 zmq_output_ready =
false;
296 zmq_encoder.load_msg(&tx_msg);
305 tx_buffer[0] = (char)0xff;
308 tx_more_bit = (0 != (tx_msg.flags() &
msg_t::more));
312 tx_len = 1 + zmq_encoder.encode(&bufPtr, space);
317 if (tx_index < tx_len)
320 tx_index += NormStreamWrite(norm_tx_stream, tx_buffer + tx_index, tx_len - tx_index);
321 if (tx_index < tx_len)
324 norm_tx_ready =
false;
332 void zmq::norm_engine_t::in_event()
336 if (!NormGetNextEvent(norm_instance, &event))
345 case NORM_TX_QUEUE_VACANCY:
346 case NORM_TX_QUEUE_EMPTY:
349 norm_tx_ready =
true;
354 case NORM_RX_OBJECT_NEW:
356 case NORM_RX_OBJECT_UPDATED:
357 recv_data(event.object);
360 case NORM_RX_OBJECT_ABORTED:
362 NormRxStreamState* rxState = (NormRxStreamState*)NormObjectGetUserData(event.object);
368 NormRxStreamState::List* list = rxState->AccessList();
369 if (NULL != list) list->Remove(*rxState);
374 case NORM_REMOTE_SENDER_INACTIVE:
383 NormNodeDelete(event.sender);
392 void zmq::norm_engine_t::restart_input()
395 zmq_input_ready =
true;
397 if (!msg_ready_list.IsEmpty())
398 recv_data(NORM_OBJECT_INVALID);
402 void zmq::norm_engine_t::recv_data(NormObjectHandle
object)
404 if (NORM_OBJECT_INVALID !=
object)
409 zmq_assert(NORM_OBJECT_STREAM == NormObjectGetType(
object));
412 NormRxStreamState* rxState = (NormRxStreamState*)NormObjectGetUserData(
object);
416 rxState =
new NormRxStreamState(
object, options.maxmsgsize);
417 if (!rxState->Init())
423 NormObjectSetUserData(
object, rxState);
425 else if (!rxState->IsRxReady())
429 rx_pending_list.Remove(*rxState);
431 if (!rxState->IsRxReady())
434 rxState->SetRxReady(
true);
435 rx_ready_list.Append(*rxState);
440 while (!rx_ready_list.IsEmpty() || (zmq_input_ready && !msg_ready_list.IsEmpty()))
444 NormRxStreamState::List::Iterator iterator(rx_ready_list);
445 NormRxStreamState* rxState;
446 while (NULL != (rxState = iterator.GetNextItem()))
448 switch(rxState->Decode())
454 rx_ready_list.Remove(*rxState);
455 msg_ready_list.Append(*rxState);
460 rxState->SetSync(
false);
467 NormObjectHandle stream = rxState->GetStreamHandle();
469 while (!rxState->InSync())
472 if (!NormStreamSeekMsgStart(stream))
479 unsigned int numBytes = 1;
480 if (!NormStreamRead(stream, &syncFlag, &numBytes))
492 if (0 == syncFlag) rxState->SetSync(
true);
495 if (!rxState->InSync())
499 rxState->SetRxReady(
false);
501 rx_ready_list.Remove(*rxState);
502 rx_pending_list.Append(*rxState);
507 unsigned int numBytes = rxState->GetBytesNeeded();
508 if (!NormStreamRead(stream, rxState->AccessBuffer(), &numBytes))
516 rxState->IncrementBufferCount(numBytes);
521 rxState->SetRxReady(
false);
523 rx_ready_list.Remove(*rxState);
524 rx_pending_list.Append(*rxState);
534 NormRxStreamState::List::Iterator iterator(msg_ready_list);
535 NormRxStreamState* rxState;
536 while (NULL != (rxState = iterator.GetNextItem()))
538 msg_t* msg = rxState->AccessMsg();
539 int rc = zmq_session->push_msg(msg);
545 zmq_input_ready =
false;
556 msg_ready_list.Remove(*rxState);
557 if (rxState->IsRxReady())
558 rx_ready_list.Append(*rxState);
560 msg_ready_list.Append(*rxState);
566 zmq_session->flush();
570 zmq::norm_engine_t::NormRxStreamState::NormRxStreamState(NormObjectHandle normStream,
572 : norm_stream(normStream), max_msg_size(maxMsgSize),
573 in_sync(false), rx_ready(false), zmq_decoder(NULL), skip_norm_sync(false),
574 buffer_ptr(NULL), buffer_size(0), buffer_count(0),
575 prev(NULL), next(NULL), list(NULL)
579 zmq::norm_engine_t::NormRxStreamState::~NormRxStreamState()
581 if (NULL != zmq_decoder)
593 bool zmq::norm_engine_t::NormRxStreamState::Init()
596 skip_norm_sync =
false;
597 if (NULL != zmq_decoder)
delete zmq_decoder;
599 zmq_decoder =
new (std::nothrow) v2_decoder_t (
in_batch_size, max_msg_size);
601 if (NULL != zmq_decoder)
605 zmq_decoder->get_buffer(&buffer_ptr, &buffer_size);
616 int zmq::norm_engine_t::NormRxStreamState::Decode()
619 while (buffer_count > 0)
622 size_t processed = 0;
631 skip_norm_sync =
false;
634 int rc = zmq_decoder->decode(buffer_ptr, buffer_count, processed);
635 buffer_ptr += processed;
636 buffer_count -= processed;
641 if (0 == buffer_count)
644 zmq_decoder->get_buffer(&buffer_ptr, &buffer_size);
646 skip_norm_sync =
true;
651 skip_norm_sync =
false;
663 zmq_decoder->get_buffer(&buffer_ptr, &buffer_size);
668 zmq::norm_engine_t::NormRxStreamState::List::List()
669 : head(NULL), tail(NULL)
673 zmq::norm_engine_t::NormRxStreamState::List::~List()
678 void zmq::norm_engine_t::NormRxStreamState::List::Destroy()
680 NormRxStreamState* item = head;
689 void zmq::norm_engine_t::NormRxStreamState::List::Append(NormRxStreamState& item)
701 void zmq::norm_engine_t::NormRxStreamState::List::Remove(NormRxStreamState& item)
703 if (NULL != item.prev)
704 item.prev->next = item.next;
707 if (NULL != item.next)
708 item.next ->prev = item.prev;
711 item.prev = item.next = NULL;
715 zmq::norm_engine_t::NormRxStreamState::List::Iterator::Iterator(
const List& list)
716 : next_item(list.head)
720 zmq::norm_engine_t::NormRxStreamState* zmq::norm_engine_t::NormRxStreamState::List::Iterator::GetNextItem()
722 NormRxStreamState* nextItem = next_item;
723 if (NULL != nextItem) next_item = nextItem->next;
728 #endif // ZMQ_HAVE_NORM