27a32f39f410371f64cf4a2e82425abfbb4dc528
[pazpar2-moved-to-github.git] / src / client.c
1 /* This file is part of Pazpar2.
2    Copyright (C) 2006-2013 Index Data
3
4 Pazpar2 is free software; you can redistribute it and/or modify it under
5 the terms of the GNU General Public License as published by the Free
6 Software Foundation; either version 2, or (at your option) any later
7 version.
8
9 Pazpar2 is distributed in the hope that it will be useful, but WITHOUT ANY
10 WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12 for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
17
18 */
19
20 /** \file client.c
21     \brief Z39.50 client
22 */
23
24 #if HAVE_CONFIG_H
25 #include <config.h>
26 #endif
27 #include <stdlib.h>
28 #include <stdio.h>
29 #include <string.h>
30 #if HAVE_SYS_TIME_H
31 #include <sys/time.h>
32 #endif
33 #if HAVE_UNISTD_H
34 #include <unistd.h>
35 #endif
36 #ifdef WIN32
37 #include <windows.h>
38 #endif
39 #include <signal.h>
40 #include <assert.h>
41
42 #include <yaz/marcdisp.h>
43 #include <yaz/comstack.h>
44 #include <yaz/tcpip.h>
45 #include <yaz/proto.h>
46 #include <yaz/readconf.h>
47 #include <yaz/pquery.h>
48 #include <yaz/otherinfo.h>
49 #include <yaz/yaz-util.h>
50 #include <yaz/nmem.h>
51 #include <yaz/query-charset.h>
52 #include <yaz/querytowrbuf.h>
53 #include <yaz/oid_db.h>
54 #include <yaz/diagbib1.h>
55 #include <yaz/snprintf.h>
56 #include <yaz/rpn2cql.h>
57 #include <yaz/rpn2solr.h>
58 #include <yaz/gettimeofday.h>
59
60 #define USE_TIMING 0
61 #if USE_TIMING
62 #include <yaz/timing.h>
63 #endif
64
65 #include "ppmutex.h"
66 #include "session.h"
67 #include "parameters.h"
68 #include "client.h"
69 #include "connection.h"
70 #include "settings.h"
71 #include "relevance.h"
72 #include "incref.h"
73
74 static YAZ_MUTEX g_mutex = 0;
75 static int no_clients = 0;
76 static int no_clients_total = 0;
77
78 static int client_use(int delta)
79 {
80     int clients;
81     if (!g_mutex)
82         yaz_mutex_create(&g_mutex);
83     yaz_mutex_enter(g_mutex);
84     no_clients += delta;
85     if (delta > 0)
86         no_clients_total += delta;
87     clients = no_clients;
88     yaz_mutex_leave(g_mutex);
89     yaz_log(YLOG_DEBUG, "%s clients=%d", delta == 0 ? "" : (delta > 0 ? "INC" : "DEC"), clients);
90     return clients;
91 }
92
93 int  clients_count(void) {
94     return client_use(0);
95 }
96
97 int  clients_count_total(void) {
98     int total = 0;
99     if (!g_mutex)
100         return 0;
101     yaz_mutex_enter(g_mutex);
102     total = no_clients_total;
103     yaz_mutex_leave(g_mutex);
104     return total;
105 }
106
107
108 /** \brief Represents client state for a connection to one search target */
109 struct client {
110     struct session_database *database;
111     struct connection *connection;
112     struct session *session;
113     char *pquery; // Current search
114     char *cqlquery; // used for SRU targets only
115     char *addinfo; // diagnostic info for most resent error
116     Odr_int hits;
117     int record_offset;
118     int show_stat_no;
119     int filtered; // When using local:, this will count the number of filtered records.
120     int maxrecs;
121     int startrecs;
122     int diagnostic;
123     char *message;
124     int preferred;
125     struct suggestions *suggestions;
126     enum client_state state;
127     struct show_raw *show_raw;
128     ZOOM_resultset resultset;
129     YAZ_MUTEX mutex;
130     int ref_count;
131     char *id;
132     facet_limits_t facet_limits;
133     int same_search;
134     char *sort_strategy;
135     char *sort_criteria;
136 };
137
138 struct suggestions {
139     NMEM nmem;
140     int num;
141     char **misspelled;
142     char **suggest;
143     char *passthrough;
144 };
145
146 struct show_raw {
147     int active; // whether this request has been sent to the server
148     int position;
149     int binary;
150     char *syntax;
151     char *esn;
152     char *nativesyntax;
153     void (*error_handler)(void *data, const char *addinfo);
154     void (*record_handler)(void *data, const char *buf, size_t sz);
155     void *data;
156     struct show_raw *next;
157 };
158
159 static const char *client_states[] = {
160     "Client_Connecting",
161     "Client_Idle",
162     "Client_Working",
163     "Client_Error",
164     "Client_Failed",
165     "Client_Disconnected"
166 };
167
168 const char *client_get_state_str(struct client *cl)
169 {
170     return client_states[cl->state];
171 }
172
173 enum client_state client_get_state(struct client *cl)
174 {
175     return cl->state;
176 }
177
178 void client_set_state_nb(struct client *cl, enum client_state st)
179 {
180     cl->state = st;
181 }
182
183 void client_set_state(struct client *cl, enum client_state st)
184 {
185     int was_active = 0;
186     if (client_is_active(cl))
187         was_active = 1;
188     cl->state = st;
189     /* If client is going from being active to inactive and all clients
190        are now idle we fire a watch for the session . The assumption is
191        that session is not mutex locked if client is already active */
192     if (was_active && !client_is_active(cl) && cl->session)
193     {
194
195         int no_active = session_active_clients(cl->session);
196         yaz_log(YLOG_DEBUG, "%s: releasing watches on zero active: %d",
197                 client_get_id(cl), no_active);
198         if (no_active == 0) {
199             session_alert_watch(cl->session, SESSION_WATCH_SHOW);
200             session_alert_watch(cl->session, SESSION_WATCH_BYTARGET);
201             session_alert_watch(cl->session, SESSION_WATCH_TERMLIST);
202             session_alert_watch(cl->session, SESSION_WATCH_SHOW_PREF);
203         }
204     }
205 }
206
207 static void client_show_raw_error(struct client *cl, const char *addinfo);
208
209 struct connection *client_get_connection(struct client *cl)
210 {
211     return cl->connection;
212 }
213
214 struct session_database *client_get_database(struct client *cl)
215 {
216     return cl->database;
217 }
218
219 struct session *client_get_session(struct client *cl)
220 {
221     return cl->session;
222 }
223
224 static void client_send_raw_present(struct client *cl);
225 static int nativesyntax_to_type(const char *s, char *type, ZOOM_record rec);
226
227 static void client_show_immediate(
228     ZOOM_resultset resultset, struct session_database *sdb, int position,
229     void *data,
230     void (*error_handler)(void *data, const char *addinfo),
231     void (*record_handler)(void *data, const char *buf, size_t sz),
232     int binary,
233     const char *nativesyntax)
234 {
235     ZOOM_record rec = 0;
236     char type[80];
237     const char *buf;
238     int len;
239
240     if (!resultset)
241     {
242         error_handler(data, "no resultset");
243         return;
244     }
245     rec = ZOOM_resultset_record_immediate(resultset, position-1);
246     if (!rec)
247     {
248         error_handler(data, "no record");
249         return;
250     }
251     nativesyntax_to_type(nativesyntax, type, rec);
252     buf = ZOOM_record_get(rec, type, &len);
253     if (!buf)
254     {
255         error_handler(data, "no record");
256         return;
257     }
258     record_handler(data, buf, len);
259 }
260
261
262 int client_show_raw_begin(struct client *cl, int position,
263                           const char *syntax, const char *esn,
264                           void *data,
265                           void (*error_handler)(void *data, const char *addinfo),
266                           void (*record_handler)(void *data, const char *buf,
267                                                  size_t sz),
268                           int binary,
269                           const char *nativesyntax)
270 {
271     if (!nativesyntax)
272     {
273         if (binary)
274             nativesyntax = "raw";
275         else
276         {
277             struct session_database *sdb = client_get_database(cl);
278             nativesyntax = session_setting_oneval(sdb, PZ_NATIVESYNTAX);
279         }
280     }
281
282     if (syntax == 0 && esn == 0)
283         client_show_immediate(cl->resultset, client_get_database(cl),
284                               position, data,
285                               error_handler, record_handler,
286                               binary, nativesyntax);
287     else
288     {
289         struct show_raw *rr, **rrp;
290
291         if (!cl->connection)
292             return -1;
293
294
295         rr = xmalloc(sizeof(*rr));
296         rr->position = position;
297         rr->active = 0;
298         rr->data = data;
299         rr->error_handler = error_handler;
300         rr->record_handler = record_handler;
301         rr->binary = binary;
302         if (syntax)
303             rr->syntax = xstrdup(syntax);
304         else
305             rr->syntax = 0;
306         if (esn)
307             rr->esn = xstrdup(esn);
308         else
309             rr->esn = 0;
310
311         assert(nativesyntax);
312         rr->nativesyntax = xstrdup(nativesyntax);
313
314         rr->next = 0;
315
316         for (rrp = &cl->show_raw; *rrp; rrp = &(*rrp)->next)
317             ;
318         *rrp = rr;
319
320         if (cl->state == Client_Failed)
321         {
322             client_show_raw_error(cl, "client failed");
323         }
324         else if (cl->state == Client_Disconnected)
325         {
326             client_show_raw_error(cl, "client disconnected");
327         }
328         else
329         {
330             client_send_raw_present(cl);
331         }
332     }
333     return 0;
334 }
335
336 static void client_show_raw_delete(struct show_raw *r)
337 {
338     xfree(r->syntax);
339     xfree(r->esn);
340     xfree(r->nativesyntax);
341     xfree(r);
342 }
343
344 void client_show_raw_remove(struct client *cl, void *data)
345 {
346     struct show_raw *rr = data;
347     struct show_raw **rrp = &cl->show_raw;
348     while (*rrp != rr)
349         rrp = &(*rrp)->next;
350     if (*rrp)
351     {
352         *rrp = rr->next;
353         client_show_raw_delete(rr);
354     }
355 }
356
357 static void client_show_raw_dequeue(struct client *cl)
358 {
359     struct show_raw *rr = cl->show_raw;
360
361     cl->show_raw = rr->next;
362     client_show_raw_delete(rr);
363 }
364
365 static void client_show_raw_error(struct client *cl, const char *addinfo)
366 {
367     while (cl->show_raw)
368     {
369         cl->show_raw->error_handler(cl->show_raw->data, addinfo);
370         client_show_raw_dequeue(cl);
371     }
372 }
373
374 static void client_send_raw_present(struct client *cl)
375 {
376     struct session_database *sdb = client_get_database(cl);
377     struct connection *co = client_get_connection(cl);
378     ZOOM_resultset set = cl->resultset;
379
380     int offset = cl->show_raw->position;
381     const char *syntax = 0;
382     const char *elements = 0;
383
384     assert(cl->show_raw);
385     assert(set);
386
387     yaz_log(YLOG_DEBUG, "%s: trying to present %d record(s) from %d",
388             client_get_id(cl), 1, offset);
389
390     if (cl->show_raw->syntax)
391         syntax = cl->show_raw->syntax;
392     else
393         syntax = session_setting_oneval(sdb, PZ_REQUESTSYNTAX);
394     ZOOM_resultset_option_set(set, "preferredRecordSyntax", syntax);
395
396     if (cl->show_raw->esn)
397         elements = cl->show_raw->esn;
398     else
399         elements = session_setting_oneval(sdb, PZ_ELEMENTS);
400     if (elements && *elements)
401         ZOOM_resultset_option_set(set, "elementSetName", elements);
402
403     ZOOM_resultset_records(set, 0, offset-1, 1);
404     cl->show_raw->active = 1;
405
406     connection_continue(co);
407 }
408
409 static int nativesyntax_to_type(const char *s, char *type,
410                                 ZOOM_record rec)
411 {
412     if (s && *s)
413     {
414         if (!strncmp(s, "iso2709", 7))
415         {
416             const char *cp = strchr(s, ';');
417             yaz_snprintf(type, 80, "xml; charset=%s", cp ? cp+1 : "marc-8s");
418         }
419         else if (!strncmp(s, "txml", 4))
420         {
421             const char *cp = strchr(s, ';');
422             yaz_snprintf(type, 80, "txml; charset=%s", cp ? cp+1 : "marc-8s");
423         }
424         else /* pass verbatim to ZOOM - including "xml" */
425             strcpy(type, s);
426         return 0;
427     }
428     else  /* attempt to deduce structure */
429     {
430         const char *syntax = ZOOM_record_get(rec, "syntax", NULL);
431         if (syntax)
432         {
433             if (!strcmp(syntax, "XML"))
434             {
435                 strcpy(type, "xml");
436                 return 0;
437             }
438             else if (!strcmp(syntax, "USmarc") || !strcmp(syntax, "MARC21"))
439             {
440                 strcpy(type, "xml; charset=marc8-s");
441                 return 0;
442             }
443             else return -1;
444         }
445         else return -1;
446     }
447 }
448
449 /**
450  * TODO Consider thread safety!!!
451  *
452  */
453 static void client_report_facets(struct client *cl, ZOOM_resultset rs)
454 {
455     struct session_database *sdb = client_get_database(cl);
456     ZOOM_facet_field *facets = ZOOM_resultset_facets(rs);
457
458     if (sdb && facets)
459     {
460         struct session *se = client_get_session(cl);
461         int facet_num = ZOOM_resultset_facets_size(rs);
462         struct setting *s;
463
464         for (s = sdb->settings[PZ_FACETMAP]; s; s = s->next)
465         {
466             const char *p = strchr(s->name + 3, ':');
467             if (p && p[1] && s->value && s->value[0])
468             {
469                 int facet_idx;
470                 p++; /* p now holds logical facet name */
471                 for (facet_idx = 0; facet_idx < facet_num; facet_idx++)
472                 {
473                     const char *native_name =
474                         ZOOM_facet_field_name(facets[facet_idx]);
475                     if (native_name && !strcmp(s->value, native_name))
476                     {
477                         size_t term_idx;
478                         size_t term_num =
479                             ZOOM_facet_field_term_count(facets[facet_idx]);
480                         for (term_idx = 0; term_idx < term_num; term_idx++ )
481                         {
482                             int freq;
483                             const char *term =
484                                 ZOOM_facet_field_get_term(facets[facet_idx],
485                                                           term_idx, &freq);
486                             if (term)
487                                 add_facet(se, p, term, freq);
488                         }
489                         break;
490                     }
491                 }
492             }
493         }
494     }
495 }
496
497 static void ingest_raw_record(struct client *cl, ZOOM_record rec)
498 {
499     const char *buf;
500     int len;
501     char type[80];
502
503     nativesyntax_to_type(cl->show_raw->nativesyntax, type, rec);
504     buf = ZOOM_record_get(rec, type, &len);
505     cl->show_raw->record_handler(cl->show_raw->data,  buf, len);
506     client_show_raw_dequeue(cl);
507 }
508
509 void client_check_preferred_watch(struct client *cl)
510 {
511     struct session *se = cl->session;
512     yaz_log(YLOG_DEBUG, "client_check_preferred_watch: %s ", client_get_id(cl));
513     if (se)
514     {
515         client_unlock(cl);
516         /* TODO possible threading issue. Session can have been destroyed */
517         if (session_is_preferred_clients_ready(se)) {
518             session_alert_watch(se, SESSION_WATCH_SHOW_PREF);
519         }
520         else
521             yaz_log(YLOG_DEBUG, "client_check_preferred_watch: Still locked on preferred targets.");
522
523         client_lock(cl);
524     }
525     else
526         yaz_log(YLOG_WARN, "client_check_preferred_watch: %s. No session!", client_get_id(cl));
527
528 }
529
530 struct suggestions* client_suggestions_create(const char* suggestions_string);
531 static void client_suggestions_destroy(struct client *cl);
532
533 void client_search_response(struct client *cl)
534 {
535     struct connection *co = cl->connection;
536     ZOOM_connection link = connection_get_link(co);
537     ZOOM_resultset resultset = cl->resultset;
538
539     const char *error, *addinfo = 0;
540
541     if (ZOOM_connection_error(link, &error, &addinfo))
542     {
543         cl->hits = 0;
544         client_set_state(cl, Client_Error);
545         yaz_log(YLOG_WARN, "Search error %s (%s): %s",
546                 error, addinfo, client_get_id(cl));
547     }
548     else
549     {
550         client_report_facets(cl, resultset);
551         cl->record_offset = cl->startrecs;
552         cl->hits = ZOOM_resultset_size(resultset);
553         yaz_log(YLOG_DEBUG, "client_search_response: hits " ODR_INT_PRINTF, cl->hits);
554         if (cl->suggestions)
555             client_suggestions_destroy(cl);
556         cl->suggestions = client_suggestions_create(ZOOM_resultset_option_get(resultset, "suggestions"));
557     }
558 }
559
560 void client_got_records(struct client *cl)
561 {
562     struct session *se = cl->session;
563     if (se)
564     {
565         if (reclist_get_num_records(se->reclist) > 0)
566         {
567             client_unlock(cl);
568             session_alert_watch(se, SESSION_WATCH_SHOW);
569             session_alert_watch(se, SESSION_WATCH_BYTARGET);
570             session_alert_watch(se, SESSION_WATCH_TERMLIST);
571             session_alert_watch(se, SESSION_WATCH_RECORD);
572             client_lock(cl);
573         }
574     }
575 }
576
577 static void client_record_ingest(struct client *cl)
578 {
579     const char *msg, *addinfo;
580     ZOOM_record rec = 0;
581     ZOOM_resultset resultset = cl->resultset;
582     struct session *se = client_get_session(cl);
583
584     if ((rec = ZOOM_resultset_record_immediate(resultset, cl->record_offset)))
585     {
586         int offset = ++cl->record_offset;
587         if (cl->session == 0) {
588             /* no operation */
589         }
590         else if (ZOOM_record_error(rec, &msg, &addinfo, 0))
591         {
592             session_log(se, YLOG_WARN, "Record error %s (%s): %s #%d",
593                         msg, addinfo, client_get_id(cl), offset);
594         }
595         else
596         {
597             struct session_database *sdb = client_get_database(cl);
598             NMEM nmem = nmem_create();
599             const char *xmlrec;
600             char type[80];
601
602             const char *s = session_setting_oneval(sdb, PZ_NATIVESYNTAX);
603             if (nativesyntax_to_type(s, type, rec))
604                 session_log(se, YLOG_WARN, "Failed to determine record type");
605             xmlrec = ZOOM_record_get(rec, type, NULL);
606             if (!xmlrec)
607             {
608                 const char *rec_syn =  ZOOM_record_get(rec, "syntax", NULL);
609                 session_log(se, YLOG_WARN, "ZOOM_record_get failed from %s #%d",
610                             client_get_id(cl), offset);
611                 session_log(se, YLOG_LOG, "pz:nativesyntax=%s . "
612                             "ZOOM record type=%s . Actual record syntax=%s",
613                             s ? s : "null", type,
614                             rec_syn ? rec_syn : "null");
615             }
616             else
617             {
618                 /* OK = 0, -1 = failure, -2 = Filtered */
619                 int rc = ingest_record(cl, xmlrec, cl->record_offset, nmem);
620                 if (rc == -1)
621                 {
622                     const char *rec_syn =  ZOOM_record_get(rec, "syntax", NULL);
623                     session_log(se, YLOG_WARN,
624                                 "Failed to ingest record from %s #%d",
625                                 client_get_id(cl), offset);
626                     session_log(se, YLOG_LOG, "pz:nativesyntax=%s . "
627                                 "ZOOM record type=%s . Actual record syntax=%s",
628                                 s ? s : "null", type,
629                                 rec_syn ? rec_syn : "null");
630                 }
631                 if (rc == -2)
632                     cl->filtered += 1;
633             }
634             nmem_destroy(nmem);
635         }
636     }
637     else
638     {
639         session_log(se, YLOG_WARN, "Got NULL record from %s #%d",
640                     client_get_id(cl), cl->record_offset);
641     }
642 }
643
644 void client_record_response(struct client *cl, int *got_records)
645 {
646     struct connection *co = cl->connection;
647     ZOOM_connection link = connection_get_link(co);
648     ZOOM_resultset resultset = cl->resultset;
649     const char *error, *addinfo;
650
651     if (ZOOM_connection_error(link, &error, &addinfo))
652     {
653         client_set_state(cl, Client_Error);
654         yaz_log(YLOG_WARN, "Search error %s (%s): %s",
655             error, addinfo, client_get_id(cl));
656     }
657     else
658     {
659         if (cl->show_raw && cl->show_raw->active)
660         {
661             ZOOM_record rec = 0;
662             if ((rec = ZOOM_resultset_record_immediate(
663                      resultset, cl->show_raw->position-1)))
664             {
665                 cl->show_raw->active = 0;
666                 ingest_raw_record(cl, rec);
667             }
668             else
669             {
670                 yaz_log(YLOG_WARN, "Expected record, but got NULL, offset=%d",
671                         cl->show_raw->position-1);
672             }
673         }
674         else
675         {
676             client_record_ingest(cl);
677             *got_records = 1;
678         }
679     }
680 }
681
682 int client_reingest(struct client *cl)
683 {
684     int i = cl->startrecs;
685     int to = cl->record_offset;
686     cl->filtered = 0;
687
688     cl->record_offset = i;
689     for (; i < to; i++)
690         client_record_ingest(cl);
691     return 0;
692 }
693
694 static void client_set_facets_request(struct client *cl, ZOOM_connection link)
695 {
696     struct session_database *sdb = client_get_database(cl);
697
698     WRBUF w = wrbuf_alloc();
699
700     struct setting *s;
701
702     for (s = sdb->settings[PZ_FACETMAP]; s; s = s->next)
703     {
704         const char *p = strchr(s->name + 3, ':');
705         if (!p)
706         {
707             yaz_log(YLOG_WARN, "Malformed facetmap name: %s", s->name);
708         }
709         else if (s->value && s->value[0])
710         {
711             wrbuf_puts(w, "@attr 1=");
712             yaz_encode_pqf_term(w, s->value, strlen(s->value));
713             if (s->next)
714                 wrbuf_puts(w, ",");
715         }
716     }
717     yaz_log(YLOG_DEBUG, "using facets str: %s", wrbuf_cstr(w));
718     ZOOM_connection_option_set(link, "facets",
719                                wrbuf_len(w) ? wrbuf_cstr(w) : 0);
720     wrbuf_destroy(w);
721 }
722
723 int client_has_facet(struct client *cl, const char *name)
724 {
725     struct session_database *sdb = client_get_database(cl);
726     struct setting *s;
727
728     for (s = sdb->settings[PZ_FACETMAP]; s; s = s->next)
729     {
730         const char *p = strchr(s->name + 3, ':');
731         if (p && !strcmp(name, p + 1))
732             return 1;
733     }
734     return 0;
735 }
736
737 static const char *get_strategy_plus_sort(struct client *l, const char *field)
738 {
739     struct session_database *sdb = client_get_database(l);
740     struct setting *s;
741
742     const char *strategy_plus_sort = 0;
743
744     for (s = sdb->settings[PZ_SORTMAP]; s; s = s->next)
745     {
746         char *p = strchr(s->name + 3, ':');
747         if (!p)
748         {
749             yaz_log(YLOG_WARN, "Malformed sortmap name: %s", s->name);
750             continue;
751         }
752         p++;
753         if (!strcmp(p, field))
754         {
755             strategy_plus_sort = s->value;
756             break;
757         }
758     }
759     return strategy_plus_sort;
760 }
761
762 void client_update_show_stat(struct client *cl, int cmd)
763 {
764     if (cmd == 0)
765         cl->show_stat_no = 0;
766     else if (cmd == 1)
767         cl->show_stat_no++;
768 }
769
770 int client_fetch_more(struct client *cl)
771 {
772     struct session_database *sdb = client_get_database(cl);
773     const char *str;
774     int extend_recs = 0;
775     int number;
776
777     str = session_setting_oneval(sdb, PZ_EXTENDRECS);
778     if (!str || !*str)
779         return 0;
780
781     extend_recs = atoi(str);
782
783     yaz_log(YLOG_LOG, "cl=%s show_stat_no=%d got=%d",
784             client_get_id(cl), cl->show_stat_no, cl->record_offset);
785     if (cl->show_stat_no < cl->record_offset)
786         return 0;
787     yaz_log(YLOG_LOG, "cl=%s Trying to fetch more", client_get_id(cl));
788
789     if (extend_recs > cl->hits)
790         extend_recs = cl->hits;
791
792     number = extend_recs - cl->record_offset;
793     if (number > 0)
794     {
795         ZOOM_resultset set = cl->resultset;
796         struct connection *co = client_get_connection(cl);
797
798         str = session_setting_oneval(sdb, PZ_REQUESTSYNTAX);
799         ZOOM_resultset_option_set(set, "preferredRecordSyntax", str);
800         str = session_setting_oneval(sdb, PZ_ELEMENTS);
801         if (str && *str)
802             ZOOM_resultset_option_set(set, "elementSetName", str);
803
804         ZOOM_resultset_records(set, 0, cl->record_offset, number);
805         client_set_state(cl, Client_Working);
806         connection_continue(co);
807         return 1;
808     }
809     else
810     {
811         yaz_log(YLOG_LOG, "cl=%s. OK no more in total set", client_get_id(cl));
812     }
813     return 0;
814 }
815
816 int client_parse_init(struct client *cl, int same_search)
817 {
818     cl->same_search = same_search;
819     return 0;
820 }
821
822 /*
823  * TODO consider how to extend the range
824  * */
825 int client_parse_range(struct client *cl, const char *startrecs, const char *maxrecs)
826 {
827     if (maxrecs && atoi(maxrecs) != cl->maxrecs)
828     {
829         cl->same_search = 0;
830         cl->maxrecs = atoi(maxrecs);
831     }
832
833     if (startrecs && atoi(startrecs) != cl->startrecs)
834     {
835         cl->same_search = 0;
836         cl->startrecs = atoi(startrecs);
837     }
838
839     return 0;
840 }
841
842 int client_start_search(struct client *cl)
843 {
844     struct session_database *sdb = client_get_database(cl);
845     struct connection *co = 0;
846     ZOOM_connection link = 0;
847     struct session *se = client_get_session(cl);
848     ZOOM_resultset rs;
849     const char *opt_piggyback   = session_setting_oneval(sdb, PZ_PIGGYBACK);
850     const char *opt_queryenc    = session_setting_oneval(sdb, PZ_QUERYENCODING);
851     const char *opt_elements    = session_setting_oneval(sdb, PZ_ELEMENTS);
852     const char *opt_requestsyn  = session_setting_oneval(sdb, PZ_REQUESTSYNTAX);
853     const char *opt_maxrecs     = session_setting_oneval(sdb, PZ_MAXRECS);
854     const char *opt_sru         = session_setting_oneval(sdb, PZ_SRU);
855     const char *opt_sort        = session_setting_oneval(sdb, PZ_SORT);
856     const char *opt_preferred   = session_setting_oneval(sdb, PZ_PREFERRED);
857     const char *extra_args      = session_setting_oneval(sdb, PZ_EXTRA_ARGS);
858     const char *opt_present_chunk = session_setting_oneval(sdb, PZ_PRESENT_CHUNK);
859     ZOOM_query query;
860     char maxrecs_str[24], startrecs_str[24], present_chunk_str[24];
861     struct timeval tval;
862     int present_chunk = 20; // Default chunk size
863     int rc_prep_connection;
864
865
866     yaz_gettimeofday(&tval);
867     tval.tv_sec += 5;
868
869     if (opt_present_chunk && strcmp(opt_present_chunk,"")) {
870         present_chunk = atoi(opt_present_chunk);
871         yaz_log(YLOG_DEBUG, "Present chunk set to %d", present_chunk);
872     }
873     rc_prep_connection =
874         client_prep_connection(cl, se->service->z3950_operation_timeout,
875                                se->service->z3950_session_timeout,
876                                se->service->server->iochan_man,
877                                &tval);
878     /* Nothing has changed and we already have a result */
879     if (cl->same_search == 1 && rc_prep_connection == 2)
880     {
881         session_log(se, YLOG_LOG, "client %s REUSE result", client_get_id(cl));
882         return client_reingest(cl);
883     }
884     else if (!rc_prep_connection)
885     {
886         session_log(se, YLOG_LOG, "client %s FAILED to search: No connection.", client_get_id(cl));
887         return -1;
888     }
889     co = client_get_connection(cl);
890     assert(cl);
891     link = connection_get_link(co);
892     assert(link);
893
894     session_log(se, YLOG_LOG, "client %s NEW search", client_get_id(cl));
895
896     cl->diagnostic = 0;
897     cl->filtered = 0;
898
899     if (extra_args && *extra_args)
900         ZOOM_connection_option_set(link, "extraArgs", extra_args);
901
902     if (opt_preferred) {
903         cl->preferred = atoi(opt_preferred);
904         if (cl->preferred)
905             yaz_log(YLOG_LOG, "Target %s has preferred status: %d",
906                     client_get_id(cl), cl->preferred);
907     }
908
909     if (*opt_piggyback)
910         ZOOM_connection_option_set(link, "piggyback", opt_piggyback);
911     else
912         ZOOM_connection_option_set(link, "piggyback", "1");
913     if (*opt_queryenc)
914         ZOOM_connection_option_set(link, "rpnCharset", opt_queryenc);
915     if (*opt_sru && *opt_elements)
916         ZOOM_connection_option_set(link, "schema", opt_elements);
917     else if (*opt_elements)
918         ZOOM_connection_option_set(link, "elementSetName", opt_elements);
919     if (*opt_requestsyn)
920         ZOOM_connection_option_set(link, "preferredRecordSyntax", opt_requestsyn);
921
922     if (opt_maxrecs && *opt_maxrecs)
923     {
924         cl->maxrecs = atoi(opt_maxrecs);
925     }
926
927     /* convert back to string representation used in ZOOM API */
928     sprintf(maxrecs_str, "%d", cl->maxrecs);
929     ZOOM_connection_option_set(link, "count", maxrecs_str);
930
931     /* A present_chunk less than 1 will disable chunking. */
932     if (present_chunk > 0 && cl->maxrecs > present_chunk) {
933         sprintf(present_chunk_str, "%d", present_chunk);
934         ZOOM_connection_option_set(link, "presentChunk", present_chunk_str);
935         yaz_log(YLOG_DEBUG, "Present chunk set to %s", present_chunk_str);
936     }
937     else {
938         ZOOM_connection_option_set(link, "presentChunk", maxrecs_str);
939         yaz_log(YLOG_DEBUG, "Present chunk set to %s (maxrecs)", maxrecs_str);
940     }
941     sprintf(startrecs_str, "%d", cl->startrecs);
942     ZOOM_connection_option_set(link, "start", startrecs_str);
943
944     /* TODO Verify does it break something for CQL targets(non-SOLR) ? */
945     /* facets definition is in PQF */
946     client_set_facets_request(cl, link);
947
948     query = ZOOM_query_create();
949     if (cl->cqlquery)
950     {
951         yaz_log(YLOG_LOG, "Client %s: Search CQL: %s", client_get_id(cl), cl->cqlquery);
952         ZOOM_query_cql(query, cl->cqlquery);
953         if (*opt_sort)
954             ZOOM_query_sortby(query, opt_sort);
955     }
956     else
957     {
958         yaz_log(YLOG_LOG, "Client %s: Search PQF: %s", client_get_id(cl), cl->pquery);
959
960         ZOOM_query_prefix(query, cl->pquery);
961     }
962     if (cl->sort_strategy && cl->sort_criteria) {
963         yaz_log(YLOG_LOG, "Client %s: Setting ZOOM sort strategy and criteria: %s %s",
964                 client_get_id(cl), cl->sort_strategy, cl->sort_criteria);
965         ZOOM_query_sortby2(query, cl->sort_strategy, cl->sort_criteria);
966     }
967
968     yaz_log(YLOG_DEBUG,"Client %s: Starting search", client_get_id(cl));
969     client_set_state(cl, Client_Working);
970     cl->hits = 0;
971     cl->record_offset = 0;
972     rs = ZOOM_connection_search(link, query);
973     ZOOM_query_destroy(query);
974     ZOOM_resultset_destroy(cl->resultset);
975     cl->resultset = rs;
976     connection_continue(co);
977     return 0;
978 }
979
980 struct client *client_create(const char *id)
981 {
982     struct client *cl = xmalloc(sizeof(*cl));
983     cl->maxrecs = 100;
984     cl->startrecs = 0;
985     cl->pquery = 0;
986     cl->cqlquery = 0;
987     cl->addinfo = 0;
988     cl->message = 0;
989     cl->database = 0;
990     cl->connection = 0;
991     cl->session = 0;
992     cl->hits = 0;
993     cl->record_offset = 0;
994     cl->filtered = 0;
995     cl->diagnostic = 0;
996     cl->state = Client_Disconnected;
997     cl->show_raw = 0;
998     cl->resultset = 0;
999     cl->suggestions = 0;
1000     cl->mutex = 0;
1001     pazpar2_mutex_create(&cl->mutex, "client");
1002     cl->preferred = 0;
1003     cl->ref_count = 1;
1004     cl->facet_limits = 0;
1005     cl->sort_strategy = 0;
1006     cl->sort_criteria = 0;
1007     assert(id);
1008     cl->id = xstrdup(id);
1009     client_use(1);
1010
1011     return cl;
1012 }
1013
1014 void client_lock(struct client *c)
1015 {
1016     yaz_mutex_enter(c->mutex);
1017 }
1018
1019 void client_unlock(struct client *c)
1020 {
1021     yaz_mutex_leave(c->mutex);
1022 }
1023
1024 void client_incref(struct client *c)
1025 {
1026     pazpar2_incref(&c->ref_count, c->mutex);
1027     yaz_log(YLOG_DEBUG, "client_incref c=%p %s cnt=%d",
1028             c, client_get_id(c), c->ref_count);
1029 }
1030
1031 int client_destroy(struct client *c)
1032 {
1033     if (c)
1034     {
1035         yaz_log(YLOG_DEBUG, "client_destroy c=%p %s cnt=%d",
1036                 c, client_get_id(c), c->ref_count);
1037         if (!pazpar2_decref(&c->ref_count, c->mutex))
1038         {
1039             xfree(c->pquery);
1040             c->pquery = 0;
1041             xfree(c->cqlquery);
1042             c->cqlquery = 0;
1043             xfree(c->addinfo);
1044             c->addinfo = 0;
1045             xfree(c->message);
1046             c->message = 0;
1047             xfree(c->id);
1048             xfree(c->sort_strategy);
1049             xfree(c->sort_criteria);
1050             assert(!c->connection);
1051             facet_limits_destroy(c->facet_limits);
1052
1053             if (c->resultset)
1054             {
1055                 ZOOM_resultset_destroy(c->resultset);
1056             }
1057             yaz_mutex_destroy(&c->mutex);
1058             xfree(c);
1059             client_use(-1);
1060             return 1;
1061         }
1062     }
1063     return 0;
1064 }
1065
1066 void client_set_connection(struct client *cl, struct connection *con)
1067 {
1068     if (cl->resultset)
1069         ZOOM_resultset_release(cl->resultset);
1070     if (con)
1071     {
1072         assert(cl->connection == 0);
1073         cl->connection = con;
1074         client_incref(cl);
1075     }
1076     else
1077     {
1078         cl->connection = con;
1079         client_destroy(cl);
1080     }
1081 }
1082
1083 void client_disconnect(struct client *cl)
1084 {
1085     if (cl->state != Client_Idle)
1086         client_set_state(cl, Client_Disconnected);
1087     client_set_connection(cl, 0);
1088 }
1089
1090
1091 // Initialize CCL map for a target
1092 static CCL_bibset prepare_cclmap(struct client *cl, CCL_bibset base_bibset)
1093 {
1094     struct session_database *sdb = client_get_database(cl);
1095     struct setting *s;
1096     CCL_bibset res;
1097
1098     if (!sdb->settings)
1099         return 0;
1100     if (base_bibset)
1101         res = ccl_qual_dup(base_bibset);
1102     else
1103         res = ccl_qual_mk();
1104     for (s = sdb->settings[PZ_CCLMAP]; s; s = s->next)
1105     {
1106         const char *addinfo = 0;
1107         char *p = strchr(s->name + 3, ':');
1108         if (!p)
1109         {
1110             WRBUF w = wrbuf_alloc();
1111             wrbuf_printf(w, "Malformed cclmap. name=%s", s->name);
1112             yaz_log(YLOG_WARN, "%s: %s", client_get_id(cl), wrbuf_cstr(w));
1113             client_set_diagnostic(cl, ZOOM_ERROR_CCL_CONFIG,
1114                                   ZOOM_diag_str(ZOOM_ERROR_CCL_CONFIG),
1115                                   wrbuf_cstr(w));
1116             client_set_state_nb(cl, Client_Error);
1117             ccl_qual_rm(&res);
1118             wrbuf_destroy(w);
1119             return 0;
1120         }
1121         p++;
1122         if (ccl_qual_fitem2(res, s->value, p, &addinfo))
1123         {
1124             WRBUF w = wrbuf_alloc();
1125
1126             wrbuf_printf(w, "Malformed cclmap. name=%s: value=%s (%s)",
1127                          s->name, p, addinfo);
1128             yaz_log(YLOG_WARN, "%s: %s", client_get_id(cl), wrbuf_cstr(w));
1129             client_set_diagnostic(cl, ZOOM_ERROR_CCL_CONFIG,
1130                                   ZOOM_diag_str(ZOOM_ERROR_CCL_CONFIG),
1131                                   wrbuf_cstr(w));
1132             client_set_state_nb(cl, Client_Error);
1133             ccl_qual_rm(&res);
1134             wrbuf_destroy(w);
1135             return 0;
1136         }
1137     }
1138     return res;
1139 }
1140
1141 // returns a xmalloced CQL query corresponding to the pquery in client
1142 static char *make_cqlquery(struct client *cl, Z_RPNQuery *zquery)
1143 {
1144     cql_transform_t cqlt = cql_transform_create();
1145     char *r = 0;
1146     WRBUF wrb = wrbuf_alloc();
1147     int status;
1148
1149     if ((status = cql_transform_rpn2cql_wrbuf(cqlt, wrb, zquery)))
1150     {
1151         yaz_log(YLOG_WARN, "Failed to generate CQL query, code=%d", status);
1152     }
1153     else
1154     {
1155         r = xstrdup(wrbuf_cstr(wrb));
1156     }
1157     wrbuf_destroy(wrb);
1158     cql_transform_close(cqlt);
1159     return r;
1160 }
1161
1162 // returns a xmalloced SOLR query corresponding to the pquery in client
1163 // TODO Could prob. be merge with the similar make_cqlquery
1164 static char *make_solrquery(struct client *cl, Z_RPNQuery *zquery)
1165 {
1166     solr_transform_t sqlt = solr_transform_create();
1167     char *r = 0;
1168     WRBUF wrb = wrbuf_alloc();
1169     int status;
1170
1171     if ((status = solr_transform_rpn2solr_wrbuf(sqlt, wrb, zquery)))
1172     {
1173         yaz_log(YLOG_WARN, "Failed to generate SOLR query, code=%d", status);
1174     }
1175     else
1176     {
1177         r = xstrdup(wrbuf_cstr(wrb));
1178     }
1179     wrbuf_destroy(wrb);
1180     solr_transform_close(sqlt);
1181     return r;
1182 }
1183
1184 const char *client_get_facet_limit_local(struct client *cl,
1185                                          struct session_database *sdb,
1186                                          int *l,
1187                                          NMEM nmem, int *num, char ***values)
1188 {
1189     const char *name = 0;
1190     const char *value = 0;
1191     for (; (name = facet_limits_get(cl->facet_limits, *l, &value)); (*l)++)
1192     {
1193         struct setting *s = 0;
1194
1195         for (s = sdb->settings[PZ_LIMITMAP]; s; s = s->next)
1196         {
1197             const char *p = strchr(s->name + 3, ':');
1198             if (p && !strcmp(p + 1, name) && s->value)
1199             {
1200                 int j, cnum;
1201                 char **cvalues;
1202                 nmem_strsplit_escape2(nmem, ",", s->value, &cvalues,
1203                                       &cnum, 1, '\\', 1);
1204                 for (j = 0; j < cnum; j++)
1205                 {
1206                     const char *cvalue = cvalues[j];
1207                     while (*cvalue == ' ')
1208                         cvalue++;
1209                     if (!strncmp(cvalue, "local:", 6))
1210                     {
1211                         const char *cp = cvalue + 6;
1212                         while (*cp == ' ')
1213                             cp++;
1214                         nmem_strsplit_escape2(nmem, "|", value, values,
1215                                               num, 1, '\\', 1);
1216                         (*l)++;
1217                         return *cp ? cp : name;
1218                     }
1219                 }
1220             }
1221         }
1222     }
1223     return 0;
1224 }
1225
1226 static int apply_limit(struct session_database *sdb,
1227                        facet_limits_t facet_limits,
1228                        WRBUF w_pqf, CCL_bibset ccl_map,
1229                        struct conf_service *service)
1230 {
1231     int ret = 0;
1232     int i = 0;
1233     const char *name;
1234     const char *value;
1235
1236     NMEM nmem_tmp = nmem_create();
1237     for (i = 0; (name = facet_limits_get(facet_limits, i, &value)); i++)
1238     {
1239         struct setting *s = 0;
1240         nmem_reset(nmem_tmp);
1241         /* name="pz:limitmap:author" value="rpn:@attr 1=4|local:other" */
1242         for (s = sdb->settings[PZ_LIMITMAP]; s; s = s->next)
1243         {
1244             const char *p = strchr(s->name + 3, ':');
1245             if (p && !strcmp(p + 1, name) && s->value)
1246             {
1247                 char **values = 0;
1248                 int i, num = 0;
1249                 char **cvalues = 0;
1250                 int j, cnum = 0;
1251                 nmem_strsplit_escape2(nmem_tmp, "|", value, &values,
1252                                       &num, 1, '\\', 1);
1253
1254                 nmem_strsplit_escape2(nmem_tmp, ",", s->value, &cvalues,
1255                                       &cnum, 1, '\\', 1);
1256
1257                 for (j = 0; ret == 0 && j < cnum; j++)
1258                 {
1259                     const char *cvalue = cvalues[j];
1260                     while (*cvalue == ' ')
1261                         cvalue++;
1262                     if (!strncmp(cvalue, "rpn:", 4))
1263                     {
1264                         const char *pqf = cvalue + 4;
1265                         wrbuf_puts(w_pqf, "@and ");
1266                         wrbuf_puts(w_pqf, pqf);
1267                         wrbuf_puts(w_pqf, " ");
1268                         for (i = 0; i < num; i++)
1269                         {
1270                             if (i < num - 1)
1271                                 wrbuf_puts(w_pqf, "@or ");
1272                             yaz_encode_pqf_term(w_pqf, values[i],
1273                                                 strlen(values[i]));
1274                         }
1275                     }
1276                     else if (!strncmp(cvalue, "ccl:", 4))
1277                     {
1278                         const char *ccl = cvalue + 4;
1279                         WRBUF ccl_w = wrbuf_alloc();
1280                         for (i = 0; i < num; i++)
1281                         {
1282                             int cerror, cpos;
1283                             struct ccl_rpn_node *cn;
1284                             wrbuf_rewind(ccl_w);
1285                             wrbuf_puts(ccl_w, ccl);
1286                             wrbuf_puts(ccl_w, "=\"");
1287                             wrbuf_puts(ccl_w, values[i]);
1288                             wrbuf_puts(ccl_w, "\"");
1289
1290                             cn = ccl_find_str(ccl_map, wrbuf_cstr(ccl_w),
1291                                               &cerror, &cpos);
1292                             if (cn)
1293                             {
1294                                 if (i == 0)
1295                                     wrbuf_printf(w_pqf, "@and ");
1296
1297                                 /* or multiple values.. could be bad if last
1298                                    CCL parse fails, but this is unlikely to
1299                                    happen */
1300                                 if (i < num - 1)
1301                                     wrbuf_printf(w_pqf, "@or ");
1302                                 ccl_pquery(w_pqf, cn);
1303                                 ccl_rpn_delete(cn);
1304                             }
1305                         }
1306                         wrbuf_destroy(ccl_w);
1307                     }
1308                     else if (!strncmp(cvalue, "local:", 6)) {
1309                         /* no operation */
1310                     }
1311                     else
1312                     {
1313                         yaz_log(YLOG_WARN, "Target %s: Bad limitmap '%s'",
1314                                 sdb->database->id, cvalue);
1315                         ret = -1; /* bad limitmap */
1316                     }
1317                 }
1318                 break;
1319             }
1320         }
1321         if (!s)
1322         {
1323             int i;
1324             for (i = 0; i < service->num_metadata; i++)
1325             {
1326                 struct conf_metadata *md = service->metadata + i;
1327                 if (!strcmp(md->name, name) && md->limitcluster)
1328                 {
1329                     yaz_log(YLOG_LOG, "limitcluster in use for %s",
1330                             md->name);
1331                     break;
1332                 }
1333             }
1334             if (i == service->num_metadata)
1335             {
1336                 yaz_log(YLOG_WARN, "Target %s: limit %s used, but no limitmap defined",
1337                         (sdb->database ? sdb->database->id : "<no id>"), name);
1338             }
1339         }
1340     }
1341     nmem_destroy(nmem_tmp);
1342     return ret;
1343 }
1344
1345 // Parse the query given the settings specific to this client
1346 // client variable same_search is set as below as well as returned:
1347 // 0 if query is OK but different from before
1348 // 1 if query is OK but same as before
1349 // return -1 on query error
1350 // return -2 on limit error
1351 int client_parse_query(struct client *cl, const char *query,
1352                        facet_limits_t facet_limits)
1353 {
1354     struct session *se = client_get_session(cl);
1355     struct conf_service *service = se->service;
1356     struct session_database *sdb = client_get_database(cl);
1357     struct ccl_rpn_node *cn;
1358     int cerror, cpos;
1359     ODR odr_out;
1360     CCL_bibset ccl_map = prepare_cclmap(cl, service->ccl_bibset);
1361     const char *sru = session_setting_oneval(sdb, PZ_SRU);
1362     const char *pqf_prefix = session_setting_oneval(sdb, PZ_PQF_PREFIX);
1363     const char *pqf_strftime = session_setting_oneval(sdb, PZ_PQF_STRFTIME);
1364     const char *query_syntax = session_setting_oneval(sdb, PZ_QUERY_SYNTAX);
1365     WRBUF w_ccl, w_pqf;
1366     int ret_value = 1;
1367     Z_RPNQuery *zquery;
1368
1369     if (!ccl_map)
1370         return -3;
1371
1372     w_ccl = wrbuf_alloc();
1373     wrbuf_puts(w_ccl, query);
1374
1375     w_pqf = wrbuf_alloc();
1376     if (*pqf_prefix)
1377     {
1378         wrbuf_puts(w_pqf, pqf_prefix);
1379         wrbuf_puts(w_pqf, " ");
1380     }
1381
1382     if (apply_limit(sdb, facet_limits, w_pqf, ccl_map, service))
1383     {
1384         ccl_qual_rm(&ccl_map);
1385         return -2;
1386     }
1387
1388     facet_limits_destroy(cl->facet_limits);
1389     cl->facet_limits = facet_limits_dup(facet_limits);
1390
1391     yaz_log(YLOG_LOG, "Client %s: CCL query: %s limit: %s", client_get_id(cl), wrbuf_cstr(w_ccl), wrbuf_cstr(w_pqf));
1392     cn = ccl_find_str(ccl_map, wrbuf_cstr(w_ccl), &cerror, &cpos);
1393     ccl_qual_rm(&ccl_map);
1394     if (!cn)
1395     {
1396         client_set_state(cl, Client_Error);
1397         session_log(se, YLOG_WARN, "Client %s: Failed to parse CCL query '%s'",
1398                     client_get_id(cl),
1399                     wrbuf_cstr(w_ccl));
1400         wrbuf_destroy(w_ccl);
1401         wrbuf_destroy(w_pqf);
1402         return -1;
1403     }
1404     wrbuf_destroy(w_ccl);
1405
1406     if (!pqf_strftime || !*pqf_strftime)
1407         ccl_pquery(w_pqf, cn);
1408     else
1409     {
1410         time_t cur_time = time(0);
1411         struct tm *tm =  localtime(&cur_time);
1412         char tmp_str[300];
1413         const char *cp = tmp_str;
1414
1415         /* see man strftime(3) for things .. In particular %% gets converted
1416          to %.. And That's our original query .. */
1417         strftime(tmp_str, sizeof(tmp_str)-1, pqf_strftime, tm);
1418         for (; *cp; cp++)
1419         {
1420             if (cp[0] == '%')
1421                 ccl_pquery(w_pqf, cn);
1422             else
1423                 wrbuf_putc(w_pqf, cp[0]);
1424         }
1425     }
1426
1427     /* Compares query and limit with old one. If different we need to research */
1428     if (!cl->pquery || strcmp(cl->pquery, wrbuf_cstr(w_pqf)))
1429     {
1430         if (cl->pquery)
1431             session_log(se, YLOG_LOG, "Client %s: Re-search due query/limit change: %s to %s", 
1432                         client_get_id(cl), cl->pquery, wrbuf_cstr(w_pqf));
1433         xfree(cl->pquery);
1434         cl->pquery = xstrdup(wrbuf_cstr(w_pqf));
1435         // return value is no longer used.
1436         ret_value = 0;
1437         // Need to (re)search
1438         cl->same_search= 0;
1439     }
1440     wrbuf_destroy(w_pqf);
1441
1442     xfree(cl->cqlquery);
1443     cl->cqlquery = 0;
1444
1445     odr_out = odr_createmem(ODR_ENCODE);
1446     zquery = p_query_rpn(odr_out, cl->pquery);
1447     if (!zquery)
1448     {
1449
1450         session_log(se, YLOG_WARN, "Invalid PQF query for Client %s: %s",
1451                     client_get_id(cl), cl->pquery);
1452         ret_value = -1;
1453     }
1454     else
1455     {
1456         session_log(se, YLOG_LOG, "PQF for Client %s: %s",
1457                     client_get_id(cl), cl->pquery);
1458
1459         /* Support for PQF on SRU targets. */
1460         if (strcmp(query_syntax, "pqf") != 0 && *sru)
1461         {
1462             if (!strcmp(sru, "solr"))
1463                 cl->cqlquery = make_solrquery(cl, zquery);
1464             else
1465                 cl->cqlquery = make_cqlquery(cl, zquery);
1466             if (!cl->cqlquery)
1467                 ret_value = -1;
1468             else
1469                 session_log(se, YLOG_LOG, "Client %s native query: %s (%s)",
1470                             client_get_id(cl), cl->cqlquery, sru);
1471         }
1472     }
1473     odr_destroy(odr_out);
1474
1475     /* TODO FIX Not thread safe */
1476     if (!se->relevance)
1477     {
1478         // Initialize relevance structure with query terms
1479         se->relevance = relevance_create_ccl(se->service->charsets, cn,
1480                                              se->service->rank_cluster,
1481                                              se->service->rank_follow,
1482                                              se->service->rank_lead,
1483                                              se->service->rank_length);
1484     }
1485     ccl_rpn_delete(cn);
1486     return ret_value;
1487 }
1488
1489 int client_parse_sort(struct client *cl, struct reclist_sortparms *sp)
1490 {
1491     if (sp)
1492     {
1493         const char *sort_strategy_and_spec =
1494             get_strategy_plus_sort(cl, sp->name);
1495         int increasing = sp->increasing;
1496         if (sort_strategy_and_spec && strlen(sort_strategy_and_spec) < 40)
1497         {
1498             char strategy[50], *p;
1499             strcpy(strategy, sort_strategy_and_spec);
1500             p = strchr(strategy, ':');
1501             if (p)
1502             {
1503                 // Split the string in two
1504                 *p++ = 0;
1505                 while (*p == ' ')
1506                     p++;
1507                 if (increasing)
1508                     strcat(p, " <");
1509                 else
1510                     strcat(p, " >");
1511                 yaz_log(YLOG_LOG, "Client %s: applying sorting %s %s", client_get_id(cl), strategy, p);
1512                 if (!cl->sort_strategy || strcmp(cl->sort_strategy, strategy))
1513                     cl->same_search = 0;
1514                 if (!cl->sort_criteria || strcmp(cl->sort_criteria, p))
1515                     cl->same_search = 0;
1516                 if (cl->same_search == 0) {
1517                     xfree(cl->sort_strategy);
1518                     cl->sort_strategy = xstrdup(strategy);
1519                     xfree(cl->sort_criteria);
1520                     cl->sort_criteria = xstrdup(p);
1521                 }
1522             }
1523             else {
1524                 yaz_log(YLOG_LOG, "Client %s: Invalid sort strategy and spec found %s", client_get_id(cl), sort_strategy_and_spec);
1525                 xfree(cl->sort_strategy);
1526                 cl->sort_strategy  = 0;
1527                 xfree(cl->sort_criteria);
1528                 cl->sort_criteria = 0;
1529             }
1530         } else {
1531             yaz_log(YLOG_DEBUG, "Client %s: No sort strategy and spec found.", client_get_id(cl));
1532             xfree(cl->sort_strategy);
1533             cl->sort_strategy  = 0;
1534             xfree(cl->sort_criteria);
1535             cl->sort_criteria = 0;
1536         }
1537
1538     }
1539     return !cl->same_search;
1540 }
1541
1542 void client_set_session(struct client *cl, struct session *se)
1543 {
1544     cl->session = se;
1545 }
1546
1547 int client_is_active(struct client *cl)
1548 {
1549     if (cl->connection && (cl->state == Client_Connecting ||
1550                            cl->state == Client_Working))
1551         return 1;
1552     return 0;
1553 }
1554
1555 int client_is_active_preferred(struct client *cl)
1556 {
1557     /* only count if this is a preferred target. */
1558     if (!cl->preferred)
1559         return 0;
1560     /* TODO No sure this the condition that Seb wants */
1561     if (cl->connection && (cl->state == Client_Connecting ||
1562                            cl->state == Client_Working))
1563         return 1;
1564     return 0;
1565 }
1566
1567 Odr_int client_get_hits(struct client *cl)
1568 {
1569     return cl->hits;
1570 }
1571
1572 Odr_int client_get_approximation(struct client *cl)
1573 {
1574     if (cl->record_offset > 0) {
1575         Odr_int approx = ((10 * cl->hits * (cl->record_offset - cl->filtered)) / cl->record_offset + 5) /10;
1576         yaz_log(YLOG_DEBUG, "%s: Approx: %lld * %d / %d = %lld ", client_get_id(cl), cl->hits, cl->record_offset - cl->filtered, cl->record_offset, approx);
1577         return approx;
1578     }
1579     return cl->hits;
1580 }
1581
1582 int client_get_num_records(struct client *cl)
1583 {
1584     return cl->record_offset;
1585 }
1586
1587 int client_get_num_records_filtered(struct client *cl)
1588 {
1589     return cl->filtered;
1590 }
1591
1592 void client_set_diagnostic(struct client *cl, int diagnostic,
1593                            const char *message, const char *addinfo)
1594 {
1595     cl->diagnostic = diagnostic;
1596     xfree(cl->message);
1597     cl->message = xstrdup(message);
1598     xfree(cl->addinfo);
1599     cl->addinfo = 0;
1600     if (addinfo)
1601         cl->addinfo = xstrdup(addinfo);
1602 }
1603
1604 int client_get_diagnostic(struct client *cl, const char **message,
1605                           const char **addinfo)
1606 {
1607     if (message)
1608         *message = cl->message;
1609     if (addinfo)
1610         *addinfo = cl->addinfo;
1611     return cl->diagnostic;
1612 }
1613
1614 const char * client_get_suggestions_xml(struct client *cl, WRBUF wrbuf)
1615 {
1616     /* int idx; */
1617     struct suggestions *suggestions = cl->suggestions;
1618
1619     if (!suggestions) {
1620         //yaz_log(YLOG_DEBUG, "No suggestions found");
1621         return "";
1622     }
1623     if (suggestions->passthrough) {
1624         yaz_log(YLOG_DEBUG, "Passthrough Suggestions: \n%s\n", suggestions->passthrough);
1625         return suggestions->passthrough;
1626     }
1627     if (suggestions->num == 0) {
1628         return "";
1629     }
1630     /*
1631     for (idx = 0; idx < suggestions->num; idx++) {
1632         wrbuf_printf(wrbuf, "<suggest term=\"%s\"", suggestions->suggest[idx]);
1633         if (suggestions->misspelled[idx] && suggestions->misspelled[idx]) {
1634             wrbuf_puts(wrbuf, suggestions->misspelled[idx]);
1635             wrbuf_puts(wrbuf, "</suggest>\n");
1636         }
1637         else
1638             wrbuf_puts(wrbuf, "/>\n");
1639     }
1640     */
1641     return wrbuf_cstr(wrbuf);
1642 }
1643
1644
1645 void client_set_database(struct client *cl, struct session_database *db)
1646 {
1647     cl->database = db;
1648 }
1649
1650 const char *client_get_id(struct client *cl)
1651 {
1652     return cl->id;
1653 }
1654
1655 int client_get_maxrecs(struct client *cl)
1656 {
1657     return cl->maxrecs;
1658 }
1659
1660 void client_set_preferred(struct client *cl, int v)
1661 {
1662     cl->preferred = v;
1663 }
1664
1665
1666 struct suggestions* client_suggestions_create(const char* suggestions_string)
1667 {
1668     int i;
1669     NMEM nmem;
1670     struct suggestions *suggestions;
1671     if (suggestions_string == 0 || suggestions_string[0] == 0 )
1672         return 0;
1673     nmem = nmem_create();
1674     suggestions = nmem_malloc(nmem, sizeof(*suggestions));
1675     yaz_log(YLOG_DEBUG, "client target suggestions: %s.", suggestions_string);
1676
1677     suggestions->nmem = nmem;
1678     suggestions->num = 0;
1679     suggestions->misspelled = 0;
1680     suggestions->suggest = 0;
1681     suggestions->passthrough = nmem_strdup_null(nmem, suggestions_string);
1682
1683     if (suggestions_string)
1684         nmem_strsplit_escape2(suggestions->nmem, "\n", suggestions_string, &suggestions->suggest,
1685                               &suggestions->num, 1, '\\', 0);
1686     /* Set up misspelled array */
1687     suggestions->misspelled = (char **) nmem_malloc(nmem, suggestions->num * sizeof(**suggestions->misspelled));
1688     /* replace = with \0 .. for each item */
1689     for (i = 0; i < suggestions->num; i++)
1690     {
1691         char *cp = strchr(suggestions->suggest[i], '=');
1692         if (cp) {
1693             *cp = '\0';
1694             suggestions->misspelled[i] = cp+1;
1695         }
1696     }
1697     return suggestions;
1698 }
1699
1700 static void client_suggestions_destroy(struct client *cl)
1701 {
1702     NMEM nmem = cl->suggestions->nmem;
1703     cl->suggestions = 0;
1704     nmem_destroy(nmem);
1705 }
1706
1707 /*
1708  * Local variables:
1709  * c-basic-offset: 4
1710  * c-file-style: "Stroustrup"
1711  * indent-tabs-mode: nil
1712  * End:
1713  * vim: shiftwidth=4 tabstop=8 expandtab
1714  */
1715