3315e2649c25f8b428703a8b260ad6eb5ac7ba4e
[yazproxy-moved-to-github.git] / src / msg-thread.cpp
1 /* $Id: msg-thread.cpp,v 1.1 2005-05-30 20:08:58 adam Exp $
2    Copyright (c) 1998-2005, Index Data.
3
4 This file is part of the yaz-proxy.
5
6 YAZ proxy is free software; you can redistribute it and/or modify it under
7 the terms of the GNU General Public License as published by the Free
8 Software Foundation; either version 2, or (at your option) any later
9 version.
10
11 YAZ proxy is distributed in the hope that it will be useful, but WITHOUT ANY
12 WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14 for more details.
15
16 You should have received a copy of the GNU General Public License
17 along with YAZ proxy; see the file LICENSE.  If not, write to the
18 Free Software Foundation, 59 Temple Place - Suite 330, Boston, MA
19 02111-1307, USA.
20  */
21 #include <pthread.h>
22 #include <unistd.h>
23 #include <ctype.h>
24
25 #include <yaz++/socket-observer.h>
26 #include <yaz/log.h>
27
28 #include "msg-thread.h"
29
30 Msg_Thread_Queue::Msg_Thread_Queue()
31 {
32     m_list = 0;
33 }
34
35 int Msg_Thread_Queue::size()
36 {
37     int no = 0;
38     Msg_Thread_Queue_List *l;
39     for (l = m_list; l; l = l->m_next)
40         no++;
41     return no;
42 }
43
44 void Msg_Thread_Queue::enqueue(IMsg_Thread *m)
45 {
46     Msg_Thread_Queue_List *l = new Msg_Thread_Queue_List;
47     l->m_next = m_list;
48     l->m_item = m;
49     m_list = l;
50 }
51
52 IMsg_Thread *Msg_Thread_Queue::dequeue()
53 {
54     Msg_Thread_Queue_List **l = &m_list;
55     if (!*l)
56         return 0;
57     while ((*l)->m_next)
58         l = &(*l)->m_next;
59     IMsg_Thread *m = (*l)->m_item;
60     delete *l;
61     *l = 0;
62     return m;
63 }
64
65 static void *tfunc(void *p)
66 {
67     Msg_Thread *pt = (Msg_Thread *) p;
68     pt->run(0);
69     return 0;
70 }
71
72
73 Msg_Thread::Msg_Thread(IYazSocketObservable *obs)
74     : m_SocketObservable(obs)
75 {
76     pipe(m_fd);
77     obs->addObserver(m_fd[0], this);
78     obs->maskObserver(this, YAZ_SOCKET_OBSERVE_READ);
79
80     m_stop_flag = false;
81     pthread_mutex_init(&m_mutex_input_data, 0);
82     pthread_cond_init(&m_cond_input_data, 0);
83     pthread_mutex_init(&m_mutex_output_data, 0);
84     pthread_cond_init(&m_cond_output_data, 0);
85     pthread_create(&m_thread_id, 0, tfunc, this);
86 }
87
88 Msg_Thread::~Msg_Thread()
89 {
90     pthread_mutex_lock(&m_mutex_input_data);
91     m_stop_flag = true;
92     pthread_cond_signal(&m_cond_input_data);
93     pthread_mutex_unlock(&m_mutex_input_data);
94     
95     pthread_join(m_thread_id, 0);
96
97     m_SocketObservable->deleteObserver(this);
98
99     pthread_cond_destroy(&m_cond_input_data);
100     pthread_mutex_destroy(&m_mutex_input_data);
101     pthread_cond_destroy(&m_cond_output_data);
102     pthread_mutex_destroy(&m_mutex_output_data);
103     close(m_fd[0]);
104     close(m_fd[1]);
105 }
106
107 void Msg_Thread::socketNotify(int event)
108 {
109     if (event & YAZ_SOCKET_OBSERVE_READ)
110     {
111         char buf[2];
112         read(m_fd[0], buf, 1);
113         pthread_mutex_lock(&m_mutex_output_data);
114         IMsg_Thread *out = m_output.dequeue();
115         pthread_mutex_unlock(&m_mutex_output_data);
116         if (out)
117             out->result();
118     }
119 }
120
121 void Msg_Thread::run(void *p)
122 {
123     while(1)
124     {
125         pthread_mutex_lock(&m_mutex_input_data);
126         pthread_cond_wait(&m_cond_input_data, &m_mutex_input_data);
127         while (1)
128         {
129             if (m_stop_flag)
130             {
131                 pthread_mutex_unlock(&m_mutex_input_data);
132                 return;
133             }
134             IMsg_Thread *in = m_input.dequeue();
135             pthread_mutex_unlock(&m_mutex_input_data);
136             if (!in)
137                 break;
138             IMsg_Thread *out = in->handle();
139             pthread_mutex_lock(&m_mutex_output_data);
140             m_output.enqueue(out);
141             pthread_cond_signal(&m_cond_output_data);
142             pthread_mutex_unlock(&m_mutex_output_data);
143
144             write(m_fd[1], "", 1);
145
146             pthread_mutex_lock(&m_mutex_input_data);
147         }
148     }
149 }
150
151 void Msg_Thread::put(IMsg_Thread *m)
152 {
153     pthread_mutex_lock(&m_mutex_input_data);
154     m_input.enqueue(m);
155     pthread_cond_signal(&m_cond_input_data);
156     pthread_mutex_unlock(&m_mutex_input_data);
157 }