Thread pool for server
[yaz-moved-to-github.git] / src / tpool.c
1 /* This file is part of the YAZ toolkit.
2  * Copyright (C) 1995-2009 Index Data
3  * See the file LICENSE for details.
4  */
5 /**
6  * \file 
7  * \brief thread pool workers
8  */
9
10 #include <assert.h>
11 #include <yaz/nmem.h>
12 #include <yaz/tpool.h>
13 #include <pthread.h>
14
15 struct work_item {
16     void *data;
17     struct work_item *next;
18 };
19
20 struct yaz_tpool_s {
21     NMEM nmem;
22     pthread_t *thread_id;
23     pthread_mutex_t mutex;
24     pthread_cond_t input_data;
25     int stop_flag;
26     size_t no_threads;
27     struct work_item *input_queue;
28     struct work_item *output_queue;
29     struct work_item *free_queue;
30     void (*work_handler)(void *work_data);
31     void (*work_destroy)(void *work_data);
32 };
33
34 static struct work_item *queue_remove_last(struct work_item **q)
35 {
36     struct work_item **work_p = q, *work_this = 0;
37
38     while (*work_p && (*work_p)->next)
39         work_p = &(*work_p)->next;
40     if (*work_p)
41     {
42         work_this = *work_p;
43         *work_p = 0;
44     }
45     return work_this;
46 }
47
48 static void queue_trav(struct work_item *q, void (*f)(void *data))
49 {
50     for (; q; q = q->next)
51         f(q->data);
52 }
53
54 void yaz_tpool_add(yaz_tpool_t p, void *data)
55 {
56     struct work_item *work_p;
57
58     pthread_mutex_lock(&p->mutex);
59
60     if (p->free_queue)
61     {
62         work_p = p->free_queue;
63         p->free_queue = p->free_queue->next;
64     }
65     else
66         work_p = nmem_malloc(p->nmem, sizeof(*work_p));
67
68     work_p->data = data;
69     work_p->next = p->input_queue;
70     p->input_queue = work_p;
71
72     pthread_cond_signal(&p->input_data);
73     pthread_mutex_unlock(&p->mutex);
74 }
75
76 void yaz_tpool_destroy(yaz_tpool_t p)
77 {
78     if (p)
79     {
80         size_t i;
81
82         pthread_mutex_lock(&p->mutex);
83         p->stop_flag = 1;
84         pthread_cond_broadcast(&p->input_data);
85         pthread_mutex_unlock(&p->mutex);
86         
87         for (i = 0; i < p->no_threads; i++)
88             pthread_join(p->thread_id[i], 0);
89         
90         if (p->work_destroy)
91         {
92             queue_trav(p->input_queue, p->work_destroy);
93             queue_trav(p->output_queue, p->work_destroy);
94         }
95         
96         pthread_cond_destroy(&p->input_data);
97         pthread_mutex_destroy(&p->mutex);
98         nmem_destroy(p->nmem);
99     }
100 }
101
102 static void *tpool_thread_handler(void *vp)
103 {
104     yaz_tpool_t p = (yaz_tpool_t) vp;
105     while (1)
106     {
107         struct work_item *work_this = 0;
108         /* wait for some work */
109         pthread_mutex_lock(&p->mutex);
110         while (!p->stop_flag && !p->input_queue)
111             pthread_cond_wait(&p->input_data, &p->mutex);
112         /* see if we were waken up because we're shutting down */
113         if (p->stop_flag)
114             break;
115         /* got something. Take the last one out of input_queue */
116         assert(p->input_queue);
117         work_this = queue_remove_last(&p->input_queue);
118         assert(work_this);
119
120         pthread_mutex_unlock(&p->mutex);
121
122         /* work on this item */
123         p->work_handler(work_this->data);
124     }        
125     pthread_mutex_unlock(&p->mutex);
126     return 0;
127 }
128
129 yaz_tpool_t yaz_tpool_create(void (*work_handler)(void *work_data),
130                              void (*work_destroy)(void *work_data),
131                              size_t no_threads)
132 {
133     NMEM nmem = nmem_create();
134     yaz_tpool_t p = nmem_malloc(nmem, sizeof(*p));
135     size_t i;
136     p->nmem = nmem;
137     p->stop_flag = 0;
138     p->no_threads = no_threads;
139
140     p->input_queue = 0;
141     p->output_queue = 0;
142     p->free_queue = 0;
143
144     p->work_handler = work_handler;
145     p->work_destroy = work_destroy;
146
147     pthread_mutex_init(&p->mutex, 0);
148     pthread_cond_init(&p->input_data, 0);
149
150     p->thread_id = nmem_malloc(p->nmem, sizeof(*p->thread_id) * p->no_threads);
151     for (i = 0; i < p->no_threads; i++)
152         pthread_create (p->thread_id + i, 0, tpool_thread_handler, p);
153     return p;
154 }
155
156 /*
157  * Local variables:
158  * c-basic-offset: 4
159  * c-file-style: "Stroustrup"
160  * indent-tabs-mode: nil
161  * End:
162  * vim: shiftwidth=4 tabstop=8 expandtab
163  */
164