Moving ThreadPoolSocketObserver and IThreadPoolMsg to yp2 namespace
[metaproxy-moved-to-github.git] / src / p2_msg.cpp
1 /* $Id: p2_msg.cpp,v 1.4 2005-10-14 10:27:18 adam Exp $
2    Copyright (c) 1998-2005, Index Data.
3
4 This file is part of the yaz-proxy.
5
6 YAZ proxy is free software; you can redistribute it and/or modify it under
7 the terms of the GNU General Public License as published by the Free
8 Software Foundation; either version 2, or (at your option) any later
9 version.
10
11 YAZ proxy is distributed in the hope that it will be useful, but WITHOUT ANY
12 WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14 for more details.
15
16 You should have received a copy of the GNU General Public License
17 along with YAZ proxy; see the file LICENSE.  If not, write to the
18 Free Software Foundation, 59 Temple Place - Suite 330, Boston, MA
19 02111-1307, USA.
20  */
21
22 #include "config.hpp"
23 #include <yaz/log.h>
24 #include <yaz/diagbib1.h>
25 #include "p2_backend.h"
26 #include "p2_frontend.h"
27 #include "p2_config.h"
28 #include "p2_modules.h"
29
30 using namespace yazpp_1;
31 using namespace std;
32
33 IP2_BackendSet::~IP2_BackendSet()
34 {
35 }
36
37 IP2_Backend::~IP2_Backend()
38 {
39
40 }
41
42 P2_Backend::P2_Backend(P2_ConfigTarget *cfg, IP2_Backend *backend_int)
43 {
44     m_configTarget = new P2_ConfigTarget;
45     *m_configTarget = *cfg;
46     m_busy = false;
47     m_int = backend_int;
48 }
49
50 P2_Backend::~P2_Backend()
51 {
52     delete m_configTarget;
53 }
54
55 P2_BackendResultSet::P2_BackendResultSet()
56 {
57     m_int = 0;
58 }
59
60 P2_BackendResultSet::~P2_BackendResultSet()
61 {
62     delete m_int;
63 }
64
65 P2_Backend *P2_Msg::select_backend(string db,
66                                    Yaz_Z_Query *query,
67                                    P2_BackendResultSet **bset)
68 {
69     P2_Config *cfg = m_server->lockConfig();
70
71     // see if some target has done this query before
72
73     *bset = 0;
74     P2_Backend *backend = 0;
75
76     list<P2_Backend *>::const_iterator it;
77     for (it = m_server->m_backend_list.begin(); 
78          it != m_server->m_backend_list.end(); it++)
79     {
80         if ((*it)->m_busy)
81             continue;
82
83         if (db != (*it)->m_configTarget->m_virt_database)
84             continue;
85         backend = *it;
86
87         if (query)
88         {
89             list<P2_BackendResultSet *>::const_iterator is;
90             for (is  = (*it)->m_resultSets.begin(); 
91                  is != (*it)->m_resultSets.end(); is++)
92             {
93                 if (query->match(&(*is)->m_query))
94                 {
95                     *bset = *is;
96                     break;
97                 }
98             }
99         }
100         if (bset)
101             break;
102     }
103     if (!backend)
104     {
105         P2_ConfigTarget *target_cfg = cfg->find_target(db);
106
107         if (!target_cfg)
108         {
109             yaz_log(YLOG_WARN, "No backend for database %s",
110                     db.c_str());
111         }
112         else
113         {
114             P2_ModuleInterface0 *int0 =
115             reinterpret_cast<P2_ModuleInterface0 *>
116                 (m_server->m_modules->get_interface(target_cfg->m_type.c_str(),
117                                                     0));
118             IP2_Backend *bint = 0;
119
120             if (int0)
121                 bint = int0->create(target_cfg->m_target_address.c_str());
122
123             if (bint)
124                 backend = new P2_Backend(target_cfg, bint);
125
126             if (backend)
127                 m_server->m_backend_list.push_back(backend);
128         }
129     }
130     if (backend)
131         backend->m_busy = true;
132     m_server->unlockConfig();
133     return backend;
134 }
135
136 void P2_FrontResultSet::setQuery(Z_Query *z_query)
137 {
138     m_query.set_Z_Query(z_query);
139 }
140
141 void P2_FrontResultSet::setDatabases(char **db, int num)
142 {
143     m_db_list.clear();
144
145     int i;
146     for (i = 0; i<num; i++)
147         m_db_list.push_back(db[i]);
148 }
149
150 P2_FrontResultSet::P2_FrontResultSet(const char *id)
151 {
152     m_resultSetId = id;
153 }
154
155
156 P2_FrontResultSet::~P2_FrontResultSet()
157 {
158 }
159
160 P2_Msg::P2_Msg(GDU *gdu, P2_Frontend *front, P2_Server *server)
161 {
162     m_front = front;
163     m_server = server;
164     m_output = 0;
165     m_gdu = gdu;
166     m_close_flag = 0;
167 }
168
169 P2_Msg::~P2_Msg()
170 {
171     delete m_output;
172     delete m_gdu;
173 }
174
175 Z_APDU *P2_Msg::frontend_search_resultset(Z_APDU *z_gdu, ODR odr,
176                                           P2_FrontResultSet **rset)
177 {
178     Z_SearchRequest *req = z_gdu->u.searchRequest;
179     list<P2_FrontResultSet *>::iterator it;
180     P2_FrontResultSet *s = 0;
181
182     string id = req->resultSetName;
183     for (it = m_front->m_resultSets.begin(); it != m_front->m_resultSets.end(); it++)
184     {
185         if ((*it)->m_resultSetId == id)
186         {
187             s = *it;
188             break;
189         }
190     }
191     if (s)
192     {
193         // result set already exists
194         *rset = s;
195         if (req->replaceIndicator && *req->replaceIndicator)
196         {  // replace indicator true
197             s->setQuery(req->query);
198             s->setDatabases(req->databaseNames, req->num_databaseNames);
199             return 0;
200         }
201         Z_APDU *apdu = zget_APDU(odr, Z_APDU_searchResponse);
202         Z_Records *rec = (Z_Records *) odr_malloc(odr, sizeof(Z_Records));
203         apdu->u.searchResponse->records = rec;
204         rec->which = Z_Records_NSD;
205         rec->u.nonSurrogateDiagnostic =
206             zget_DefaultDiagFormat(
207                 odr, YAZ_BIB1_RESULT_SET_EXISTS_AND_REPLACE_INDICATOR_OFF,
208                 req->resultSetName);
209         
210         return apdu;
211     }
212     // does not exist 
213     s = new P2_FrontResultSet(req->resultSetName);
214     s->setQuery(req->query);
215     s->setDatabases(req->databaseNames, req->num_databaseNames);
216     m_front->m_resultSets.push_back(s);
217     *rset = s;
218     return 0;
219 }
220
221 Z_APDU *P2_Msg::frontend_search_apdu(Z_APDU *request_apdu, ODR odr)
222 {
223     P2_FrontResultSet *rset;
224     Z_APDU *response_apdu = frontend_search_resultset(request_apdu, odr,
225                                                       &rset);
226     if (response_apdu)
227         return response_apdu;
228
229     // no immediate error (yet) 
230     size_t i;
231     for (i = 0; i<rset->m_db_list.size(); i++)
232     {
233         string db = rset->m_db_list[i];
234         P2_BackendResultSet *bset;
235         P2_Backend *b = select_backend(db, &rset->m_query, &bset);
236         if (!b)
237         {
238             Z_APDU *apdu = zget_APDU(odr, Z_APDU_searchResponse);
239             Z_Records *rec = (Z_Records *) odr_malloc(odr, sizeof(Z_Records));
240             apdu->u.searchResponse->records = rec;
241             rec->which = Z_Records_NSD;
242             rec->u.nonSurrogateDiagnostic =
243                 zget_DefaultDiagFormat(
244                     odr, YAZ_BIB1_DATABASE_UNAVAILABLE, db.c_str());
245             return apdu;
246         }
247         if (!bset)
248         {   // new set 
249             bset = new P2_BackendResultSet();
250
251             bset->m_query.set_Z_Query(request_apdu->u.searchRequest->query);
252             bset->m_db_list.push_back(db);
253
254             b->m_int->search(&bset->m_query, &bset->m_int, &bset->m_hit_count);
255             b->m_resultSets.push_back(bset);
256         }
257         else
258         {
259             bset->m_int->get(1, 1);
260         }
261         response_apdu = zget_APDU(odr, Z_APDU_searchResponse);
262         *response_apdu->u.searchResponse->resultCount = bset->m_hit_count;
263         b->m_busy = false;
264     }
265     if (!response_apdu)
266     {
267         Z_APDU *apdu = zget_APDU(odr, Z_APDU_searchResponse);
268         Z_Records *rec = (Z_Records *) odr_malloc(odr, sizeof(Z_Records));
269         apdu->u.searchResponse->records = rec;
270             rec->which = Z_Records_NSD;
271             rec->u.nonSurrogateDiagnostic =
272                 zget_DefaultDiagFormat(odr, YAZ_BIB1_UNSUPP_SEARCH, 0);
273             return apdu;
274     }
275     return response_apdu;
276 }
277
278 Z_APDU *P2_Msg::frontend_present_resultset(Z_APDU *z_gdu, ODR odr,
279                                            P2_FrontResultSet **rset)
280 {
281     Z_PresentRequest *req = z_gdu->u.presentRequest;
282     list<P2_FrontResultSet *>::iterator it;
283     P2_FrontResultSet *s = 0;
284
285     string id = req->resultSetId;
286     for (it = m_front->m_resultSets.begin(); it != m_front->m_resultSets.end(); it++)
287     {
288         if ((*it)->m_resultSetId == id)
289         {
290             s = *it;
291             break;
292         }
293     }
294     *rset = s;
295     if (s)
296         return 0;  // fine result set exists 
297
298     Z_APDU *apdu = zget_APDU(odr, Z_APDU_presentResponse);
299     
300     Z_Records *rec = (Z_Records *) odr_malloc(odr, sizeof(Z_Records));
301     apdu->u.presentResponse->records = rec;
302     rec->which = Z_Records_NSD;
303     rec->u.nonSurrogateDiagnostic =
304         zget_DefaultDiagFormat(
305             odr,
306             YAZ_BIB1_SPECIFIED_RESULT_SET_DOES_NOT_EXIST,
307             req->resultSetId);
308     return apdu;
309 }
310
311 Z_APDU *P2_Msg::frontend_present_apdu(Z_APDU *request_apdu, ODR odr)
312 {
313     P2_FrontResultSet *rset;
314     Z_APDU *response_apdu = frontend_present_resultset(request_apdu, odr,
315                                                        &rset);
316     if (response_apdu)
317         return response_apdu;
318     return zget_APDU(odr, Z_APDU_presentResponse);
319 }
320     
321 yp2::IThreadPoolMsg *P2_Msg::handle()
322 {
323     ODR odr = odr_createmem(ODR_ENCODE);
324     yaz_log(YLOG_LOG, "P2_Msg:handle begin");
325     Z_GDU *request_gdu = m_gdu->get();
326
327     if (request_gdu->which == Z_GDU_Z3950)
328     {
329         Z_APDU *request_apdu = request_gdu->u.z3950;
330         Z_APDU *response_apdu = 0;
331         switch(request_apdu->which)
332         {
333         case Z_APDU_initRequest:
334             response_apdu = zget_APDU(odr, Z_APDU_initResponse);
335             ODR_MASK_SET(response_apdu->u.initResponse->options, Z_Options_triggerResourceCtrl);
336             ODR_MASK_SET(response_apdu->u.initResponse->options, Z_Options_search);
337             ODR_MASK_SET(response_apdu->u.initResponse->options, Z_Options_present);
338             ODR_MASK_SET(response_apdu->u.initResponse->options, Z_Options_namedResultSets);
339             break;
340         case Z_APDU_searchRequest:
341             response_apdu = frontend_search_apdu(request_apdu, odr);
342             break;
343         case Z_APDU_presentRequest:
344             response_apdu = frontend_present_apdu(request_apdu, odr);
345             break;
346         case Z_APDU_triggerResourceControlRequest:
347             break;
348         default:
349             response_apdu = zget_APDU(odr, Z_APDU_close);
350             m_close_flag = 1;
351             break;
352         }
353         if (response_apdu)
354             m_output = new GDU(response_apdu);
355     }
356     yaz_log(YLOG_LOG, "P2_Msg:handle end");
357     odr_destroy(odr);
358     return this;
359 }
360
361 void P2_Msg::result()
362 {
363     m_front->m_no_requests--;
364     if (!m_front->m_delete_flag)
365     {
366         if (m_output)
367         {
368             int len;
369             m_front->send_GDU(m_output->get(), &len);
370         }
371         if (m_close_flag)
372         {
373             m_front->close();
374             m_front->m_delete_flag = 1;
375         }
376     }
377     if (m_front->m_delete_flag && m_front->m_no_requests == 0)
378         delete m_front;
379     delete this;
380 }
381
382 /*
383  * Local variables:
384  * c-basic-offset: 4
385  * indent-tabs-mode: nil
386  * End:
387  * vim: shiftwidth=4 tabstop=8 expandtab
388  */