Multiplexing
[yazpp-moved-to-github.git] / src / yaz-proxy.cpp
1 /*
2  * Copyright (c) 1998-2003, Index Data.
3  * See the file LICENSE for details.
4  * 
5  * $Id: yaz-proxy.cpp,v 1.52 2003-10-08 09:32:49 adam Exp $
6  */
7
8 #include <assert.h>
9 #include <time.h>
10
11 #include <yaz/log.h>
12 #include <yaz/diagbib1.h>
13 #include <yaz++/proxy.h>
14
15 static const char *apdu_name(Z_APDU *apdu)
16 {
17     switch (apdu->which)
18     {
19     case Z_APDU_initRequest:
20         return "initRequest";
21     case Z_APDU_initResponse:
22         return "initResponse";
23     case Z_APDU_searchRequest:
24         return "searchRequest";
25     case Z_APDU_searchResponse:
26         return "searchResponse";
27     case Z_APDU_presentRequest:
28         return "presentRequest";
29     case Z_APDU_presentResponse:
30         return "presentResponse";
31     case Z_APDU_deleteResultSetRequest:
32         return "deleteResultSetRequest";
33     case Z_APDU_deleteResultSetResponse:
34         return "deleteResultSetResponse";
35     case Z_APDU_scanRequest:
36         return "scanRequest";
37     case Z_APDU_scanResponse:
38         return "scanResponse";
39     case Z_APDU_sortRequest:
40         return "sortRequest";
41     case Z_APDU_sortResponse:
42         return "sortResponse";
43     case Z_APDU_extendedServicesRequest:
44         return "extendedServicesRequest";
45     case Z_APDU_extendedServicesResponse:
46         return "extendedServicesResponse";
47     case Z_APDU_close:
48         return "close";
49     }
50     return "other";
51 }
52
53 Yaz_Proxy::Yaz_Proxy(IYaz_PDU_Observable *the_PDU_Observable) :
54     Yaz_Z_Assoc(the_PDU_Observable), m_bw_stat(60), m_pdu_stat(60)
55 {
56     m_PDU_Observable = the_PDU_Observable;
57     m_client = 0;
58     m_parent = 0;
59     m_clientPool = 0;
60     m_seqno = 1;
61     m_keepalive = 0;
62     m_proxyTarget = 0;
63     m_default_target = 0;
64     m_proxy_authentication = 0;
65     m_max_clients = 150;
66     m_seed = time(0);
67     m_client_idletime = 600;
68     m_target_idletime = 600;
69     m_optimize = xstrdup ("1");
70     strcpy(m_session_str, "x");
71     m_session_no=0;
72     m_bytes_sent = m_bytes_recv = 0;
73     m_bw_hold_PDU = 0;
74     m_bw_max = 0;
75     m_pdu_max = 0;
76     m_max_record_retrieve = 0;
77 }
78
79 Yaz_Proxy::~Yaz_Proxy()
80 {
81     yaz_log(LOG_LOG, "%s Closed %d/%d sent/recv bytes total", m_session_str,
82             m_bytes_sent, m_bytes_recv);
83     xfree (m_proxyTarget);
84     xfree (m_default_target);
85     xfree (m_proxy_authentication);
86     xfree (m_optimize);
87 }
88
89 int Yaz_Proxy::set_config(const char *config)
90 {
91     int r = m_config.read_xml(config);
92     return r;
93 }
94
95 void Yaz_Proxy::set_default_target(const char *target)
96 {
97     xfree (m_default_target);
98     m_default_target = 0;
99     if (target)
100         m_default_target = (char *) xstrdup (target);
101 }
102
103 void Yaz_Proxy::set_proxy_authentication (const char *auth)
104 {
105     xfree (m_proxy_authentication);
106     m_proxy_authentication = 0;
107     if (auth)
108         m_proxy_authentication = (char *) xstrdup (auth);
109 }
110
111 IYaz_PDU_Observer *Yaz_Proxy::sessionNotify(IYaz_PDU_Observable
112                                             *the_PDU_Observable, int fd)
113 {
114     Yaz_Proxy *new_proxy = new Yaz_Proxy(the_PDU_Observable);
115     new_proxy->m_parent = this;
116     new_proxy->m_config = m_config;
117     new_proxy->timeout(m_client_idletime);
118     new_proxy->m_target_idletime = m_target_idletime;
119     new_proxy->set_default_target(m_default_target);
120     new_proxy->set_APDU_log(get_APDU_log());
121     new_proxy->set_proxy_authentication(m_proxy_authentication);
122     sprintf(new_proxy->m_session_str, "%ld:%d", (long) time(0), m_session_no);
123     m_session_no++;
124     yaz_log (LOG_LOG, "%s New session %s", new_proxy->m_session_str,
125              the_PDU_Observable->getpeername());
126     return new_proxy;
127 }
128
129 char *Yaz_Proxy::get_cookie(Z_OtherInformation **otherInfo)
130 {
131     int oid[OID_SIZE];
132     Z_OtherInformationUnit *oi;
133     struct oident ent;
134     ent.proto = PROTO_Z3950;
135     ent.oclass = CLASS_USERINFO;
136     ent.value = (oid_value) VAL_COOKIE;
137     assert (oid_ent_to_oid (&ent, oid));
138
139     if (oid_ent_to_oid (&ent, oid) && 
140         (oi = update_otherInformation(otherInfo, 0, oid, 1, 1)) &&
141         oi->which == Z_OtherInfo_characterInfo)
142         return oi->information.characterInfo;
143     return 0;
144 }
145
146 char *Yaz_Proxy::get_proxy(Z_OtherInformation **otherInfo)
147 {
148     int oid[OID_SIZE];
149     Z_OtherInformationUnit *oi;
150     struct oident ent;
151     ent.proto = PROTO_Z3950;
152     ent.oclass = CLASS_USERINFO;
153     ent.value = (oid_value) VAL_PROXY;
154     if (oid_ent_to_oid (&ent, oid) &&
155         (oi = update_otherInformation(otherInfo, 0, oid, 1, 1)) &&
156         oi->which == Z_OtherInfo_characterInfo)
157         return oi->information.characterInfo;
158     return 0;
159 }
160
161 const char *Yaz_Proxy::load_balance(const char **url)
162 {
163     int zurl_in_use[MAX_ZURL_PLEX];
164     Yaz_ProxyClient *c;
165     int i;
166
167     for (i = 0; i<MAX_ZURL_PLEX; i++)
168         zurl_in_use[i] = 0;
169     for (c = m_parent->m_clientPool; c; c = c->m_next)
170     {
171         for (i = 0; url[i]; i++)
172             if (!strcmp(url[i], c->get_hostname()))
173                 zurl_in_use[i]++;
174     }
175     int min = 100000;
176     const char *ret = 0;
177     for (i = 0; url[i]; i++)
178     {
179         yaz_log(LOG_LOG, "%s zurl=%s use=%d",
180                 m_session_str, url[i], zurl_in_use[i]);
181         if (min > zurl_in_use[i])
182         {
183             ret = url[i];
184             min = zurl_in_use[i];
185         }
186     }
187     return ret;
188 }
189
190 Yaz_ProxyClient *Yaz_Proxy::get_client(Z_APDU *apdu)
191 {
192     assert (m_parent);
193     Yaz_Proxy *parent = m_parent;
194     Z_OtherInformation **oi;
195     Yaz_ProxyClient *c = m_client;
196     
197     get_otherInfoAPDU(apdu, &oi);
198     char *cookie = get_cookie(oi);
199
200     if (!m_proxyTarget)
201     {
202         const char *url[MAX_ZURL_PLEX];
203         const char *proxy_host = get_proxy(oi);
204         if (proxy_host)
205         {
206             xfree(m_default_target);
207             m_default_target = xstrdup(proxy_host);
208             proxy_host = m_default_target;
209         }
210         
211         int client_idletime = -1;
212         m_config.get_target_info(proxy_host, url, &m_keepalive, &m_bw_max,
213                                  &m_pdu_max, &m_max_record_retrieve,
214                                  &m_target_idletime, &client_idletime,
215                                  &parent->m_max_clients);
216         if (client_idletime != -1)
217         {
218             m_client_idletime = client_idletime;
219             timeout(m_client_idletime);
220         }
221         if (!url[0])
222         {
223             yaz_log(LOG_LOG, "%s No default target", m_session_str);
224             return 0;
225         }
226         m_proxyTarget = (char*) xstrdup(load_balance(url));
227     }
228     if (cookie && *cookie)
229     {
230         Yaz_ProxyClient *cc = 0;
231         
232         for (c = parent->m_clientPool; c; c = c->m_next)
233         {
234             assert (c->m_prev);
235             assert (*c->m_prev == c);
236             if (c->m_cookie && !strcmp(cookie,c->m_cookie) &&
237                 !strcmp(m_proxyTarget, c->get_hostname()))
238             {
239                 cc = c;
240             }
241         }
242         if (cc)
243         {
244             // found it in cache
245             c = cc;
246             // The following handles "cancel"
247             // If connection is busy (waiting for PDU) and
248             // we have an initRequest we can safely do re-open
249             if (c->m_waiting && apdu->which == Z_APDU_initRequest)
250             {
251                 yaz_log (LOG_LOG, "%s REOPEN target=%s", m_session_str,
252                          c->get_hostname());
253                 c->close();
254                 c->client(m_proxyTarget);
255                 c->m_init_flag = 0;
256
257                 c->m_last_ok = 0;
258                 c->m_cache.clear();
259                 c->m_last_resultCount = 0;
260                 c->m_sr_transform = 0;
261                 c->m_waiting = 0;
262                 c->m_resultSetStartPoint = 0;
263                 c->timeout(m_target_idletime); 
264             }
265             c->m_seqno = parent->m_seqno;
266             if (c->m_server && c->m_server != this)
267                 c->m_server->m_client = 0;
268             c->m_server = this;
269             (parent->m_seqno)++;
270             yaz_log (LOG_DEBUG, "get_client 1 %p %p", this, c);
271             return c;
272         }
273     }
274     else if (!c)
275     {
276         Yaz_ProxyClient *cc = 0;
277         
278         for (c = parent->m_clientPool; c; c = c->m_next)
279         {
280             assert (c->m_prev);
281             assert (*c->m_prev == c);
282             if (c->m_server == 0 && c->m_cookie == 0 && 
283                 !strcmp(m_proxyTarget, c->get_hostname()))
284             {
285                 cc = c;
286             }
287         }
288         if (cc)
289         {
290             // found it in cache
291             c = cc;
292
293             yaz_log (LOG_LOG, "%s REUSE %d %d %s",
294                      m_session_str,
295                      c->m_seqno, parent->m_seqno, c->get_hostname());
296
297             c->m_seqno = parent->m_seqno;
298             assert(c->m_server == 0);
299             c->m_server = this;
300             
301             (parent->m_seqno)++;
302             return c;
303         }
304     }
305     if (!m_client)
306     {
307         if (apdu->which != Z_APDU_initRequest)
308         {
309             yaz_log (LOG_LOG, "no first INIT!");
310             return 0;
311         }
312         Z_InitRequest *initRequest = apdu->u.initRequest;
313
314         if (!initRequest->idAuthentication)
315         {
316             if (m_proxy_authentication)
317             {
318                 initRequest->idAuthentication =
319                     (Z_IdAuthentication *)
320                     odr_malloc (odr_encode(),
321                                 sizeof(*initRequest->idAuthentication));
322                 initRequest->idAuthentication->which =
323                     Z_IdAuthentication_open;
324                 initRequest->idAuthentication->u.open =
325                     odr_strdup (odr_encode(), m_proxy_authentication);
326             }
327         }
328
329         // go through list of clients - and find the lowest/oldest one.
330         Yaz_ProxyClient *c_min = 0;
331         int min_seq = -1;
332         int no_of_clients = 0;
333         if (parent->m_clientPool)
334             yaz_log (LOG_LOG, "Existing sessions");
335         for (c = parent->m_clientPool; c; c = c->m_next)
336         {
337             yaz_log (LOG_LOG, " Session %-3d wait=%d %s cookie=%s", c->m_seqno,
338                                c->m_waiting, c->get_hostname(),
339                                c->m_cookie ? c->m_cookie : "");
340             no_of_clients++;
341             if (min_seq < 0 || c->m_seqno < min_seq)
342             {
343                 min_seq = c->m_seqno;
344                 c_min = c;
345             }
346         }
347         if (no_of_clients >= parent->m_max_clients)
348         {
349             c = c_min;
350             if (c->m_waiting || strcmp(m_proxyTarget, c->get_hostname()))
351             {
352                 yaz_log (LOG_LOG, "%s MAXCLIENTS Destroy %d",
353                          m_session_str, c->m_seqno);
354                 if (c->m_server && c->m_server != this)
355                     delete c->m_server;
356                 c->m_server = 0;
357             }
358             else
359             {
360                 yaz_log (LOG_LOG, "%s MAXCLIENTS Reuse %d %d %s",
361                          m_session_str,
362                          c->m_seqno, parent->m_seqno, c->get_hostname());
363                 xfree (c->m_cookie);
364                 c->m_cookie = 0;
365                 if (cookie)
366                     c->m_cookie = xstrdup(cookie);
367                 c->m_seqno = parent->m_seqno;
368                 if (c->m_server && c->m_server != this)
369                 {
370                     c->m_server->m_client = 0;
371                     delete c->m_server;
372                 }
373                 (parent->m_seqno)++;
374                 return c;
375             }
376         }
377         else
378         {
379             yaz_log (LOG_LOG, "%s NEW %d %s",
380                      m_session_str, parent->m_seqno, m_proxyTarget);
381             c = new Yaz_ProxyClient(m_PDU_Observable->clone());
382             c->m_next = parent->m_clientPool;
383             if (c->m_next)
384                 c->m_next->m_prev = &c->m_next;
385             parent->m_clientPool = c;
386             c->m_prev = &parent->m_clientPool;
387         }
388
389         xfree (c->m_cookie);
390         c->m_cookie = 0;
391         if (cookie)
392             c->m_cookie = xstrdup(cookie);
393
394         c->m_seqno = parent->m_seqno;
395         c->client(m_proxyTarget);
396         c->m_init_flag = 0;
397         c->m_last_resultCount = 0;
398         c->m_last_ok = 0;
399         c->m_cache.clear();
400         c->m_sr_transform = 0;
401         c->m_waiting = 0;
402         c->m_resultSetStartPoint = 0;
403         c->timeout(30);
404
405         (parent->m_seqno)++;
406     }
407     yaz_log (LOG_DEBUG, "get_client 3 %p %p", this, c);
408     return c;
409 }
410
411 void Yaz_Proxy::display_diagrecs(Z_DiagRec **pp, int num)
412 {
413     int i;
414     for (i = 0; i<num; i++)
415     {
416         oident *ent;
417         Z_DefaultDiagFormat *r;
418         Z_DiagRec *p = pp[i];
419         if (p->which != Z_DiagRec_defaultFormat)
420         {
421             yaz_log(LOG_LOG, "%s Error no diagnostics", m_session_str);
422             return;
423         }
424         else
425             r = p->u.defaultFormat;
426         if (!(ent = oid_getentbyoid(r->diagnosticSetId)) ||
427             ent->oclass != CLASS_DIAGSET || ent->value != VAL_BIB1)
428             yaz_log(LOG_LOG, "%s Error unknown diagnostic set", m_session_str);
429         switch (r->which)
430         {
431         case Z_DefaultDiagFormat_v2Addinfo:
432             yaz_log(LOG_LOG, "%s Error %d %s:%s",
433                     m_session_str,
434                     *r->condition, diagbib1_str(*r->condition),
435                     r->u.v2Addinfo);
436             break;
437         case Z_DefaultDiagFormat_v3Addinfo:
438             yaz_log(LOG_LOG, "%s Error %d %s:%s",
439                     m_session_str,
440                     *r->condition, diagbib1_str(*r->condition),
441                     r->u.v3Addinfo);
442             break;
443         }
444     }
445 }
446
447 int Yaz_Proxy::send_to_client(Z_APDU *apdu)
448 {
449     int len = 0;
450     if (apdu->which == Z_APDU_searchResponse)
451     {
452         Z_SearchResponse *sr = apdu->u.searchResponse;
453         Z_Records *p = sr->records;
454         if (p && p->which == Z_Records_NSD)
455         {
456             Z_DiagRec dr, *dr_p = &dr;
457             dr.which = Z_DiagRec_defaultFormat;
458             dr.u.defaultFormat = p->u.nonSurrogateDiagnostic;
459
460             display_diagrecs(&dr_p, 1);
461         }
462         else
463         {
464             if (sr->resultCount)
465                 yaz_log(LOG_LOG, "%s %d hits", m_session_str,
466                         *sr->resultCount);
467         }
468     }
469     else if (apdu->which == Z_APDU_presentResponse)
470     {
471         Z_PresentResponse *sr = apdu->u.presentResponse;
472         Z_Records *p = sr->records;
473         if (p && p->which == Z_Records_NSD)
474         {
475             Z_DiagRec dr, *dr_p = &dr;
476             dr.which = Z_DiagRec_defaultFormat;
477             dr.u.defaultFormat = p->u.nonSurrogateDiagnostic;
478
479             display_diagrecs(&dr_p, 1);
480         }
481     }
482     int r = send_Z_PDU(apdu, &len);
483     yaz_log (LOG_LOG, "%s Sending %s to client %d bytes", m_session_str,
484              apdu_name(apdu), len);
485     m_bytes_sent += len;
486     m_bw_stat.add_bytes(len);
487     return r;
488 }
489
490 int Yaz_ProxyClient::send_to_target(Z_APDU *apdu)
491 {
492     int len = 0;
493     int r = send_Z_PDU(apdu, &len);
494     yaz_log (LOG_LOG, "%s Sending %s to %s %d bytes",
495              get_session_str(),
496              apdu_name(apdu), get_hostname(), len);
497     m_bytes_sent += len;
498     return r;
499 }
500
501 Z_APDU *Yaz_Proxy::result_set_optimize(Z_APDU *apdu)
502 {
503     if (*m_parent->m_optimize == '0')
504         return apdu;      // don't optimize result sets..
505     if (apdu->which == Z_APDU_presentRequest)
506     {
507         Z_PresentRequest *pr = apdu->u.presentRequest;
508         Z_NamePlusRecordList *npr;
509         int toget = *pr->numberOfRecordsRequested;
510         int start = *pr->resultSetStartPoint;
511
512         if (m_client->m_last_resultSetId &&
513             !strcmp(m_client->m_last_resultSetId, pr->resultSetId))
514         {
515             if (m_client->m_cache.lookup (odr_encode(), &npr, start, toget,
516                                           pr->preferredRecordSyntax,
517                                           pr->recordComposition))
518             {
519                 yaz_log (LOG_LOG, "%s Returned cache records for present request", 
520                          m_session_str);
521                 Z_APDU *new_apdu = create_Z_PDU(Z_APDU_presentResponse);
522                 new_apdu->u.presentResponse->referenceId = pr->referenceId;
523                 
524                 new_apdu->u.presentResponse->numberOfRecordsReturned
525                     = odr_intdup(odr_encode(), toget);
526                                                                  
527                 new_apdu->u.presentResponse->records = (Z_Records*)
528                     odr_malloc(odr_encode(), sizeof(Z_Records));
529                 new_apdu->u.presentResponse->records->which = Z_Records_DBOSD;
530                 new_apdu->u.presentResponse->records->u.databaseOrSurDiagnostics = npr;
531                 new_apdu->u.presentResponse->nextResultSetPosition =
532                     odr_intdup(odr_encode(), start+toget);
533
534                 send_to_client(new_apdu);
535                 return 0;
536             }
537         }
538     }
539
540     if (apdu->which != Z_APDU_searchRequest)
541         return apdu;
542     Z_SearchRequest *sr = apdu->u.searchRequest;
543     Yaz_Z_Query *this_query = new Yaz_Z_Query;
544     Yaz_Z_Databases this_databases;
545
546     this_databases.set(sr->num_databaseNames, (const char **)
547                        sr->databaseNames);
548     
549     this_query->set_Z_Query(sr->query);
550
551     char query_str[80];
552     this_query->print(query_str, sizeof(query_str)-1);
553     yaz_log(LOG_LOG, "%s Query %s", m_session_str, query_str);
554
555     if (m_client->m_last_ok && m_client->m_last_query &&
556         m_client->m_last_query->match(this_query) &&
557         !strcmp(m_client->m_last_resultSetId, sr->resultSetName) &&
558         m_client->m_last_databases.match(this_databases))
559     {
560         delete this_query;
561         if (m_client->m_last_resultCount > *sr->smallSetUpperBound &&
562             m_client->m_last_resultCount < *sr->largeSetLowerBound)
563         {
564             Z_NamePlusRecordList *npr;
565             int toget = *sr->mediumSetPresentNumber;
566             Z_RecordComposition *comp = 0;
567
568             if (toget > m_client->m_last_resultCount)
569                 toget = m_client->m_last_resultCount;
570             
571             if (sr->mediumSetElementSetNames)
572             {
573                 comp = (Z_RecordComposition *)
574                     odr_malloc(odr_encode(), sizeof(Z_RecordComposition));
575                 comp->which = Z_RecordComp_simple;
576                 comp->u.simple = sr->mediumSetElementSetNames;
577             }
578  
579             if (m_client->m_cache.lookup (odr_encode(), &npr, 1, toget,
580                                           sr->preferredRecordSyntax, comp))
581             {
582                 yaz_log (LOG_LOG, "%s Returned cache records for medium set",
583                          m_session_str);
584                 Z_APDU *new_apdu = create_Z_PDU(Z_APDU_searchResponse);
585                 new_apdu->u.searchResponse->referenceId = sr->referenceId;
586                 new_apdu->u.searchResponse->resultCount =
587                     &m_client->m_last_resultCount;
588                 
589                 new_apdu->u.searchResponse->numberOfRecordsReturned
590                     = odr_intdup(odr_encode(), toget);
591                                                         
592                 new_apdu->u.searchResponse->presentStatus =
593                     odr_intdup(odr_encode(), Z_PresentStatus_success);
594                 new_apdu->u.searchResponse->records = (Z_Records*)
595                     odr_malloc(odr_encode(), sizeof(Z_Records));
596                 new_apdu->u.searchResponse->records->which = Z_Records_DBOSD;
597                 new_apdu->u.searchResponse->records->u.databaseOrSurDiagnostics = npr;
598                 new_apdu->u.searchResponse->nextResultSetPosition =
599                     odr_intdup(odr_encode(), toget+1);
600                 send_to_client(new_apdu);
601                 return 0;
602             }
603             else
604             {
605                 // medium Set
606                 // send present request (medium size)
607                 yaz_log (LOG_LOG, "%s Optimizing search for medium set",
608                          m_session_str);
609
610                 Z_APDU *new_apdu = create_Z_PDU(Z_APDU_presentRequest);
611                 Z_PresentRequest *pr = new_apdu->u.presentRequest;
612                 pr->referenceId = sr->referenceId;
613                 pr->resultSetId = sr->resultSetName;
614                 pr->preferredRecordSyntax = sr->preferredRecordSyntax;
615                 *pr->numberOfRecordsRequested = toget;
616                 pr->recordComposition = comp;
617                 m_client->m_sr_transform = 1;
618                 return new_apdu;
619             }
620         }
621         else if (m_client->m_last_resultCount >= *sr->largeSetLowerBound ||
622             m_client->m_last_resultCount <= 0)
623         {
624             // large set. Return pseudo-search response immediately
625             yaz_log (LOG_LOG, "%s Optimizing search for large set",
626                      m_session_str);
627             Z_APDU *new_apdu = create_Z_PDU(Z_APDU_searchResponse);
628             new_apdu->u.searchResponse->referenceId = sr->referenceId;
629             new_apdu->u.searchResponse->resultCount =
630                 &m_client->m_last_resultCount;
631             send_to_client(new_apdu);
632             return 0;
633         }
634         else
635         {
636             Z_NamePlusRecordList *npr;
637             int toget = m_client->m_last_resultCount;
638             Z_RecordComposition *comp = 0;
639             // small set
640             // send a present request (small set)
641             
642             if (sr->smallSetElementSetNames)
643             {
644                 comp = (Z_RecordComposition *)
645                     odr_malloc(odr_encode(), sizeof(Z_RecordComposition));
646                 comp->which = Z_RecordComp_simple;
647                 comp->u.simple = sr->smallSetElementSetNames;
648             }
649
650             if (m_client->m_cache.lookup (odr_encode(), &npr, 1, toget,
651                                           sr->preferredRecordSyntax, comp))
652             {
653                 yaz_log (LOG_LOG, "%s Returned cache records for small set",
654                          m_session_str);
655                 Z_APDU *new_apdu = create_Z_PDU(Z_APDU_searchResponse);
656                 new_apdu->u.searchResponse->referenceId = sr->referenceId;
657                 new_apdu->u.searchResponse->resultCount =
658                     &m_client->m_last_resultCount;
659                 
660                 new_apdu->u.searchResponse->numberOfRecordsReturned
661                     = odr_intdup(odr_encode(), toget);
662                                                                  
663                 new_apdu->u.searchResponse->presentStatus =
664                     odr_intdup(odr_encode(), Z_PresentStatus_success);
665                 new_apdu->u.searchResponse->records = (Z_Records*)
666                     odr_malloc(odr_encode(), sizeof(Z_Records));
667                 new_apdu->u.searchResponse->records->which = Z_Records_DBOSD;
668                 new_apdu->u.searchResponse->records->u.databaseOrSurDiagnostics = npr;
669                 new_apdu->u.searchResponse->nextResultSetPosition =
670                     odr_intdup(odr_encode(), toget+1);
671                 send_to_client(new_apdu);
672                 return 0;
673             }
674             else
675             {
676                 yaz_log (LOG_LOG, "%s Optimizing search for small set",
677                          m_session_str);
678                 Z_APDU *new_apdu = create_Z_PDU(Z_APDU_presentRequest);
679                 Z_PresentRequest *pr = new_apdu->u.presentRequest;
680                 pr->referenceId = sr->referenceId;
681                 pr->resultSetId = sr->resultSetName;
682                 pr->preferredRecordSyntax = sr->preferredRecordSyntax;
683                 *pr->numberOfRecordsRequested = toget;
684                 pr->recordComposition = comp;
685                 m_client->m_sr_transform = 1;
686                 return new_apdu;
687             }
688         }
689     }
690     else  // query doesn't match
691     {
692         delete m_client->m_last_query;
693         m_client->m_last_query = this_query;
694         m_client->m_last_ok = 0;
695         m_client->m_cache.clear();
696         m_client->m_resultSetStartPoint = 0;
697
698         xfree (m_client->m_last_resultSetId);
699         m_client->m_last_resultSetId = xstrdup (sr->resultSetName);
700
701         m_client->m_last_databases.set(sr->num_databaseNames,
702                                        (const char **) sr->databaseNames);
703     }
704     return apdu;
705 }
706
707
708 void Yaz_Proxy::recv_Z_PDU(Z_APDU *apdu, int len)
709 {
710     int reduce = 0;
711     m_bytes_recv += len;
712     
713     yaz_log (LOG_LOG, "%s Receiving %s from client %d bytes", m_session_str,
714              apdu_name(apdu), len);
715
716     if (m_bw_hold_PDU)     // double incoming PDU. shutdown now.
717         shutdown();
718
719     m_bw_stat.add_bytes(len);
720     m_pdu_stat.add_bytes(1);
721
722     int bw_total = m_bw_stat.get_total();
723     int pdu_total = m_pdu_stat.get_total();
724
725     yaz_log(LOG_LOG, "%s stat bw=%d pdu=%d limit-bw=%d limit-pdu=%d",
726             m_session_str, bw_total, pdu_total, m_bw_max, m_pdu_max);
727     if (m_bw_max)
728     {
729         if (bw_total > m_bw_max)
730         {
731             reduce = (bw_total/m_bw_max);
732         }
733     }
734     if (m_pdu_max)
735     {
736         if (pdu_total > m_pdu_max)
737         {
738             int nreduce = (60/m_pdu_max);
739             reduce = (reduce > nreduce) ? reduce : nreduce;
740         }
741     }
742     if (reduce)  
743     {
744         yaz_log(LOG_LOG, "%s Limit delay=%d", m_session_str, reduce);
745         m_bw_hold_PDU = apdu;  // save PDU and signal "on hold"
746         timeout(reduce);       // call us reduce seconds later
747     }
748     else
749         recv_Z_PDU_0(apdu);    // all fine. Proceed receive PDU as usual
750 }
751
752 void Yaz_Proxy::handle_max_record_retrieve(Z_APDU *apdu)
753 {
754     if (m_max_record_retrieve)
755     {
756         if (apdu->which == Z_APDU_presentRequest)
757         {
758             Z_PresentRequest *pr = apdu->u.presentRequest;
759             if (pr->numberOfRecordsRequested && 
760                 *pr->numberOfRecordsRequested > m_max_record_retrieve)
761                 *pr->numberOfRecordsRequested = m_max_record_retrieve;
762         }
763     }
764 }
765
766 Z_Records *Yaz_Proxy::create_nonSurrogateDiagnostics(ODR odr,
767                                                      int error,
768                                                      const char *addinfo)
769 {
770     Z_Records *rec = (Z_Records *)
771         odr_malloc (odr, sizeof(*rec));
772     int *err = (int *)
773         odr_malloc (odr, sizeof(*err));
774     Z_DiagRec *drec = (Z_DiagRec *)
775         odr_malloc (odr, sizeof(*drec));
776     Z_DefaultDiagFormat *dr = (Z_DefaultDiagFormat *)
777         odr_malloc (odr, sizeof(*dr));
778     *err = error;
779     rec->which = Z_Records_NSD;
780     rec->u.nonSurrogateDiagnostic = dr;
781     dr->diagnosticSetId =
782         yaz_oidval_to_z3950oid (odr, CLASS_DIAGSET, VAL_BIB1);
783     dr->condition = err;
784     dr->which = Z_DefaultDiagFormat_v2Addinfo;
785     dr->u.v2Addinfo = odr_strdup (odr, addinfo ? addinfo : "");
786     return rec;
787 }
788
789 Z_APDU *Yaz_Proxy::handle_query_validation(Z_APDU *apdu)
790 {
791     if (apdu->which == Z_APDU_searchRequest)
792     {
793         Z_SearchRequest *sr = apdu->u.searchRequest;
794         int err;
795         char *addinfo = 0;
796         err = m_config.check_query(odr_encode(), m_default_target, sr->query,
797                                    &addinfo);
798         if (err)
799         {
800             Z_APDU *new_apdu = create_Z_PDU(Z_APDU_searchResponse);
801
802             new_apdu->u.searchResponse->referenceId = sr->referenceId;
803             new_apdu->u.searchResponse->records =
804                 create_nonSurrogateDiagnostics(odr_encode(), err, addinfo);
805             *new_apdu->u.searchResponse->searchStatus = 0;
806
807             send_to_client(new_apdu);
808
809             return 0;
810         }
811     }
812     return apdu;
813 }
814
815 Z_APDU *Yaz_Proxy::handle_syntax_validation(Z_APDU *apdu)
816 {
817     if (apdu->which == Z_APDU_searchRequest)
818     {
819         Z_SearchRequest *sr = apdu->u.searchRequest;
820         if (*sr->smallSetUpperBound > 0 || *sr->largeSetLowerBound > 1)
821         {
822             int err;
823             char *addinfo = 0;
824             err = m_config.check_syntax(odr_encode(), m_default_target,
825                                         sr->preferredRecordSyntax,
826                                         &addinfo);
827             if (err)
828             {
829                 Z_APDU *new_apdu = create_Z_PDU(Z_APDU_searchResponse);
830                 
831                 new_apdu->u.searchResponse->referenceId = sr->referenceId;
832                 new_apdu->u.searchResponse->records =
833                     create_nonSurrogateDiagnostics(odr_encode(), err, addinfo);
834                 *new_apdu->u.searchResponse->searchStatus = 0;
835                 
836                 send_to_client(new_apdu);
837                 
838                 return 0;
839             }
840         }
841     }
842     else if (apdu->which == Z_APDU_presentRequest)
843     {
844         Z_PresentRequest *pr = apdu->u.presentRequest;
845         int err;
846         char *addinfo = 0;
847         err = m_config.check_syntax(odr_encode(), m_default_target,
848                                     pr->preferredRecordSyntax,
849                                     &addinfo);
850         if (err)
851         {
852             Z_APDU *new_apdu = create_Z_PDU(Z_APDU_presentResponse);
853             
854             new_apdu->u.presentResponse->referenceId = pr->referenceId;
855             new_apdu->u.presentResponse->records =
856                 create_nonSurrogateDiagnostics(odr_encode(), err, addinfo);
857             *new_apdu->u.presentResponse->presentStatus =
858                 Z_PresentStatus_failure;
859             
860             send_to_client(new_apdu);
861             
862             return 0;
863         }
864     }
865     return apdu;
866 }
867
868 void Yaz_Proxy::recv_Z_PDU_0(Z_APDU *apdu)
869 {
870     // Determine our client.
871     m_client = get_client(apdu);
872     if (!m_client)
873     {
874         delete this;
875         return;
876     }
877     m_client->m_server = this;
878
879     if (apdu->which == Z_APDU_initRequest)
880     {
881         if (apdu->u.initRequest->implementationId)
882             yaz_log(LOG_LOG, "%s implementationId: %s",
883                     m_session_str, apdu->u.initRequest->implementationId);
884         if (apdu->u.initRequest->implementationName)
885             yaz_log(LOG_LOG, "%s implementationName: %s",
886                     m_session_str, apdu->u.initRequest->implementationName);
887         if (apdu->u.initRequest->implementationVersion)
888             yaz_log(LOG_LOG, "%s implementationVersion: %s",
889                     m_session_str, apdu->u.initRequest->implementationVersion);
890         if (m_client->m_init_flag)
891         {
892             Z_APDU *apdu = m_client->m_initResponse;
893             apdu->u.initResponse->otherInfo = 0;
894             if (m_client->m_cookie && *m_client->m_cookie)
895                 set_otherInformationString(apdu, VAL_COOKIE, 1,
896                                            m_client->m_cookie);
897             send_to_client(apdu);
898             return;
899         }
900         m_client->m_init_flag = 1;
901     }
902     handle_max_record_retrieve(apdu);
903
904     if (apdu)
905         apdu = handle_syntax_validation(apdu);
906
907     if (apdu)
908         apdu = handle_query_validation(apdu);
909
910     if (apdu)
911         apdu = result_set_optimize(apdu);
912     if (!apdu)
913     {
914         m_client->timeout(m_target_idletime);  // mark it active even 
915         // though we didn't use it
916         return;
917     }
918
919     // delete other info part from PDU before sending to target
920     Z_OtherInformation **oi;
921     get_otherInfoAPDU(apdu, &oi);
922     if (oi)
923         *oi = 0;
924
925     if (apdu->which == Z_APDU_presentRequest &&
926         m_client->m_resultSetStartPoint == 0)
927     {
928         Z_PresentRequest *pr = apdu->u.presentRequest;
929         m_client->m_resultSetStartPoint = *pr->resultSetStartPoint;
930         m_client->m_cache.copy_presentRequest(apdu->u.presentRequest);
931     } else {
932         m_client->m_resultSetStartPoint = 0;
933     }
934     if (m_client->send_to_target(apdu) < 0)
935     {
936         delete m_client;
937         m_client = 0;
938         delete this;
939     }
940     else
941         m_client->m_waiting = 1;
942 }
943
944 void Yaz_Proxy::connectNotify()
945 {
946 }
947
948 void Yaz_Proxy::shutdown()
949 {
950     // only keep if keep_alive flag is set...
951     if (m_keepalive && m_client && m_client->m_waiting == 0)
952     {
953         yaz_log (LOG_LOG, "%s Shutdown (client to proxy) keepalive %s",
954                  m_session_str,
955                  m_client->get_hostname());
956         assert (m_client->m_waiting != 2);
957         // Tell client (if any) that no server connection is there..
958         m_client->m_server = 0;
959     }
960     else if (m_client)
961     {
962         yaz_log (LOG_LOG, "%s Shutdown (client to proxy) close %s",
963                  m_session_str,
964                  m_client->get_hostname());
965         assert (m_client->m_waiting != 2);
966         delete m_client;
967     }
968     else if (!m_parent)
969     {
970         yaz_log (LOG_LOG, "%s shutdown (client to proxy) bad state",
971                  m_session_str);
972         assert (m_parent);
973     }
974     else 
975     {
976         yaz_log (LOG_LOG, "%s Shutdown (client to proxy)",
977                  m_session_str);
978     }
979     delete this;
980 }
981
982 const char *Yaz_ProxyClient::get_session_str() 
983 {
984     if (!m_server)
985         return "0";
986     return m_server->get_session_str();
987 }
988
989 void Yaz_ProxyClient::shutdown()
990 {
991     yaz_log (LOG_LOG, "%s Shutdown (proxy to target) %s", get_session_str(),
992              get_hostname());
993     delete m_server;
994     delete this;
995 }
996
997 void Yaz_Proxy::failNotify()
998 {
999     yaz_log (LOG_LOG, "%s Connection closed by client",
1000              get_session_str());
1001     shutdown();
1002 }
1003
1004 void Yaz_ProxyClient::failNotify()
1005 {
1006     yaz_log (LOG_LOG, "%s Connection closed by target %s", 
1007              get_session_str(), get_hostname());
1008     shutdown();
1009 }
1010
1011 void Yaz_ProxyClient::connectNotify()
1012 {
1013     yaz_log (LOG_LOG, "%s Connection accepted by %s", get_session_str(),
1014              get_hostname());
1015     int to;
1016     if (m_server)
1017         to = m_server->get_target_idletime();
1018     else
1019         to = 600;
1020     timeout(to);
1021 }
1022
1023 IYaz_PDU_Observer *Yaz_ProxyClient::sessionNotify(IYaz_PDU_Observable
1024                                                   *the_PDU_Observable, int fd)
1025 {
1026     return new Yaz_ProxyClient(the_PDU_Observable);
1027 }
1028
1029 Yaz_ProxyClient::~Yaz_ProxyClient()
1030 {
1031     if (m_prev)
1032         *m_prev = m_next;
1033     if (m_next)
1034         m_next->m_prev = m_prev;
1035     m_waiting = 2;     // for debugging purposes only.
1036     odr_destroy(m_init_odr);
1037     delete m_last_query;
1038     xfree (m_last_resultSetId);
1039     xfree (m_cookie);
1040 }
1041
1042 void Yaz_Proxy::timeoutNotify()
1043 {
1044     if (m_bw_hold_PDU)
1045     {
1046         timeout(m_client_idletime);
1047         Z_APDU *apdu = m_bw_hold_PDU;
1048         m_bw_hold_PDU = 0;
1049         recv_Z_PDU_0(apdu);
1050     }
1051     else
1052     {
1053         yaz_log (LOG_LOG, "%s Timeout (client to proxy)", m_session_str);
1054         shutdown();
1055     }
1056 }
1057
1058 void Yaz_ProxyClient::timeoutNotify()
1059 {
1060     yaz_log (LOG_LOG, "%s Timeout (proxy to target) %s", get_session_str(),
1061              get_hostname());
1062     shutdown();
1063 }
1064
1065 Yaz_ProxyClient::Yaz_ProxyClient(IYaz_PDU_Observable *the_PDU_Observable) :
1066     Yaz_Z_Assoc (the_PDU_Observable)
1067 {
1068     m_cookie = 0;
1069     m_next = 0;
1070     m_prev = 0;
1071     m_init_flag = 0;
1072     m_last_query = 0;
1073     m_last_resultSetId = 0;
1074     m_last_resultCount = 0;
1075     m_last_ok = 0;
1076     m_sr_transform = 0;
1077     m_waiting = 0;
1078     m_init_odr = odr_createmem (ODR_DECODE);
1079     m_initResponse = 0;
1080     m_resultSetStartPoint = 0;
1081     m_bytes_sent = m_bytes_recv = 0;
1082 }
1083
1084 const char *Yaz_Proxy::option(const char *name, const char *value)
1085 {
1086     if (!strcmp (name, "optimize")) {
1087         if (value) {
1088             xfree (m_optimize); 
1089             m_optimize = xstrdup (value);
1090         }
1091         return m_optimize;
1092     }
1093     return 0;
1094 }
1095
1096 void Yaz_ProxyClient::recv_Z_PDU(Z_APDU *apdu, int len)
1097 {
1098     m_bytes_recv += len;
1099     m_waiting = 0;
1100     yaz_log (LOG_LOG, "%s Receiving %s from %s %d bytes", get_session_str(),
1101              apdu_name(apdu), get_hostname(), len);
1102     if (apdu->which == Z_APDU_initResponse)
1103     {
1104         NMEM nmem = odr_extract_mem (odr_decode());
1105         odr_reset (m_init_odr);
1106         nmem_transfer (m_init_odr->mem, nmem);
1107         m_initResponse = apdu;
1108
1109         Z_InitResponse *ir = apdu->u.initResponse;
1110         char *im0 = ir->implementationName;
1111         
1112         char *im1 = (char*) 
1113             odr_malloc(m_init_odr, 20 + (im0 ? strlen(im0) : 0));
1114         *im1 = '\0';
1115         if (im0)
1116         {
1117             strcat(im1, im0);
1118             strcat(im1, " ");
1119         }
1120         strcat(im1, "(YAZ Proxy)");
1121         ir->implementationName = im1;
1122
1123         nmem_destroy (nmem);
1124     }
1125     if (apdu->which == Z_APDU_searchResponse)
1126     {
1127         Z_SearchResponse *sr = apdu->u.searchResponse;
1128         m_last_resultCount = *sr->resultCount;
1129         int status = *sr->searchStatus;
1130         if (status && (!sr->records || sr->records->which == Z_Records_DBOSD))
1131         {
1132             m_last_ok = 1;
1133             
1134             if (sr->records && sr->records->which == Z_Records_DBOSD)
1135             {
1136                 m_cache.add(odr_decode(),
1137                             sr->records->u.databaseOrSurDiagnostics, 1,
1138                             *sr->resultCount);
1139             }
1140         }
1141     }
1142     if (apdu->which == Z_APDU_presentResponse)
1143     {
1144         Z_PresentResponse *pr = apdu->u.presentResponse;
1145         if (m_sr_transform)
1146         {
1147             m_sr_transform = 0;
1148             Z_APDU *new_apdu = create_Z_PDU(Z_APDU_searchResponse);
1149             Z_SearchResponse *sr = new_apdu->u.searchResponse;
1150             sr->referenceId = pr->referenceId;
1151             *sr->resultCount = m_last_resultCount;
1152             sr->records = pr->records;
1153             sr->nextResultSetPosition = pr->nextResultSetPosition;
1154             sr->numberOfRecordsReturned = pr->numberOfRecordsReturned;
1155             apdu = new_apdu;
1156         }
1157         if (pr->records && 
1158             pr->records->which == Z_Records_DBOSD && m_resultSetStartPoint)
1159         {
1160             m_cache.add(odr_decode(),
1161                         pr->records->u.databaseOrSurDiagnostics,
1162                         m_resultSetStartPoint, -1);
1163             m_resultSetStartPoint = 0;
1164         }
1165     }
1166     if (m_cookie)
1167         set_otherInformationString (apdu, VAL_COOKIE, 1, m_cookie);
1168     if (m_server)
1169     {
1170         m_server->send_to_client(apdu);
1171     }
1172     if (apdu->which == Z_APDU_close)
1173     {
1174         shutdown();
1175     }
1176 }