Prevent multiplexing for cookie sessions
[yazpp-moved-to-github.git] / src / yaz-proxy.cpp
1 /*
2  * Copyright (c) 1998-2003, Index Data.
3  * See the file LICENSE for details.
4  * 
5  * $Id: yaz-proxy.cpp,v 1.53 2003-10-08 09:49:05 adam Exp $
6  */
7
8 #include <assert.h>
9 #include <time.h>
10
11 #include <yaz/log.h>
12 #include <yaz/diagbib1.h>
13 #include <yaz++/proxy.h>
14
15 static const char *apdu_name(Z_APDU *apdu)
16 {
17     switch (apdu->which)
18     {
19     case Z_APDU_initRequest:
20         return "initRequest";
21     case Z_APDU_initResponse:
22         return "initResponse";
23     case Z_APDU_searchRequest:
24         return "searchRequest";
25     case Z_APDU_searchResponse:
26         return "searchResponse";
27     case Z_APDU_presentRequest:
28         return "presentRequest";
29     case Z_APDU_presentResponse:
30         return "presentResponse";
31     case Z_APDU_deleteResultSetRequest:
32         return "deleteResultSetRequest";
33     case Z_APDU_deleteResultSetResponse:
34         return "deleteResultSetResponse";
35     case Z_APDU_scanRequest:
36         return "scanRequest";
37     case Z_APDU_scanResponse:
38         return "scanResponse";
39     case Z_APDU_sortRequest:
40         return "sortRequest";
41     case Z_APDU_sortResponse:
42         return "sortResponse";
43     case Z_APDU_extendedServicesRequest:
44         return "extendedServicesRequest";
45     case Z_APDU_extendedServicesResponse:
46         return "extendedServicesResponse";
47     case Z_APDU_close:
48         return "close";
49     }
50     return "other";
51 }
52
53 Yaz_Proxy::Yaz_Proxy(IYaz_PDU_Observable *the_PDU_Observable) :
54     Yaz_Z_Assoc(the_PDU_Observable), m_bw_stat(60), m_pdu_stat(60)
55 {
56     m_PDU_Observable = the_PDU_Observable;
57     m_client = 0;
58     m_parent = 0;
59     m_clientPool = 0;
60     m_seqno = 1;
61     m_keepalive = 0;
62     m_proxyTarget = 0;
63     m_default_target = 0;
64     m_proxy_authentication = 0;
65     m_max_clients = 150;
66     m_seed = time(0);
67     m_client_idletime = 600;
68     m_target_idletime = 600;
69     m_optimize = xstrdup ("1");
70     strcpy(m_session_str, "x");
71     m_session_no=0;
72     m_bytes_sent = m_bytes_recv = 0;
73     m_bw_hold_PDU = 0;
74     m_bw_max = 0;
75     m_pdu_max = 0;
76     m_max_record_retrieve = 0;
77 }
78
79 Yaz_Proxy::~Yaz_Proxy()
80 {
81     yaz_log(LOG_LOG, "%s Closed %d/%d sent/recv bytes total", m_session_str,
82             m_bytes_sent, m_bytes_recv);
83     xfree (m_proxyTarget);
84     xfree (m_default_target);
85     xfree (m_proxy_authentication);
86     xfree (m_optimize);
87 }
88
89 int Yaz_Proxy::set_config(const char *config)
90 {
91     int r = m_config.read_xml(config);
92     return r;
93 }
94
95 void Yaz_Proxy::set_default_target(const char *target)
96 {
97     xfree (m_default_target);
98     m_default_target = 0;
99     if (target)
100         m_default_target = (char *) xstrdup (target);
101 }
102
103 void Yaz_Proxy::set_proxy_authentication (const char *auth)
104 {
105     xfree (m_proxy_authentication);
106     m_proxy_authentication = 0;
107     if (auth)
108         m_proxy_authentication = (char *) xstrdup (auth);
109 }
110
111 IYaz_PDU_Observer *Yaz_Proxy::sessionNotify(IYaz_PDU_Observable
112                                             *the_PDU_Observable, int fd)
113 {
114     Yaz_Proxy *new_proxy = new Yaz_Proxy(the_PDU_Observable);
115     new_proxy->m_parent = this;
116     new_proxy->m_config = m_config;
117     new_proxy->timeout(m_client_idletime);
118     new_proxy->m_target_idletime = m_target_idletime;
119     new_proxy->set_default_target(m_default_target);
120     new_proxy->set_APDU_log(get_APDU_log());
121     new_proxy->set_proxy_authentication(m_proxy_authentication);
122     sprintf(new_proxy->m_session_str, "%ld:%d", (long) time(0), m_session_no);
123     m_session_no++;
124     yaz_log (LOG_LOG, "%s New session %s", new_proxy->m_session_str,
125              the_PDU_Observable->getpeername());
126     return new_proxy;
127 }
128
129 char *Yaz_Proxy::get_cookie(Z_OtherInformation **otherInfo)
130 {
131     int oid[OID_SIZE];
132     Z_OtherInformationUnit *oi;
133     struct oident ent;
134     ent.proto = PROTO_Z3950;
135     ent.oclass = CLASS_USERINFO;
136     ent.value = (oid_value) VAL_COOKIE;
137     assert (oid_ent_to_oid (&ent, oid));
138
139     if (oid_ent_to_oid (&ent, oid) && 
140         (oi = update_otherInformation(otherInfo, 0, oid, 1, 1)) &&
141         oi->which == Z_OtherInfo_characterInfo)
142         return oi->information.characterInfo;
143     return 0;
144 }
145
146 char *Yaz_Proxy::get_proxy(Z_OtherInformation **otherInfo)
147 {
148     int oid[OID_SIZE];
149     Z_OtherInformationUnit *oi;
150     struct oident ent;
151     ent.proto = PROTO_Z3950;
152     ent.oclass = CLASS_USERINFO;
153     ent.value = (oid_value) VAL_PROXY;
154     if (oid_ent_to_oid (&ent, oid) &&
155         (oi = update_otherInformation(otherInfo, 0, oid, 1, 1)) &&
156         oi->which == Z_OtherInfo_characterInfo)
157         return oi->information.characterInfo;
158     return 0;
159 }
160
161 const char *Yaz_Proxy::load_balance(const char **url)
162 {
163     int zurl_in_use[MAX_ZURL_PLEX];
164     Yaz_ProxyClient *c;
165     int i;
166
167     for (i = 0; i<MAX_ZURL_PLEX; i++)
168         zurl_in_use[i] = 0;
169     for (c = m_parent->m_clientPool; c; c = c->m_next)
170     {
171         for (i = 0; url[i]; i++)
172             if (!strcmp(url[i], c->get_hostname()))
173                 zurl_in_use[i]++;
174     }
175     int min = 100000;
176     const char *ret = 0;
177     for (i = 0; url[i]; i++)
178     {
179         yaz_log(LOG_DEBUG, "%s zurl=%s use=%d",
180                 m_session_str, url[i], zurl_in_use[i]);
181         if (min > zurl_in_use[i])
182         {
183             ret = url[i];
184             min = zurl_in_use[i];
185         }
186     }
187     return ret;
188 }
189
190 Yaz_ProxyClient *Yaz_Proxy::get_client(Z_APDU *apdu)
191 {
192     assert (m_parent);
193     Yaz_Proxy *parent = m_parent;
194     Z_OtherInformation **oi;
195     Yaz_ProxyClient *c = m_client;
196     
197     get_otherInfoAPDU(apdu, &oi);
198     char *cookie = get_cookie(oi);
199
200     if (!m_proxyTarget)
201     {
202         const char *url[MAX_ZURL_PLEX];
203         const char *proxy_host = get_proxy(oi);
204         if (proxy_host)
205         {
206             xfree(m_default_target);
207             m_default_target = xstrdup(proxy_host);
208             proxy_host = m_default_target;
209         }
210         
211         int client_idletime = -1;
212         m_config.get_target_info(proxy_host, url, &m_keepalive, &m_bw_max,
213                                  &m_pdu_max, &m_max_record_retrieve,
214                                  &m_target_idletime, &client_idletime,
215                                  &parent->m_max_clients);
216         if (client_idletime != -1)
217         {
218             m_client_idletime = client_idletime;
219             timeout(m_client_idletime);
220         }
221         if (!url[0])
222         {
223             yaz_log(LOG_LOG, "%s No default target", m_session_str);
224             return 0;
225         }
226         // we don't handle multiplexing for cookie session, so we just
227         // pick the first one in this case (anonymous users will be able
228         // to use any backend)
229         if (cookie && *cookie)
230             m_proxyTarget = (char*) xstrdup(url[0]);
231         else
232             m_proxyTarget = (char*) xstrdup(load_balance(url));
233     }
234     if (cookie && *cookie)
235     {
236         Yaz_ProxyClient *cc = 0;
237         
238         for (c = parent->m_clientPool; c; c = c->m_next)
239         {
240             assert (c->m_prev);
241             assert (*c->m_prev == c);
242             if (c->m_cookie && !strcmp(cookie,c->m_cookie) &&
243                 !strcmp(m_proxyTarget, c->get_hostname()))
244             {
245                 cc = c;
246             }
247         }
248         if (cc)
249         {
250             // found it in cache
251             c = cc;
252             // The following handles "cancel"
253             // If connection is busy (waiting for PDU) and
254             // we have an initRequest we can safely do re-open
255             if (c->m_waiting && apdu->which == Z_APDU_initRequest)
256             {
257                 yaz_log (LOG_LOG, "%s REOPEN target=%s", m_session_str,
258                          c->get_hostname());
259                 c->close();
260                 c->client(m_proxyTarget);
261                 c->m_init_flag = 0;
262
263                 c->m_last_ok = 0;
264                 c->m_cache.clear();
265                 c->m_last_resultCount = 0;
266                 c->m_sr_transform = 0;
267                 c->m_waiting = 0;
268                 c->m_resultSetStartPoint = 0;
269                 c->timeout(m_target_idletime); 
270             }
271             c->m_seqno = parent->m_seqno;
272             if (c->m_server && c->m_server != this)
273                 c->m_server->m_client = 0;
274             c->m_server = this;
275             (parent->m_seqno)++;
276             yaz_log (LOG_DEBUG, "get_client 1 %p %p", this, c);
277             return c;
278         }
279     }
280     else if (!c)
281     {
282         Yaz_ProxyClient *cc = 0;
283         
284         for (c = parent->m_clientPool; c; c = c->m_next)
285         {
286             assert (c->m_prev);
287             assert (*c->m_prev == c);
288             if (c->m_server == 0 && c->m_cookie == 0 && 
289                 !strcmp(m_proxyTarget, c->get_hostname()))
290             {
291                 cc = c;
292             }
293         }
294         if (cc)
295         {
296             // found it in cache
297             c = cc;
298
299             yaz_log (LOG_LOG, "%s REUSE %d %d %s",
300                      m_session_str,
301                      c->m_seqno, parent->m_seqno, c->get_hostname());
302
303             c->m_seqno = parent->m_seqno;
304             assert(c->m_server == 0);
305             c->m_server = this;
306             
307             (parent->m_seqno)++;
308             return c;
309         }
310     }
311     if (!m_client)
312     {
313         if (apdu->which != Z_APDU_initRequest)
314         {
315             yaz_log (LOG_LOG, "no first INIT!");
316             return 0;
317         }
318         Z_InitRequest *initRequest = apdu->u.initRequest;
319
320         if (!initRequest->idAuthentication)
321         {
322             if (m_proxy_authentication)
323             {
324                 initRequest->idAuthentication =
325                     (Z_IdAuthentication *)
326                     odr_malloc (odr_encode(),
327                                 sizeof(*initRequest->idAuthentication));
328                 initRequest->idAuthentication->which =
329                     Z_IdAuthentication_open;
330                 initRequest->idAuthentication->u.open =
331                     odr_strdup (odr_encode(), m_proxy_authentication);
332             }
333         }
334
335         // go through list of clients - and find the lowest/oldest one.
336         Yaz_ProxyClient *c_min = 0;
337         int min_seq = -1;
338         int no_of_clients = 0;
339         if (parent->m_clientPool)
340             yaz_log (LOG_DEBUG, "Existing sessions");
341         for (c = parent->m_clientPool; c; c = c->m_next)
342         {
343             yaz_log (LOG_DEBUG, " Session %-3d wait=%d %s cookie=%s", c->m_seqno,
344                                c->m_waiting, c->get_hostname(),
345                                c->m_cookie ? c->m_cookie : "");
346             no_of_clients++;
347             if (min_seq < 0 || c->m_seqno < min_seq)
348             {
349                 min_seq = c->m_seqno;
350                 c_min = c;
351             }
352         }
353         if (no_of_clients >= parent->m_max_clients)
354         {
355             c = c_min;
356             if (c->m_waiting || strcmp(m_proxyTarget, c->get_hostname()))
357             {
358                 yaz_log (LOG_LOG, "%s MAXCLIENTS Destroy %d",
359                          m_session_str, c->m_seqno);
360                 if (c->m_server && c->m_server != this)
361                     delete c->m_server;
362                 c->m_server = 0;
363             }
364             else
365             {
366                 yaz_log (LOG_LOG, "%s MAXCLIENTS Reuse %d %d %s",
367                          m_session_str,
368                          c->m_seqno, parent->m_seqno, c->get_hostname());
369                 xfree (c->m_cookie);
370                 c->m_cookie = 0;
371                 if (cookie)
372                     c->m_cookie = xstrdup(cookie);
373                 c->m_seqno = parent->m_seqno;
374                 if (c->m_server && c->m_server != this)
375                 {
376                     c->m_server->m_client = 0;
377                     delete c->m_server;
378                 }
379                 (parent->m_seqno)++;
380                 return c;
381             }
382         }
383         else
384         {
385             yaz_log (LOG_LOG, "%s NEW %d %s",
386                      m_session_str, parent->m_seqno, m_proxyTarget);
387             c = new Yaz_ProxyClient(m_PDU_Observable->clone());
388             c->m_next = parent->m_clientPool;
389             if (c->m_next)
390                 c->m_next->m_prev = &c->m_next;
391             parent->m_clientPool = c;
392             c->m_prev = &parent->m_clientPool;
393         }
394
395         xfree (c->m_cookie);
396         c->m_cookie = 0;
397         if (cookie)
398             c->m_cookie = xstrdup(cookie);
399
400         c->m_seqno = parent->m_seqno;
401         c->client(m_proxyTarget);
402         c->m_init_flag = 0;
403         c->m_last_resultCount = 0;
404         c->m_last_ok = 0;
405         c->m_cache.clear();
406         c->m_sr_transform = 0;
407         c->m_waiting = 0;
408         c->m_resultSetStartPoint = 0;
409         c->timeout(30);
410
411         (parent->m_seqno)++;
412     }
413     yaz_log (LOG_DEBUG, "get_client 3 %p %p", this, c);
414     return c;
415 }
416
417 void Yaz_Proxy::display_diagrecs(Z_DiagRec **pp, int num)
418 {
419     int i;
420     for (i = 0; i<num; i++)
421     {
422         oident *ent;
423         Z_DefaultDiagFormat *r;
424         Z_DiagRec *p = pp[i];
425         if (p->which != Z_DiagRec_defaultFormat)
426         {
427             yaz_log(LOG_LOG, "%s Error no diagnostics", m_session_str);
428             return;
429         }
430         else
431             r = p->u.defaultFormat;
432         if (!(ent = oid_getentbyoid(r->diagnosticSetId)) ||
433             ent->oclass != CLASS_DIAGSET || ent->value != VAL_BIB1)
434             yaz_log(LOG_LOG, "%s Error unknown diagnostic set", m_session_str);
435         switch (r->which)
436         {
437         case Z_DefaultDiagFormat_v2Addinfo:
438             yaz_log(LOG_LOG, "%s Error %d %s:%s",
439                     m_session_str,
440                     *r->condition, diagbib1_str(*r->condition),
441                     r->u.v2Addinfo);
442             break;
443         case Z_DefaultDiagFormat_v3Addinfo:
444             yaz_log(LOG_LOG, "%s Error %d %s:%s",
445                     m_session_str,
446                     *r->condition, diagbib1_str(*r->condition),
447                     r->u.v3Addinfo);
448             break;
449         }
450     }
451 }
452
453 int Yaz_Proxy::send_to_client(Z_APDU *apdu)
454 {
455     int len = 0;
456     if (apdu->which == Z_APDU_searchResponse)
457     {
458         Z_SearchResponse *sr = apdu->u.searchResponse;
459         Z_Records *p = sr->records;
460         if (p && p->which == Z_Records_NSD)
461         {
462             Z_DiagRec dr, *dr_p = &dr;
463             dr.which = Z_DiagRec_defaultFormat;
464             dr.u.defaultFormat = p->u.nonSurrogateDiagnostic;
465
466             display_diagrecs(&dr_p, 1);
467         }
468         else
469         {
470             if (sr->resultCount)
471                 yaz_log(LOG_LOG, "%s %d hits", m_session_str,
472                         *sr->resultCount);
473         }
474     }
475     else if (apdu->which == Z_APDU_presentResponse)
476     {
477         Z_PresentResponse *sr = apdu->u.presentResponse;
478         Z_Records *p = sr->records;
479         if (p && p->which == Z_Records_NSD)
480         {
481             Z_DiagRec dr, *dr_p = &dr;
482             dr.which = Z_DiagRec_defaultFormat;
483             dr.u.defaultFormat = p->u.nonSurrogateDiagnostic;
484
485             display_diagrecs(&dr_p, 1);
486         }
487     }
488     int r = send_Z_PDU(apdu, &len);
489     yaz_log (LOG_LOG, "%s Sending %s to client %d bytes", m_session_str,
490              apdu_name(apdu), len);
491     m_bytes_sent += len;
492     m_bw_stat.add_bytes(len);
493     return r;
494 }
495
496 int Yaz_ProxyClient::send_to_target(Z_APDU *apdu)
497 {
498     int len = 0;
499     int r = send_Z_PDU(apdu, &len);
500     yaz_log (LOG_LOG, "%s Sending %s to %s %d bytes",
501              get_session_str(),
502              apdu_name(apdu), get_hostname(), len);
503     m_bytes_sent += len;
504     return r;
505 }
506
507 Z_APDU *Yaz_Proxy::result_set_optimize(Z_APDU *apdu)
508 {
509     if (*m_parent->m_optimize == '0')
510         return apdu;      // don't optimize result sets..
511     if (apdu->which == Z_APDU_presentRequest)
512     {
513         Z_PresentRequest *pr = apdu->u.presentRequest;
514         Z_NamePlusRecordList *npr;
515         int toget = *pr->numberOfRecordsRequested;
516         int start = *pr->resultSetStartPoint;
517
518         if (m_client->m_last_resultSetId &&
519             !strcmp(m_client->m_last_resultSetId, pr->resultSetId))
520         {
521             if (m_client->m_cache.lookup (odr_encode(), &npr, start, toget,
522                                           pr->preferredRecordSyntax,
523                                           pr->recordComposition))
524             {
525                 yaz_log (LOG_LOG, "%s Returned cache records for present request", 
526                          m_session_str);
527                 Z_APDU *new_apdu = create_Z_PDU(Z_APDU_presentResponse);
528                 new_apdu->u.presentResponse->referenceId = pr->referenceId;
529                 
530                 new_apdu->u.presentResponse->numberOfRecordsReturned
531                     = odr_intdup(odr_encode(), toget);
532                                                                  
533                 new_apdu->u.presentResponse->records = (Z_Records*)
534                     odr_malloc(odr_encode(), sizeof(Z_Records));
535                 new_apdu->u.presentResponse->records->which = Z_Records_DBOSD;
536                 new_apdu->u.presentResponse->records->u.databaseOrSurDiagnostics = npr;
537                 new_apdu->u.presentResponse->nextResultSetPosition =
538                     odr_intdup(odr_encode(), start+toget);
539
540                 send_to_client(new_apdu);
541                 return 0;
542             }
543         }
544     }
545
546     if (apdu->which != Z_APDU_searchRequest)
547         return apdu;
548     Z_SearchRequest *sr = apdu->u.searchRequest;
549     Yaz_Z_Query *this_query = new Yaz_Z_Query;
550     Yaz_Z_Databases this_databases;
551
552     this_databases.set(sr->num_databaseNames, (const char **)
553                        sr->databaseNames);
554     
555     this_query->set_Z_Query(sr->query);
556
557     char query_str[80];
558     this_query->print(query_str, sizeof(query_str)-1);
559     yaz_log(LOG_LOG, "%s Query %s", m_session_str, query_str);
560
561     if (m_client->m_last_ok && m_client->m_last_query &&
562         m_client->m_last_query->match(this_query) &&
563         !strcmp(m_client->m_last_resultSetId, sr->resultSetName) &&
564         m_client->m_last_databases.match(this_databases))
565     {
566         delete this_query;
567         if (m_client->m_last_resultCount > *sr->smallSetUpperBound &&
568             m_client->m_last_resultCount < *sr->largeSetLowerBound)
569         {
570             Z_NamePlusRecordList *npr;
571             int toget = *sr->mediumSetPresentNumber;
572             Z_RecordComposition *comp = 0;
573
574             if (toget > m_client->m_last_resultCount)
575                 toget = m_client->m_last_resultCount;
576             
577             if (sr->mediumSetElementSetNames)
578             {
579                 comp = (Z_RecordComposition *)
580                     odr_malloc(odr_encode(), sizeof(Z_RecordComposition));
581                 comp->which = Z_RecordComp_simple;
582                 comp->u.simple = sr->mediumSetElementSetNames;
583             }
584  
585             if (m_client->m_cache.lookup (odr_encode(), &npr, 1, toget,
586                                           sr->preferredRecordSyntax, comp))
587             {
588                 yaz_log (LOG_LOG, "%s Returned cache records for medium set",
589                          m_session_str);
590                 Z_APDU *new_apdu = create_Z_PDU(Z_APDU_searchResponse);
591                 new_apdu->u.searchResponse->referenceId = sr->referenceId;
592                 new_apdu->u.searchResponse->resultCount =
593                     &m_client->m_last_resultCount;
594                 
595                 new_apdu->u.searchResponse->numberOfRecordsReturned
596                     = odr_intdup(odr_encode(), toget);
597                                                         
598                 new_apdu->u.searchResponse->presentStatus =
599                     odr_intdup(odr_encode(), Z_PresentStatus_success);
600                 new_apdu->u.searchResponse->records = (Z_Records*)
601                     odr_malloc(odr_encode(), sizeof(Z_Records));
602                 new_apdu->u.searchResponse->records->which = Z_Records_DBOSD;
603                 new_apdu->u.searchResponse->records->u.databaseOrSurDiagnostics = npr;
604                 new_apdu->u.searchResponse->nextResultSetPosition =
605                     odr_intdup(odr_encode(), toget+1);
606                 send_to_client(new_apdu);
607                 return 0;
608             }
609             else
610             {
611                 // medium Set
612                 // send present request (medium size)
613                 yaz_log (LOG_LOG, "%s Optimizing search for medium set",
614                          m_session_str);
615
616                 Z_APDU *new_apdu = create_Z_PDU(Z_APDU_presentRequest);
617                 Z_PresentRequest *pr = new_apdu->u.presentRequest;
618                 pr->referenceId = sr->referenceId;
619                 pr->resultSetId = sr->resultSetName;
620                 pr->preferredRecordSyntax = sr->preferredRecordSyntax;
621                 *pr->numberOfRecordsRequested = toget;
622                 pr->recordComposition = comp;
623                 m_client->m_sr_transform = 1;
624                 return new_apdu;
625             }
626         }
627         else if (m_client->m_last_resultCount >= *sr->largeSetLowerBound ||
628             m_client->m_last_resultCount <= 0)
629         {
630             // large set. Return pseudo-search response immediately
631             yaz_log (LOG_LOG, "%s Optimizing search for large set",
632                      m_session_str);
633             Z_APDU *new_apdu = create_Z_PDU(Z_APDU_searchResponse);
634             new_apdu->u.searchResponse->referenceId = sr->referenceId;
635             new_apdu->u.searchResponse->resultCount =
636                 &m_client->m_last_resultCount;
637             send_to_client(new_apdu);
638             return 0;
639         }
640         else
641         {
642             Z_NamePlusRecordList *npr;
643             int toget = m_client->m_last_resultCount;
644             Z_RecordComposition *comp = 0;
645             // small set
646             // send a present request (small set)
647             
648             if (sr->smallSetElementSetNames)
649             {
650                 comp = (Z_RecordComposition *)
651                     odr_malloc(odr_encode(), sizeof(Z_RecordComposition));
652                 comp->which = Z_RecordComp_simple;
653                 comp->u.simple = sr->smallSetElementSetNames;
654             }
655
656             if (m_client->m_cache.lookup (odr_encode(), &npr, 1, toget,
657                                           sr->preferredRecordSyntax, comp))
658             {
659                 yaz_log (LOG_LOG, "%s Returned cache records for small set",
660                          m_session_str);
661                 Z_APDU *new_apdu = create_Z_PDU(Z_APDU_searchResponse);
662                 new_apdu->u.searchResponse->referenceId = sr->referenceId;
663                 new_apdu->u.searchResponse->resultCount =
664                     &m_client->m_last_resultCount;
665                 
666                 new_apdu->u.searchResponse->numberOfRecordsReturned
667                     = odr_intdup(odr_encode(), toget);
668                                                                  
669                 new_apdu->u.searchResponse->presentStatus =
670                     odr_intdup(odr_encode(), Z_PresentStatus_success);
671                 new_apdu->u.searchResponse->records = (Z_Records*)
672                     odr_malloc(odr_encode(), sizeof(Z_Records));
673                 new_apdu->u.searchResponse->records->which = Z_Records_DBOSD;
674                 new_apdu->u.searchResponse->records->u.databaseOrSurDiagnostics = npr;
675                 new_apdu->u.searchResponse->nextResultSetPosition =
676                     odr_intdup(odr_encode(), toget+1);
677                 send_to_client(new_apdu);
678                 return 0;
679             }
680             else
681             {
682                 yaz_log (LOG_LOG, "%s Optimizing search for small set",
683                          m_session_str);
684                 Z_APDU *new_apdu = create_Z_PDU(Z_APDU_presentRequest);
685                 Z_PresentRequest *pr = new_apdu->u.presentRequest;
686                 pr->referenceId = sr->referenceId;
687                 pr->resultSetId = sr->resultSetName;
688                 pr->preferredRecordSyntax = sr->preferredRecordSyntax;
689                 *pr->numberOfRecordsRequested = toget;
690                 pr->recordComposition = comp;
691                 m_client->m_sr_transform = 1;
692                 return new_apdu;
693             }
694         }
695     }
696     else  // query doesn't match
697     {
698         delete m_client->m_last_query;
699         m_client->m_last_query = this_query;
700         m_client->m_last_ok = 0;
701         m_client->m_cache.clear();
702         m_client->m_resultSetStartPoint = 0;
703
704         xfree (m_client->m_last_resultSetId);
705         m_client->m_last_resultSetId = xstrdup (sr->resultSetName);
706
707         m_client->m_last_databases.set(sr->num_databaseNames,
708                                        (const char **) sr->databaseNames);
709     }
710     return apdu;
711 }
712
713
714 void Yaz_Proxy::recv_Z_PDU(Z_APDU *apdu, int len)
715 {
716     int reduce = 0;
717     m_bytes_recv += len;
718     
719     yaz_log (LOG_LOG, "%s Receiving %s from client %d bytes", m_session_str,
720              apdu_name(apdu), len);
721
722     if (m_bw_hold_PDU)     // double incoming PDU. shutdown now.
723         shutdown();
724
725     m_bw_stat.add_bytes(len);
726     m_pdu_stat.add_bytes(1);
727
728     int bw_total = m_bw_stat.get_total();
729     int pdu_total = m_pdu_stat.get_total();
730
731     yaz_log(LOG_LOG, "%s stat bw=%d pdu=%d limit-bw=%d limit-pdu=%d",
732             m_session_str, bw_total, pdu_total, m_bw_max, m_pdu_max);
733     if (m_bw_max)
734     {
735         if (bw_total > m_bw_max)
736         {
737             reduce = (bw_total/m_bw_max);
738         }
739     }
740     if (m_pdu_max)
741     {
742         if (pdu_total > m_pdu_max)
743         {
744             int nreduce = (60/m_pdu_max);
745             reduce = (reduce > nreduce) ? reduce : nreduce;
746         }
747     }
748     if (reduce)  
749     {
750         yaz_log(LOG_LOG, "%s Limit delay=%d", m_session_str, reduce);
751         m_bw_hold_PDU = apdu;  // save PDU and signal "on hold"
752         timeout(reduce);       // call us reduce seconds later
753     }
754     else
755         recv_Z_PDU_0(apdu);    // all fine. Proceed receive PDU as usual
756 }
757
758 void Yaz_Proxy::handle_max_record_retrieve(Z_APDU *apdu)
759 {
760     if (m_max_record_retrieve)
761     {
762         if (apdu->which == Z_APDU_presentRequest)
763         {
764             Z_PresentRequest *pr = apdu->u.presentRequest;
765             if (pr->numberOfRecordsRequested && 
766                 *pr->numberOfRecordsRequested > m_max_record_retrieve)
767                 *pr->numberOfRecordsRequested = m_max_record_retrieve;
768         }
769     }
770 }
771
772 Z_Records *Yaz_Proxy::create_nonSurrogateDiagnostics(ODR odr,
773                                                      int error,
774                                                      const char *addinfo)
775 {
776     Z_Records *rec = (Z_Records *)
777         odr_malloc (odr, sizeof(*rec));
778     int *err = (int *)
779         odr_malloc (odr, sizeof(*err));
780     Z_DiagRec *drec = (Z_DiagRec *)
781         odr_malloc (odr, sizeof(*drec));
782     Z_DefaultDiagFormat *dr = (Z_DefaultDiagFormat *)
783         odr_malloc (odr, sizeof(*dr));
784     *err = error;
785     rec->which = Z_Records_NSD;
786     rec->u.nonSurrogateDiagnostic = dr;
787     dr->diagnosticSetId =
788         yaz_oidval_to_z3950oid (odr, CLASS_DIAGSET, VAL_BIB1);
789     dr->condition = err;
790     dr->which = Z_DefaultDiagFormat_v2Addinfo;
791     dr->u.v2Addinfo = odr_strdup (odr, addinfo ? addinfo : "");
792     return rec;
793 }
794
795 Z_APDU *Yaz_Proxy::handle_query_validation(Z_APDU *apdu)
796 {
797     if (apdu->which == Z_APDU_searchRequest)
798     {
799         Z_SearchRequest *sr = apdu->u.searchRequest;
800         int err;
801         char *addinfo = 0;
802         err = m_config.check_query(odr_encode(), m_default_target, sr->query,
803                                    &addinfo);
804         if (err)
805         {
806             Z_APDU *new_apdu = create_Z_PDU(Z_APDU_searchResponse);
807
808             new_apdu->u.searchResponse->referenceId = sr->referenceId;
809             new_apdu->u.searchResponse->records =
810                 create_nonSurrogateDiagnostics(odr_encode(), err, addinfo);
811             *new_apdu->u.searchResponse->searchStatus = 0;
812
813             send_to_client(new_apdu);
814
815             return 0;
816         }
817     }
818     return apdu;
819 }
820
821 Z_APDU *Yaz_Proxy::handle_syntax_validation(Z_APDU *apdu)
822 {
823     if (apdu->which == Z_APDU_searchRequest)
824     {
825         Z_SearchRequest *sr = apdu->u.searchRequest;
826         if (*sr->smallSetUpperBound > 0 || *sr->largeSetLowerBound > 1)
827         {
828             int err;
829             char *addinfo = 0;
830             err = m_config.check_syntax(odr_encode(), m_default_target,
831                                         sr->preferredRecordSyntax,
832                                         &addinfo);
833             if (err)
834             {
835                 Z_APDU *new_apdu = create_Z_PDU(Z_APDU_searchResponse);
836                 
837                 new_apdu->u.searchResponse->referenceId = sr->referenceId;
838                 new_apdu->u.searchResponse->records =
839                     create_nonSurrogateDiagnostics(odr_encode(), err, addinfo);
840                 *new_apdu->u.searchResponse->searchStatus = 0;
841                 
842                 send_to_client(new_apdu);
843                 
844                 return 0;
845             }
846         }
847     }
848     else if (apdu->which == Z_APDU_presentRequest)
849     {
850         Z_PresentRequest *pr = apdu->u.presentRequest;
851         int err;
852         char *addinfo = 0;
853         err = m_config.check_syntax(odr_encode(), m_default_target,
854                                     pr->preferredRecordSyntax,
855                                     &addinfo);
856         if (err)
857         {
858             Z_APDU *new_apdu = create_Z_PDU(Z_APDU_presentResponse);
859             
860             new_apdu->u.presentResponse->referenceId = pr->referenceId;
861             new_apdu->u.presentResponse->records =
862                 create_nonSurrogateDiagnostics(odr_encode(), err, addinfo);
863             *new_apdu->u.presentResponse->presentStatus =
864                 Z_PresentStatus_failure;
865             
866             send_to_client(new_apdu);
867             
868             return 0;
869         }
870     }
871     return apdu;
872 }
873
874 void Yaz_Proxy::recv_Z_PDU_0(Z_APDU *apdu)
875 {
876     // Determine our client.
877     m_client = get_client(apdu);
878     if (!m_client)
879     {
880         delete this;
881         return;
882     }
883     m_client->m_server = this;
884
885     if (apdu->which == Z_APDU_initRequest)
886     {
887         if (apdu->u.initRequest->implementationId)
888             yaz_log(LOG_LOG, "%s implementationId: %s",
889                     m_session_str, apdu->u.initRequest->implementationId);
890         if (apdu->u.initRequest->implementationName)
891             yaz_log(LOG_LOG, "%s implementationName: %s",
892                     m_session_str, apdu->u.initRequest->implementationName);
893         if (apdu->u.initRequest->implementationVersion)
894             yaz_log(LOG_LOG, "%s implementationVersion: %s",
895                     m_session_str, apdu->u.initRequest->implementationVersion);
896         if (m_client->m_init_flag)
897         {
898             Z_APDU *apdu = m_client->m_initResponse;
899             apdu->u.initResponse->otherInfo = 0;
900             if (m_client->m_cookie && *m_client->m_cookie)
901                 set_otherInformationString(apdu, VAL_COOKIE, 1,
902                                            m_client->m_cookie);
903             send_to_client(apdu);
904             return;
905         }
906         m_client->m_init_flag = 1;
907     }
908     handle_max_record_retrieve(apdu);
909
910     if (apdu)
911         apdu = handle_syntax_validation(apdu);
912
913     if (apdu)
914         apdu = handle_query_validation(apdu);
915
916     if (apdu)
917         apdu = result_set_optimize(apdu);
918     if (!apdu)
919     {
920         m_client->timeout(m_target_idletime);  // mark it active even 
921         // though we didn't use it
922         return;
923     }
924
925     // delete other info part from PDU before sending to target
926     Z_OtherInformation **oi;
927     get_otherInfoAPDU(apdu, &oi);
928     if (oi)
929         *oi = 0;
930
931     if (apdu->which == Z_APDU_presentRequest &&
932         m_client->m_resultSetStartPoint == 0)
933     {
934         Z_PresentRequest *pr = apdu->u.presentRequest;
935         m_client->m_resultSetStartPoint = *pr->resultSetStartPoint;
936         m_client->m_cache.copy_presentRequest(apdu->u.presentRequest);
937     } else {
938         m_client->m_resultSetStartPoint = 0;
939     }
940     if (m_client->send_to_target(apdu) < 0)
941     {
942         delete m_client;
943         m_client = 0;
944         delete this;
945     }
946     else
947         m_client->m_waiting = 1;
948 }
949
950 void Yaz_Proxy::connectNotify()
951 {
952 }
953
954 void Yaz_Proxy::shutdown()
955 {
956     // only keep if keep_alive flag is set...
957     if (m_keepalive && m_client && m_client->m_waiting == 0)
958     {
959         yaz_log (LOG_LOG, "%s Shutdown (client to proxy) keepalive %s",
960                  m_session_str,
961                  m_client->get_hostname());
962         assert (m_client->m_waiting != 2);
963         // Tell client (if any) that no server connection is there..
964         m_client->m_server = 0;
965     }
966     else if (m_client)
967     {
968         yaz_log (LOG_LOG, "%s Shutdown (client to proxy) close %s",
969                  m_session_str,
970                  m_client->get_hostname());
971         assert (m_client->m_waiting != 2);
972         delete m_client;
973     }
974     else if (!m_parent)
975     {
976         yaz_log (LOG_LOG, "%s shutdown (client to proxy) bad state",
977                  m_session_str);
978         assert (m_parent);
979     }
980     else 
981     {
982         yaz_log (LOG_LOG, "%s Shutdown (client to proxy)",
983                  m_session_str);
984     }
985     delete this;
986 }
987
988 const char *Yaz_ProxyClient::get_session_str() 
989 {
990     if (!m_server)
991         return "0";
992     return m_server->get_session_str();
993 }
994
995 void Yaz_ProxyClient::shutdown()
996 {
997     yaz_log (LOG_LOG, "%s Shutdown (proxy to target) %s", get_session_str(),
998              get_hostname());
999     delete m_server;
1000     delete this;
1001 }
1002
1003 void Yaz_Proxy::failNotify()
1004 {
1005     yaz_log (LOG_LOG, "%s Connection closed by client",
1006              get_session_str());
1007     shutdown();
1008 }
1009
1010 void Yaz_ProxyClient::failNotify()
1011 {
1012     yaz_log (LOG_LOG, "%s Connection closed by target %s", 
1013              get_session_str(), get_hostname());
1014     shutdown();
1015 }
1016
1017 void Yaz_ProxyClient::connectNotify()
1018 {
1019     yaz_log (LOG_LOG, "%s Connection accepted by %s", get_session_str(),
1020              get_hostname());
1021     int to;
1022     if (m_server)
1023         to = m_server->get_target_idletime();
1024     else
1025         to = 600;
1026     timeout(to);
1027 }
1028
1029 IYaz_PDU_Observer *Yaz_ProxyClient::sessionNotify(IYaz_PDU_Observable
1030                                                   *the_PDU_Observable, int fd)
1031 {
1032     return new Yaz_ProxyClient(the_PDU_Observable);
1033 }
1034
1035 Yaz_ProxyClient::~Yaz_ProxyClient()
1036 {
1037     if (m_prev)
1038         *m_prev = m_next;
1039     if (m_next)
1040         m_next->m_prev = m_prev;
1041     m_waiting = 2;     // for debugging purposes only.
1042     odr_destroy(m_init_odr);
1043     delete m_last_query;
1044     xfree (m_last_resultSetId);
1045     xfree (m_cookie);
1046 }
1047
1048 void Yaz_Proxy::timeoutNotify()
1049 {
1050     if (m_bw_hold_PDU)
1051     {
1052         timeout(m_client_idletime);
1053         Z_APDU *apdu = m_bw_hold_PDU;
1054         m_bw_hold_PDU = 0;
1055         recv_Z_PDU_0(apdu);
1056     }
1057     else
1058     {
1059         yaz_log (LOG_LOG, "%s Timeout (client to proxy)", m_session_str);
1060         shutdown();
1061     }
1062 }
1063
1064 void Yaz_ProxyClient::timeoutNotify()
1065 {
1066     yaz_log (LOG_LOG, "%s Timeout (proxy to target) %s", get_session_str(),
1067              get_hostname());
1068     shutdown();
1069 }
1070
1071 Yaz_ProxyClient::Yaz_ProxyClient(IYaz_PDU_Observable *the_PDU_Observable) :
1072     Yaz_Z_Assoc (the_PDU_Observable)
1073 {
1074     m_cookie = 0;
1075     m_next = 0;
1076     m_prev = 0;
1077     m_init_flag = 0;
1078     m_last_query = 0;
1079     m_last_resultSetId = 0;
1080     m_last_resultCount = 0;
1081     m_last_ok = 0;
1082     m_sr_transform = 0;
1083     m_waiting = 0;
1084     m_init_odr = odr_createmem (ODR_DECODE);
1085     m_initResponse = 0;
1086     m_resultSetStartPoint = 0;
1087     m_bytes_sent = m_bytes_recv = 0;
1088 }
1089
1090 const char *Yaz_Proxy::option(const char *name, const char *value)
1091 {
1092     if (!strcmp (name, "optimize")) {
1093         if (value) {
1094             xfree (m_optimize); 
1095             m_optimize = xstrdup (value);
1096         }
1097         return m_optimize;
1098     }
1099     return 0;
1100 }
1101
1102 void Yaz_ProxyClient::recv_Z_PDU(Z_APDU *apdu, int len)
1103 {
1104     m_bytes_recv += len;
1105     m_waiting = 0;
1106     yaz_log (LOG_LOG, "%s Receiving %s from %s %d bytes", get_session_str(),
1107              apdu_name(apdu), get_hostname(), len);
1108     if (apdu->which == Z_APDU_initResponse)
1109     {
1110         NMEM nmem = odr_extract_mem (odr_decode());
1111         odr_reset (m_init_odr);
1112         nmem_transfer (m_init_odr->mem, nmem);
1113         m_initResponse = apdu;
1114
1115         Z_InitResponse *ir = apdu->u.initResponse;
1116         char *im0 = ir->implementationName;
1117         
1118         char *im1 = (char*) 
1119             odr_malloc(m_init_odr, 20 + (im0 ? strlen(im0) : 0));
1120         *im1 = '\0';
1121         if (im0)
1122         {
1123             strcat(im1, im0);
1124             strcat(im1, " ");
1125         }
1126         strcat(im1, "(YAZ Proxy)");
1127         ir->implementationName = im1;
1128
1129         nmem_destroy (nmem);
1130     }
1131     if (apdu->which == Z_APDU_searchResponse)
1132     {
1133         Z_SearchResponse *sr = apdu->u.searchResponse;
1134         m_last_resultCount = *sr->resultCount;
1135         int status = *sr->searchStatus;
1136         if (status && (!sr->records || sr->records->which == Z_Records_DBOSD))
1137         {
1138             m_last_ok = 1;
1139             
1140             if (sr->records && sr->records->which == Z_Records_DBOSD)
1141             {
1142                 m_cache.add(odr_decode(),
1143                             sr->records->u.databaseOrSurDiagnostics, 1,
1144                             *sr->resultCount);
1145             }
1146         }
1147     }
1148     if (apdu->which == Z_APDU_presentResponse)
1149     {
1150         Z_PresentResponse *pr = apdu->u.presentResponse;
1151         if (m_sr_transform)
1152         {
1153             m_sr_transform = 0;
1154             Z_APDU *new_apdu = create_Z_PDU(Z_APDU_searchResponse);
1155             Z_SearchResponse *sr = new_apdu->u.searchResponse;
1156             sr->referenceId = pr->referenceId;
1157             *sr->resultCount = m_last_resultCount;
1158             sr->records = pr->records;
1159             sr->nextResultSetPosition = pr->nextResultSetPosition;
1160             sr->numberOfRecordsReturned = pr->numberOfRecordsReturned;
1161             apdu = new_apdu;
1162         }
1163         if (pr->records && 
1164             pr->records->which == Z_Records_DBOSD && m_resultSetStartPoint)
1165         {
1166             m_cache.add(odr_decode(),
1167                         pr->records->u.databaseOrSurDiagnostics,
1168                         m_resultSetStartPoint, -1);
1169             m_resultSetStartPoint = 0;
1170         }
1171     }
1172     if (m_cookie)
1173         set_otherInformationString (apdu, VAL_COOKIE, 1, m_cookie);
1174     if (m_server)
1175     {
1176         m_server->send_to_client(apdu);
1177     }
1178     if (apdu->which == Z_APDU_close)
1179     {
1180         shutdown();
1181     }
1182 }