Update to new ID URLs
[yazproxy-moved-to-github.git] / src / msg-thread.cpp
1 /* This file is part of YAZ proxy
2    Copyright (C) 1998-2009 Index Data
3
4 YAZ proxy is free software; you can redistribute it and/or modify it under
5 the terms of the GNU General Public License as published by the Free
6 Software Foundation; either version 2, or (at your option) any later
7 version.
8
9 YAZ proxy is distributed in the hope that it will be useful, but WITHOUT ANY
10 WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12 for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
17 */
18
19 #if YAZ_POSIX_THREADS
20 #include <pthread.h>
21 #endif
22
23 #if HAVE_UNISTD_H
24 #include <unistd.h>
25 #endif
26
27 #include <ctype.h>
28 #include <stdio.h>
29
30 #include <yazpp/socket-observer.h>
31 #include <yaz/log.h>
32
33 #include "msg-thread.h"
34
35 using namespace yazpp_1;
36
37 class Msg_Thread::Private {
38 public:
39     int m_no_threads;
40     Msg_Thread_Queue m_input;
41     Msg_Thread_Queue m_output;
42 #if YAZ_POSIX_THREADS
43     int m_fd[2];
44     yazpp_1::ISocketObservable *m_SocketObservable;
45     pthread_t *m_thread_id;
46     pthread_mutex_t m_mutex_input_data;
47     pthread_cond_t m_cond_input_data;
48     pthread_mutex_t m_mutex_output_data;
49     bool m_stop_flag;
50 #endif
51 };
52
53 IMsg_Thread::~IMsg_Thread()
54 {
55
56 }
57
58 Msg_Thread_Queue::Msg_Thread_Queue()
59 {
60     m_list = 0;
61 }
62
63 int Msg_Thread_Queue::size()
64 {
65     int no = 0;
66     Msg_Thread_Queue_List *l;
67     for (l = m_list; l; l = l->m_next)
68         no++;
69     return no;
70 }
71
72 void Msg_Thread_Queue::enqueue(IMsg_Thread *m)
73 {
74     Msg_Thread_Queue_List *l = new Msg_Thread_Queue_List;
75     l->m_next = m_list;
76     l->m_item = m;
77     m_list = l;
78 }
79
80 IMsg_Thread *Msg_Thread_Queue::dequeue()
81 {
82     Msg_Thread_Queue_List **l = &m_list;
83     if (!*l)
84         return 0;
85     while ((*l)->m_next)
86         l = &(*l)->m_next;
87     IMsg_Thread *m = (*l)->m_item;
88     delete *l;
89     *l = 0;
90     return m;
91 }
92
93 #if YAZ_POSIX_THREADS
94 static void *tfunc(void *p)
95 {
96     Msg_Thread *pt = (Msg_Thread *) p;
97     pt->run(0);
98     return 0;
99 }
100 #endif
101
102 Msg_Thread::Msg_Thread(ISocketObservable *obs, int no_threads)
103 {
104     m_p = new Private;
105
106 #if YAZ_POSIX_THREADS
107     m_p->m_SocketObservable = obs;
108
109     pipe(m_p->m_fd);
110     obs->addObserver(m_p->m_fd[0], this);
111     obs->maskObserver(this, SOCKET_OBSERVE_READ);
112
113     m_p->m_stop_flag = false;
114     pthread_mutex_init(&m_p->m_mutex_input_data, 0);
115     pthread_cond_init(&m_p->m_cond_input_data, 0);
116     pthread_mutex_init(&m_p->m_mutex_output_data, 0);
117
118     m_p->m_no_threads = no_threads;
119     m_p->m_thread_id = new pthread_t[no_threads];
120     int i;
121     for (i = 0; i<m_p->m_no_threads; i++)
122         pthread_create(&m_p->m_thread_id[i], 0, tfunc, this);
123 #endif
124 }
125
126 Msg_Thread::~Msg_Thread()
127 {
128 #if YAZ_POSIX_THREADS
129     pthread_mutex_lock(&m_p->m_mutex_input_data);
130     m_p->m_stop_flag = true;
131     pthread_cond_broadcast(&m_p->m_cond_input_data);
132     pthread_mutex_unlock(&m_p->m_mutex_input_data);
133     
134     int i;
135     for (i = 0; i<m_p->m_no_threads; i++)
136         pthread_join(m_p->m_thread_id[i], 0);
137     delete [] m_p->m_thread_id;
138
139     m_p->m_SocketObservable->deleteObserver(this);
140
141     pthread_cond_destroy(&m_p->m_cond_input_data);
142     pthread_mutex_destroy(&m_p->m_mutex_input_data);
143     pthread_mutex_destroy(&m_p->m_mutex_output_data);
144     close(m_p->m_fd[0]);
145     close(m_p->m_fd[1]);
146 #endif
147
148     delete m_p;
149 }
150
151 void Msg_Thread::socketNotify(int event)
152 {
153 #if YAZ_POSIX_THREADS
154     if (event & SOCKET_OBSERVE_READ)
155     {
156         char buf[2];
157         read(m_p->m_fd[0], buf, 1);
158         pthread_mutex_lock(&m_p->m_mutex_output_data);
159         IMsg_Thread *out = m_p->m_output.dequeue();
160         pthread_mutex_unlock(&m_p->m_mutex_output_data);
161         if (out)
162             out->result();
163     }
164 #endif
165 }
166
167 #if YAZ_POSIX_THREADS
168 void Msg_Thread::run(void *p)
169 {
170     while(1)
171     {
172         pthread_mutex_lock(&m_p->m_mutex_input_data);
173         while (!m_p->m_stop_flag && m_p->m_input.size() == 0)
174             pthread_cond_wait(&m_p->m_cond_input_data, &m_p->m_mutex_input_data);
175         if (m_p->m_stop_flag)
176         {
177             pthread_mutex_unlock(&m_p->m_mutex_input_data);
178             break;
179         }
180         IMsg_Thread *in = m_p->m_input.dequeue();
181         pthread_mutex_unlock(&m_p->m_mutex_input_data);
182
183         IMsg_Thread *out = in->handle();
184         pthread_mutex_lock(&m_p->m_mutex_output_data);
185         m_p->m_output.enqueue(out);
186         
187         write(m_p->m_fd[1], "", 1);
188         pthread_mutex_unlock(&m_p->m_mutex_output_data);
189     }
190 }
191 #endif
192
193 void Msg_Thread::put(IMsg_Thread *m)
194 {
195 #if YAZ_POSIX_THREADS
196     pthread_mutex_lock(&m_p->m_mutex_input_data);
197     m_p->m_input.enqueue(m);
198     pthread_cond_signal(&m_p->m_cond_input_data);
199     pthread_mutex_unlock(&m_p->m_mutex_input_data);
200 #else
201     IMsg_Thread *out = m->handle();
202     if (out)
203         out->result();
204 #endif
205 }
206 /*
207  * Local variables:
208  * c-basic-offset: 4
209  * c-file-style: "Stroustrup"
210  * indent-tabs-mode: nil
211  * End:
212  * vim: shiftwidth=4 tabstop=8 expandtab
213  */
214