projects
/
pazpar2-moved-to-github.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
Merge branch 'master' of ssh://git.indexdata.com/home/git/pub/pazpar2
[pazpar2-moved-to-github.git]
/
src
/
sel_thread.c
diff --git
a/src/sel_thread.c
b/src/sel_thread.c
index
669c0b3
..
1e13d24
100644
(file)
--- a/
src/sel_thread.c
+++ b/
src/sel_thread.c
@@
-1,5
+1,5
@@
/* This file is part of Pazpar2.
/* This file is part of Pazpar2.
- Copyright (C) 2006-2008 Index Data
+ Copyright (C) 2006-2010 Index Data
Pazpar2 is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free
Pazpar2 is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free
@@
-18,7
+18,7
@@
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*/
#if HAVE_CONFIG_H
*/
#if HAVE_CONFIG_H
-#include "cconfig.h"
+#include <config.h>
#endif
#include "sel_thread.h"
#endif
#include "sel_thread.h"
@@
-26,7
+26,8
@@
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
#include <yaz/nmem.h>
#include <unistd.h>
#include <stdlib.h>
#include <yaz/nmem.h>
#include <unistd.h>
#include <stdlib.h>
-#include <pthread.h>
+#include <yaz/thread_create.h>
+#include <yaz/mutex.h>
#include <assert.h>
struct work_item {
#include <assert.h>
struct work_item {
@@
-57,9
+58,9
@@
static void queue_trav(struct work_item *q, void (*f)(void *data))
struct sel_thread {
int fd[2];
NMEM nmem;
struct sel_thread {
int fd[2];
NMEM nmem;
- pthread_t *thread_id;
- pthread_mutex_t mutex;
- pthread_cond_t input_data;
+ yaz_thread_t *thread_id;
+ YAZ_MUTEX mutex;
+ YAZ_COND input_data;
int stop_flag;
int no_threads;
struct work_item *input_queue;
int stop_flag;
int no_threads;
struct work_item *input_queue;
@@
-69,17
+70,19
@@
struct sel_thread {
void (*work_destroy)(void *work_data);
};
void (*work_destroy)(void *work_data);
};
+static int input_queue_length = 0;
+
static void *sel_thread_handler(void *vp)
{
sel_thread_t p = (sel_thread_t) vp;
static void *sel_thread_handler(void *vp)
{
sel_thread_t p = (sel_thread_t) vp;
- while(1)
+ while (1)
{
struct work_item *work_this = 0;
/* wait for some work */
{
struct work_item *work_this = 0;
/* wait for some work */
- pthread_mutex_lock(&p->mutex);
+ yaz_mutex_enter(p->mutex);
while (!p->stop_flag && !p->input_queue)
while (!p->stop_flag && !p->input_queue)
- pthread_cond_wait(&p->input_data, &p->mutex);
+ yaz_cond_wait(p->input_data, p->mutex, 0);
/* see if we were waken up because we're shutting down */
if (p->stop_flag)
break;
/* see if we were waken up because we're shutting down */
if (p->stop_flag)
break;
@@
-87,23
+90,25
@@
static void *sel_thread_handler(void *vp)
assert(p->input_queue);
work_this = queue_remove_last(&p->input_queue);
assert(p->input_queue);
work_this = queue_remove_last(&p->input_queue);
+ input_queue_length--;
+ yaz_log(YLOG_DEBUG, "input queue length after pop: %d", input_queue_length);
assert(work_this);
assert(work_this);
- pthread_mutex_unlock(&p->mutex);
+ yaz_mutex_leave(p->mutex);
/* work on this item */
p->work_handler(work_this->data);
/* put it back into output queue */
/* work on this item */
p->work_handler(work_this->data);
/* put it back into output queue */
- pthread_mutex_lock(&p->mutex);
+ yaz_mutex_enter(p->mutex);
work_this->next = p->output_queue;
p->output_queue = work_this;
work_this->next = p->output_queue;
p->output_queue = work_this;
- pthread_mutex_unlock(&p->mutex);
+ yaz_mutex_leave(p->mutex);
/* wake up select/poll with a single byte */
/* wake up select/poll with a single byte */
- write(p->fd[1], "", 1);
+ (void) write(p->fd[1], "", 1);
}
}
- pthread_mutex_unlock(&p->mutex);
+ yaz_mutex_leave(p->mutex);
return 0;
}
return 0;
}
@@
-132,28
+137,34
@@
sel_thread_t sel_thread_create(void (*work_handler)(void *work_data),
p->free_queue = 0;
p->work_handler = work_handler;
p->work_destroy = work_destroy;
p->free_queue = 0;
p->work_handler = work_handler;
p->work_destroy = work_destroy;
-
+ p->no_threads = 0; /* we if need to destroy */
p->stop_flag = 0;
p->stop_flag = 0;
- p->no_threads = no_of_threads;
- pthread_mutex_init(&p->mutex, 0);
- pthread_cond_init(&p->input_data, 0);
+ p->mutex = 0;
+ yaz_mutex_create(&p->mutex);
+ yaz_cond_create(&p->input_data);
+ if (p->input_data == 0) /* condition variable could not be created? */
+ {
+ sel_thread_destroy(p);
+ return 0;
+ }
+ p->no_threads = no_of_threads;
p->thread_id = nmem_malloc(nmem, sizeof(*p->thread_id) * p->no_threads);
for (i = 0; i < p->no_threads; i++)
p->thread_id = nmem_malloc(nmem, sizeof(*p->thread_id) * p->no_threads);
for (i = 0; i < p->no_threads; i++)
- pthread_create (p->thread_id + i, 0, sel_thread_handler, p);
+ p->thread_id[i] = yaz_thread_create(sel_thread_handler, p);
return p;
}
void sel_thread_destroy(sel_thread_t p)
{
int i;
return p;
}
void sel_thread_destroy(sel_thread_t p)
{
int i;
- pthread_mutex_lock(&p->mutex);
+ yaz_mutex_enter(p->mutex);
p->stop_flag = 1;
p->stop_flag = 1;
- pthread_cond_broadcast(&p->input_data);
- pthread_mutex_unlock(&p->mutex);
+ yaz_cond_broadcast(p->input_data);
+ yaz_mutex_leave(p->mutex);
for (i = 0; i< p->no_threads; i++)
for (i = 0; i< p->no_threads; i++)
- pthread_join(p->thread_id[i], 0);
+ yaz_thread_join(&p->thread_id[i], 0);
if (p->work_destroy)
{
if (p->work_destroy)
{
@@
-163,8
+174,8
@@
void sel_thread_destroy(sel_thread_t p)
close(p->fd[0]);
close(p->fd[1]);
close(p->fd[0]);
close(p->fd[1]);
- pthread_cond_destroy(&p->input_data);
- pthread_mutex_destroy(&p->mutex);
+ yaz_cond_destroy(&p->input_data);
+ yaz_mutex_destroy(&p->mutex);
nmem_destroy(p->nmem);
}
nmem_destroy(p->nmem);
}
@@
-172,7
+183,7
@@
void sel_thread_add(sel_thread_t p, void *data)
{
struct work_item *work_p;
{
struct work_item *work_p;
- pthread_mutex_lock(&p->mutex);
+ yaz_mutex_enter(p->mutex);
if (p->free_queue)
{
if (p->free_queue)
{
@@
-185,9
+196,10
@@
void sel_thread_add(sel_thread_t p, void *data)
work_p->data = data;
work_p->next = p->input_queue;
p->input_queue = work_p;
work_p->data = data;
work_p->next = p->input_queue;
p->input_queue = work_p;
-
- pthread_cond_signal(&p->input_data);
- pthread_mutex_unlock(&p->mutex);
+ input_queue_length++;
+ yaz_log(YLOG_DEBUG, "sel_thread_add: Input queue length after push: %d", input_queue_length);
+ yaz_cond_signal(p->input_data);
+ yaz_mutex_leave(p->mutex);
}
void *sel_thread_result(sel_thread_t p)
}
void *sel_thread_result(sel_thread_t p)
@@
-196,7
+208,7
@@
void *sel_thread_result(sel_thread_t p)
void *data = 0;
char read_buf[1];
void *data = 0;
char read_buf[1];
- pthread_mutex_lock(&p->mutex);
+ yaz_mutex_enter(p->mutex);
/* got something. Take the last one out of output_queue */
work_this = queue_remove_last(&p->output_queue);
/* got something. Take the last one out of output_queue */
work_this = queue_remove_last(&p->output_queue);
@@
-207,16
+219,18
@@
void *sel_thread_result(sel_thread_t p)
p->free_queue = work_this;
data = work_this->data;
p->free_queue = work_this;
data = work_this->data;
- read(p->fd[0], read_buf, 1);
+ (void) read(p->fd[0], read_buf, 1);
}
}
- pthread_mutex_unlock(&p->mutex);
+ yaz_mutex_leave(p->mutex);
return data;
}
/*
* Local variables:
* c-basic-offset: 4
return data;
}
/*
* Local variables:
* c-basic-offset: 4
+ * c-file-style: "Stroustrup"
* indent-tabs-mode: nil
* End:
* vim: shiftwidth=4 tabstop=8 expandtab
*/
* indent-tabs-mode: nil
* End:
* vim: shiftwidth=4 tabstop=8 expandtab
*/
+