Object
This class provides a way to synchronize communication between threads.
Example:
require 'thread' queue = Queue.new producer = Thread.new do 5.times do |i| sleep rand(i) # simulate expense queue << i puts "#{i} produced" end end consumer = Thread.new do 5.times do |i| value = queue.pop sleep rand(i/2) # simulate expense puts "consumed #{value}" end end consumer.join
Removes all objects from the queue.
static VALUE rb_queue_clear(VALUE self) { Queue *queue; Data_Get_Struct(self, Queue, queue); lock_mutex(&queue->mutex); clear_list(&queue->values); signal_condvar(&queue->space_available); unlock_mutex(&queue->mutex); return self; }
Returns true if the queue is empty.
static VALUE rb_queue_empty_p(VALUE self) { Queue *queue; VALUE result; Data_Get_Struct(self, Queue, queue); lock_mutex(&queue->mutex); result = queue->values.size == 0 ? Qtrue : Qfalse; unlock_mutex(&queue->mutex); return result; }
Returns the length of the queue.
static VALUE rb_queue_length(VALUE self) { Queue *queue; VALUE result; Data_Get_Struct(self, Queue, queue); lock_mutex(&queue->mutex); result = ULONG2NUM(queue->values.size); unlock_mutex(&queue->mutex); return result; }
static VALUE rb_queue_marshal_dump(VALUE self) { Queue *queue; VALUE array; Data_Get_Struct(self, Queue, queue); array = array_from_list(&queue->values); rb_ary_unshift(array, ULONG2NUM(queue->capacity)); return rb_marshal_dump(array, Qnil); }
static VALUE rb_queue_marshal_load(VALUE self, VALUE data) { Queue *queue; VALUE array; Data_Get_Struct(self, Queue, queue); array = rb_marshal_load(data); if (TYPE(array) != T_ARRAY) { rb_raise(rb_eTypeError, "expected Array of queue data"); } if (RARRAY(array)->len < 1) { rb_raise(rb_eArgError, "missing capacity value"); } queue->capacity = NUM2ULONG(rb_ary_shift(array)); push_multiple_list(&queue->values, RARRAY(array)->ptr, (unsigned)RARRAY(array)->len); return self; }
Returns the number of threads waiting on the queue.
static VALUE rb_queue_num_waiting(VALUE self) { Queue *queue; VALUE result; Data_Get_Struct(self, Queue, queue); lock_mutex(&queue->mutex); result = ULONG2NUM(queue->value_available.waiting.size + queue->space_available.waiting.size); unlock_mutex(&queue->mutex); return result; }
call_seq: pop(non_block=false)
Retrieves data from the queue. If the queue is empty, the calling thread is suspended until data is pushed onto the queue. If non_block is true, the thread isn’t suspended, and an exception is raised.
static VALUE rb_queue_pop(int argc, VALUE *argv, VALUE self) { Queue *queue; int should_block; VALUE result; Data_Get_Struct(self, Queue, queue); if (argc == 0) { should_block = 1; } else if (argc == 1) { should_block = !RTEST(argv[0]); } else { rb_raise(rb_eArgError, "wrong number of arguments (%d for 1)", argc); } lock_mutex(&queue->mutex); if (!queue->values.entries && !should_block) { unlock_mutex(&queue->mutex); rb_raise(private_eThreadError, "queue empty"); } while (!queue->values.entries) { wait_condvar(&queue->value_available, &queue->mutex); } result = shift_list(&queue->values); if (queue->capacity && queue->values.size < queue->capacity) { signal_condvar(&queue->space_available); } unlock_mutex(&queue->mutex); return result; }
Pushes obj to the queue.
static VALUE rb_queue_push(VALUE self, VALUE value) { Queue *queue; Data_Get_Struct(self, Queue, queue); lock_mutex(&queue->mutex); while (queue->capacity && queue->values.size >= queue->capacity) { wait_condvar(&queue->space_available, &queue->mutex); } push_list(&queue->values, value); signal_condvar(&queue->value_available); unlock_mutex(&queue->mutex); return self; }
Generated with the Darkfish Rdoc Generator 2.