libzmq  master
ZeroMQ C++ Core Engine (LIBZMQ)
norm_engine.cpp
Go to the documentation of this file.
1 
2 #include "platform.hpp"
3 
4 #if defined ZMQ_HAVE_NORM
5 
6 #include "norm_engine.hpp"
7 #include "session_base.hpp"
8 #include "v2_protocol.hpp"
9 
10 zmq::norm_engine_t::norm_engine_t(io_thread_t* parent_,
11  const options_t& options_)
12  : io_object_t(parent_), zmq_session(NULL), options(options_),
13  norm_instance(NORM_INSTANCE_INVALID), norm_session(NORM_SESSION_INVALID),
14  is_sender(false), is_receiver(false),
15  zmq_encoder(0), norm_tx_stream(NORM_OBJECT_INVALID),
16  tx_first_msg(true), tx_more_bit(false),
17  zmq_output_ready(false), norm_tx_ready(false),
18  tx_index(0), tx_len(0),
19  zmq_input_ready(false)
20 {
21  int rc = tx_msg.init();
22  errno_assert(0 == rc);
23 }
24 
25 zmq::norm_engine_t::~norm_engine_t()
26 {
27  shutdown(); // in case it was not already called
28 }
29 
30 
31 int zmq::norm_engine_t::init(const char* network_, bool send, bool recv)
32 {
33  // Parse the "network_" address int "iface", "addr", and "port"
34  // norm endpoint format: [id,][<iface>;]<addr>:<port>
35  // First, look for optional local NormNodeId
36  // (default NORM_NODE_ANY causes NORM to use host IP addr for NormNodeId)
37  NormNodeId localId = NORM_NODE_ANY;
38  const char* ifacePtr = strchr(network_, ',');
39  if (NULL != ifacePtr)
40  {
41  size_t idLen = ifacePtr - network_;
42  if (idLen > 31) idLen = 31;
43  char idText[32];
44  strncpy(idText, network_, idLen);
45  idText[idLen] = '\0';
46  localId = (NormNodeId)atoi(idText);
47  ifacePtr++;
48  }
49  else
50  {
51  ifacePtr = network_;
52  }
53 
54  // Second, look for optional multicast ifaceName
55  char ifaceName[256];
56  const char* addrPtr = strchr(ifacePtr, ';');
57  if (NULL != addrPtr)
58  {
59  size_t ifaceLen = addrPtr - ifacePtr;
60  if (ifaceLen > 255) ifaceLen = 255; // return error instead?
61  strncpy(ifaceName, ifacePtr, ifaceLen);
62  ifaceName[ifaceLen] = '\0';
63  ifacePtr = ifaceName;
64  addrPtr++;
65  }
66  else
67  {
68  addrPtr = ifacePtr;
69  ifacePtr = NULL;
70  }
71 
72  // Finally, parse IP address and port number
73  const char* portPtr = strrchr(addrPtr, ':');
74  if (NULL == portPtr)
75  {
76  errno = EINVAL;
77  return -1;
78  }
79 
80  char addr[256];
81  size_t addrLen = portPtr - addrPtr;
82  if (addrLen > 255) addrLen = 255;
83  strncpy(addr, addrPtr, addrLen);
84  addr[addrLen] = '\0';
85  portPtr++;
86  unsigned short portNumber = atoi(portPtr);
87 
88  if (NORM_INSTANCE_INVALID == norm_instance)
89  {
90  if (NORM_INSTANCE_INVALID == (norm_instance = NormCreateInstance()))
91  {
92  // errno set by whatever caused NormCreateInstance() to fail
93  return -1;
94  }
95  }
96 
97  // TBD - What do we use for our local NormNodeId?
98  // (for now we use automatic, IP addr based assignment or passed in 'id')
99  // a) Use ZMQ Identity somehow?
100  // b) Add function to use iface addr
101  // c) Randomize and implement a NORM session layer
102  // conflict detection/resolution protocol
103 
104  norm_session = NormCreateSession(norm_instance, addr, portNumber, localId);
105  if (NORM_SESSION_INVALID == norm_session)
106  {
107  int savedErrno = errno;
108  NormDestroyInstance(norm_instance);
109  norm_instance = NORM_INSTANCE_INVALID;
110  errno = savedErrno;
111  return -1;
112  }
113  // There's many other useful NORM options that could be applied here
114  if (NormIsUnicastAddress(addr))
115  {
116  NormSetDefaultUnicastNack(norm_session, true);
117  }
118  else
119  {
120  // These only apply for multicast sessions
121  //NormSetTTL(norm_session, options.multicast_hops); // ZMQ default is 1
122  NormSetTTL(norm_session, 255); // since the ZMQ_MULTICAST_HOPS socket option isn't well-supported
123  NormSetRxPortReuse(norm_session, true); // port reuse doesn't work for non-connected unicast
124  NormSetLoopback(norm_session, true); // needed when multicast users on same machine
125  if (NULL != ifacePtr)
126  {
127  // Note a bad interface may not be caught until sender or receiver start
128  // (Since sender/receiver is not yet started, this always succeeds here)
129  NormSetMulticastInterface(norm_session, ifacePtr);
130  }
131  }
132 
133  if (recv)
134  {
135  // The alternative NORM_SYNC_CURRENT here would provide "instant"
136  // receiver sync to the sender's _current_ message transmission.
137  // NORM_SYNC_STREAM tries to get everything the sender has cached/buffered
138  NormSetDefaultSyncPolicy(norm_session, NORM_SYNC_STREAM);
139  if (!NormStartReceiver(norm_session, 2*1024*1024))
140  {
141  // errno set by whatever failed
142  int savedErrno = errno;
143  NormDestroyInstance(norm_instance); // session gets closed, too
144  norm_session = NORM_SESSION_INVALID;
145  norm_instance = NORM_INSTANCE_INVALID;
146  errno = savedErrno;
147  return -1;
148  }
149  is_receiver = true;
150  }
151 
152  if (send)
153  {
154  // Pick a random sender instance id (aka norm sender session id)
155  NormSessionId instanceId = NormGetRandomSessionId();
156  // TBD - provide "options" for some NORM sender parameters
157  if (!NormStartSender(norm_session, instanceId, 2*1024*1024, 1400, 16, 4))
158  {
159  // errno set by whatever failed
160  int savedErrno = errno;
161  NormDestroyInstance(norm_instance); // session gets closed, too
162  norm_session = NORM_SESSION_INVALID;
163  norm_instance = NORM_INSTANCE_INVALID;
164  errno = savedErrno;
165  return -1;
166  }
167  NormSetCongestionControl(norm_session, true);
168  norm_tx_ready = true;
169  is_sender = true;
170  if (NORM_OBJECT_INVALID == (norm_tx_stream = NormStreamOpen(norm_session, 2*1024*1024)))
171  {
172  // errno set by whatever failed
173  int savedErrno = errno;
174  NormDestroyInstance(norm_instance); // session gets closed, too
175  norm_session = NORM_SESSION_INVALID;
176  norm_instance = NORM_INSTANCE_INVALID;
177  errno = savedErrno;
178  return -1;
179  }
180  }
181 
182  //NormSetMessageTrace(norm_session, true);
183  //NormSetDebugLevel(3);
184  //NormOpenDebugLog(norm_instance, "normLog.txt");
185 
186  return 0; // no error
187 } // end zmq::norm_engine_t::init()
188 
189 void zmq::norm_engine_t::shutdown()
190 {
191  // TBD - implement a more graceful shutdown option
192  if (is_receiver)
193  {
194  NormStopReceiver(norm_session);
195 
196  // delete any active NormRxStreamState
197  rx_pending_list.Destroy();
198  rx_ready_list.Destroy();
199  msg_ready_list.Destroy();
200 
201  is_receiver = false;
202  }
203  if (is_sender)
204  {
205  NormStopSender(norm_session);
206  is_sender = false;
207  }
208  if (NORM_SESSION_INVALID != norm_session)
209  {
210  NormDestroySession(norm_session);
211  norm_session = NORM_SESSION_INVALID;
212  }
213  if (NORM_INSTANCE_INVALID != norm_instance)
214  {
215  NormStopInstance(norm_instance);
216  NormDestroyInstance(norm_instance);
217  norm_instance = NORM_INSTANCE_INVALID;
218  }
219 } // end zmq::norm_engine_t::shutdown()
220 
221 void zmq::norm_engine_t::plug (io_thread_t* io_thread_, session_base_t *session_)
222 {
223  // TBD - we may assign the NORM engine to an io_thread in the future???
224  zmq_session = session_;
225  if (is_sender) zmq_output_ready = true;
226  if (is_receiver) zmq_input_ready = true;
227 
228  fd_t normDescriptor = NormGetDescriptor(norm_instance);
229  norm_descriptor_handle = add_fd(normDescriptor);
230  // Set POLLIN for notification of pending NormEvents
231  set_pollin(norm_descriptor_handle);
232 
233  if (is_sender) send_data();
234 
235 } // end zmq::norm_engine_t::init()
236 
237 void zmq::norm_engine_t::unplug()
238 {
239  rm_fd(norm_descriptor_handle);
240 
241  zmq_session = NULL;
242 } // end zmq::norm_engine_t::unplug()
243 
244 void zmq::norm_engine_t::terminate()
245 {
246  unplug();
247  shutdown();
248  delete this;
249 }
250 
251 void zmq::norm_engine_t::restart_output()
252 {
253  // There's new message data available from the session
254  zmq_output_ready = true;
255  if (norm_tx_ready) send_data();
256 
257 } // end zmq::norm_engine_t::restart_output()
258 
259 void zmq::norm_engine_t::send_data()
260 {
261  // Here we write as much as is available or we can
262  while (zmq_output_ready && norm_tx_ready)
263  {
264  if (0 == tx_len)
265  {
266  // Our tx_buffer needs data to send
267  // Get more data from encoder
268  size_t space = BUFFER_SIZE;
269  unsigned char* bufPtr = (unsigned char*)tx_buffer;
270  tx_len = zmq_encoder.encode(&bufPtr, space);
271  if (0 == tx_len)
272  {
273  if (tx_first_msg)
274  {
275  // We don't need to mark eom/flush until a message is sent
276  tx_first_msg = false;
277  }
278  else
279  {
280  // A prior message was completely written to stream, so
281  // mark end-of-message and possibly flush (to force packet transmission,
282  // even if it's not a full segment so message gets delivered quickly)
283  // NormStreamMarkEom(norm_tx_stream); // the flush below marks eom
284  // Note NORM_FLUSH_ACTIVE makes NORM fairly chatty for low duty cycle messaging
285  // but makes sure content is delivered quickly. Positive acknowledgements
286  // with flush override would make NORM more succinct here
287  NormStreamFlush(norm_tx_stream, true, NORM_FLUSH_ACTIVE);
288  }
289  // Need to pull and load a new message to send
290  if (-1 == zmq_session->pull_msg(&tx_msg))
291  {
292  // We need to wait for "restart_output()" to be called by ZMQ
293  zmq_output_ready = false;
294  break;
295  }
296  zmq_encoder.load_msg(&tx_msg);
297  // Should we write message size header for NORM to use? Or expect NORM
298  // receiver to decode ZMQ message framing format(s)?
299  // OK - we need to use a byte to denote when the ZMQ frame is the _first_
300  // frame of a message so it can be decoded properly when a receiver
301  // 'syncs' mid-stream. We key off the the state of the 'more_flag'
302  // I.e.,If more_flag _was_ false previously, this is the first
303  // frame of a ZMQ message.
304  if (tx_more_bit)
305  tx_buffer[0] = (char)0xff; // this is not first frame of message
306  else
307  tx_buffer[0] = 0x00; // this is first frame of message
308  tx_more_bit = (0 != (tx_msg.flags() & msg_t::more));
309  // Go ahead an get a first chunk of the message
310  bufPtr++;
311  space--;
312  tx_len = 1 + zmq_encoder.encode(&bufPtr, space);
313  tx_index = 0;
314  }
315  }
316  // Do we have data in our tx_buffer pending
317  if (tx_index < tx_len)
318  {
319  // We have data in our tx_buffer to send, so write it to the stream
320  tx_index += NormStreamWrite(norm_tx_stream, tx_buffer + tx_index, tx_len - tx_index);
321  if (tx_index < tx_len)
322  {
323  // NORM stream buffer full, wait for NORM_TX_QUEUE_VACANCY
324  norm_tx_ready = false;
325  break;
326  }
327  tx_len = 0; // all buffered data was written
328  }
329  } // end while (zmq_output_ready && norm_tx_ready)
330 } // end zmq::norm_engine_t::send_data()
331 
332 void zmq::norm_engine_t::in_event()
333 {
334  // This means a NormEvent is pending, so call NormGetNextEvent() and handle
335  NormEvent event;
336  if (!NormGetNextEvent(norm_instance, &event))
337  {
338  // NORM has died before we unplugged?!
339  zmq_assert(false);
340  return;
341  }
342 
343  switch(event.type)
344  {
345  case NORM_TX_QUEUE_VACANCY:
346  case NORM_TX_QUEUE_EMPTY:
347  if (!norm_tx_ready)
348  {
349  norm_tx_ready = true;
350  send_data();
351  }
352  break;
353 
354  case NORM_RX_OBJECT_NEW:
355  //break;
356  case NORM_RX_OBJECT_UPDATED:
357  recv_data(event.object);
358  break;
359 
360  case NORM_RX_OBJECT_ABORTED:
361  {
362  NormRxStreamState* rxState = (NormRxStreamState*)NormObjectGetUserData(event.object);
363  if (NULL != rxState)
364  {
365  // Remove the state from the list it's in
366  // This is now unnecessary since deletion takes care of list removal
367  // but in the interest of being clear ...
368  NormRxStreamState::List* list = rxState->AccessList();
369  if (NULL != list) list->Remove(*rxState);
370  }
371  delete rxState;
372  break;
373  }
374  case NORM_REMOTE_SENDER_INACTIVE:
375  // Here we free resources used for this formerly active sender.
376  // Note w/ NORM_SYNC_STREAM, if sender reactivates, we may
377  // get some messages delivered twice. NORM_SYNC_CURRENT would
378  // mitigate that but might miss data at startup. Always tradeoffs.
379  // Instead of immediately deleting, we could instead initiate a
380  // user configurable timeout here to wait some amount of time
381  // after this event to declare the remote sender truly dead
382  // and delete its state???
383  NormNodeDelete(event.sender);
384  break;
385 
386  default:
387  // We ignore some NORM events
388  break;
389  }
390 } // zmq::norm_engine_t::in_event()
391 
392 void zmq::norm_engine_t::restart_input()
393 {
394  // TBD - should we check/assert that zmq_input_ready was false???
395  zmq_input_ready = true;
396  // Process any pending received messages
397  if (!msg_ready_list.IsEmpty())
398  recv_data(NORM_OBJECT_INVALID);
399 
400 } // end zmq::norm_engine_t::restart_input()
401 
402 void zmq::norm_engine_t::recv_data(NormObjectHandle object)
403 {
404  if (NORM_OBJECT_INVALID != object)
405  {
406  // Call result of NORM_RX_OBJECT_UPDATED notification
407  // This is a rx_ready indication for a new or existing rx stream
408  // First, determine if this is a stream we already know
409  zmq_assert(NORM_OBJECT_STREAM == NormObjectGetType(object));
410  // Since there can be multiple senders (publishers), we keep
411  // state for each separate rx stream.
412  NormRxStreamState* rxState = (NormRxStreamState*)NormObjectGetUserData(object);
413  if (NULL == rxState)
414  {
415  // This is a new stream, so create rxState with zmq decoder, etc
416  rxState = new NormRxStreamState(object, options.maxmsgsize);
417  if (!rxState->Init())
418  {
419  errno_assert(false);
420  delete rxState;
421  return;
422  }
423  NormObjectSetUserData(object, rxState);
424  }
425  else if (!rxState->IsRxReady())
426  {
427  // Existing non-ready stream, so remove from pending
428  // list to be promoted to rx_ready_list ...
429  rx_pending_list.Remove(*rxState);
430  }
431  if (!rxState->IsRxReady())
432  {
433  // TBD - prepend up front for immediate service?
434  rxState->SetRxReady(true);
435  rx_ready_list.Append(*rxState);
436  }
437  }
438  // This loop repeats until we've read all data available from "rx ready" inbound streams
439  // and pushed any accumulated messages we can up to the zmq session.
440  while (!rx_ready_list.IsEmpty() || (zmq_input_ready && !msg_ready_list.IsEmpty()))
441  {
442  // Iterate through our rx_ready streams, reading data into the decoder
443  // (This services incoming "rx ready" streams in a round-robin fashion)
444  NormRxStreamState::List::Iterator iterator(rx_ready_list);
445  NormRxStreamState* rxState;
446  while (NULL != (rxState = iterator.GetNextItem()))
447  {
448  switch(rxState->Decode())
449  {
450  case 1: // msg completed
451  // Complete message decoded, move this stream to msg_ready_list
452  // to push the message up to the session below. Note the stream
453  // will be returned to the "rx_ready_list" after that's done
454  rx_ready_list.Remove(*rxState);
455  msg_ready_list.Append(*rxState);
456  continue;
457 
458  case -1: // decoding error (shouldn't happen w/ NORM, but ...)
459  // We need to re-sync this stream (decoder buffer was reset)
460  rxState->SetSync(false);
461  break;
462 
463  default: // 0 - need more data
464  break;
465  }
466  // Get more data from this stream
467  NormObjectHandle stream = rxState->GetStreamHandle();
468  // First, make sure we're in sync ...
469  while (!rxState->InSync())
470  {
471  // seek NORM message start
472  if (!NormStreamSeekMsgStart(stream))
473  {
474  // Need to wait for more data
475  break;
476  }
477  // read message 'flag' byte to see if this it's a 'final' frame
478  char syncFlag;
479  unsigned int numBytes = 1;
480  if (!NormStreamRead(stream, &syncFlag, &numBytes))
481  {
482  // broken stream (shouldn't happen after seek msg start?)
483  zmq_assert(false);
484  continue;
485  }
486  if (0 == numBytes)
487  {
488  // This probably shouldn't happen either since we found msg start
489  // Need to wait for more data
490  break;
491  }
492  if (0 == syncFlag) rxState->SetSync(true);
493  // else keep seeking ...
494  } // end while(!rxState->InSync())
495  if (!rxState->InSync())
496  {
497  // Need more data for this stream, so remove from "rx ready"
498  // list and iterate to next "rx ready" stream
499  rxState->SetRxReady(false);
500  // Move from rx_ready_list to rx_pending_list
501  rx_ready_list.Remove(*rxState);
502  rx_pending_list.Append(*rxState);
503  continue;
504  }
505  // Now we're actually ready to read data from the NORM stream to the zmq_decoder
506  // the underlying zmq_decoder->get_buffer() call sets how much is needed.
507  unsigned int numBytes = rxState->GetBytesNeeded();
508  if (!NormStreamRead(stream, rxState->AccessBuffer(), &numBytes))
509  {
510  // broken NORM stream, so re-sync
511  rxState->Init(); // TBD - check result
512  // This will retry syncing, and getting data from this stream
513  // since we don't increment the "it" iterator
514  continue;
515  }
516  rxState->IncrementBufferCount(numBytes);
517  if (0 == numBytes)
518  {
519  // All the data available has been read
520  // Need to wait for NORM_RX_OBJECT_UPDATED for this stream
521  rxState->SetRxReady(false);
522  // Move from rx_ready_list to rx_pending_list
523  rx_ready_list.Remove(*rxState);
524  rx_pending_list.Append(*rxState);
525  }
526  } // end while(NULL != (rxState = iterator.GetNextItem()))
527 
528  if (zmq_input_ready)
529  {
530  // At this point, we've made a pass through the "rx_ready" stream list
531  // Now make a pass through the "msg_pending" list (if the zmq session
532  // ready for more input). This may possibly return streams back to
533  // the "rx ready" stream list after their pending message is handled
534  NormRxStreamState::List::Iterator iterator(msg_ready_list);
535  NormRxStreamState* rxState;
536  while (NULL != (rxState = iterator.GetNextItem()))
537  {
538  msg_t* msg = rxState->AccessMsg();
539  int rc = zmq_session->push_msg(msg);
540  if (-1 == rc)
541  {
542  if (EAGAIN == errno)
543  {
544  // need to wait until session calls "restart_input()"
545  zmq_input_ready = false;
546  break;
547  }
548  else
549  {
550  // session rejected message?
551  // TBD - handle this better
552  zmq_assert(false);
553  }
554  }
555  // else message was accepted.
556  msg_ready_list.Remove(*rxState);
557  if (rxState->IsRxReady()) // Move back to "rx_ready" list to read more data
558  rx_ready_list.Append(*rxState);
559  else // Move back to "rx_pending" list until NORM_RX_OBJECT_UPDATED
560  msg_ready_list.Append(*rxState);
561  } // end while(NULL != (rxState = iterator.GetNextItem()))
562  } // end if (zmq_input_ready)
563  } // end while ((!rx_ready_list.empty() || (zmq_input_ready && !msg_ready_list.empty()))
564 
565  // Alert zmq of the messages we have pushed up
566  zmq_session->flush();
567 
568 } // end zmq::norm_engine_t::recv_data()
569 
570 zmq::norm_engine_t::NormRxStreamState::NormRxStreamState(NormObjectHandle normStream,
571  int64_t maxMsgSize)
572  : norm_stream(normStream), max_msg_size(maxMsgSize),
573  in_sync(false), rx_ready(false), zmq_decoder(NULL), skip_norm_sync(false),
574  buffer_ptr(NULL), buffer_size(0), buffer_count(0),
575  prev(NULL), next(NULL), list(NULL)
576 {
577 }
578 
579 zmq::norm_engine_t::NormRxStreamState::~NormRxStreamState()
580 {
581  if (NULL != zmq_decoder)
582  {
583  delete zmq_decoder;
584  zmq_decoder = NULL;
585  }
586  if (NULL != list)
587  {
588  list->Remove(*this);
589  list = NULL;
590  }
591 }
592 
593 bool zmq::norm_engine_t::NormRxStreamState::Init()
594 {
595  in_sync = false;
596  skip_norm_sync = false;
597  if (NULL != zmq_decoder) delete zmq_decoder;
598  // Note "in_batch_size" comes from config.h
599  zmq_decoder = new (std::nothrow) v2_decoder_t (in_batch_size, max_msg_size);
600  alloc_assert (zmq_decoder);
601  if (NULL != zmq_decoder)
602  {
603  buffer_count = 0;
604  buffer_size = 0;
605  zmq_decoder->get_buffer(&buffer_ptr, &buffer_size);
606  return true;
607  }
608  else
609  {
610  return false;
611  }
612 } // end zmq::norm_engine_t::NormRxStreamState::Init()
613 
614 // This decodes any pending data sitting in our stream decoder buffer
615 // It returns 1 upon message completion, -1 on error, 1 on msg completion
616 int zmq::norm_engine_t::NormRxStreamState::Decode()
617 {
618  // If we have pending bytes to decode, process those first
619  while (buffer_count > 0)
620  {
621  // There's pending data for the decoder to decode
622  size_t processed = 0;
623 
624  // This a bit of a kludgy approach used to weed
625  // out the NORM ZMQ message transport "syncFlag" byte
626  // from the ZMQ message stream being decoded (but it works!)
627  if (skip_norm_sync)
628  {
629  buffer_ptr++;
630  buffer_count--;
631  skip_norm_sync = false;
632  }
633 
634  int rc = zmq_decoder->decode(buffer_ptr, buffer_count, processed);
635  buffer_ptr += processed;
636  buffer_count -= processed;
637  switch (rc)
638  {
639  case 1:
640  // msg completed
641  if (0 == buffer_count)
642  {
643  buffer_size = 0;
644  zmq_decoder->get_buffer(&buffer_ptr, &buffer_size);
645  }
646  skip_norm_sync = true;
647  return 1;
648  case -1:
649  // decoder error (reset decoder and state variables)
650  in_sync = false;
651  skip_norm_sync = false; // will get consumed by norm sync check
652  Init();
653  break;
654 
655  case 0:
656  // need more data, keep decoding until buffer exhausted
657  break;
658  }
659  }
660  // Reset buffer pointer/count for next read
661  buffer_count = 0;
662  buffer_size = 0;
663  zmq_decoder->get_buffer(&buffer_ptr, &buffer_size);
664  return 0; // need more data
665 
666 } // end zmq::norm_engine_t::NormRxStreamState::Decode()
667 
668 zmq::norm_engine_t::NormRxStreamState::List::List()
669  : head(NULL), tail(NULL)
670 {
671 }
672 
673 zmq::norm_engine_t::NormRxStreamState::List::~List()
674 {
675  Destroy();
676 }
677 
678 void zmq::norm_engine_t::NormRxStreamState::List::Destroy()
679 {
680  NormRxStreamState* item = head;
681  while (NULL != item)
682  {
683  Remove(*item);
684  delete item;
685  item = head;
686  }
687 } // end zmq::norm_engine_t::NormRxStreamState::List::Destroy()
688 
689 void zmq::norm_engine_t::NormRxStreamState::List::Append(NormRxStreamState& item)
690 {
691  item.prev = tail;
692  if (NULL != tail)
693  tail->next = &item;
694  else
695  head = &item;
696  item.next = NULL;
697  tail = &item;
698  item.list = this;
699 } // end zmq::norm_engine_t::NormRxStreamState::List::Append()
700 
701 void zmq::norm_engine_t::NormRxStreamState::List::Remove(NormRxStreamState& item)
702 {
703  if (NULL != item.prev)
704  item.prev->next = item.next;
705  else
706  head = item.next;
707  if (NULL != item.next)
708  item.next ->prev = item.prev;
709  else
710  tail = item.prev;
711  item.prev = item.next = NULL;
712  item.list = NULL;
713 } // end zmq::norm_engine_t::NormRxStreamState::List::Remove()
714 
715 zmq::norm_engine_t::NormRxStreamState::List::Iterator::Iterator(const List& list)
716  : next_item(list.head)
717 {
718 }
719 
720 zmq::norm_engine_t::NormRxStreamState* zmq::norm_engine_t::NormRxStreamState::List::Iterator::GetNextItem()
721 {
722  NormRxStreamState* nextItem = next_item;
723  if (NULL != nextItem) next_item = nextItem->next;
724  return nextItem;
725 } // end zmq::norm_engine_t::NormRxStreamState::List::Iterator::GetNextItem()
726 
727 
728 #endif // ZMQ_HAVE_NORM
Definition: command.hpp:81
int fd_t
Definition: fd.hpp:50
#define zmq_assert(x)
Definition: err.hpp:119
#define alloc_assert(x)
Definition: err.hpp:159
#define errno_assert(x)
Definition: err.hpp:129