b07fe03dfe91db480be850a4719b0d064d99468c
[yazproxy-moved-to-github.git] / src / tstthreads.cpp
1 /* $Id: tstthreads.cpp,v 1.1 2005-05-19 21:29:58 adam Exp $
2    Copyright (c) 1998-2005, Index Data.
3
4 This file is part of the yaz-proxy.
5
6 YAZ proxy is free software; you can redistribute it and/or modify it under
7 the terms of the GNU General Public License as published by the Free
8 Software Foundation; either version 2, or (at your option) any later
9 version.
10
11 YAZ proxy is distributed in the hope that it will be useful, but WITHOUT ANY
12 WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14 for more details.
15
16 You should have received a copy of the GNU General Public License
17 along with YAZ proxy; see the file LICENSE.  If not, write to the
18 Free Software Foundation, 59 Temple Place - Suite 330, Boston, MA
19 02111-1307, USA.
20  */
21
22 #include <pthread.h>
23 #include <unistd.h>
24 #include <ctype.h>
25
26 #if HAVE_DLFCN_H
27 #include <dlfcn.h>
28 #endif
29
30 #include <yaz++/socket-observer.h>
31 #include <yaz++/socket-manager.h>
32 #include <yaz/log.h>
33 #include "proxyp.h"
34
35 class Proxy_Msg {
36 public:
37     virtual void destroy() = 0;
38     virtual Proxy_Msg *handle() = 0;
39     virtual void result() = 0;
40 };
41
42 class Proxy_Msg_Queue_List {
43     friend class Proxy_Msg_Queue;
44  private:
45     Proxy_Msg *m_item;
46     Proxy_Msg_Queue_List *m_next;
47 };
48
49 class Proxy_Msg_Queue {
50  public:
51     Proxy_Msg_Queue();
52     void enqueue(Proxy_Msg *in);
53     Proxy_Msg *dequeue();
54     int size();
55  private:
56     Proxy_Msg_Queue_List *m_list;
57 };
58
59 Proxy_Msg_Queue::Proxy_Msg_Queue()
60 {
61     m_list = 0;
62 }
63
64 int Proxy_Msg_Queue::size()
65 {
66     int no = 0;
67     Proxy_Msg_Queue_List *l;
68     for (l = m_list; l; l = l->m_next)
69         no++;
70     return no;
71 }
72
73 void Proxy_Msg_Queue::enqueue(Proxy_Msg *m)
74 {
75     Proxy_Msg_Queue_List *l = new Proxy_Msg_Queue_List;
76     l->m_next = m_list;
77     l->m_item = m;
78     m_list = l;
79 }
80
81 Proxy_Msg *Proxy_Msg_Queue::dequeue()
82 {
83     Proxy_Msg_Queue_List **l = &m_list;
84     if (!*l)
85         return 0;
86     while ((*l)->m_next)
87         l = &(*l)->m_next;
88     Proxy_Msg *m = (*l)->m_item;
89     delete *l;
90     *l = 0;
91     return m;
92 }
93
94 class Proxy_Thread : public IYazSocketObserver {
95 public:
96     Proxy_Thread(IYazSocketObservable *obs);
97     virtual ~Proxy_Thread();
98     void socketNotify(int event);
99     void put(Proxy_Msg *m);
100     Proxy_Msg *get();
101     void run(void *p);
102 private:
103     IYazSocketObservable *m_obs;
104     int m_fd[2];
105     pthread_t m_thread_id;
106     Proxy_Msg_Queue m_input;
107     Proxy_Msg_Queue m_output;
108     pthread_mutex_t m_mutex_input_data;
109     pthread_cond_t m_cond_input_data;
110     pthread_mutex_t m_mutex_output_data;
111     pthread_cond_t m_cond_output_data;
112 };
113
114 static void *tfunc(void *p)
115 {
116     Proxy_Thread *pt = (Proxy_Thread *) p;
117     pt->run(0);
118     return 0;
119 }
120
121
122 Proxy_Thread::Proxy_Thread(IYazSocketObservable *obs)
123     : m_obs(obs)
124 {
125     pthread_mutex_init(&m_mutex_input_data, 0);
126     pthread_cond_init(&m_cond_input_data, 0);
127     pthread_mutex_init(&m_mutex_output_data, 0);
128     pthread_cond_init(&m_cond_output_data, 0);
129     m_fd[0] = m_fd[1] = -1;
130     if (pipe(m_fd) != 0)
131         return;
132     m_obs->addObserver(m_fd[0], this);
133     m_obs->timeoutObserver(this, 2000);
134     m_obs->maskObserver(this, YAZ_SOCKET_OBSERVE_READ);
135
136     pthread_create(&m_thread_id, 0, tfunc, this);
137 }
138
139 Proxy_Thread::~Proxy_Thread()
140 {
141
142 }
143
144 void Proxy_Thread::socketNotify(int event)
145 {
146     char buf[2];
147     read(m_fd[0], buf, 1);
148     pthread_mutex_lock(&m_mutex_output_data);
149     Proxy_Msg *out = m_output.dequeue();
150     pthread_mutex_unlock(&m_mutex_output_data);
151     if (out)
152         out->result();
153 }
154
155 void Proxy_Thread::run(void *p)
156 {
157     while(1)
158     {
159         pthread_mutex_lock(&m_mutex_input_data);
160         pthread_cond_wait(&m_cond_input_data, &m_mutex_input_data);
161         while(1)
162         {
163             Proxy_Msg *in = m_input.dequeue();
164             pthread_mutex_unlock(&m_mutex_input_data);
165             if (!in)
166                 break;
167             Proxy_Msg *out = in->handle();
168             pthread_mutex_lock(&m_mutex_output_data);
169             m_output.enqueue(out);
170             pthread_cond_signal(&m_cond_output_data);
171             pthread_mutex_unlock(&m_mutex_output_data);
172             write(m_fd[1], "", 1);
173
174             pthread_mutex_lock(&m_mutex_input_data);
175         }
176     }
177 }
178
179 void Proxy_Thread::put(Proxy_Msg *m)
180 {
181     pthread_mutex_lock(&m_mutex_input_data);
182     m_input.enqueue(m);
183     pthread_cond_signal(&m_cond_input_data);
184     int in_size = m_input.size();
185     pthread_mutex_unlock(&m_mutex_input_data);
186     int out_size = m_output.size();
187     printf("in-size=%d out-size=%d\n", in_size, out_size);
188 }
189
190 class My_Msg : public Proxy_Msg {
191 public:
192     void destroy();
193     Proxy_Msg *handle();
194     void result();
195     int m_val;
196 };
197
198 class My_Thread : public Proxy_Thread {
199 public:
200     My_Thread(IYazSocketObservable *obs);
201 };
202
203 My_Thread::My_Thread(IYazSocketObservable *obs) : Proxy_Thread(obs)
204 {
205 }
206
207 Proxy_Msg *My_Msg::handle()
208 {
209     My_Msg *res = new My_Msg;
210     int sl = rand() % 5;
211
212     res->m_val = m_val;
213     printf("My_Msg::handle val=%d sleep=%d\n", m_val, sl);
214     sleep(sl);
215     return res;
216 }
217
218
219 void My_Msg::result()
220 {
221     printf("My_Msg::result val=%d\n", m_val);
222 }
223
224 void My_Msg::destroy()
225 {
226     delete this;
227 }
228
229 class My_Timer_Thread : public IYazSocketObserver {
230 private:
231     IYazSocketObservable *m_obs;
232     int m_fd[2];
233     My_Thread *m_t;
234 public:
235     My_Timer_Thread(IYazSocketObservable *obs, My_Thread *t);
236     void socketNotify(int event);
237 };
238
239 My_Timer_Thread::My_Timer_Thread(IYazSocketObservable *obs,
240                                  My_Thread *t) : m_obs(obs) 
241 {
242     pipe(m_fd);
243     m_t = t;
244     obs->addObserver(m_fd[0], this);
245     obs->timeoutObserver(this, 2);
246 }
247
248 void My_Timer_Thread::socketNotify(int event)
249 {
250     static int seq = 1;
251     printf("Add %d\n", seq);
252     My_Msg *m = new My_Msg;
253     m->m_val = seq++;
254     m_t->put(m);
255 }
256
257 int main(int argc, char **argv)
258 {
259     Yaz_SocketManager mySocketManager;
260
261     My_Thread m(&mySocketManager);
262     My_Timer_Thread t(&mySocketManager, &m);
263     while (mySocketManager.processEvent() > 0)
264         ;
265     return 0;
266 }