Header include tweaks
[pazpar2-moved-to-github.git] / src / sel_thread.c
1 /* This file is part of Pazpar2.
2    Copyright (C) 2006-2010 Index Data
3
4 Pazpar2 is free software; you can redistribute it and/or modify it under
5 the terms of the GNU General Public License as published by the Free
6 Software Foundation; either version 2, or (at your option) any later
7 version.
8
9 Pazpar2 is distributed in the hope that it will be useful, but WITHOUT ANY
10 WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12 for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
17
18 */
19
20 #if HAVE_CONFIG_H
21 #include <config.h>
22 #endif
23
24 #include "sel_thread.h"
25 #include <yaz/log.h>
26 #include <yaz/nmem.h>
27 #if HAVE_UNISTD_H
28 #include <unistd.h>
29 #endif
30 #ifdef WIN32
31 #include <winsock2.h>
32 #endif
33 #include <stdlib.h>
34 #include <yaz/thread_create.h>
35 #include <yaz/mutex.h>
36 #include <yaz/spipe.h>
37 #include <assert.h>
38
39 struct work_item {
40     void *data;
41     struct work_item *next;
42 };
43
44 static struct work_item *queue_remove_last(struct work_item **q)
45 {
46     struct work_item **work_p = q, *work_this = 0;
47
48     while (*work_p && (*work_p)->next)
49         work_p = &(*work_p)->next;
50     if (*work_p)
51     {
52         work_this = *work_p;
53         *work_p = 0;
54     }
55     return work_this;
56 }
57
58 static void queue_trav(struct work_item *q, void (*f)(void *data))
59 {
60     for (; q; q = q->next)
61         f(q->data);
62 }
63
64 struct sel_thread {
65     int write_fd;
66     int read_fd;
67     yaz_spipe_t spipe;
68     NMEM nmem;
69     yaz_thread_t *thread_id;
70     YAZ_MUTEX mutex;
71     YAZ_COND input_data;
72     int stop_flag;
73     int no_threads;
74     struct work_item *input_queue;
75     struct work_item *output_queue;
76     struct work_item *free_queue;
77     void (*work_handler)(void *work_data);
78     void (*work_destroy)(void *work_data);
79 };
80
81 static int input_queue_length = 0;
82
83 static void *sel_thread_handler(void *vp)
84 {
85     sel_thread_t p = (sel_thread_t) vp;
86
87     while (1)
88     {
89         struct work_item *work_this = 0;
90         /* wait for some work */
91         yaz_mutex_enter(p->mutex);
92         while (!p->stop_flag && !p->input_queue)
93             yaz_cond_wait(p->input_data, p->mutex, 0);
94         /* see if we were waken up because we're shutting down */
95         if (p->stop_flag)
96             break;
97         /* got something. Take the last one out of input_queue */
98
99         assert(p->input_queue);
100         work_this = queue_remove_last(&p->input_queue);
101         input_queue_length--;
102         yaz_log(YLOG_DEBUG, "input queue length after pop: %d", input_queue_length);
103         assert(work_this);
104
105         yaz_mutex_leave(p->mutex);
106
107         /* work on this item */
108         p->work_handler(work_this->data);
109         
110         /* put it back into output queue */
111         yaz_mutex_enter(p->mutex);
112         work_this->next = p->output_queue;
113         p->output_queue = work_this;
114         yaz_mutex_leave(p->mutex);
115
116         /* wake up select/poll with a single byte */
117 #ifdef WIN32
118         (void) send(p->write_fd, "", 1, 0);
119 #else
120         (void) write(p->write_fd, "", 1);
121 #endif
122     }        
123     yaz_mutex_leave(p->mutex);
124     return 0;
125 }
126
127 sel_thread_t sel_thread_create(void (*work_handler)(void *work_data),
128                                void (*work_destroy)(void *work_data),
129                                int *read_fd, int no_of_threads)
130 {
131     int i;
132     NMEM nmem = nmem_create();
133     sel_thread_t p = nmem_malloc(nmem, sizeof(*p));
134
135     assert(work_handler);
136     /* work_destroy may be NULL */
137     assert(read_fd);
138     assert(no_of_threads >= 1);
139
140     p->nmem = nmem;
141
142 #ifdef WIN32
143     /* use port 12119 temporarily on Windos and hope for the best */
144     p->spipe = yaz_spipe_create(12119, 0);
145 #else
146     p->spipe = yaz_spipe_create(0, 0);
147 #endif
148     if (!p->spipe)
149     {
150         nmem_destroy(nmem);
151         return 0;
152     }    
153
154     *read_fd = p->read_fd = yaz_spipe_get_read_fd(p->spipe);
155     p->write_fd = yaz_spipe_get_write_fd(p->spipe);
156
157     p->input_queue = 0;
158     p->output_queue = 0;
159     p->free_queue = 0;
160     p->work_handler = work_handler;
161     p->work_destroy = work_destroy;
162     p->no_threads = 0; /* we if need to destroy */
163     p->stop_flag = 0;
164     p->mutex = 0;
165     yaz_mutex_create(&p->mutex);
166     yaz_cond_create(&p->input_data);
167     if (p->input_data == 0) /* condition variable could not be created? */
168     {
169         sel_thread_destroy(p);
170         return 0;
171     }
172
173     p->no_threads = no_of_threads;
174     p->thread_id = nmem_malloc(nmem, sizeof(*p->thread_id) * p->no_threads);
175     for (i = 0; i < p->no_threads; i++)
176         p->thread_id[i] = yaz_thread_create(sel_thread_handler, p);
177     return p;
178 }
179
180 void sel_thread_destroy(sel_thread_t p)
181 {
182     int i;
183     yaz_mutex_enter(p->mutex);
184     p->stop_flag = 1;
185     yaz_cond_broadcast(p->input_data);
186     yaz_mutex_leave(p->mutex);
187     
188     for (i = 0; i< p->no_threads; i++)
189         yaz_thread_join(&p->thread_id[i], 0);
190
191     if (p->work_destroy)
192     {
193         queue_trav(p->input_queue, p->work_destroy);
194         queue_trav(p->output_queue, p->work_destroy);
195     }
196
197     yaz_spipe_destroy(p->spipe);
198     yaz_cond_destroy(&p->input_data);
199     yaz_mutex_destroy(&p->mutex);
200     nmem_destroy(p->nmem);
201 }
202
203 void sel_thread_add(sel_thread_t p, void *data)
204 {
205     struct work_item *work_p;
206
207     yaz_mutex_enter(p->mutex);
208
209     if (p->free_queue)
210     {
211         work_p = p->free_queue;
212         p->free_queue = p->free_queue->next;
213     }
214     else
215         work_p = nmem_malloc(p->nmem, sizeof(*work_p));
216
217     work_p->data = data;
218     work_p->next = p->input_queue;
219     p->input_queue = work_p;
220     input_queue_length++;
221     yaz_log(YLOG_DEBUG, "sel_thread_add: Input queue length after push: %d", input_queue_length);
222     yaz_cond_signal(p->input_data);
223     yaz_mutex_leave(p->mutex);
224 }
225
226 void *sel_thread_result(sel_thread_t p)
227 {
228     struct work_item *work_this = 0;
229     void *data = 0;
230     char read_buf[1];
231
232     yaz_mutex_enter(p->mutex);
233
234     /* got something. Take the last one out of output_queue */
235     work_this = queue_remove_last(&p->output_queue);
236     if (work_this)
237     {
238         /* put freed item in free list */
239         work_this->next = p->free_queue;
240         p->free_queue = work_this;
241         
242         data = work_this->data;
243 #ifdef WIN32
244         (void) recv(p->read_fd, read_buf, 1, 0);
245 #else
246         (void) read(p->read_fd, read_buf, 1);
247 #endif
248     }
249     yaz_mutex_leave(p->mutex);
250     return data;
251 }
252
253 /*
254  * Local variables:
255  * c-basic-offset: 4
256  * c-file-style: "Stroustrup"
257  * indent-tabs-mode: nil
258  * End:
259  * vim: shiftwidth=4 tabstop=8 expandtab
260  */
261