Use threading utilities
[pazpar2-moved-to-github.git] / src / sel_thread.c
1 /* This file is part of Pazpar2.
2    Copyright (C) 2006-2010 Index Data
3
4 Pazpar2 is free software; you can redistribute it and/or modify it under
5 the terms of the GNU General Public License as published by the Free
6 Software Foundation; either version 2, or (at your option) any later
7 version.
8
9 Pazpar2 is distributed in the hope that it will be useful, but WITHOUT ANY
10 WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12 for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
17
18 */
19
20 #if HAVE_CONFIG_H
21 #include <config.h>
22 #endif
23
24 #include "sel_thread.h"
25 #include <yaz/log.h>
26 #include <yaz/nmem.h>
27 #include <unistd.h>
28 #include <stdlib.h>
29 #include <yaz/thread_create.h>
30 #include <yaz/mutex.h>
31 #include <assert.h>
32
33 struct work_item {
34     void *data;
35     struct work_item *next;
36 };
37
38 static struct work_item *queue_remove_last(struct work_item **q)
39 {
40     struct work_item **work_p = q, *work_this = 0;
41
42     while (*work_p && (*work_p)->next)
43         work_p = &(*work_p)->next;
44     if (*work_p)
45     {
46         work_this = *work_p;
47         *work_p = 0;
48     }
49     return work_this;
50 }
51
52 static void queue_trav(struct work_item *q, void (*f)(void *data))
53 {
54     for (; q; q = q->next)
55         f(q->data);
56 }
57
58 struct sel_thread {
59     int fd[2];
60     NMEM nmem;
61     yaz_thread_t *thread_id;
62     YAZ_MUTEX mutex;
63     YAZ_COND input_data;
64     int stop_flag;
65     int no_threads;
66     struct work_item *input_queue;
67     struct work_item *output_queue;
68     struct work_item *free_queue;
69     void (*work_handler)(void *work_data);
70     void (*work_destroy)(void *work_data);
71 };
72
73 static int input_queue_length = 0;
74
75 static void *sel_thread_handler(void *vp)
76 {
77     sel_thread_t p = (sel_thread_t) vp;
78
79     while (1)
80     {
81         struct work_item *work_this = 0;
82         /* wait for some work */
83         yaz_mutex_enter(p->mutex);
84         while (!p->stop_flag && !p->input_queue)
85             yaz_cond_wait(p->input_data, p->mutex, 0);
86         /* see if we were waken up because we're shutting down */
87         if (p->stop_flag)
88             break;
89         /* got something. Take the last one out of input_queue */
90
91         assert(p->input_queue);
92         work_this = queue_remove_last(&p->input_queue);
93         input_queue_length--;
94         yaz_log(YLOG_DEBUG, "input queue length after pop: %d", input_queue_length);
95         assert(work_this);
96
97         yaz_mutex_leave(p->mutex);
98
99         /* work on this item */
100         p->work_handler(work_this->data);
101         
102         /* put it back into output queue */
103         yaz_mutex_enter(p->mutex);
104         work_this->next = p->output_queue;
105         p->output_queue = work_this;
106         yaz_mutex_leave(p->mutex);
107
108         /* wake up select/poll with a single byte */
109         (void) write(p->fd[1], "", 1);
110     }        
111     yaz_mutex_leave(p->mutex);
112     return 0;
113 }
114
115 sel_thread_t sel_thread_create(void (*work_handler)(void *work_data),
116                                void (*work_destroy)(void *work_data),
117                                int *read_fd, int no_of_threads)
118 {
119     int i;
120     NMEM nmem = nmem_create();
121     sel_thread_t p = nmem_malloc(nmem, sizeof(*p));
122
123     assert(work_handler);
124     /* work_destroy may be NULL */
125     assert(read_fd);
126     assert(no_of_threads >= 1);
127
128     p->nmem = nmem;
129     if (pipe(p->fd))
130     {
131         nmem_destroy(nmem);
132         return 0;
133     }
134     *read_fd = p->fd[0];
135     p->input_queue = 0;
136     p->output_queue = 0;
137     p->free_queue = 0;
138     p->work_handler = work_handler;
139     p->work_destroy = work_destroy;
140     p->no_threads = 0; /* we if need to destroy */
141     p->stop_flag = 0;
142     p->mutex = 0;
143     yaz_mutex_create(&p->mutex);
144     yaz_cond_create(&p->input_data);
145     if (p->input_data == 0) /* condition variable could not be created? */
146     {
147         sel_thread_destroy(p);
148         return 0;
149     }
150
151     p->no_threads = no_of_threads;
152     p->thread_id = nmem_malloc(nmem, sizeof(*p->thread_id) * p->no_threads);
153     for (i = 0; i < p->no_threads; i++)
154         p->thread_id[i] = yaz_thread_create(sel_thread_handler, p);
155     return p;
156 }
157
158 void sel_thread_destroy(sel_thread_t p)
159 {
160     int i;
161     yaz_mutex_enter(p->mutex);
162     p->stop_flag = 1;
163     yaz_cond_broadcast(p->input_data);
164     yaz_mutex_leave(p->mutex);
165     
166     for (i = 0; i< p->no_threads; i++)
167         yaz_thread_join(&p->thread_id[i], 0);
168
169     if (p->work_destroy)
170     {
171         queue_trav(p->input_queue, p->work_destroy);
172         queue_trav(p->output_queue, p->work_destroy);
173     }
174
175     close(p->fd[0]);
176     close(p->fd[1]);
177     yaz_cond_destroy(&p->input_data);
178     yaz_mutex_destroy(&p->mutex);
179     nmem_destroy(p->nmem);
180 }
181
182 void sel_thread_add(sel_thread_t p, void *data)
183 {
184     struct work_item *work_p;
185
186     yaz_mutex_enter(p->mutex);
187
188     if (p->free_queue)
189     {
190         work_p = p->free_queue;
191         p->free_queue = p->free_queue->next;
192     }
193     else
194         work_p = nmem_malloc(p->nmem, sizeof(*work_p));
195
196     work_p->data = data;
197     work_p->next = p->input_queue;
198     p->input_queue = work_p;
199     input_queue_length++;
200     yaz_log(YLOG_DEBUG, "sel_thread_add: Input queue length after push: %d", input_queue_length);
201     yaz_cond_signal(p->input_data);
202     yaz_mutex_leave(p->mutex);
203 }
204
205 void *sel_thread_result(sel_thread_t p)
206 {
207     struct work_item *work_this = 0;
208     void *data = 0;
209     char read_buf[1];
210
211     yaz_mutex_enter(p->mutex);
212
213     /* got something. Take the last one out of output_queue */
214     work_this = queue_remove_last(&p->output_queue);
215     if (work_this)
216     {
217         /* put freed item in free list */
218         work_this->next = p->free_queue;
219         p->free_queue = work_this;
220         
221         data = work_this->data;
222         (void) read(p->fd[0], read_buf, 1);
223     }
224     yaz_mutex_leave(p->mutex);
225     return data;
226 }
227
228 /*
229  * Local variables:
230  * c-basic-offset: 4
231  * c-file-style: "Stroustrup"
232  * indent-tabs-mode: nil
233  * End:
234  * vim: shiftwidth=4 tabstop=8 expandtab
235  */
236