Add destructor for class (interface) IMsg_Thread.
[yazproxy-moved-to-github.git] / src / msg-thread.cpp
1 /* $Id: msg-thread.cpp,v 1.9 2005-09-26 09:22:59 adam Exp $
2    Copyright (c) 1998-2005, Index Data.
3
4 This file is part of the yaz-proxy.
5
6 YAZ proxy is free software; you can redistribute it and/or modify it under
7 the terms of the GNU General Public License as published by the Free
8 Software Foundation; either version 2, or (at your option) any later
9 version.
10
11 YAZ proxy is distributed in the hope that it will be useful, but WITHOUT ANY
12 WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14 for more details.
15
16 You should have received a copy of the GNU General Public License
17 along with YAZ proxy; see the file LICENSE.  If not, write to the
18 Free Software Foundation, 59 Temple Place - Suite 330, Boston, MA
19 02111-1307, USA.
20  */
21 #include <pthread.h>
22 #include <unistd.h>
23 #include <ctype.h>
24 #include <stdio.h>
25
26 #include <yaz++/socket-observer.h>
27 #include <yaz/log.h>
28
29 #include "msg-thread.h"
30
31 using namespace yazpp_1;
32
33 IMsg_Thread::~IMsg_Thread()
34 {
35
36 }
37
38 Msg_Thread_Queue::Msg_Thread_Queue()
39 {
40     m_list = 0;
41 }
42
43 int Msg_Thread_Queue::size()
44 {
45     int no = 0;
46     Msg_Thread_Queue_List *l;
47     for (l = m_list; l; l = l->m_next)
48         no++;
49     return no;
50 }
51
52 void Msg_Thread_Queue::enqueue(IMsg_Thread *m)
53 {
54     Msg_Thread_Queue_List *l = new Msg_Thread_Queue_List;
55     l->m_next = m_list;
56     l->m_item = m;
57     m_list = l;
58 }
59
60 IMsg_Thread *Msg_Thread_Queue::dequeue()
61 {
62     Msg_Thread_Queue_List **l = &m_list;
63     if (!*l)
64         return 0;
65     while ((*l)->m_next)
66         l = &(*l)->m_next;
67     IMsg_Thread *m = (*l)->m_item;
68     delete *l;
69     *l = 0;
70     return m;
71 }
72
73 static void *tfunc(void *p)
74 {
75     Msg_Thread *pt = (Msg_Thread *) p;
76     pt->run(0);
77     return 0;
78 }
79
80
81 Msg_Thread::Msg_Thread(ISocketObservable *obs, int no_threads)
82     : m_SocketObservable(obs)
83 {
84     pipe(m_fd);
85     obs->addObserver(m_fd[0], this);
86     obs->maskObserver(this, SOCKET_OBSERVE_READ);
87
88     m_stop_flag = false;
89     pthread_mutex_init(&m_mutex_input_data, 0);
90     pthread_cond_init(&m_cond_input_data, 0);
91     pthread_mutex_init(&m_mutex_output_data, 0);
92
93     m_no_threads = no_threads;
94     m_thread_id = new pthread_t[no_threads];
95     int i;
96     for (i = 0; i<m_no_threads; i++)
97         pthread_create(&m_thread_id[i], 0, tfunc, this);
98 }
99
100 Msg_Thread::~Msg_Thread()
101 {
102     pthread_mutex_lock(&m_mutex_input_data);
103     m_stop_flag = true;
104     pthread_cond_signal(&m_cond_input_data);
105     pthread_mutex_unlock(&m_mutex_input_data);
106     
107     int i;
108     for (i = 0; i<m_no_threads; i++)
109         pthread_join(m_thread_id[i], 0);
110     delete [] m_thread_id;
111
112     m_SocketObservable->deleteObserver(this);
113
114     pthread_cond_destroy(&m_cond_input_data);
115     pthread_mutex_destroy(&m_mutex_input_data);
116     pthread_mutex_destroy(&m_mutex_output_data);
117     close(m_fd[0]);
118     close(m_fd[1]);
119 }
120
121 void Msg_Thread::socketNotify(int event)
122 {
123     if (event & SOCKET_OBSERVE_READ)
124     {
125         char buf[2];
126         read(m_fd[0], buf, 1);
127         pthread_mutex_lock(&m_mutex_output_data);
128         IMsg_Thread *out = m_output.dequeue();
129         pthread_mutex_unlock(&m_mutex_output_data);
130         if (out)
131             out->result();
132     }
133 }
134
135 void Msg_Thread::run(void *p)
136 {
137     while(1)
138     {
139         pthread_mutex_lock(&m_mutex_input_data);
140         while (!m_stop_flag && m_input.size() == 0)
141             pthread_cond_wait(&m_cond_input_data, &m_mutex_input_data);
142         if (m_stop_flag)
143         {
144             pthread_mutex_unlock(&m_mutex_input_data);
145             break;
146         }
147         IMsg_Thread *in = m_input.dequeue();
148         pthread_mutex_unlock(&m_mutex_input_data);
149
150         IMsg_Thread *out = in->handle();
151         pthread_mutex_lock(&m_mutex_output_data);
152         m_output.enqueue(out);
153         
154         write(m_fd[1], "", 1);
155         pthread_mutex_unlock(&m_mutex_output_data);
156     }
157 }
158
159 void Msg_Thread::put(IMsg_Thread *m)
160 {
161     pthread_mutex_lock(&m_mutex_input_data);
162     m_input.enqueue(m);
163     pthread_cond_signal(&m_cond_input_data);
164     pthread_mutex_unlock(&m_mutex_input_data);
165 }
166 /*
167  * Local variables:
168  * c-basic-offset: 4
169  * indent-tabs-mode: nil
170  * End:
171  * vim: shiftwidth=4 tabstop=8 expandtab
172  */
173