c7bfacc5835271aef5282c1cee8b25641b4ec38b
[pazpar2-moved-to-github.git] / src / sel_thread.c
1 /* $Id: sel_thread.c,v 1.4 2007-04-23 08:06:21 adam Exp $
2    Copyright (c) 2006-2007, Index Data.
3
4 This file is part of Pazpar2.
5
6 Pazpar2 is free software; you can redistribute it and/or modify it under
7 the terms of the GNU General Public License as published by the Free
8 Software Foundation; either version 2, or (at your option) any later
9 version.
10
11 Pazpar2 is distributed in the hope that it will be useful, but WITHOUT ANY
12 WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14 for more details.
15
16 You should have received a copy of the GNU General Public License
17 along with Pazpar2; see the file LICENSE.  If not, write to the
18 Free Software Foundation, 59 Temple Place - Suite 330, Boston, MA
19 02111-1307, USA.
20  */
21
22 #if HAVE_CONFIG_H
23 #include "cconfig.h"
24 #endif
25
26 #include "sel_thread.h"
27 #include <yaz/log.h>
28 #include <yaz/nmem.h>
29 #include <unistd.h>
30 #include <stdlib.h>
31 #include <pthread.h>
32 #include <assert.h>
33
34 struct work_item {
35     void *data;
36     struct work_item *next;
37 };
38
39 static struct work_item *queue_remove_last(struct work_item **q)
40 {
41     struct work_item **work_p = q, *work_this = 0;
42
43     while (*work_p && (*work_p)->next)
44         work_p = &(*work_p)->next;
45     if (*work_p)
46     {
47         work_this = *work_p;
48         *work_p = 0;
49     }
50     return work_this;
51 }
52
53 static void queue_trav(struct work_item *q, void (*f)(void *data))
54 {
55     for (; q; q = q->next)
56         f(q->data);
57 }
58
59 struct sel_thread {
60     int fd[2];
61     NMEM nmem;
62     pthread_t *thread_id;
63     pthread_mutex_t mutex;
64     pthread_cond_t input_data;
65     int stop_flag;
66     int no_threads;
67     struct work_item *input_queue;
68     struct work_item *output_queue;
69     struct work_item *free_queue;
70     void (*work_handler)(void *work_data);
71     void (*work_destroy)(void *work_data);
72 };
73
74 static void *sel_thread_handler(void *vp)
75 {
76     sel_thread_t p = (sel_thread_t) vp;
77
78     while(1)
79     {
80         struct work_item *work_this = 0;
81         /* wait for some work */
82         pthread_mutex_lock(&p->mutex);
83         while (!p->stop_flag && !p->input_queue)
84             pthread_cond_wait(&p->input_data, &p->mutex);
85         /* see if we were waken up because we're shutting down */
86         if (p->stop_flag)
87             break;
88         /* got something. Take the last one out of input_queue */
89
90         assert(p->input_queue);
91         work_this = queue_remove_last(&p->input_queue);
92         assert(work_this);
93
94         pthread_mutex_unlock(&p->mutex);
95
96         /* work on this item */
97         p->work_handler(work_this->data);
98         
99         /* put it back into output queue */
100         pthread_mutex_lock(&p->mutex);
101         work_this->next = p->output_queue;
102         p->output_queue = work_this;
103         pthread_mutex_unlock(&p->mutex);
104
105         /* wake up select/poll with a single byte */
106         write(p->fd[1], "", 1);
107     }        
108     pthread_mutex_unlock(&p->mutex);
109     return 0;
110 }
111
112 sel_thread_t sel_thread_create(void (*work_handler)(void *work_data),
113                                void (*work_destroy)(void *work_data),
114                                int *read_fd, int no_of_threads)
115 {
116     int i;
117     NMEM nmem = nmem_create();
118     sel_thread_t p = nmem_malloc(nmem, sizeof(*p));
119
120     assert(work_handler);
121     /* work_destroy may be NULL */
122     assert(read_fd);
123     assert(no_of_threads >= 1);
124
125     p->nmem = nmem;
126     if (pipe(p->fd))
127     {
128         nmem_destroy(nmem);
129         return 0;
130     }
131     *read_fd = p->fd[0];
132     p->input_queue = 0;
133     p->output_queue = 0;
134     p->free_queue = 0;
135     p->work_handler = work_handler;
136     p->work_destroy = work_destroy;
137
138     p->stop_flag = 0;
139     p->no_threads = no_of_threads;
140     pthread_mutex_init(&p->mutex, 0);
141     pthread_cond_init(&p->input_data, 0);
142
143     p->thread_id = nmem_malloc(nmem, sizeof(*p->thread_id) * p->no_threads);
144     for (i = 0; i < p->no_threads; i++)
145         pthread_create (p->thread_id + i, 0, sel_thread_handler, p);
146     return p;
147 }
148
149 void sel_thread_destroy(sel_thread_t p)
150 {
151     int i;
152     pthread_mutex_lock(&p->mutex);
153     p->stop_flag = 1;
154     pthread_cond_broadcast(&p->input_data);
155     pthread_mutex_unlock(&p->mutex);
156     
157     for (i = 0; i< p->no_threads; i++)
158         pthread_join(p->thread_id[i], 0);
159
160     if (p->work_destroy)
161     {
162         queue_trav(p->input_queue, p->work_destroy);
163         queue_trav(p->output_queue, p->work_destroy);
164     }
165
166     close(p->fd[0]);
167     close(p->fd[1]);
168     pthread_cond_destroy(&p->input_data);
169     pthread_mutex_destroy(&p->mutex);
170     nmem_destroy(p->nmem);
171 }
172
173 void sel_thread_add(sel_thread_t p, void *data)
174 {
175     struct work_item *work_p;
176
177     pthread_mutex_lock(&p->mutex);
178
179     if (p->free_queue)
180     {
181         work_p = p->free_queue;
182         p->free_queue = p->free_queue->next;
183     }
184     else
185         work_p = nmem_malloc(p->nmem, sizeof(*work_p));
186
187     work_p->data = data;
188     work_p->next = p->input_queue;
189     p->input_queue = work_p;
190
191     pthread_cond_signal(&p->input_data);
192     pthread_mutex_unlock(&p->mutex);
193 }
194
195 void *sel_thread_result(sel_thread_t p)
196 {
197     struct work_item *work_this = 0;
198     void *data = 0;
199     char read_buf[1];
200
201     pthread_mutex_lock(&p->mutex);
202
203     /* got something. Take the last one out of output_queue */
204     work_this = queue_remove_last(&p->output_queue);
205     if (work_this)
206     {
207         /* put freed item in free list */
208         work_this->next = p->free_queue;
209         p->free_queue = work_this;
210         
211         data = work_this->data;
212         read(p->fd[0], read_buf, 1);
213     }
214     pthread_mutex_unlock(&p->mutex);
215     return data;
216 }
217
218 /*
219  * Local variables:
220  * c-basic-offset: 4
221  * indent-tabs-mode: nil
222  * End:
223  * vim: shiftwidth=4 tabstop=8 expandtab
224  */