sel_thread_result reads from pipe
[pazpar2-moved-to-github.git] / src / sel_thread.c
1 /* $Id: sel_thread.c,v 1.2 2007-04-20 10:15:19 adam Exp $
2    Copyright (c) 2006-2007, Index Data.
3
4 This file is part of Pazpar2.
5
6 Pazpar2 is free software; you can redistribute it and/or modify it under
7 the terms of the GNU General Public License as published by the Free
8 Software Foundation; either version 2, or (at your option) any later
9 version.
10
11 Pazpar2 is distributed in the hope that it will be useful, but WITHOUT ANY
12 WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14 for more details.
15
16 You should have received a copy of the GNU General Public License
17 along with Pazpar2; see the file LICENSE.  If not, write to the
18 Free Software Foundation, 59 Temple Place - Suite 330, Boston, MA
19 02111-1307, USA.
20  */
21
22 #if HAVE_CONFIG_H
23 #include "cconfig.h"
24 #endif
25
26 #include "sel_thread.h"
27 #include <yaz/log.h>
28 #include <yaz/nmem.h>
29 #include <unistd.h>
30 #include <stdlib.h>
31 #include <pthread.h>
32
33 struct work_item {
34     void *data;
35     struct work_item *next;
36 };
37
38 struct sel_thread {
39     int fd[2];
40     NMEM nmem;
41     pthread_t thread_id;
42     pthread_mutex_t mutex;
43     pthread_cond_t input_data;
44     int stop_flag;
45     struct work_item *input_queue;
46     struct work_item *output_queue;
47     struct work_item *free_queue;
48     void (*work_handler)(void *work_data);;
49 };
50
51 static void *sel_thread_handler(void *vp)
52 {
53     sel_thread_t p = (sel_thread_t) vp;
54
55     while(1)
56     {
57         struct work_item **work_p, *work_this;
58         /* wait for some work */
59         pthread_mutex_lock(&p->mutex);
60         while (!p->stop_flag && !p->input_queue)
61             pthread_cond_wait(&p->input_data, &p->mutex);
62         /* see if we were waken up because we're shutting down */
63         if (p->stop_flag)
64             break;
65         /* got something. Take the last one out of input_queue */
66         work_p = &p->input_queue;
67         while ((*work_p)->next)
68             work_p = &(*work_p)->next;
69         work_this = *work_p;
70         *work_p = 0;
71         pthread_mutex_unlock(&p->mutex);
72
73         /* work on this item */
74         p->work_handler(work_this->data);
75
76         /* put it back into output queue */
77         pthread_mutex_lock(&p->mutex);
78         work_this->next = p->output_queue;
79         p->output_queue = work_this;
80         pthread_mutex_unlock(&p->mutex);
81
82         /* wake up select/poll with a single byte */
83         write(p->fd[1], "", 1);
84     }        
85     pthread_mutex_unlock(&p->mutex);
86     return 0;
87 }
88
89 sel_thread_t sel_thread_create(void (*work_handler)(void *work_data),
90                                int *read_fd)
91 {
92     NMEM nmem = nmem_create();
93     sel_thread_t p = nmem_malloc(nmem, sizeof(*p));
94
95     p->nmem = nmem;
96     if (pipe(p->fd))
97     {
98         nmem_destroy(nmem);
99         return 0;
100     }
101     *read_fd = p->fd[0];
102     p->input_queue = 0;
103     p->output_queue = 0;
104     p->free_queue = 0;
105     p->work_handler = work_handler;
106
107     p->stop_flag = 0;
108     pthread_mutex_init(&p->mutex, 0);
109     pthread_cond_init(&p->input_data, 0);
110     pthread_create (&p->thread_id, 0, sel_thread_handler, p);
111     return p;
112 }
113
114 void sel_thread_destroy(sel_thread_t p)
115 {
116     pthread_mutex_lock(&p->mutex);
117
118     p->stop_flag = 1;
119     pthread_cond_broadcast(&p->input_data);
120     pthread_mutex_unlock(&p->mutex);
121     
122     pthread_join(p->thread_id, 0);
123
124     close(p->fd[0]);
125     close(p->fd[1]);
126     pthread_cond_destroy(&p->input_data);
127     pthread_mutex_destroy(&p->mutex);
128     nmem_destroy(p->nmem);
129 }
130
131 void sel_thread_add(sel_thread_t p, void *data)
132 {
133     struct work_item *work_p;
134
135     pthread_mutex_lock(&p->mutex);
136     work_p = p->free_queue;
137     if (!work_p)
138         work_p = nmem_malloc(p->nmem, sizeof(*work_p));
139
140     work_p->data = data;
141     work_p->next = p->input_queue;
142     p->input_queue = work_p;
143     pthread_mutex_unlock(&p->mutex);
144 }
145
146 void *sel_thread_result(sel_thread_t p)
147 {
148     struct work_item **work_p, *work_this;
149     void *data;
150     char read_buf[1];
151
152     /* got something. Take the last one out of output_queue */
153     work_p = &p->output_queue;
154     if (!*work_p)
155         return 0;
156
157     read(p->fd[0], read_buf, 1);
158
159     while ((*work_p)->next)
160         work_p = &(*work_p)->next;
161     work_this = *work_p;
162     *work_p = 0;
163
164     /* put freed item in free list */
165     work_this->next = p->free_queue;
166     p->free_queue = work_this;
167
168     data = work_this->data;
169     pthread_mutex_unlock(&p->mutex);
170
171     return data;
172 }
173
174 /*
175  * Local variables:
176  * c-basic-offset: 4
177  * indent-tabs-mode: nil
178  * End:
179  * vim: shiftwidth=4 tabstop=8 expandtab
180  */