Clients cached within session
[pazpar2-moved-to-github.git] / src / client.c
1 /* This file is part of Pazpar2.
2    Copyright (C) 2006-2011 Index Data
3
4 Pazpar2 is free software; you can redistribute it and/or modify it under
5 the terms of the GNU General Public License as published by the Free
6 Software Foundation; either version 2, or (at your option) any later
7 version.
8
9 Pazpar2 is distributed in the hope that it will be useful, but WITHOUT ANY
10 WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12 for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
17
18 */
19
20 /** \file client.c
21     \brief Z39.50 client 
22 */
23
24 #if HAVE_CONFIG_H
25 #include <config.h>
26 #endif
27 #include <stdlib.h>
28 #include <stdio.h>
29 #include <string.h>
30 #if HAVE_SYS_TIME_H
31 #include <sys/time.h>
32 #endif
33 #if HAVE_UNISTD_H
34 #include <unistd.h>
35 #endif
36 #include <signal.h>
37 #include <assert.h>
38
39 #include <yaz/marcdisp.h>
40 #include <yaz/comstack.h>
41 #include <yaz/tcpip.h>
42 #include <yaz/proto.h>
43 #include <yaz/readconf.h>
44 #include <yaz/pquery.h>
45 #include <yaz/otherinfo.h>
46 #include <yaz/yaz-util.h>
47 #include <yaz/nmem.h>
48 #include <yaz/query-charset.h>
49 #include <yaz/querytowrbuf.h>
50 #include <yaz/oid_db.h>
51 #include <yaz/diagbib1.h>
52 #include <yaz/snprintf.h>
53 #include <yaz/rpn2cql.h>
54 #include <yaz/rpn2solr.h>
55
56 #define USE_TIMING 0
57 #if USE_TIMING
58 #include <yaz/timing.h>
59 #endif
60
61 #include "ppmutex.h"
62 #include "session.h"
63 #include "parameters.h"
64 #include "client.h"
65 #include "connection.h"
66 #include "settings.h"
67 #include "relevance.h"
68 #include "incref.h"
69
70 static YAZ_MUTEX g_mutex = 0;
71 static int no_clients = 0;
72 static int no_clients_total = 0;
73
74 static int client_use(int delta)
75 {
76     int clients;
77     if (!g_mutex)
78         yaz_mutex_create(&g_mutex);
79     yaz_mutex_enter(g_mutex);
80     no_clients += delta;
81     if (delta > 0)
82         no_clients_total += delta;
83     clients = no_clients;
84     yaz_mutex_leave(g_mutex);
85     yaz_log(YLOG_DEBUG, "%s clients=%d", delta == 0 ? "" : (delta > 0 ? "INC" : "DEC"), clients);
86     return clients;
87 }
88
89 int  clients_count(void) {
90     return client_use(0);
91 }
92
93 int  clients_count_total(void) {
94     int total = 0;
95     if (!g_mutex)
96         return 0;
97     yaz_mutex_enter(g_mutex);
98     total = no_clients_total;
99     yaz_mutex_leave(g_mutex);
100     return total;
101 }
102
103
104 /** \brief Represents client state for a connection to one search target */
105 struct client {
106     struct session_database *database;
107     struct connection *connection;
108     struct session *session;
109     char *pquery; // Current search
110     char *cqlquery; // used for SRU targets only
111     Odr_int hits;
112     int record_offset;
113     int maxrecs;
114     int startrecs;
115     int diagnostic;
116     int preferred;
117     struct suggestions *suggestions;
118     enum client_state state;
119     struct show_raw *show_raw;
120     ZOOM_resultset resultset;
121     YAZ_MUTEX mutex;
122     int ref_count;
123     char *id;
124 };
125
126 struct suggestions {
127     NMEM nmem;
128     int num;
129     char **misspelled;
130     char **suggest;
131     char *passthrough;
132 };
133
134 struct show_raw {
135     int active; // whether this request has been sent to the server
136     int position;
137     int binary;
138     char *syntax;
139     char *esn;
140     void (*error_handler)(void *data, const char *addinfo);
141     void (*record_handler)(void *data, const char *buf, size_t sz);
142     void *data;
143     struct show_raw *next;
144 };
145
146 static const char *client_states[] = {
147     "Client_Connecting",
148     "Client_Idle",
149     "Client_Working",
150     "Client_Error",
151     "Client_Failed",
152     "Client_Disconnected"
153 };
154
155 const char *client_get_state_str(struct client *cl)
156 {
157     return client_states[cl->state];
158 }
159
160 enum client_state client_get_state(struct client *cl)
161 {
162     return cl->state;
163 }
164
165 void client_set_state(struct client *cl, enum client_state st)
166 {
167     int was_active = 0;
168     if (client_is_active(cl))
169         was_active = 1;
170     cl->state = st;
171     /* If client is going from being active to inactive and all clients
172        are now idle we fire a watch for the session . The assumption is
173        that session is not mutex locked if client is already active */
174     if (was_active && !client_is_active(cl) && cl->session)
175     {
176
177         int no_active = session_active_clients(cl->session);
178         yaz_log(YLOG_DEBUG, "%s: releasing watches on zero active: %d",
179                 client_get_id(cl), no_active);
180         if (no_active == 0) {
181             session_alert_watch(cl->session, SESSION_WATCH_SHOW);
182             session_alert_watch(cl->session, SESSION_WATCH_SHOW_PREF);
183         }
184     }
185 }
186
187 static void client_show_raw_error(struct client *cl, const char *addinfo);
188
189 struct connection *client_get_connection(struct client *cl)
190 {
191     return cl->connection;
192 }
193
194 struct session_database *client_get_database(struct client *cl)
195 {
196     return cl->database;
197 }
198
199 struct session *client_get_session(struct client *cl)
200 {
201     return cl->session;
202 }
203
204 const char *client_get_pquery(struct client *cl)
205 {
206     return cl->pquery;
207 }
208
209 static void client_send_raw_present(struct client *cl);
210 static int nativesyntax_to_type(struct session_database *sdb, char *type,
211                                 ZOOM_record rec);
212
213 static void client_show_immediate(
214     ZOOM_resultset resultset, struct session_database *sdb, int position,
215     void *data,
216     void (*error_handler)(void *data, const char *addinfo),
217     void (*record_handler)(void *data, const char *buf, size_t sz),
218     int binary)
219 {
220     ZOOM_record rec = 0;
221     char type[80];
222     const char *buf;
223     int len;
224
225     if (!resultset)
226     {
227         error_handler(data, "no resultset");
228         return;
229     }
230     rec = ZOOM_resultset_record(resultset, position-1);
231     if (!rec)
232     {
233         error_handler(data, "no record");
234         return;
235     }
236     if (binary)
237         strcpy(type, "raw");
238     else
239         nativesyntax_to_type(sdb, type, rec);
240     buf = ZOOM_record_get(rec, type, &len);
241     if (!buf)
242     {
243         error_handler(data, "no record");
244         return;
245     }
246     record_handler(data, buf, len);
247 }
248
249
250 int client_show_raw_begin(struct client *cl, int position,
251                           const char *syntax, const char *esn,
252                           void *data,
253                           void (*error_handler)(void *data, const char *addinfo),
254                           void (*record_handler)(void *data, const char *buf,
255                                                  size_t sz),
256                           int binary)
257 {
258     if (syntax == 0 && esn == 0)
259         client_show_immediate(cl->resultset, client_get_database(cl),
260                               position, data,
261                               error_handler, record_handler,
262                               binary);
263     else
264     {
265         struct show_raw *rr, **rrp;
266
267         if (!cl->connection)
268             return -1;
269     
270
271         rr = xmalloc(sizeof(*rr));
272         rr->position = position;
273         rr->active = 0;
274         rr->data = data;
275         rr->error_handler = error_handler;
276         rr->record_handler = record_handler;
277         rr->binary = binary;
278         if (syntax)
279             rr->syntax = xstrdup(syntax);
280         else
281             rr->syntax = 0;
282         if (esn)
283             rr->esn = xstrdup(esn);
284         else
285             rr->esn = 0;
286         rr->next = 0;
287         
288         for (rrp = &cl->show_raw; *rrp; rrp = &(*rrp)->next)
289             ;
290         *rrp = rr;
291         
292         if (cl->state == Client_Failed)
293         {
294             client_show_raw_error(cl, "client failed");
295         }
296         else if (cl->state == Client_Disconnected)
297         {
298             client_show_raw_error(cl, "client disconnected");
299         }
300         else
301         {
302             client_send_raw_present(cl);
303         }
304     }
305     return 0;
306 }
307
308 static void client_show_raw_delete(struct show_raw *r)
309 {
310     xfree(r->syntax);
311     xfree(r->esn);
312     xfree(r);
313 }
314
315 void client_show_raw_remove(struct client *cl, void *data)
316 {
317     struct show_raw *rr = data;
318     struct show_raw **rrp = &cl->show_raw;
319     while (*rrp != rr)
320         rrp = &(*rrp)->next;
321     if (*rrp)
322     {
323         *rrp = rr->next;
324         client_show_raw_delete(rr);
325     }
326 }
327
328 void client_show_raw_dequeue(struct client *cl)
329 {
330     struct show_raw *rr = cl->show_raw;
331
332     cl->show_raw = rr->next;
333     client_show_raw_delete(rr);
334 }
335
336 static void client_show_raw_error(struct client *cl, const char *addinfo)
337 {
338     while (cl->show_raw)
339     {
340         cl->show_raw->error_handler(cl->show_raw->data, addinfo);
341         client_show_raw_dequeue(cl);
342     }
343 }
344
345 static void client_send_raw_present(struct client *cl)
346 {
347     struct session_database *sdb = client_get_database(cl);
348     struct connection *co = client_get_connection(cl);
349     ZOOM_resultset set = cl->resultset;
350
351     int offset = cl->show_raw->position;
352     const char *syntax = 0;
353     const char *elements = 0;
354
355     assert(cl->show_raw);
356     assert(set);
357
358     yaz_log(YLOG_DEBUG, "%s: trying to present %d record(s) from %d",
359             client_get_id(cl), 1, offset);
360
361     if (cl->show_raw->syntax)
362         syntax = cl->show_raw->syntax;
363     else
364         syntax = session_setting_oneval(sdb, PZ_REQUESTSYNTAX);
365     ZOOM_resultset_option_set(set, "preferredRecordSyntax", syntax);
366
367     if (cl->show_raw->esn)
368         elements = cl->show_raw->esn;
369     else
370         elements = session_setting_oneval(sdb, PZ_ELEMENTS);
371     if (elements && *elements)
372         ZOOM_resultset_option_set(set, "elementSetName", elements);
373
374     ZOOM_resultset_records(set, 0, offset-1, 1);
375     cl->show_raw->active = 1;
376
377     connection_continue(co);
378 }
379
380 static int nativesyntax_to_type(struct session_database *sdb, char *type,
381                                 ZOOM_record rec)
382 {
383     const char *s = session_setting_oneval(sdb, PZ_NATIVESYNTAX);
384
385     if (s && *s)
386     {
387         if (!strncmp(s, "iso2709", 7))
388         {
389             const char *cp = strchr(s, ';');
390             yaz_snprintf(type, 80, "xml; charset=%s", cp ? cp+1 : "marc-8s");
391         }
392         else if (!strncmp(s, "xml", 3))
393         {
394             strcpy(type, "xml");
395         }
396         else if (!strncmp(s, "txml", 4))
397         {
398             const char *cp = strchr(s, ';');
399             yaz_snprintf(type, 80, "txml; charset=%s", cp ? cp+1 : "marc-8s");
400         }
401         else
402             return -1;
403         return 0;
404     }
405     else  /* attempt to deduce structure */
406     {
407         const char *syntax = ZOOM_record_get(rec, "syntax", NULL);
408         if (syntax)
409         {
410             if (!strcmp(syntax, "XML"))
411             {
412                 strcpy(type, "xml");
413                 return 0;
414             }
415             else if (!strcmp(syntax, "USmarc") || !strcmp(syntax, "MARC21"))
416             {
417                 strcpy(type, "xml; charset=marc8-s");
418                 return 0;
419             }
420             else return -1;
421         }
422         else return -1;
423     }
424 }
425
426 /**
427  * TODO Consider thread safety!!!
428  *
429  */
430 void client_report_facets(struct client *cl, ZOOM_resultset rs)
431 {
432     struct session_database *sdb = client_get_database(cl);
433     ZOOM_facet_field *facets = ZOOM_resultset_facets(rs);
434
435     if (sdb && facets)
436     {
437         struct session *se = client_get_session(cl);
438         int facet_num = ZOOM_resultset_facets_size(rs);
439         struct setting *s;
440
441         for (s = sdb->settings[PZ_FACETMAP]; s; s = s->next)
442         {
443             const char *p = strchr(s->name + 3, ':');
444             if (p && p[1] && s->value && s->value[0])
445             {
446                 int facet_idx;
447                 p++; /* p now holds logical facet name */
448                 for (facet_idx = 0; facet_idx < facet_num; facet_idx++)
449                 {
450                     const char *native_name =
451                         ZOOM_facet_field_name(facets[facet_idx]);
452                     if (native_name && !strcmp(s->value, native_name))
453                     {
454                         size_t term_idx;
455                         size_t term_num =
456                             ZOOM_facet_field_term_count(facets[facet_idx]);
457                         for (term_idx = 0; term_idx < term_num; term_idx++ )
458                         {
459                             int freq;
460                             const char *term =
461                                 ZOOM_facet_field_get_term(facets[facet_idx],
462                                                           term_idx, &freq);
463                             if (term)
464                                 add_facet(se, p, term, freq);
465                         }
466                         break;
467                     }
468                 }
469             }
470         }
471     }
472 }
473
474 static void ingest_raw_record(struct client *cl, ZOOM_record rec)
475 {
476     const char *buf;
477     int len;
478     char type[80];
479
480     if (cl->show_raw->binary)
481         strcpy(type, "raw");
482     else
483     {
484         struct session_database *sdb = client_get_database(cl);
485         nativesyntax_to_type(sdb, type, rec);
486     }
487
488     buf = ZOOM_record_get(rec, type, &len);
489     cl->show_raw->record_handler(cl->show_raw->data,  buf, len);
490     client_show_raw_dequeue(cl);
491 }
492
493 void client_check_preferred_watch(struct client *cl)
494 {
495     struct session *se = cl->session;
496     yaz_log(YLOG_DEBUG, "client_check_preferred_watch: %s ", client_get_id(cl));
497     if (se)
498     {
499         client_unlock(cl);
500         /* TODO possible threading issue. Session can have been destroyed */
501         if (session_is_preferred_clients_ready(se)) {
502             session_alert_watch(se, SESSION_WATCH_SHOW_PREF);
503         }
504         else
505             yaz_log(YLOG_DEBUG, "client_check_preferred_watch: Still locked on preferred targets.");
506
507         client_lock(cl);
508     }
509     else
510         yaz_log(YLOG_WARN, "client_check_preferred_watch: %s. No session!", client_get_id(cl));
511
512 }
513
514 struct suggestions* client_suggestions_create(const char* suggestions_string);
515 static void client_suggestions_destroy(struct client *cl);
516
517 void client_search_response(struct client *cl)
518 {
519     struct connection *co = cl->connection;
520     ZOOM_connection link = connection_get_link(co);
521     ZOOM_resultset resultset = cl->resultset;
522
523     const char *error, *addinfo = 0;
524     
525     if (ZOOM_connection_error(link, &error, &addinfo))
526     {
527         cl->hits = 0;
528         client_set_state(cl, Client_Error);
529         yaz_log(YLOG_WARN, "Search error %s (%s): %s",
530                 error, addinfo, client_get_id(cl));
531     }
532     else
533     {
534         yaz_log(YLOG_DEBUG, "client_search_response: hits "
535                 ODR_INT_PRINTF, cl->hits);
536         client_report_facets(cl, resultset);
537         cl->record_offset = cl->startrecs;
538         cl->hits = ZOOM_resultset_size(resultset);
539         if (cl->suggestions)
540             client_suggestions_destroy(cl);
541         cl->suggestions = client_suggestions_create(ZOOM_resultset_option_get(resultset, "suggestions"));
542     }
543 }
544
545 void client_got_records(struct client *cl)
546 {
547     struct session *se = cl->session;
548     if (se)
549     {
550         client_unlock(cl);
551         session_alert_watch(se, SESSION_WATCH_SHOW);
552         session_alert_watch(se, SESSION_WATCH_RECORD);
553         client_lock(cl);
554     }
555 }
556
557 static void client_record_ingest(struct client *cl)
558 {
559     const char *msg, *addinfo;
560     ZOOM_record rec = 0;
561     ZOOM_resultset resultset = cl->resultset;
562     int offset = cl->record_offset;
563     if ((rec = ZOOM_resultset_record(resultset, offset)))
564     {
565         cl->record_offset++;
566         if (cl->session == 0)
567             ;
568         else if (ZOOM_record_error(rec, &msg, &addinfo, 0))
569         {
570             yaz_log(YLOG_WARN, "Record error %s (%s): %s (rec #%d)",
571                     msg, addinfo, client_get_id(cl),
572                     cl->record_offset);
573         }
574         else
575         {
576             struct session_database *sdb = client_get_database(cl);
577             NMEM nmem = nmem_create();
578             const char *xmlrec;
579             char type[80];
580             
581             if (nativesyntax_to_type(sdb, type, rec))
582                 yaz_log(YLOG_WARN, "Failed to determine record type");
583             xmlrec = ZOOM_record_get(rec, type, NULL);
584             if (!xmlrec)
585                 yaz_log(YLOG_WARN, "ZOOM_record_get failed from %s",
586                         client_get_id(cl));
587             else
588             {
589                 /* OK = 0, -1 = failure, -2 = Filtered */
590                 if (ingest_record(cl, xmlrec, cl->record_offset, nmem) == -1)
591                     yaz_log(YLOG_WARN, "Failed to ingest from %s", client_get_id(cl));
592             }
593             nmem_destroy(nmem);
594         }
595     }
596     else
597     {
598         yaz_log(YLOG_WARN, "Expected record, but got NULL, offset=%d",
599                 offset);
600     }
601 }
602
603 void client_record_response(struct client *cl)
604 {
605     struct connection *co = cl->connection;
606     ZOOM_connection link = connection_get_link(co);
607     ZOOM_resultset resultset = cl->resultset;
608     const char *error, *addinfo;
609
610     if (ZOOM_connection_error(link, &error, &addinfo))
611     {
612         client_set_state(cl, Client_Error);
613         yaz_log(YLOG_WARN, "Search error %s (%s): %s",
614             error, addinfo, client_get_id(cl));
615     }
616     else
617     {
618         if (cl->show_raw && cl->show_raw->active)
619         {
620             ZOOM_record rec = 0;
621             if ((rec = ZOOM_resultset_record(resultset,
622                                              cl->show_raw->position-1)))
623             {
624                 cl->show_raw->active = 0;
625                 ingest_raw_record(cl, rec);
626             }
627             else
628             {
629                 yaz_log(YLOG_WARN, "Expected record, but got NULL, offset=%d",
630                         cl->show_raw->position-1);
631             }
632         }
633         else
634         {
635             client_record_ingest(cl);
636         }
637     }
638 }
639
640 void client_reingest(struct client *cl)
641 {
642     int i = cl->startrecs;
643     int to = cl->record_offset;
644
645     cl->record_offset = i;
646     for (; i < to; i++)
647         client_record_ingest(cl);
648 }
649
650 static void client_set_facets_request(struct client *cl, ZOOM_connection link)
651 {
652     struct session_database *sdb = client_get_database(cl);
653
654     WRBUF w = wrbuf_alloc();
655     
656     struct setting *s;
657
658     for (s = sdb->settings[PZ_FACETMAP]; s; s = s->next)
659     {
660         const char *p = strchr(s->name + 3, ':');
661         if (!p)
662         {
663             yaz_log(YLOG_WARN, "Malformed facetmap name: %s", s->name);
664         }
665         else if (s->value && s->value[0])
666         {
667             wrbuf_puts(w, "@attr 1=");
668             yaz_encode_pqf_term(w, s->value, strlen(s->value));
669             if (s->next)
670                 wrbuf_puts(w, ",");
671         }
672     }
673     yaz_log(YLOG_LOG, "using facets str: %s", wrbuf_cstr(w));
674     ZOOM_connection_option_set(link, "facets",
675                                wrbuf_len(w) ? wrbuf_cstr(w) : 0);
676     wrbuf_destroy(w);
677 }
678
679 int client_has_facet(struct client *cl, const char *name)
680 {
681     struct session_database *sdb = client_get_database(cl);
682     struct setting *s;
683
684     for (s = sdb->settings[PZ_FACETMAP]; s; s = s->next)
685     {
686         const char *p = strchr(s->name + 3, ':');
687         if (p && !strcmp(name, p + 1))
688             return 1;
689     }
690     return 0;
691 }
692
693 void client_start_search(struct client *cl, const char *sort_strategy_and_spec,
694                          int increasing)
695 {
696     struct session_database *sdb = client_get_database(cl);
697     struct connection *co = client_get_connection(cl);
698     ZOOM_connection link = connection_get_link(co);
699     ZOOM_resultset rs;
700     const char *opt_piggyback   = session_setting_oneval(sdb, PZ_PIGGYBACK);
701     const char *opt_queryenc    = session_setting_oneval(sdb, PZ_QUERYENCODING);
702     const char *opt_elements    = session_setting_oneval(sdb, PZ_ELEMENTS);
703     const char *opt_requestsyn  = session_setting_oneval(sdb, PZ_REQUESTSYNTAX);
704     const char *opt_maxrecs     = session_setting_oneval(sdb, PZ_MAXRECS);
705     const char *opt_sru         = session_setting_oneval(sdb, PZ_SRU);
706     const char *opt_sort        = session_setting_oneval(sdb, PZ_SORT);
707     const char *opt_preferred   = session_setting_oneval(sdb, PZ_PREFERRED);
708     const char *extra_args      = session_setting_oneval(sdb, PZ_EXTRA_ARGS);
709     char maxrecs_str[24], startrecs_str[24];
710     ZOOM_query q;
711
712     assert(link);
713
714     cl->hits = 0;
715     cl->record_offset = 0;
716     cl->diagnostic = 0;
717
718     if (extra_args && *extra_args)
719         ZOOM_connection_option_set(link, "extraArgs", extra_args);
720
721     if (opt_preferred) {
722         cl->preferred = atoi(opt_preferred);
723         if (cl->preferred)
724             yaz_log(YLOG_LOG, "Target %s has preferred status: %d",
725                     client_get_id(cl), cl->preferred);
726     }
727     client_set_state(cl, Client_Working);
728
729     if (*opt_piggyback)
730         ZOOM_connection_option_set(link, "piggyback", opt_piggyback);
731     else
732         ZOOM_connection_option_set(link, "piggyback", "1");
733     if (*opt_queryenc)
734         ZOOM_connection_option_set(link, "rpnCharset", opt_queryenc);
735     if (*opt_sru && *opt_elements)
736         ZOOM_connection_option_set(link, "schema", opt_elements);
737     else if (*opt_elements)
738         ZOOM_connection_option_set(link, "elementSetName", opt_elements);
739     if (*opt_requestsyn)
740         ZOOM_connection_option_set(link, "preferredRecordSyntax", opt_requestsyn);
741
742     if (opt_maxrecs && *opt_maxrecs)
743     {
744         cl->maxrecs = atoi(opt_maxrecs);
745     }
746
747     /* convert back to string representation used in ZOOM API */
748     sprintf(maxrecs_str, "%d", cl->maxrecs);
749     ZOOM_connection_option_set(link, "count", maxrecs_str);
750
751     if (cl->maxrecs > 20)
752         ZOOM_connection_option_set(link, "presentChunk", "20");
753     else
754         ZOOM_connection_option_set(link, "presentChunk", maxrecs_str);
755
756     sprintf(startrecs_str, "%d", cl->startrecs);
757     ZOOM_connection_option_set(link, "start", startrecs_str);
758
759     /* TODO Verify does it break something for CQL targets(non-SOLR) ? */
760     /* facets definition is in PQF */
761     client_set_facets_request(cl, link);
762
763     q = ZOOM_query_create();
764     if (cl->cqlquery)
765     {
766         yaz_log(YLOG_LOG, "Search %s CQL: %s", client_get_id(cl),
767                 cl->cqlquery);
768         ZOOM_query_cql(q, cl->cqlquery);
769         if (*opt_sort)
770             ZOOM_query_sortby(q, opt_sort);
771     }
772     else
773     {
774         yaz_log(YLOG_LOG, "Search %s PQF: %s", client_get_id(cl), cl->pquery);
775         
776         ZOOM_query_prefix(q, cl->pquery);
777     }
778     if (sort_strategy_and_spec &&
779         strlen(sort_strategy_and_spec) < 40 /* spec below */)
780     {
781         char spec[50], *p;
782         strcpy(spec, sort_strategy_and_spec);
783         p = strchr(spec, ':');
784         if (p)
785         {
786             *p++ = '\0'; /* cut the string in two */
787             while (*p == ' ')
788                 p++;
789             if (increasing)
790                 strcat(p, " <");
791             else
792                 strcat(p, " >");
793             yaz_log(YLOG_LOG, "applying %s %s", spec, p);
794             ZOOM_query_sortby2(q, spec, p);
795         }
796     }
797     rs = ZOOM_connection_search(link, q);
798     ZOOM_query_destroy(q);
799     ZOOM_resultset_destroy(cl->resultset);
800     cl->resultset = rs;
801     connection_continue(co);
802 }
803
804 struct client *client_create(const char *id)
805 {
806     struct client *cl = xmalloc(sizeof(*cl));
807     cl->maxrecs = 100;
808     cl->startrecs = 0;
809     cl->pquery = 0;
810     cl->cqlquery = 0;
811     cl->database = 0;
812     cl->connection = 0;
813     cl->session = 0;
814     cl->hits = 0;
815     cl->record_offset = 0;
816     cl->diagnostic = 0;
817     cl->state = Client_Disconnected;
818     cl->show_raw = 0;
819     cl->resultset = 0;
820     cl->suggestions = 0;
821     cl->mutex = 0;
822     pazpar2_mutex_create(&cl->mutex, "client");
823     cl->preferred = 0;
824     cl->ref_count = 1;
825     assert(id);
826     cl->id = xstrdup(id);
827     client_use(1);
828     
829     return cl;
830 }
831
832 void client_lock(struct client *c)
833 {
834     yaz_mutex_enter(c->mutex);
835 }
836
837 void client_unlock(struct client *c)
838 {
839     yaz_mutex_leave(c->mutex);
840 }
841
842 void client_incref(struct client *c)
843 {
844     pazpar2_incref(&c->ref_count, c->mutex);
845     yaz_log(YLOG_DEBUG, "client_incref c=%p %s cnt=%d",
846             c, client_get_id(c), c->ref_count);
847 }
848
849 int client_destroy(struct client *c)
850 {
851     if (c)
852     {
853         yaz_log(YLOG_DEBUG, "client_destroy c=%p %s cnt=%d",
854                 c, client_get_id(c), c->ref_count);
855         if (!pazpar2_decref(&c->ref_count, c->mutex))
856         {
857             xfree(c->pquery);
858             c->pquery = 0;
859             xfree(c->cqlquery);
860             c->cqlquery = 0;
861             xfree(c->id);
862             assert(!c->connection);
863
864             if (c->resultset)
865             {
866                 ZOOM_resultset_destroy(c->resultset);
867             }
868             yaz_mutex_destroy(&c->mutex);
869             xfree(c);
870             client_use(-1);
871             return 1;
872         }
873     }
874     return 0;
875 }
876
877 void client_set_connection(struct client *cl, struct connection *con)
878 {
879     if (cl->resultset)
880         ZOOM_resultset_release(cl->resultset);
881     if (con)
882     {
883         assert(cl->connection == 0);
884         cl->connection = con;
885         client_incref(cl);
886     }
887     else
888     {
889         cl->connection = con;
890         client_destroy(cl);
891     }
892 }
893
894 void client_disconnect(struct client *cl)
895 {
896     if (cl->state != Client_Idle)
897         client_set_state(cl, Client_Disconnected);
898     client_set_connection(cl, 0);
899 }
900
901
902 // Initialize CCL map for a target
903 static CCL_bibset prepare_cclmap(struct client *cl)
904 {
905     struct session_database *sdb = client_get_database(cl);
906     struct setting *s;
907     CCL_bibset res;
908
909     if (!sdb->settings)
910         return 0;
911     res = ccl_qual_mk();
912     for (s = sdb->settings[PZ_CCLMAP]; s; s = s->next)
913     {
914         char *p = strchr(s->name + 3, ':');
915         if (!p)
916         {
917             yaz_log(YLOG_WARN, "Malformed cclmap name: %s", s->name);
918             ccl_qual_rm(&res);
919             return 0;
920         }
921         p++;
922         ccl_qual_fitem(res, s->value, p);
923     }
924     return res;
925 }
926
927 // returns a xmalloced CQL query corresponding to the pquery in client
928 static char *make_cqlquery(struct client *cl)
929 {
930     cql_transform_t cqlt = cql_transform_create();
931     Z_RPNQuery *zquery;
932     char *r;
933     WRBUF wrb = wrbuf_alloc();
934     int status;
935     ODR odr_out = odr_createmem(ODR_ENCODE);
936
937     zquery = p_query_rpn(odr_out, cl->pquery);
938     yaz_log(YLOG_LOG, "PQF: %s", cl->pquery);
939     if ((status = cql_transform_rpn2cql_wrbuf(cqlt, wrb, zquery)))
940     {
941         yaz_log(YLOG_WARN, "Failed to generate CQL query, code=%d", status);
942         r = 0;
943     }
944     else
945     {
946         r = xstrdup(wrbuf_cstr(wrb));
947     }     
948     wrbuf_destroy(wrb);
949     odr_destroy(odr_out);
950     cql_transform_close(cqlt);
951     return r;
952 }
953
954 // returns a xmalloced SOLR query corresponding to the pquery in client
955 // TODO Could prob. be merge with the similar make_cqlquery
956 static char *make_solrquery(struct client *cl)
957 {
958     solr_transform_t sqlt = solr_transform_create();
959     Z_RPNQuery *zquery;
960     char *r;
961     WRBUF wrb = wrbuf_alloc();
962     int status;
963     ODR odr_out = odr_createmem(ODR_ENCODE);
964
965     zquery = p_query_rpn(odr_out, cl->pquery);
966     if (zquery == 0) {
967         yaz_log(YLOG_WARN, "Failed to generate RPN from PQF: %s", cl->pquery);
968         return 0;
969     }
970     yaz_log(YLOG_LOG, "PQF: %s", cl->pquery);
971     if ((status = solr_transform_rpn2solr_wrbuf(sqlt, wrb, zquery)))
972     {
973         yaz_log(YLOG_WARN, "Failed to generate SOLR query from PQF %s, code=%d", cl->pquery, status);
974         r = 0;
975     }
976     else
977     {
978         r = xstrdup(wrbuf_cstr(wrb));
979     }
980     wrbuf_destroy(wrb);
981     odr_destroy(odr_out);
982     solr_transform_close(sqlt);
983     return r;
984 }
985
986 static void apply_limit(struct session_database *sdb,
987                         facet_limits_t facet_limits,
988                         WRBUF w_pqf, WRBUF w_ccl)
989 {
990     int i = 0;
991     const char *name;
992     const char *value;
993     NMEM nmem_tmp = nmem_create();
994     for (i = 0; (name = facet_limits_get(facet_limits, i, &value)); i++)
995     {
996         struct setting *s = 0;
997         
998         for (s = sdb->settings[PZ_LIMITMAP]; s; s = s->next)
999         {
1000             const char *p = strchr(s->name + 3, ':');
1001             if (p && !strcmp(p + 1, name) && s->value)
1002             {
1003                 char **values = 0;
1004                 int i, num = 0;
1005                 nmem_strsplit_escape2(nmem_tmp, "|", value, &values,
1006                                       &num, 1, '\\', 1);
1007
1008                 if (!strncmp(s->value, "rpn:", 4))
1009                 {
1010                     const char *pqf = s->value + 4;
1011
1012                     wrbuf_puts(w_pqf, "@and ");
1013                     wrbuf_puts(w_pqf, pqf);
1014                     wrbuf_puts(w_pqf, " ");
1015                     for (i = 0; i < num; i++)
1016                     {
1017                         if (i < num - 1)
1018                             wrbuf_puts(w_pqf, "@or ");
1019                         yaz_encode_pqf_term(w_pqf, values[i],
1020                                             strlen(values[i]));
1021                     }
1022                 }
1023                 else if (!strncmp(s->value, "ccl:", 4))
1024                 {
1025                     const char *ccl = s->value + 4;
1026
1027                     wrbuf_puts(w_ccl, " and (");
1028
1029                     for (i = 0; i < num; i++)
1030                     {
1031                         if (i)
1032                             wrbuf_puts(w_ccl, " or ");
1033                         wrbuf_puts(w_ccl, ccl);
1034                         wrbuf_puts(w_ccl, "=\"");
1035                         wrbuf_puts(w_ccl, values[i]);
1036                         wrbuf_puts(w_ccl, "\"");
1037                     }
1038                     wrbuf_puts(w_ccl, ")");
1039
1040                 }
1041                 break;
1042             }
1043         }
1044         nmem_reset(nmem_tmp);
1045         if (!s)
1046         {
1047             yaz_log(YLOG_WARN, "Target %s: limit %s used, but no limitmap defined",
1048                     (sdb->database ? sdb->database->id : "<no id>"), name);
1049         }
1050     }
1051     nmem_destroy(nmem_tmp);
1052 }
1053                         
1054 // Parse the query given the settings specific to this client
1055 int client_parse_query(struct client *cl, const char *query,
1056                        facet_limits_t facet_limits,
1057                        const char *startrecs, const char *maxrecs)
1058 {
1059     struct session *se = client_get_session(cl);
1060     struct session_database *sdb = client_get_database(cl);
1061     struct ccl_rpn_node *cn;
1062     int cerror, cpos;
1063     CCL_bibset ccl_map = prepare_cclmap(cl);
1064     const char *sru = session_setting_oneval(sdb, PZ_SRU);
1065     const char *pqf_prefix = session_setting_oneval(sdb, PZ_PQF_PREFIX);
1066     const char *pqf_strftime = session_setting_oneval(sdb, PZ_PQF_STRFTIME);
1067     const char *query_syntax = session_setting_oneval(sdb, PZ_QUERY_SYNTAX);
1068     WRBUF w_ccl, w_pqf;
1069     int ret_value = 1;
1070
1071     if (!ccl_map)
1072         return -1;
1073
1074
1075     if (maxrecs && atoi(maxrecs) != cl->maxrecs)
1076     {
1077         ret_value = 0;
1078         cl->maxrecs = atoi(maxrecs);
1079     }
1080
1081     if (startrecs && atoi(startrecs) != cl->startrecs)
1082     {
1083         ret_value = 0;
1084         cl->startrecs = atoi(startrecs);
1085     }
1086
1087     w_ccl = wrbuf_alloc();
1088     wrbuf_puts(w_ccl, query);
1089
1090     w_pqf = wrbuf_alloc();
1091     if (*pqf_prefix)
1092     {
1093         wrbuf_puts(w_pqf, pqf_prefix);
1094         wrbuf_puts(w_pqf, " ");
1095     }
1096
1097     apply_limit(sdb, facet_limits, w_pqf, w_ccl);
1098
1099     yaz_log(YLOG_LOG, "CCL query: %s", wrbuf_cstr(w_ccl));
1100     cn = ccl_find_str(ccl_map, wrbuf_cstr(w_ccl), &cerror, &cpos);
1101     ccl_qual_rm(&ccl_map);
1102     if (!cn)
1103     {
1104         client_set_state(cl, Client_Error);
1105         session_log(se, YLOG_WARN, "Failed to parse CCL query '%s' for %s",
1106                     wrbuf_cstr(w_ccl),
1107                     client_get_id(cl));
1108         wrbuf_destroy(w_ccl);
1109         wrbuf_destroy(w_pqf);
1110         return -1;
1111     }
1112     wrbuf_destroy(w_ccl);
1113
1114     if (!pqf_strftime || !*pqf_strftime)
1115         ccl_pquery(w_pqf, cn);
1116     else
1117     {
1118         time_t cur_time = time(0);
1119         struct tm *tm =  localtime(&cur_time);
1120         char tmp_str[300];
1121         const char *cp = tmp_str;
1122
1123         /* see man strftime(3) for things .. In particular %% gets converted
1124          to %.. And That's our original query .. */
1125         strftime(tmp_str, sizeof(tmp_str)-1, pqf_strftime, tm);
1126         for (; *cp; cp++)
1127         {
1128             if (cp[0] == '%')
1129                 ccl_pquery(w_pqf, cn);
1130             else
1131                 wrbuf_putc(w_pqf, cp[0]);
1132         }
1133     }
1134     if (!cl->pquery || strcmp(cl->pquery, wrbuf_cstr(w_pqf)))
1135     {
1136         xfree(cl->pquery);
1137         cl->pquery = xstrdup(wrbuf_cstr(w_pqf));
1138         ret_value = 0;
1139     }
1140     wrbuf_destroy(w_pqf);
1141
1142     yaz_log(YLOG_LOG, "PQF query: %s", cl->pquery);
1143
1144     xfree(cl->cqlquery);
1145
1146     /* Support for PQF on SRU targets. */
1147     /* TODO Refactor */
1148     yaz_log(YLOG_DEBUG, "Query syntax: %s", query_syntax);
1149     if (strcmp(query_syntax, "pqf") != 0 && *sru)
1150     {
1151         if (!strcmp(sru, "solr")) {
1152             if (!(cl->cqlquery = make_solrquery(cl)))
1153                 return -1;
1154         }
1155         else {
1156             if (!(cl->cqlquery = make_cqlquery(cl)))
1157                 return -1;
1158         }
1159     }
1160     else
1161         cl->cqlquery = 0;
1162
1163     /* TODO FIX Not thread safe */
1164     if (!se->relevance)
1165     {
1166         // Initialize relevance structure with query terms
1167         se->relevance = relevance_create_ccl(
1168             se->service->charsets, se->nmem, cn);
1169     }
1170
1171     ccl_rpn_delete(cn);
1172     return ret_value;
1173 }
1174
1175 void client_set_session(struct client *cl, struct session *se)
1176 {
1177     cl->session = se;
1178 }
1179
1180 int client_is_active(struct client *cl)
1181 {
1182     if (cl->connection && (cl->state == Client_Connecting ||
1183                            cl->state == Client_Working))
1184         return 1;
1185     return 0;
1186 }
1187
1188 int client_is_active_preferred(struct client *cl)
1189 {
1190     /* only count if this is a preferred target. */
1191     if (!cl->preferred)
1192         return 0;
1193     /* TODO No sure this the condition that Seb wants */
1194     if (cl->connection && (cl->state == Client_Connecting ||
1195                            cl->state == Client_Working))
1196         return 1;
1197     return 0;
1198 }
1199
1200 Odr_int client_get_hits(struct client *cl)
1201 {
1202     return cl->hits;
1203 }
1204
1205 int client_get_num_records(struct client *cl)
1206 {
1207     return cl->record_offset;
1208 }
1209
1210 void client_set_diagnostic(struct client *cl, int diagnostic)
1211 {
1212     cl->diagnostic = diagnostic;
1213 }
1214
1215 int client_get_diagnostic(struct client *cl)
1216 {
1217     return cl->diagnostic;
1218 }
1219
1220 const char * client_get_suggestions_xml(struct client *cl, WRBUF wrbuf)
1221 {
1222     /* int idx; */
1223     struct suggestions *suggestions = cl->suggestions;
1224
1225     if (!suggestions) {
1226         yaz_log(YLOG_DEBUG, "No suggestions found");
1227         return "";
1228     }
1229     if (suggestions->passthrough) {
1230         yaz_log(YLOG_DEBUG, "Passthrough Suggestions: \n%s\n", suggestions->passthrough);
1231         return suggestions->passthrough;
1232     }
1233     if (suggestions->num == 0) {
1234         return "";
1235     }
1236     /*
1237     for (idx = 0; idx < suggestions->num; idx++) {
1238         wrbuf_printf(wrbuf, "<suggest term=\"%s\"", suggestions->suggest[idx]);
1239         if (suggestions->misspelled[idx] && suggestions->misspelled[idx]) {
1240             wrbuf_puts(wrbuf, suggestions->misspelled[idx]);
1241             wrbuf_puts(wrbuf, "</suggest>\n");
1242         }
1243         else
1244             wrbuf_puts(wrbuf, "/>\n");
1245     }
1246     */
1247     return wrbuf_cstr(wrbuf);
1248 }
1249
1250
1251 void client_set_database(struct client *cl, struct session_database *db)
1252 {
1253     cl->database = db;
1254 }
1255
1256 const char *client_get_id(struct client *cl)
1257 {
1258     return cl->id;
1259 }
1260
1261 int client_get_maxrecs(struct client *cl)
1262 {
1263     return cl->maxrecs;
1264 }
1265
1266 void client_set_preferred(struct client *cl, int v)
1267 {
1268     cl->preferred = v;
1269 }
1270
1271
1272 struct suggestions* client_suggestions_create(const char* suggestions_string)
1273 {
1274     int i;
1275     NMEM nmem;
1276     struct suggestions *suggestions;
1277     if (suggestions_string == 0)
1278         return 0;
1279     nmem = nmem_create();
1280     suggestions = nmem_malloc(nmem, sizeof(*suggestions));
1281     yaz_log(YLOG_DEBUG, "client target suggestions: %s", suggestions_string);
1282
1283     suggestions->nmem = nmem;
1284     suggestions->num = 0;
1285     suggestions->misspelled = 0;
1286     suggestions->suggest = 0;
1287     suggestions->passthrough = nmem_strdup_null(nmem, suggestions_string);
1288
1289     if (suggestions_string)
1290         nmem_strsplit_escape2(suggestions->nmem, "\n", suggestions_string, &suggestions->suggest,
1291                               &suggestions->num, 1, '\\', 0);
1292     /* Set up misspelled array */
1293     suggestions->misspelled = (char **) nmem_malloc(nmem, suggestions->num * sizeof(**suggestions->misspelled));
1294     /* replace = with \0 .. for each item */
1295     for (i = 0; i < suggestions->num; i++)
1296     {
1297         char *cp = strchr(suggestions->suggest[i], '=');
1298         if (cp) {
1299             *cp = '\0';
1300             suggestions->misspelled[i] = cp+1;
1301         }
1302     }
1303     return suggestions;
1304 }
1305
1306 static void client_suggestions_destroy(struct client *cl)
1307 {
1308     NMEM nmem = cl->suggestions->nmem;
1309     cl->suggestions = 0;
1310     nmem_destroy(nmem);
1311 }
1312
1313 /*
1314  * Local variables:
1315  * c-basic-offset: 4
1316  * c-file-style: "Stroustrup"
1317  * indent-tabs-mode: nil
1318  * End:
1319  * vim: shiftwidth=4 tabstop=8 expandtab
1320  */
1321