Added work queue length logging
[pazpar2-moved-to-github.git] / src / sel_thread.c
1 /* This file is part of Pazpar2.
2    Copyright (C) 2006-2010 Index Data
3
4 Pazpar2 is free software; you can redistribute it and/or modify it under
5 the terms of the GNU General Public License as published by the Free
6 Software Foundation; either version 2, or (at your option) any later
7 version.
8
9 Pazpar2 is distributed in the hope that it will be useful, but WITHOUT ANY
10 WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12 for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
17
18 */
19
20 #if HAVE_CONFIG_H
21 #include <config.h>
22 #endif
23
24 #include "sel_thread.h"
25 #include <yaz/log.h>
26 #include <yaz/nmem.h>
27 #include <unistd.h>
28 #include <stdlib.h>
29 #include <pthread.h>
30 #include <assert.h>
31
32 struct work_item {
33     void *data;
34     struct work_item *next;
35 };
36
37 static struct work_item *queue_remove_last(struct work_item **q)
38 {
39     struct work_item **work_p = q, *work_this = 0;
40
41     while (*work_p && (*work_p)->next)
42         work_p = &(*work_p)->next;
43     if (*work_p)
44     {
45         work_this = *work_p;
46         *work_p = 0;
47     }
48     return work_this;
49 }
50
51 static void queue_trav(struct work_item *q, void (*f)(void *data))
52 {
53     for (; q; q = q->next)
54         f(q->data);
55 }
56
57 struct sel_thread {
58     int fd[2];
59     NMEM nmem;
60     pthread_t *thread_id;
61     pthread_mutex_t mutex;
62     pthread_cond_t input_data;
63     int stop_flag;
64     int no_threads;
65     struct work_item *input_queue;
66     struct work_item *output_queue;
67     struct work_item *free_queue;
68     void (*work_handler)(void *work_data);
69     void (*work_destroy)(void *work_data);
70 };
71
72 static int input_queue_length = 0;
73
74 static void *sel_thread_handler(void *vp)
75 {
76     sel_thread_t p = (sel_thread_t) vp;
77
78     while (1)
79     {
80         struct work_item *work_this = 0;
81         /* wait for some work */
82         pthread_mutex_lock(&p->mutex);
83         while (!p->stop_flag && !p->input_queue)
84             pthread_cond_wait(&p->input_data, &p->mutex);
85         /* see if we were waken up because we're shutting down */
86         if (p->stop_flag)
87             break;
88         /* got something. Take the last one out of input_queue */
89
90         assert(p->input_queue);
91         work_this = queue_remove_last(&p->input_queue);
92         input_queue_length--;
93         yaz_log(YLOG_DEBUG, "input queue length after pop: %d", input_queue_length);
94         assert(work_this);
95
96         pthread_mutex_unlock(&p->mutex);
97
98         /* work on this item */
99         p->work_handler(work_this->data);
100         
101         /* put it back into output queue */
102         pthread_mutex_lock(&p->mutex);
103         work_this->next = p->output_queue;
104         p->output_queue = work_this;
105         pthread_mutex_unlock(&p->mutex);
106
107         /* wake up select/poll with a single byte */
108         (void) write(p->fd[1], "", 1);
109     }        
110     pthread_mutex_unlock(&p->mutex);
111     return 0;
112 }
113
114 sel_thread_t sel_thread_create(void (*work_handler)(void *work_data),
115                                void (*work_destroy)(void *work_data),
116                                int *read_fd, int no_of_threads)
117 {
118     int i;
119     NMEM nmem = nmem_create();
120     sel_thread_t p = nmem_malloc(nmem, sizeof(*p));
121
122     assert(work_handler);
123     /* work_destroy may be NULL */
124     assert(read_fd);
125     assert(no_of_threads >= 1);
126
127     p->nmem = nmem;
128     if (pipe(p->fd))
129     {
130         nmem_destroy(nmem);
131         return 0;
132     }
133     *read_fd = p->fd[0];
134     p->input_queue = 0;
135     p->output_queue = 0;
136     p->free_queue = 0;
137     p->work_handler = work_handler;
138     p->work_destroy = work_destroy;
139
140     p->stop_flag = 0;
141     p->no_threads = no_of_threads;
142     pthread_mutex_init(&p->mutex, 0);
143     pthread_cond_init(&p->input_data, 0);
144
145     p->thread_id = nmem_malloc(nmem, sizeof(*p->thread_id) * p->no_threads);
146     for (i = 0; i < p->no_threads; i++)
147         pthread_create(p->thread_id + i, 0, sel_thread_handler, p);
148     return p;
149 }
150
151 void sel_thread_destroy(sel_thread_t p)
152 {
153     int i;
154     pthread_mutex_lock(&p->mutex);
155     p->stop_flag = 1;
156     pthread_cond_broadcast(&p->input_data);
157     pthread_mutex_unlock(&p->mutex);
158     
159     for (i = 0; i< p->no_threads; i++)
160         pthread_join(p->thread_id[i], 0);
161
162     if (p->work_destroy)
163     {
164         queue_trav(p->input_queue, p->work_destroy);
165         queue_trav(p->output_queue, p->work_destroy);
166     }
167
168     close(p->fd[0]);
169     close(p->fd[1]);
170     pthread_cond_destroy(&p->input_data);
171     pthread_mutex_destroy(&p->mutex);
172     nmem_destroy(p->nmem);
173 }
174
175 void sel_thread_add(sel_thread_t p, void *data)
176 {
177     struct work_item *work_p;
178
179     pthread_mutex_lock(&p->mutex);
180
181     if (p->free_queue)
182     {
183         work_p = p->free_queue;
184         p->free_queue = p->free_queue->next;
185     }
186     else
187         work_p = nmem_malloc(p->nmem, sizeof(*work_p));
188
189     work_p->data = data;
190     work_p->next = p->input_queue;
191     p->input_queue = work_p;
192     input_queue_length++;
193     yaz_log(YLOG_DEBUG, "sel_thread_add: Input queue length after push: %d", input_queue_length);
194     pthread_cond_signal(&p->input_data);
195     pthread_mutex_unlock(&p->mutex);
196 }
197
198 void *sel_thread_result(sel_thread_t p)
199 {
200     struct work_item *work_this = 0;
201     void *data = 0;
202     char read_buf[1];
203
204     pthread_mutex_lock(&p->mutex);
205
206     /* got something. Take the last one out of output_queue */
207     work_this = queue_remove_last(&p->output_queue);
208     if (work_this)
209     {
210         /* put freed item in free list */
211         work_this->next = p->free_queue;
212         p->free_queue = work_this;
213         
214         data = work_this->data;
215         (void) read(p->fd[0], read_buf, 1);
216     }
217     pthread_mutex_unlock(&p->mutex);
218     return data;
219 }
220
221 /*
222  * Local variables:
223  * c-basic-offset: 4
224  * c-file-style: "Stroustrup"
225  * indent-tabs-mode: nil
226  * End:
227  * vim: shiftwidth=4 tabstop=8 expandtab
228  */
229