Select thread system now passes a simple test using IOCHANSs.
[pazpar2-moved-to-github.git] / src / sel_thread.c
1 /* $Id: sel_thread.c,v 1.3 2007-04-20 11:44:58 adam Exp $
2    Copyright (c) 2006-2007, Index Data.
3
4 This file is part of Pazpar2.
5
6 Pazpar2 is free software; you can redistribute it and/or modify it under
7 the terms of the GNU General Public License as published by the Free
8 Software Foundation; either version 2, or (at your option) any later
9 version.
10
11 Pazpar2 is distributed in the hope that it will be useful, but WITHOUT ANY
12 WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14 for more details.
15
16 You should have received a copy of the GNU General Public License
17 along with Pazpar2; see the file LICENSE.  If not, write to the
18 Free Software Foundation, 59 Temple Place - Suite 330, Boston, MA
19 02111-1307, USA.
20  */
21
22 #if HAVE_CONFIG_H
23 #include "cconfig.h"
24 #endif
25
26 #include "sel_thread.h"
27 #include <yaz/log.h>
28 #include <yaz/nmem.h>
29 #include <unistd.h>
30 #include <stdlib.h>
31 #include <pthread.h>
32 #include <assert.h>
33
34 struct work_item {
35     void *data;
36     struct work_item *next;
37 };
38
39 static struct work_item *queue_remove_last(struct work_item **q)
40 {
41     struct work_item **work_p = q, *work_this = 0;
42
43     while (*work_p && (*work_p)->next)
44         work_p = &(*work_p)->next;
45     if (*work_p)
46     {
47         work_this = *work_p;
48         *work_p = 0;
49     }
50     return work_this;
51 }
52
53 struct sel_thread {
54     int fd[2];
55     NMEM nmem;
56     pthread_t thread_id;
57     pthread_mutex_t mutex;
58     pthread_cond_t input_data;
59     int stop_flag;
60     struct work_item *input_queue;
61     struct work_item *output_queue;
62     struct work_item *free_queue;
63     void (*work_handler)(void *work_data);;
64 };
65
66 static void *sel_thread_handler(void *vp)
67 {
68     sel_thread_t p = (sel_thread_t) vp;
69
70     while(1)
71     {
72         struct work_item *work_this = 0;
73         /* wait for some work */
74         pthread_mutex_lock(&p->mutex);
75         while (!p->stop_flag && !p->input_queue)
76             pthread_cond_wait(&p->input_data, &p->mutex);
77         /* see if we were waken up because we're shutting down */
78         if (p->stop_flag)
79             break;
80         /* got something. Take the last one out of input_queue */
81
82         assert(p->input_queue);
83         work_this = queue_remove_last(&p->input_queue);
84         assert(work_this);
85
86         pthread_mutex_unlock(&p->mutex);
87
88         /* work on this item */
89         p->work_handler(work_this->data);
90         
91         /* put it back into output queue */
92         pthread_mutex_lock(&p->mutex);
93         work_this->next = p->output_queue;
94         p->output_queue = work_this;
95         pthread_mutex_unlock(&p->mutex);
96
97         /* wake up select/poll with a single byte */
98         write(p->fd[1], "", 1);
99     }        
100     pthread_mutex_unlock(&p->mutex);
101     return 0;
102 }
103
104 sel_thread_t sel_thread_create(void (*work_handler)(void *work_data),
105                                int *read_fd)
106 {
107     NMEM nmem = nmem_create();
108     sel_thread_t p = nmem_malloc(nmem, sizeof(*p));
109
110     p->nmem = nmem;
111     if (pipe(p->fd))
112     {
113         nmem_destroy(nmem);
114         return 0;
115     }
116     *read_fd = p->fd[0];
117     p->input_queue = 0;
118     p->output_queue = 0;
119     p->free_queue = 0;
120     p->work_handler = work_handler;
121
122     p->stop_flag = 0;
123     pthread_mutex_init(&p->mutex, 0);
124     pthread_cond_init(&p->input_data, 0);
125     pthread_create (&p->thread_id, 0, sel_thread_handler, p);
126     return p;
127 }
128
129 void sel_thread_destroy(sel_thread_t p)
130 {
131     pthread_mutex_lock(&p->mutex);
132     p->stop_flag = 1;
133     pthread_cond_broadcast(&p->input_data);
134     pthread_mutex_unlock(&p->mutex);
135     
136     pthread_join(p->thread_id, 0);
137
138     close(p->fd[0]);
139     close(p->fd[1]);
140     pthread_cond_destroy(&p->input_data);
141     pthread_mutex_destroy(&p->mutex);
142     nmem_destroy(p->nmem);
143 }
144
145 void sel_thread_add(sel_thread_t p, void *data)
146 {
147     struct work_item *work_p;
148
149     pthread_mutex_lock(&p->mutex);
150
151     if (p->free_queue)
152     {
153         work_p = p->free_queue;
154         p->free_queue = p->free_queue->next;
155     }
156     else
157         work_p = nmem_malloc(p->nmem, sizeof(*work_p));
158
159     work_p->data = data;
160     work_p->next = p->input_queue;
161     p->input_queue = work_p;
162
163     pthread_cond_signal(&p->input_data);
164     pthread_mutex_unlock(&p->mutex);
165 }
166
167 void *sel_thread_result(sel_thread_t p)
168 {
169     struct work_item *work_this = 0;
170     void *data = 0;
171     char read_buf[1];
172
173     pthread_mutex_lock(&p->mutex);
174
175     /* got something. Take the last one out of output_queue */
176     work_this = queue_remove_last(&p->output_queue);
177     if (work_this)
178     {
179         /* put freed item in free list */
180         work_this->next = p->free_queue;
181         p->free_queue = work_this;
182         
183         data = work_this->data;
184         read(p->fd[0], read_buf, 1);
185     }
186     pthread_mutex_unlock(&p->mutex);
187     return data;
188 }
189
190 /*
191  * Local variables:
192  * c-basic-offset: 4
193  * indent-tabs-mode: nil
194  * End:
195  * vim: shiftwidth=4 tabstop=8 expandtab
196  */