X-Git-Url: http://git.indexdata.com/?p=yazproxy-moved-to-github.git;a=blobdiff_plain;f=src%2Fmsg-thread.cpp;h=2dfd652f91fcdd79c7e0444a46fe382ef1965fc7;hp=c69ecfd11b3f0c0d2890503d67d85beb9b441a9a;hb=7fa984c171c65d05d34775c4533808793b2109cb;hpb=d7f41f16caf965adb1a5ffcdaa937787dfcc030a diff --git a/src/msg-thread.cpp b/src/msg-thread.cpp index c69ecfd..2dfd652 100644 --- a/src/msg-thread.cpp +++ b/src/msg-thread.cpp @@ -1,7 +1,5 @@ -/* $Id: msg-thread.cpp,v 1.5 2005-06-25 15:58:33 adam Exp $ - Copyright (c) 1998-2005, Index Data. - -This file is part of the yaz-proxy. +/* This file is part of YAZ proxy + Copyright (C) 1998-2009 Index Data YAZ proxy is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free @@ -14,22 +12,49 @@ FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License -along with YAZ proxy; see the file LICENSE. If not, write to the -Free Software Foundation, 59 Temple Place - Suite 330, Boston, MA -02111-1307, USA. - */ +along with this program; if not, write to the Free Software +Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA +*/ + +#if YAZ_POSIX_THREADS #include +#endif + +#if HAVE_UNISTD_H #include +#endif + #include #include -#include +#include #include #include "msg-thread.h" using namespace yazpp_1; +class Msg_Thread::Private { +public: + int m_no_threads; + Msg_Thread_Queue m_input; + Msg_Thread_Queue m_output; +#if YAZ_POSIX_THREADS + int m_fd[2]; + yazpp_1::ISocketObservable *m_SocketObservable; + pthread_t *m_thread_id; + pthread_mutex_t m_mutex_input_data; + pthread_cond_t m_cond_input_data; + pthread_mutex_t m_mutex_output_data; + bool m_stop_flag; +#endif +}; + +IMsg_Thread::~IMsg_Thread() +{ + +} + Msg_Thread_Queue::Msg_Thread_Queue() { m_list = 0; @@ -65,102 +90,123 @@ IMsg_Thread *Msg_Thread_Queue::dequeue() return m; } +#if YAZ_POSIX_THREADS static void *tfunc(void *p) { Msg_Thread *pt = (Msg_Thread *) p; pt->run(0); return 0; } +#endif - -Msg_Thread::Msg_Thread(ISocketObservable *obs) - : m_SocketObservable(obs) +Msg_Thread::Msg_Thread(ISocketObservable *obs, int no_threads) { - pipe(m_fd); - obs->addObserver(m_fd[0], this); + m_p = new Private; + +#if YAZ_POSIX_THREADS + m_p->m_SocketObservable = obs; + + pipe(m_p->m_fd); + obs->addObserver(m_p->m_fd[0], this); obs->maskObserver(this, SOCKET_OBSERVE_READ); - m_stop_flag = false; - pthread_mutex_init(&m_mutex_input_data, 0); - pthread_cond_init(&m_cond_input_data, 0); - pthread_mutex_init(&m_mutex_output_data, 0); - pthread_cond_init(&m_cond_output_data, 0); - pthread_create(&m_thread_id, 0, tfunc, this); + m_p->m_stop_flag = false; + pthread_mutex_init(&m_p->m_mutex_input_data, 0); + pthread_cond_init(&m_p->m_cond_input_data, 0); + pthread_mutex_init(&m_p->m_mutex_output_data, 0); + + m_p->m_no_threads = no_threads; + m_p->m_thread_id = new pthread_t[no_threads]; + int i; + for (i = 0; im_no_threads; i++) + pthread_create(&m_p->m_thread_id[i], 0, tfunc, this); +#endif } Msg_Thread::~Msg_Thread() { - pthread_mutex_lock(&m_mutex_input_data); - m_stop_flag = true; - pthread_cond_signal(&m_cond_input_data); - pthread_mutex_unlock(&m_mutex_input_data); +#if YAZ_POSIX_THREADS + pthread_mutex_lock(&m_p->m_mutex_input_data); + m_p->m_stop_flag = true; + pthread_cond_broadcast(&m_p->m_cond_input_data); + pthread_mutex_unlock(&m_p->m_mutex_input_data); - pthread_join(m_thread_id, 0); + int i; + for (i = 0; im_no_threads; i++) + pthread_join(m_p->m_thread_id[i], 0); + delete [] m_p->m_thread_id; + + m_p->m_SocketObservable->deleteObserver(this); - m_SocketObservable->deleteObserver(this); + pthread_cond_destroy(&m_p->m_cond_input_data); + pthread_mutex_destroy(&m_p->m_mutex_input_data); + pthread_mutex_destroy(&m_p->m_mutex_output_data); + close(m_p->m_fd[0]); + close(m_p->m_fd[1]); +#endif - pthread_cond_destroy(&m_cond_input_data); - pthread_mutex_destroy(&m_mutex_input_data); - pthread_cond_destroy(&m_cond_output_data); - pthread_mutex_destroy(&m_mutex_output_data); - close(m_fd[0]); - close(m_fd[1]); + delete m_p; } void Msg_Thread::socketNotify(int event) { +#if YAZ_POSIX_THREADS if (event & SOCKET_OBSERVE_READ) { char buf[2]; - read(m_fd[0], buf, 1); - pthread_mutex_lock(&m_mutex_output_data); - IMsg_Thread *out = m_output.dequeue(); - pthread_mutex_unlock(&m_mutex_output_data); + read(m_p->m_fd[0], buf, 1); + pthread_mutex_lock(&m_p->m_mutex_output_data); + IMsg_Thread *out = m_p->m_output.dequeue(); + pthread_mutex_unlock(&m_p->m_mutex_output_data); if (out) out->result(); } +#endif } +#if YAZ_POSIX_THREADS void Msg_Thread::run(void *p) { while(1) { - pthread_mutex_lock(&m_mutex_input_data); - pthread_cond_wait(&m_cond_input_data, &m_mutex_input_data); - while (1) + pthread_mutex_lock(&m_p->m_mutex_input_data); + while (!m_p->m_stop_flag && m_p->m_input.size() == 0) + pthread_cond_wait(&m_p->m_cond_input_data, &m_p->m_mutex_input_data); + if (m_p->m_stop_flag) { - if (m_stop_flag) - { - pthread_mutex_unlock(&m_mutex_input_data); - return; - } - IMsg_Thread *in = m_input.dequeue(); - pthread_mutex_unlock(&m_mutex_input_data); - if (!in) - break; - IMsg_Thread *out = in->handle(); - pthread_mutex_lock(&m_mutex_output_data); - m_output.enqueue(out); - pthread_cond_signal(&m_cond_output_data); - pthread_mutex_unlock(&m_mutex_output_data); - - write(m_fd[1], "", 1); - - pthread_mutex_lock(&m_mutex_input_data); + pthread_mutex_unlock(&m_p->m_mutex_input_data); + break; } + IMsg_Thread *in = m_p->m_input.dequeue(); + pthread_mutex_unlock(&m_p->m_mutex_input_data); + + IMsg_Thread *out = in->handle(); + pthread_mutex_lock(&m_p->m_mutex_output_data); + m_p->m_output.enqueue(out); + + write(m_p->m_fd[1], "", 1); + pthread_mutex_unlock(&m_p->m_mutex_output_data); } } +#endif void Msg_Thread::put(IMsg_Thread *m) { - pthread_mutex_lock(&m_mutex_input_data); - m_input.enqueue(m); - pthread_cond_signal(&m_cond_input_data); - pthread_mutex_unlock(&m_mutex_input_data); +#if YAZ_POSIX_THREADS + pthread_mutex_lock(&m_p->m_mutex_input_data); + m_p->m_input.enqueue(m); + pthread_cond_signal(&m_p->m_cond_input_data); + pthread_mutex_unlock(&m_p->m_mutex_input_data); +#else + IMsg_Thread *out = m->handle(); + if (out) + out->result(); +#endif } /* * Local variables: * c-basic-offset: 4 + * c-file-style: "Stroustrup" * indent-tabs-mode: nil * End: * vim: shiftwidth=4 tabstop=8 expandtab