1 /* This file is part of the YAZ toolkit.
2 * Copyright (C) 1995-2009 Index Data
3 * See the file LICENSE for details.
7 * \brief thread pool workers
12 #include <yaz/tpool.h>
17 struct work_item *next;
23 pthread_mutex_t mutex;
24 pthread_cond_t input_data;
27 struct work_item *input_queue;
28 struct work_item *output_queue;
29 struct work_item *free_queue;
30 void (*work_handler)(void *work_data);
31 void (*work_destroy)(void *work_data);
34 static struct work_item *queue_remove_last(struct work_item **q)
36 struct work_item **work_p = q, *work_this = 0;
38 while (*work_p && (*work_p)->next)
39 work_p = &(*work_p)->next;
48 static void queue_trav(struct work_item *q, void (*f)(void *data))
50 for (; q; q = q->next)
54 void yaz_tpool_add(yaz_tpool_t p, void *data)
56 struct work_item *work_p;
58 pthread_mutex_lock(&p->mutex);
62 work_p = p->free_queue;
63 p->free_queue = p->free_queue->next;
66 work_p = nmem_malloc(p->nmem, sizeof(*work_p));
69 work_p->next = p->input_queue;
70 p->input_queue = work_p;
72 pthread_cond_signal(&p->input_data);
73 pthread_mutex_unlock(&p->mutex);
76 void yaz_tpool_destroy(yaz_tpool_t p)
82 pthread_mutex_lock(&p->mutex);
84 pthread_cond_broadcast(&p->input_data);
85 pthread_mutex_unlock(&p->mutex);
87 for (i = 0; i < p->no_threads; i++)
88 pthread_join(p->thread_id[i], 0);
92 queue_trav(p->input_queue, p->work_destroy);
93 queue_trav(p->output_queue, p->work_destroy);
96 pthread_cond_destroy(&p->input_data);
97 pthread_mutex_destroy(&p->mutex);
98 nmem_destroy(p->nmem);
102 static void *tpool_thread_handler(void *vp)
104 yaz_tpool_t p = (yaz_tpool_t) vp;
107 struct work_item *work_this = 0;
108 /* wait for some work */
109 pthread_mutex_lock(&p->mutex);
110 while (!p->stop_flag && !p->input_queue)
111 pthread_cond_wait(&p->input_data, &p->mutex);
112 /* see if we were waken up because we're shutting down */
115 /* got something. Take the last one out of input_queue */
116 assert(p->input_queue);
117 work_this = queue_remove_last(&p->input_queue);
120 pthread_mutex_unlock(&p->mutex);
122 /* work on this item */
123 p->work_handler(work_this->data);
125 pthread_mutex_unlock(&p->mutex);
129 yaz_tpool_t yaz_tpool_create(void (*work_handler)(void *work_data),
130 void (*work_destroy)(void *work_data),
133 NMEM nmem = nmem_create();
134 yaz_tpool_t p = nmem_malloc(nmem, sizeof(*p));
138 p->no_threads = no_threads;
144 p->work_handler = work_handler;
145 p->work_destroy = work_destroy;
147 pthread_mutex_init(&p->mutex, 0);
148 pthread_cond_init(&p->input_data, 0);
150 p->thread_id = nmem_malloc(p->nmem, sizeof(*p->thread_id) * p->no_threads);
151 for (i = 0; i < p->no_threads; i++)
152 pthread_create (p->thread_id + i, 0, tpool_thread_handler, p);
159 * c-file-style: "Stroustrup"
160 * indent-tabs-mode: nil
162 * vim: shiftwidth=4 tabstop=8 expandtab