1 /* $Id: sel_thread.c,v 1.3 2007-04-20 11:44:58 adam Exp $
2 Copyright (c) 2006-2007, Index Data.
4 This file is part of Pazpar2.
6 Pazpar2 is free software; you can redistribute it and/or modify it under
7 the terms of the GNU General Public License as published by the Free
8 Software Foundation; either version 2, or (at your option) any later
11 Pazpar2 is distributed in the hope that it will be useful, but WITHOUT ANY
12 WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
16 You should have received a copy of the GNU General Public License
17 along with Pazpar2; see the file LICENSE. If not, write to the
18 Free Software Foundation, 59 Temple Place - Suite 330, Boston, MA
26 #include "sel_thread.h"
36 struct work_item *next;
39 static struct work_item *queue_remove_last(struct work_item **q)
41 struct work_item **work_p = q, *work_this = 0;
43 while (*work_p && (*work_p)->next)
44 work_p = &(*work_p)->next;
57 pthread_mutex_t mutex;
58 pthread_cond_t input_data;
60 struct work_item *input_queue;
61 struct work_item *output_queue;
62 struct work_item *free_queue;
63 void (*work_handler)(void *work_data);;
66 static void *sel_thread_handler(void *vp)
68 sel_thread_t p = (sel_thread_t) vp;
72 struct work_item *work_this = 0;
73 /* wait for some work */
74 pthread_mutex_lock(&p->mutex);
75 while (!p->stop_flag && !p->input_queue)
76 pthread_cond_wait(&p->input_data, &p->mutex);
77 /* see if we were waken up because we're shutting down */
80 /* got something. Take the last one out of input_queue */
82 assert(p->input_queue);
83 work_this = queue_remove_last(&p->input_queue);
86 pthread_mutex_unlock(&p->mutex);
88 /* work on this item */
89 p->work_handler(work_this->data);
91 /* put it back into output queue */
92 pthread_mutex_lock(&p->mutex);
93 work_this->next = p->output_queue;
94 p->output_queue = work_this;
95 pthread_mutex_unlock(&p->mutex);
97 /* wake up select/poll with a single byte */
98 write(p->fd[1], "", 1);
100 pthread_mutex_unlock(&p->mutex);
104 sel_thread_t sel_thread_create(void (*work_handler)(void *work_data),
107 NMEM nmem = nmem_create();
108 sel_thread_t p = nmem_malloc(nmem, sizeof(*p));
120 p->work_handler = work_handler;
123 pthread_mutex_init(&p->mutex, 0);
124 pthread_cond_init(&p->input_data, 0);
125 pthread_create (&p->thread_id, 0, sel_thread_handler, p);
129 void sel_thread_destroy(sel_thread_t p)
131 pthread_mutex_lock(&p->mutex);
133 pthread_cond_broadcast(&p->input_data);
134 pthread_mutex_unlock(&p->mutex);
136 pthread_join(p->thread_id, 0);
140 pthread_cond_destroy(&p->input_data);
141 pthread_mutex_destroy(&p->mutex);
142 nmem_destroy(p->nmem);
145 void sel_thread_add(sel_thread_t p, void *data)
147 struct work_item *work_p;
149 pthread_mutex_lock(&p->mutex);
153 work_p = p->free_queue;
154 p->free_queue = p->free_queue->next;
157 work_p = nmem_malloc(p->nmem, sizeof(*work_p));
160 work_p->next = p->input_queue;
161 p->input_queue = work_p;
163 pthread_cond_signal(&p->input_data);
164 pthread_mutex_unlock(&p->mutex);
167 void *sel_thread_result(sel_thread_t p)
169 struct work_item *work_this = 0;
173 pthread_mutex_lock(&p->mutex);
175 /* got something. Take the last one out of output_queue */
176 work_this = queue_remove_last(&p->output_queue);
179 /* put freed item in free list */
180 work_this->next = p->free_queue;
181 p->free_queue = work_this;
183 data = work_this->data;
184 read(p->fd[0], read_buf, 1);
186 pthread_mutex_unlock(&p->mutex);
193 * indent-tabs-mode: nil
195 * vim: shiftwidth=4 tabstop=8 expandtab