X-Git-Url: http://git.indexdata.com/?p=yazproxy-moved-to-github.git;a=blobdiff_plain;f=src%2Fmsg-thread.cpp;h=d7e4eb28044d4cb052b156d885bd2c3a0a75ae11;hp=224176faa25d696b4c933ff1e981ebcbd28fc968;hb=4f79f9b5b0095b2f81b1ce583f0f82462f9ee36a;hpb=0eccf4a7c3b7c020da54ef6f60b8994cff45c8fb diff --git a/src/msg-thread.cpp b/src/msg-thread.cpp index 224176f..d7e4eb2 100644 --- a/src/msg-thread.cpp +++ b/src/msg-thread.cpp @@ -1,7 +1,5 @@ -/* $Id: msg-thread.cpp,v 1.10 2005-10-05 12:05:40 adam Exp $ - Copyright (c) 1998-2005, Index Data. - -This file is part of the yaz-proxy. +/* This file is part of YAZ proxy + Copyright (C) 1998-2011 Index Data YAZ proxy is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free @@ -14,22 +12,44 @@ FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License -along with YAZ proxy; see the file LICENSE. If not, write to the -Free Software Foundation, 59 Temple Place - Suite 330, Boston, MA -02111-1307, USA. - */ +along with this program; if not, write to the Free Software +Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA +*/ + +#if YAZ_POSIX_THREADS #include +#endif + +#if HAVE_UNISTD_H #include +#endif + #include #include -#include +#include #include #include "msg-thread.h" using namespace yazpp_1; +class Msg_Thread::Private { +public: + int m_no_threads; + Msg_Thread_Queue m_input; + Msg_Thread_Queue m_output; +#if YAZ_POSIX_THREADS + int m_fd[2]; + yazpp_1::ISocketObservable *m_SocketObservable; + pthread_t *m_thread_id; + pthread_mutex_t m_mutex_input_data; + pthread_cond_t m_cond_input_data; + pthread_mutex_t m_mutex_output_data; + bool m_stop_flag; +#endif +}; + IMsg_Thread::~IMsg_Thread() { @@ -70,102 +90,123 @@ IMsg_Thread *Msg_Thread_Queue::dequeue() return m; } +#if YAZ_POSIX_THREADS static void *tfunc(void *p) { Msg_Thread *pt = (Msg_Thread *) p; pt->run(0); return 0; } - +#endif Msg_Thread::Msg_Thread(ISocketObservable *obs, int no_threads) - : m_SocketObservable(obs) { - pipe(m_fd); - obs->addObserver(m_fd[0], this); + m_p = new Private; + +#if YAZ_POSIX_THREADS + m_p->m_SocketObservable = obs; + + pipe(m_p->m_fd); + obs->addObserver(m_p->m_fd[0], this); obs->maskObserver(this, SOCKET_OBSERVE_READ); - m_stop_flag = false; - pthread_mutex_init(&m_mutex_input_data, 0); - pthread_cond_init(&m_cond_input_data, 0); - pthread_mutex_init(&m_mutex_output_data, 0); + m_p->m_stop_flag = false; + pthread_mutex_init(&m_p->m_mutex_input_data, 0); + pthread_cond_init(&m_p->m_cond_input_data, 0); + pthread_mutex_init(&m_p->m_mutex_output_data, 0); - m_no_threads = no_threads; - m_thread_id = new pthread_t[no_threads]; + m_p->m_no_threads = no_threads; + m_p->m_thread_id = new pthread_t[no_threads]; int i; - for (i = 0; im_no_threads; i++) + pthread_create(&m_p->m_thread_id[i], 0, tfunc, this); +#endif } Msg_Thread::~Msg_Thread() { - pthread_mutex_lock(&m_mutex_input_data); - m_stop_flag = true; - pthread_cond_broadcast(&m_cond_input_data); - pthread_mutex_unlock(&m_mutex_input_data); - +#if YAZ_POSIX_THREADS + pthread_mutex_lock(&m_p->m_mutex_input_data); + m_p->m_stop_flag = true; + pthread_cond_broadcast(&m_p->m_cond_input_data); + pthread_mutex_unlock(&m_p->m_mutex_input_data); + int i; - for (i = 0; im_no_threads; i++) + pthread_join(m_p->m_thread_id[i], 0); + delete [] m_p->m_thread_id; + + m_p->m_SocketObservable->deleteObserver(this); - m_SocketObservable->deleteObserver(this); + pthread_cond_destroy(&m_p->m_cond_input_data); + pthread_mutex_destroy(&m_p->m_mutex_input_data); + pthread_mutex_destroy(&m_p->m_mutex_output_data); + close(m_p->m_fd[0]); + close(m_p->m_fd[1]); +#endif - pthread_cond_destroy(&m_cond_input_data); - pthread_mutex_destroy(&m_mutex_input_data); - pthread_mutex_destroy(&m_mutex_output_data); - close(m_fd[0]); - close(m_fd[1]); + delete m_p; } void Msg_Thread::socketNotify(int event) { +#if YAZ_POSIX_THREADS if (event & SOCKET_OBSERVE_READ) { char buf[2]; - read(m_fd[0], buf, 1); - pthread_mutex_lock(&m_mutex_output_data); - IMsg_Thread *out = m_output.dequeue(); - pthread_mutex_unlock(&m_mutex_output_data); + read(m_p->m_fd[0], buf, 1); + pthread_mutex_lock(&m_p->m_mutex_output_data); + IMsg_Thread *out = m_p->m_output.dequeue(); + pthread_mutex_unlock(&m_p->m_mutex_output_data); if (out) out->result(); } +#endif } +#if YAZ_POSIX_THREADS void Msg_Thread::run(void *p) { while(1) { - pthread_mutex_lock(&m_mutex_input_data); - while (!m_stop_flag && m_input.size() == 0) - pthread_cond_wait(&m_cond_input_data, &m_mutex_input_data); - if (m_stop_flag) + pthread_mutex_lock(&m_p->m_mutex_input_data); + while (!m_p->m_stop_flag && m_p->m_input.size() == 0) + pthread_cond_wait(&m_p->m_cond_input_data, &m_p->m_mutex_input_data); + if (m_p->m_stop_flag) { - pthread_mutex_unlock(&m_mutex_input_data); + pthread_mutex_unlock(&m_p->m_mutex_input_data); break; } - IMsg_Thread *in = m_input.dequeue(); - pthread_mutex_unlock(&m_mutex_input_data); + IMsg_Thread *in = m_p->m_input.dequeue(); + pthread_mutex_unlock(&m_p->m_mutex_input_data); IMsg_Thread *out = in->handle(); - pthread_mutex_lock(&m_mutex_output_data); - m_output.enqueue(out); - - write(m_fd[1], "", 1); - pthread_mutex_unlock(&m_mutex_output_data); + pthread_mutex_lock(&m_p->m_mutex_output_data); + m_p->m_output.enqueue(out); + + write(m_p->m_fd[1], "", 1); + pthread_mutex_unlock(&m_p->m_mutex_output_data); } } +#endif void Msg_Thread::put(IMsg_Thread *m) { - pthread_mutex_lock(&m_mutex_input_data); - m_input.enqueue(m); - pthread_cond_signal(&m_cond_input_data); - pthread_mutex_unlock(&m_mutex_input_data); +#if YAZ_POSIX_THREADS + pthread_mutex_lock(&m_p->m_mutex_input_data); + m_p->m_input.enqueue(m); + pthread_cond_signal(&m_p->m_cond_input_data); + pthread_mutex_unlock(&m_p->m_mutex_input_data); +#else + IMsg_Thread *out = m->handle(); + if (out) + out->result(); +#endif } /* * Local variables: * c-basic-offset: 4 + * c-file-style: "Stroustrup" * indent-tabs-mode: nil * End: * vim: shiftwidth=4 tabstop=8 expandtab