#include #include #include #include #include #include #include /* strange, but in linux we don't have some useful macros */ #ifdef __linux__ #define TAILQ_EMPTY(head) ((head)->tqh_first == NULL) #define TAILQ_FIRST(head) ((head)->tqh_first) #endif TAILQ_HEAD (queue, entry); struct entry { TAILQ_ENTRY(entry) entries; void *data; }; struct aqueue_free_slots_stack { struct aqueue_free_slots_stack *next; }; struct aqueue { struct aqueue_free_slots_stack *free_stack; struct queue *q; pthread_mutex_t mutex; pthread_cond_t cond; int waiting_threads; int len; int initialised; }; #define error(text) do { \ fprintf(stderr, "%s:%i %s\n", __FILE__, __LINE__, text); \ exit(1); \ } while(0) static int counter, iter, verbose, empty_count; struct aqueue *aqueue; static inline void push_free_slot(struct aqueue_free_slots_stack **stack_p, void *data_p) { struct aqueue_free_slots_stack *data = data_p; data->next = *stack_p; *stack_p = data; } static inline void * pop_free_slot(struct aqueue_free_slots_stack **stack_p) { struct aqueue_free_slots_stack *data; data = *stack_p; if (data) { *stack_p = data->next; data->next = NULL; } return data; } static inline void clear_stack(struct aqueue_free_slots_stack **stack_p) { void *data; while ((data = pop_free_slot(stack_p)) != NULL) free(data); } inline void init_aqueue(struct aqueue *aq) { int status; aq->q = malloc(sizeof(struct queue)); if (aq->q == NULL) error("can't allocate memory"); bzero(aq->q, sizeof(struct queue)); TAILQ_INIT(aq->q); aq->waiting_threads = aq->len = aq->initialised = 0; status = pthread_mutex_init(&aq->mutex, NULL); if (status != 0) error("Init mutex"); status = pthread_cond_init(&aq->cond, NULL); if (status != 0) error("Init condvar"); aq->free_stack = NULL; aq->initialised = 1; } inline int queue_length(struct aqueue *aq) { if (aq == NULL) error("NULL async queue"); if (aq->initialised != 1) error("async queue not initialised"); return aq->len; } void * async_queue_pop(struct aqueue *aq) { int status; struct entry *en; void *d; if (aq == NULL) error("NULL async queue"); if (aq->initialised != 1) error("async queue not initialised"); status = pthread_mutex_lock(&aq->mutex); if (status != 0) error("can't lock mutex"); aq->waiting_threads++; while (TAILQ_EMPTY(aq->q)) { empty_count++; status = pthread_cond_wait(&aq->cond, &aq->mutex); if (status != 0) error("condwait"); } aq->waiting_threads--; en = TAILQ_FIRST(aq->q); TAILQ_REMOVE(aq->q, en, entries); d = en->data; aq->len--; push_free_slot(&aq->free_stack, en); status = pthread_mutex_unlock(&aq->mutex); if (status != 0) error("can't unlock mutex"); return d; } void * async_queue_try_pop(struct aqueue *aq) { int status; struct entry *en; void *d; if (aq == NULL) error("NULL async queue"); if (aq->initialised != 1) error("async queue not initialised"); d = NULL; status = pthread_mutex_lock(&aq->mutex); if (status != 0) error("can't lock mutex"); if (TAILQ_EMPTY(aq->q)) goto end; en = TAILQ_FIRST(aq->q); TAILQ_REMOVE(aq->q, en, entries); d = en->data; aq->len--; push_free_slot(&aq->free_stack, en); end: status = pthread_mutex_unlock(&aq->mutex); if (status != 0) error("can't unlock mutex"); return d; } void async_queue_push(struct aqueue *aq, void *data) { int status; struct entry *en; if (aq == NULL) error("NULL async queue"); if (aq->initialised != 1) error("async queue not initialised"); if (data == NULL) error("NULL data"); status = pthread_mutex_lock(&aq->mutex); if (status != 0) error("can't lock mutex"); en = pop_free_slot(&aq->free_stack); if (!en) { en = malloc(sizeof(struct entry)); if (en == NULL) error("can't allocate memory"); } en->data = data; TAILQ_INSERT_TAIL(aq->q, en, entries); aq->len++; status = pthread_cond_signal(&aq->cond); if (status != 0) error("cond signal"); status = pthread_mutex_unlock(&aq->mutex); if (status != 0) error("can't unlock mutex"); } void * pusher(void __attribute__((unused)) *foo) { int i; fprintf(stderr, "%s started\n", __func__); counter = 9897; i = 0; while(i < iter) { if (verbose) fprintf(stderr, "push thread pushed %i\n", counter); async_queue_push(aqueue, &counter); i++; } return NULL; } void * poper(void __attribute__((unused)) *foo) { int *n, i; fprintf(stderr, "%s started\n", __func__); i = 0; while(i < iter) { n = async_queue_pop(aqueue); i++; if (verbose) fprintf(stderr, "pop thread received %i\n", *n); } return NULL; } void usage() { printf("-n \tnumber of iterations\n"); printf("-v\t\tverbose\n"); exit(0); } int main(int argc, char **argv) { int status, c; pthread_t push_id, pop_id_1; if (argc < 2) usage(); while ((c = getopt(argc, argv, "vn:")) != -1) { switch(c) { case 'n': iter = atoi(optarg); break; case 'v': verbose = 1; break; default: usage(); } } aqueue = malloc(sizeof(struct aqueue)); init_aqueue(aqueue); status = pthread_create(&push_id, NULL, pusher, NULL); if (status != 0) error("can't create pusher thread"); status = pthread_create(&pop_id_1, NULL, poper, NULL); if (status != 0) error("can't create poper thread"); status = pthread_join(pop_id_1, NULL); if (status != 0) error("join"); fprintf(stderr, "sleep on pthread_cond_wait = %i times\n", empty_count); return 0; }