Use socket pipe (spipe)
[pazpar2-moved-to-github.git] / src / sel_thread.c
1 /* This file is part of Pazpar2.
2    Copyright (C) 2006-2010 Index Data
3
4 Pazpar2 is free software; you can redistribute it and/or modify it under
5 the terms of the GNU General Public License as published by the Free
6 Software Foundation; either version 2, or (at your option) any later
7 version.
8
9 Pazpar2 is distributed in the hope that it will be useful, but WITHOUT ANY
10 WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12 for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
17
18 */
19
20 #if HAVE_CONFIG_H
21 #include <config.h>
22 #endif
23
24 #include "sel_thread.h"
25 #include <yaz/log.h>
26 #include <yaz/nmem.h>
27 #include <unistd.h>
28 #include <stdlib.h>
29 #include <yaz/thread_create.h>
30 #include <yaz/mutex.h>
31 #include <yaz/spipe.h>
32 #include <assert.h>
33
34 struct work_item {
35     void *data;
36     struct work_item *next;
37 };
38
39 static struct work_item *queue_remove_last(struct work_item **q)
40 {
41     struct work_item **work_p = q, *work_this = 0;
42
43     while (*work_p && (*work_p)->next)
44         work_p = &(*work_p)->next;
45     if (*work_p)
46     {
47         work_this = *work_p;
48         *work_p = 0;
49     }
50     return work_this;
51 }
52
53 static void queue_trav(struct work_item *q, void (*f)(void *data))
54 {
55     for (; q; q = q->next)
56         f(q->data);
57 }
58
59 struct sel_thread {
60     int write_fd;
61     int read_fd;
62     yaz_spipe_t spipe;
63     NMEM nmem;
64     yaz_thread_t *thread_id;
65     YAZ_MUTEX mutex;
66     YAZ_COND input_data;
67     int stop_flag;
68     int no_threads;
69     struct work_item *input_queue;
70     struct work_item *output_queue;
71     struct work_item *free_queue;
72     void (*work_handler)(void *work_data);
73     void (*work_destroy)(void *work_data);
74 };
75
76 static int input_queue_length = 0;
77
78 static void *sel_thread_handler(void *vp)
79 {
80     sel_thread_t p = (sel_thread_t) vp;
81
82     while (1)
83     {
84         struct work_item *work_this = 0;
85         /* wait for some work */
86         yaz_mutex_enter(p->mutex);
87         while (!p->stop_flag && !p->input_queue)
88             yaz_cond_wait(p->input_data, p->mutex, 0);
89         /* see if we were waken up because we're shutting down */
90         if (p->stop_flag)
91             break;
92         /* got something. Take the last one out of input_queue */
93
94         assert(p->input_queue);
95         work_this = queue_remove_last(&p->input_queue);
96         input_queue_length--;
97         yaz_log(YLOG_DEBUG, "input queue length after pop: %d", input_queue_length);
98         assert(work_this);
99
100         yaz_mutex_leave(p->mutex);
101
102         /* work on this item */
103         p->work_handler(work_this->data);
104         
105         /* put it back into output queue */
106         yaz_mutex_enter(p->mutex);
107         work_this->next = p->output_queue;
108         p->output_queue = work_this;
109         yaz_mutex_leave(p->mutex);
110
111         /* wake up select/poll with a single byte */
112 #ifdef WIN32
113         (void) send(p->write_fd, "", 1, 0);
114 #else
115         (void) write(p->write_fd, "", 1);
116 #endif
117     }        
118     yaz_mutex_leave(p->mutex);
119     return 0;
120 }
121
122 sel_thread_t sel_thread_create(void (*work_handler)(void *work_data),
123                                void (*work_destroy)(void *work_data),
124                                int *read_fd, int no_of_threads)
125 {
126     int i;
127     NMEM nmem = nmem_create();
128     sel_thread_t p = nmem_malloc(nmem, sizeof(*p));
129
130     assert(work_handler);
131     /* work_destroy may be NULL */
132     assert(read_fd);
133     assert(no_of_threads >= 1);
134
135     p->nmem = nmem;
136
137 #ifdef WIN32
138     /* use port 12119 temporarily on Windos and hope for the best */
139     p->spipe = yaz_spipe_create(12119, 0);
140 #else
141     p->spipe = yaz_spipe_create(0, 0);
142 #endif
143     if (!p->spipe)
144     {
145         nmem_destroy(nmem);
146         return 0;
147     }    
148
149     *read_fd = p->read_fd = yaz_spipe_get_read_fd(p->spipe);
150     p->write_fd = yaz_spipe_get_write_fd(p->spipe);
151
152     p->input_queue = 0;
153     p->output_queue = 0;
154     p->free_queue = 0;
155     p->work_handler = work_handler;
156     p->work_destroy = work_destroy;
157     p->no_threads = 0; /* we if need to destroy */
158     p->stop_flag = 0;
159     p->mutex = 0;
160     yaz_mutex_create(&p->mutex);
161     yaz_cond_create(&p->input_data);
162     if (p->input_data == 0) /* condition variable could not be created? */
163     {
164         sel_thread_destroy(p);
165         return 0;
166     }
167
168     p->no_threads = no_of_threads;
169     p->thread_id = nmem_malloc(nmem, sizeof(*p->thread_id) * p->no_threads);
170     for (i = 0; i < p->no_threads; i++)
171         p->thread_id[i] = yaz_thread_create(sel_thread_handler, p);
172     return p;
173 }
174
175 void sel_thread_destroy(sel_thread_t p)
176 {
177     int i;
178     yaz_mutex_enter(p->mutex);
179     p->stop_flag = 1;
180     yaz_cond_broadcast(p->input_data);
181     yaz_mutex_leave(p->mutex);
182     
183     for (i = 0; i< p->no_threads; i++)
184         yaz_thread_join(&p->thread_id[i], 0);
185
186     if (p->work_destroy)
187     {
188         queue_trav(p->input_queue, p->work_destroy);
189         queue_trav(p->output_queue, p->work_destroy);
190     }
191
192     yaz_spipe_destroy(p->spipe);
193     yaz_cond_destroy(&p->input_data);
194     yaz_mutex_destroy(&p->mutex);
195     nmem_destroy(p->nmem);
196 }
197
198 void sel_thread_add(sel_thread_t p, void *data)
199 {
200     struct work_item *work_p;
201
202     yaz_mutex_enter(p->mutex);
203
204     if (p->free_queue)
205     {
206         work_p = p->free_queue;
207         p->free_queue = p->free_queue->next;
208     }
209     else
210         work_p = nmem_malloc(p->nmem, sizeof(*work_p));
211
212     work_p->data = data;
213     work_p->next = p->input_queue;
214     p->input_queue = work_p;
215     input_queue_length++;
216     yaz_log(YLOG_DEBUG, "sel_thread_add: Input queue length after push: %d", input_queue_length);
217     yaz_cond_signal(p->input_data);
218     yaz_mutex_leave(p->mutex);
219 }
220
221 void *sel_thread_result(sel_thread_t p)
222 {
223     struct work_item *work_this = 0;
224     void *data = 0;
225     char read_buf[1];
226
227     yaz_mutex_enter(p->mutex);
228
229     /* got something. Take the last one out of output_queue */
230     work_this = queue_remove_last(&p->output_queue);
231     if (work_this)
232     {
233         /* put freed item in free list */
234         work_this->next = p->free_queue;
235         p->free_queue = work_this;
236         
237         data = work_this->data;
238 #ifdef WIN32
239         (void) recv(p->read_fd, read_buf, 1, 0);
240 #else
241         (void) read(p->read_fd, read_buf, 1);
242 #endif
243     }
244     yaz_mutex_leave(p->mutex);
245     return data;
246 }
247
248 /*
249  * Local variables:
250  * c-basic-offset: 4
251  * c-file-style: "Stroustrup"
252  * indent-tabs-mode: nil
253  * End:
254  * vim: shiftwidth=4 tabstop=8 expandtab
255  */
256