Now also release termlist watch
[pazpar2-moved-to-github.git] / src / client.c
1 /* This file is part of Pazpar2.
2    Copyright (C) 2006-2011 Index Data
3
4 Pazpar2 is free software; you can redistribute it and/or modify it under
5 the terms of the GNU General Public License as published by the Free
6 Software Foundation; either version 2, or (at your option) any later
7 version.
8
9 Pazpar2 is distributed in the hope that it will be useful, but WITHOUT ANY
10 WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12 for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
17
18 */
19
20 /** \file client.c
21     \brief Z39.50 client 
22 */
23
24 #if HAVE_CONFIG_H
25 #include <config.h>
26 #endif
27 #include <stdlib.h>
28 #include <stdio.h>
29 #include <string.h>
30 #if HAVE_SYS_TIME_H
31 #include <sys/time.h>
32 #endif
33 #if HAVE_UNISTD_H
34 #include <unistd.h>
35 #endif
36 #include <signal.h>
37 #include <assert.h>
38
39 #include <yaz/marcdisp.h>
40 #include <yaz/comstack.h>
41 #include <yaz/tcpip.h>
42 #include <yaz/proto.h>
43 #include <yaz/readconf.h>
44 #include <yaz/pquery.h>
45 #include <yaz/otherinfo.h>
46 #include <yaz/yaz-util.h>
47 #include <yaz/nmem.h>
48 #include <yaz/query-charset.h>
49 #include <yaz/querytowrbuf.h>
50 #include <yaz/oid_db.h>
51 #include <yaz/diagbib1.h>
52 #include <yaz/snprintf.h>
53 #include <yaz/rpn2cql.h>
54 #include <yaz/rpn2solr.h>
55
56 #define USE_TIMING 0
57 #if USE_TIMING
58 #include <yaz/timing.h>
59 #endif
60
61 #include "ppmutex.h"
62 #include "session.h"
63 #include "parameters.h"
64 #include "client.h"
65 #include "connection.h"
66 #include "settings.h"
67 #include "relevance.h"
68 #include "incref.h"
69
70 static YAZ_MUTEX g_mutex = 0;
71 static int no_clients = 0;
72 static int no_clients_total = 0;
73
74 static int client_use(int delta)
75 {
76     int clients;
77     if (!g_mutex)
78         yaz_mutex_create(&g_mutex);
79     yaz_mutex_enter(g_mutex);
80     no_clients += delta;
81     if (delta > 0)
82         no_clients_total += delta;
83     clients = no_clients;
84     yaz_mutex_leave(g_mutex);
85     yaz_log(YLOG_DEBUG, "%s clients=%d", delta == 0 ? "" : (delta > 0 ? "INC" : "DEC"), clients);
86     return clients;
87 }
88
89 int  clients_count(void) {
90     return client_use(0);
91 }
92
93 int  clients_count_total(void) {
94     int total = 0;
95     if (!g_mutex)
96         return 0;
97     yaz_mutex_enter(g_mutex);
98     total = no_clients_total;
99     yaz_mutex_leave(g_mutex);
100     return total;
101 }
102
103
104 /** \brief Represents client state for a connection to one search target */
105 struct client {
106     struct session_database *database;
107     struct connection *connection;
108     struct session *session;
109     char *pquery; // Current search
110     char *cqlquery; // used for SRU targets only
111     Odr_int hits;
112     int record_offset;
113     int maxrecs;
114     int startrecs;
115     int diagnostic;
116     int preferred;
117     struct suggestions *suggestions;
118     enum client_state state;
119     struct show_raw *show_raw;
120     ZOOM_resultset resultset;
121     YAZ_MUTEX mutex;
122     int ref_count;
123     char *id;
124 };
125
126 struct suggestions {
127     NMEM nmem;
128     int num;
129     char **misspelled;
130     char **suggest;
131     char *passthrough;
132 };
133
134 struct show_raw {
135     int active; // whether this request has been sent to the server
136     int position;
137     int binary;
138     char *syntax;
139     char *esn;
140     void (*error_handler)(void *data, const char *addinfo);
141     void (*record_handler)(void *data, const char *buf, size_t sz);
142     void *data;
143     struct show_raw *next;
144 };
145
146 static const char *client_states[] = {
147     "Client_Connecting",
148     "Client_Idle",
149     "Client_Working",
150     "Client_Error",
151     "Client_Failed",
152     "Client_Disconnected"
153 };
154
155 const char *client_get_state_str(struct client *cl)
156 {
157     return client_states[cl->state];
158 }
159
160 enum client_state client_get_state(struct client *cl)
161 {
162     return cl->state;
163 }
164
165 void client_set_state(struct client *cl, enum client_state st)
166 {
167     int was_active = 0;
168     if (client_is_active(cl))
169         was_active = 1;
170     cl->state = st;
171     /* If client is going from being active to inactive and all clients
172        are now idle we fire a watch for the session . The assumption is
173        that session is not mutex locked if client is already active */
174     if (was_active && !client_is_active(cl) && cl->session)
175     {
176
177         int no_active = session_active_clients(cl->session);
178         yaz_log(YLOG_DEBUG, "%s: releasing watches on zero active: %d",
179                 client_get_id(cl), no_active);
180         if (no_active == 0) {
181             session_alert_watch(cl->session, SESSION_WATCH_SHOW);
182             session_alert_watch(cl->session, SESSION_WATCH_BYTARGET);
183             session_alert_watch(cl->session, SESSION_WATCH_TERMLIST);
184             session_alert_watch(cl->session, SESSION_WATCH_SHOW_PREF);
185         }
186     }
187 }
188
189 static void client_show_raw_error(struct client *cl, const char *addinfo);
190
191 struct connection *client_get_connection(struct client *cl)
192 {
193     return cl->connection;
194 }
195
196 struct session_database *client_get_database(struct client *cl)
197 {
198     return cl->database;
199 }
200
201 struct session *client_get_session(struct client *cl)
202 {
203     return cl->session;
204 }
205
206 const char *client_get_pquery(struct client *cl)
207 {
208     return cl->pquery;
209 }
210
211 static void client_send_raw_present(struct client *cl);
212 static int nativesyntax_to_type(struct session_database *sdb, char *type,
213                                 ZOOM_record rec);
214
215 static void client_show_immediate(
216     ZOOM_resultset resultset, struct session_database *sdb, int position,
217     void *data,
218     void (*error_handler)(void *data, const char *addinfo),
219     void (*record_handler)(void *data, const char *buf, size_t sz),
220     int binary)
221 {
222     ZOOM_record rec = 0;
223     char type[80];
224     const char *buf;
225     int len;
226
227     if (!resultset)
228     {
229         error_handler(data, "no resultset");
230         return;
231     }
232     rec = ZOOM_resultset_record(resultset, position-1);
233     if (!rec)
234     {
235         error_handler(data, "no record");
236         return;
237     }
238     if (binary)
239         strcpy(type, "raw");
240     else
241         nativesyntax_to_type(sdb, type, rec);
242     buf = ZOOM_record_get(rec, type, &len);
243     if (!buf)
244     {
245         error_handler(data, "no record");
246         return;
247     }
248     record_handler(data, buf, len);
249 }
250
251
252 int client_show_raw_begin(struct client *cl, int position,
253                           const char *syntax, const char *esn,
254                           void *data,
255                           void (*error_handler)(void *data, const char *addinfo),
256                           void (*record_handler)(void *data, const char *buf,
257                                                  size_t sz),
258                           int binary)
259 {
260     if (syntax == 0 && esn == 0)
261         client_show_immediate(cl->resultset, client_get_database(cl),
262                               position, data,
263                               error_handler, record_handler,
264                               binary);
265     else
266     {
267         struct show_raw *rr, **rrp;
268
269         if (!cl->connection)
270             return -1;
271     
272
273         rr = xmalloc(sizeof(*rr));
274         rr->position = position;
275         rr->active = 0;
276         rr->data = data;
277         rr->error_handler = error_handler;
278         rr->record_handler = record_handler;
279         rr->binary = binary;
280         if (syntax)
281             rr->syntax = xstrdup(syntax);
282         else
283             rr->syntax = 0;
284         if (esn)
285             rr->esn = xstrdup(esn);
286         else
287             rr->esn = 0;
288         rr->next = 0;
289         
290         for (rrp = &cl->show_raw; *rrp; rrp = &(*rrp)->next)
291             ;
292         *rrp = rr;
293         
294         if (cl->state == Client_Failed)
295         {
296             client_show_raw_error(cl, "client failed");
297         }
298         else if (cl->state == Client_Disconnected)
299         {
300             client_show_raw_error(cl, "client disconnected");
301         }
302         else
303         {
304             client_send_raw_present(cl);
305         }
306     }
307     return 0;
308 }
309
310 static void client_show_raw_delete(struct show_raw *r)
311 {
312     xfree(r->syntax);
313     xfree(r->esn);
314     xfree(r);
315 }
316
317 void client_show_raw_remove(struct client *cl, void *data)
318 {
319     struct show_raw *rr = data;
320     struct show_raw **rrp = &cl->show_raw;
321     while (*rrp != rr)
322         rrp = &(*rrp)->next;
323     if (*rrp)
324     {
325         *rrp = rr->next;
326         client_show_raw_delete(rr);
327     }
328 }
329
330 void client_show_raw_dequeue(struct client *cl)
331 {
332     struct show_raw *rr = cl->show_raw;
333
334     cl->show_raw = rr->next;
335     client_show_raw_delete(rr);
336 }
337
338 static void client_show_raw_error(struct client *cl, const char *addinfo)
339 {
340     while (cl->show_raw)
341     {
342         cl->show_raw->error_handler(cl->show_raw->data, addinfo);
343         client_show_raw_dequeue(cl);
344     }
345 }
346
347 static void client_send_raw_present(struct client *cl)
348 {
349     struct session_database *sdb = client_get_database(cl);
350     struct connection *co = client_get_connection(cl);
351     ZOOM_resultset set = cl->resultset;
352
353     int offset = cl->show_raw->position;
354     const char *syntax = 0;
355     const char *elements = 0;
356
357     assert(cl->show_raw);
358     assert(set);
359
360     yaz_log(YLOG_DEBUG, "%s: trying to present %d record(s) from %d",
361             client_get_id(cl), 1, offset);
362
363     if (cl->show_raw->syntax)
364         syntax = cl->show_raw->syntax;
365     else
366         syntax = session_setting_oneval(sdb, PZ_REQUESTSYNTAX);
367     ZOOM_resultset_option_set(set, "preferredRecordSyntax", syntax);
368
369     if (cl->show_raw->esn)
370         elements = cl->show_raw->esn;
371     else
372         elements = session_setting_oneval(sdb, PZ_ELEMENTS);
373     if (elements && *elements)
374         ZOOM_resultset_option_set(set, "elementSetName", elements);
375
376     ZOOM_resultset_records(set, 0, offset-1, 1);
377     cl->show_raw->active = 1;
378
379     connection_continue(co);
380 }
381
382 static int nativesyntax_to_type(struct session_database *sdb, char *type,
383                                 ZOOM_record rec)
384 {
385     const char *s = session_setting_oneval(sdb, PZ_NATIVESYNTAX);
386
387     if (s && *s)
388     {
389         if (!strncmp(s, "iso2709", 7))
390         {
391             const char *cp = strchr(s, ';');
392             yaz_snprintf(type, 80, "xml; charset=%s", cp ? cp+1 : "marc-8s");
393         }
394         else if (!strncmp(s, "xml", 3))
395         {
396             strcpy(type, "xml");
397         }
398         else if (!strncmp(s, "txml", 4))
399         {
400             const char *cp = strchr(s, ';');
401             yaz_snprintf(type, 80, "txml; charset=%s", cp ? cp+1 : "marc-8s");
402         }
403         else
404             return -1;
405         return 0;
406     }
407     else  /* attempt to deduce structure */
408     {
409         const char *syntax = ZOOM_record_get(rec, "syntax", NULL);
410         if (syntax)
411         {
412             if (!strcmp(syntax, "XML"))
413             {
414                 strcpy(type, "xml");
415                 return 0;
416             }
417             else if (!strcmp(syntax, "USmarc") || !strcmp(syntax, "MARC21"))
418             {
419                 strcpy(type, "xml; charset=marc8-s");
420                 return 0;
421             }
422             else return -1;
423         }
424         else return -1;
425     }
426 }
427
428 /**
429  * TODO Consider thread safety!!!
430  *
431  */
432 void client_report_facets(struct client *cl, ZOOM_resultset rs)
433 {
434     struct session_database *sdb = client_get_database(cl);
435     ZOOM_facet_field *facets = ZOOM_resultset_facets(rs);
436
437     if (sdb && facets)
438     {
439         struct session *se = client_get_session(cl);
440         int facet_num = ZOOM_resultset_facets_size(rs);
441         struct setting *s;
442
443         for (s = sdb->settings[PZ_FACETMAP]; s; s = s->next)
444         {
445             const char *p = strchr(s->name + 3, ':');
446             if (p && p[1] && s->value && s->value[0])
447             {
448                 int facet_idx;
449                 p++; /* p now holds logical facet name */
450                 for (facet_idx = 0; facet_idx < facet_num; facet_idx++)
451                 {
452                     const char *native_name =
453                         ZOOM_facet_field_name(facets[facet_idx]);
454                     if (native_name && !strcmp(s->value, native_name))
455                     {
456                         size_t term_idx;
457                         size_t term_num =
458                             ZOOM_facet_field_term_count(facets[facet_idx]);
459                         for (term_idx = 0; term_idx < term_num; term_idx++ )
460                         {
461                             int freq;
462                             const char *term =
463                                 ZOOM_facet_field_get_term(facets[facet_idx],
464                                                           term_idx, &freq);
465                             if (term)
466                                 add_facet(se, p, term, freq);
467                         }
468                         break;
469                     }
470                 }
471             }
472         }
473     }
474 }
475
476 static void ingest_raw_record(struct client *cl, ZOOM_record rec)
477 {
478     const char *buf;
479     int len;
480     char type[80];
481
482     if (cl->show_raw->binary)
483         strcpy(type, "raw");
484     else
485     {
486         struct session_database *sdb = client_get_database(cl);
487         nativesyntax_to_type(sdb, type, rec);
488     }
489
490     buf = ZOOM_record_get(rec, type, &len);
491     cl->show_raw->record_handler(cl->show_raw->data,  buf, len);
492     client_show_raw_dequeue(cl);
493 }
494
495 void client_check_preferred_watch(struct client *cl)
496 {
497     struct session *se = cl->session;
498     yaz_log(YLOG_DEBUG, "client_check_preferred_watch: %s ", client_get_id(cl));
499     if (se)
500     {
501         client_unlock(cl);
502         /* TODO possible threading issue. Session can have been destroyed */
503         if (session_is_preferred_clients_ready(se)) {
504             session_alert_watch(se, SESSION_WATCH_SHOW_PREF);
505         }
506         else
507             yaz_log(YLOG_DEBUG, "client_check_preferred_watch: Still locked on preferred targets.");
508
509         client_lock(cl);
510     }
511     else
512         yaz_log(YLOG_WARN, "client_check_preferred_watch: %s. No session!", client_get_id(cl));
513
514 }
515
516 struct suggestions* client_suggestions_create(const char* suggestions_string);
517 static void client_suggestions_destroy(struct client *cl);
518
519 void client_search_response(struct client *cl)
520 {
521     struct connection *co = cl->connection;
522     ZOOM_connection link = connection_get_link(co);
523     ZOOM_resultset resultset = cl->resultset;
524
525     const char *error, *addinfo = 0;
526     
527     if (ZOOM_connection_error(link, &error, &addinfo))
528     {
529         cl->hits = 0;
530         client_set_state(cl, Client_Error);
531         yaz_log(YLOG_WARN, "Search error %s (%s): %s",
532                 error, addinfo, client_get_id(cl));
533     }
534     else
535     {
536         yaz_log(YLOG_DEBUG, "client_search_response: hits "
537                 ODR_INT_PRINTF, cl->hits);
538         client_report_facets(cl, resultset);
539         cl->record_offset = cl->startrecs;
540         cl->hits = ZOOM_resultset_size(resultset);
541         if (cl->suggestions)
542             client_suggestions_destroy(cl);
543         cl->suggestions = client_suggestions_create(ZOOM_resultset_option_get(resultset, "suggestions"));
544     }
545 }
546
547 void client_got_records(struct client *cl)
548 {
549     struct session *se = cl->session;
550     if (se)
551     {
552         client_unlock(cl);
553         session_alert_watch(se, SESSION_WATCH_SHOW);
554         session_alert_watch(se, SESSION_WATCH_BYTARGET);
555         session_alert_watch(se, SESSION_WATCH_TERMLIST);
556         session_alert_watch(se, SESSION_WATCH_RECORD);
557         client_lock(cl);
558     }
559 }
560
561 void client_record_response(struct client *cl)
562 {
563     struct connection *co = cl->connection;
564     ZOOM_connection link = connection_get_link(co);
565     ZOOM_resultset resultset = cl->resultset;
566     const char *error, *addinfo;
567
568     if (ZOOM_connection_error(link, &error, &addinfo))
569     {
570         client_set_state(cl, Client_Error);
571         yaz_log(YLOG_WARN, "Search error %s (%s): %s",
572             error, addinfo, client_get_id(cl));
573     }
574     else
575     {
576         ZOOM_record rec = 0;
577         const char *msg, *addinfo;
578         
579         if (cl->show_raw && cl->show_raw->active)
580         {
581             if ((rec = ZOOM_resultset_record(resultset,
582                                              cl->show_raw->position-1)))
583             {
584                 cl->show_raw->active = 0;
585                 ingest_raw_record(cl, rec);
586             }
587             else
588             {
589                 yaz_log(YLOG_WARN, "Expected record, but got NULL, offset=%d",
590                         cl->show_raw->position-1);
591             }
592         }
593         else
594         {
595             int offset = cl->record_offset;
596             if ((rec = ZOOM_resultset_record(resultset, offset)))
597             {
598                 cl->record_offset++;
599                 if (cl->session == 0)
600                     ;
601                 else if (ZOOM_record_error(rec, &msg, &addinfo, 0))
602                 {
603                     yaz_log(YLOG_WARN, "Record error %s (%s): %s (rec #%d)",
604                             msg, addinfo, client_get_id(cl),
605                             cl->record_offset);
606                 }
607                 else
608                 {
609                     struct session_database *sdb = client_get_database(cl);
610                     NMEM nmem = nmem_create();
611                     const char *xmlrec;
612                     char type[80];
613
614                     if (nativesyntax_to_type(sdb, type, rec))
615                         yaz_log(YLOG_WARN, "Failed to determine record type");
616                     xmlrec = ZOOM_record_get(rec, type, NULL);
617                     if (!xmlrec)
618                         yaz_log(YLOG_WARN, "ZOOM_record_get failed from %s",
619                                 client_get_id(cl));
620                     else
621                     {
622                         /* OK = 0, -1 = failure, -2 = Filtered */
623                         if (ingest_record(cl, xmlrec, cl->record_offset, nmem) == -1)
624                             yaz_log(YLOG_WARN, "Failed to ingest from %s", client_get_id(cl));
625                     }
626                     nmem_destroy(nmem);
627                 }
628             }
629             else
630             {
631                 yaz_log(YLOG_WARN, "Expected record, but got NULL, offset=%d",
632                         offset);
633             }
634         }
635     }
636 }
637
638 static void client_set_facets_request(struct client *cl, ZOOM_connection link)
639 {
640     struct session_database *sdb = client_get_database(cl);
641
642     WRBUF w = wrbuf_alloc();
643     
644     struct setting *s;
645
646     for (s = sdb->settings[PZ_FACETMAP]; s; s = s->next)
647     {
648         const char *p = strchr(s->name + 3, ':');
649         if (!p)
650         {
651             yaz_log(YLOG_WARN, "Malformed facetmap name: %s", s->name);
652         }
653         else if (s->value && s->value[0])
654         {
655             wrbuf_puts(w, "@attr 1=");
656             yaz_encode_pqf_term(w, s->value, strlen(s->value));
657             if (s->next)
658                 wrbuf_puts(w, ",");
659         }
660     }
661     yaz_log(YLOG_LOG, "using facets str: %s", wrbuf_cstr(w));
662     ZOOM_connection_option_set(link, "facets",
663                                wrbuf_len(w) ? wrbuf_cstr(w) : 0);
664     wrbuf_destroy(w);
665 }
666
667 int client_has_facet(struct client *cl, const char *name)
668 {
669     struct session_database *sdb = client_get_database(cl);
670     struct setting *s;
671
672     for (s = sdb->settings[PZ_FACETMAP]; s; s = s->next)
673     {
674         const char *p = strchr(s->name + 3, ':');
675         if (p && !strcmp(name, p + 1))
676             return 1;
677     }
678     return 0;
679 }
680
681 void client_start_search(struct client *cl, const char *sort_strategy_and_spec,
682                          int increasing)
683 {
684     struct session_database *sdb = client_get_database(cl);
685     struct connection *co = client_get_connection(cl);
686     ZOOM_connection link = connection_get_link(co);
687     ZOOM_resultset rs;
688     const char *opt_piggyback   = session_setting_oneval(sdb, PZ_PIGGYBACK);
689     const char *opt_queryenc    = session_setting_oneval(sdb, PZ_QUERYENCODING);
690     const char *opt_elements    = session_setting_oneval(sdb, PZ_ELEMENTS);
691     const char *opt_requestsyn  = session_setting_oneval(sdb, PZ_REQUESTSYNTAX);
692     const char *opt_maxrecs     = session_setting_oneval(sdb, PZ_MAXRECS);
693     const char *opt_sru         = session_setting_oneval(sdb, PZ_SRU);
694     const char *opt_sort        = session_setting_oneval(sdb, PZ_SORT);
695     const char *opt_preferred   = session_setting_oneval(sdb, PZ_PREFERRED);
696     const char *extra_args      = session_setting_oneval(sdb, PZ_EXTRA_ARGS);
697     char maxrecs_str[24], startrecs_str[24];
698     ZOOM_query q;
699
700     assert(link);
701
702     cl->record_offset = 0;
703     cl->diagnostic = 0;
704
705     if (extra_args && *extra_args)
706         ZOOM_connection_option_set(link, "extraArgs", extra_args);
707
708     if (opt_preferred) {
709         cl->preferred = atoi(opt_preferred);
710         if (cl->preferred)
711             yaz_log(YLOG_LOG, "Target %s has preferred status: %d",
712                     client_get_id(cl), cl->preferred);
713     }
714     client_set_state(cl, Client_Working);
715
716     if (*opt_piggyback)
717         ZOOM_connection_option_set(link, "piggyback", opt_piggyback);
718     else
719         ZOOM_connection_option_set(link, "piggyback", "1");
720     if (*opt_queryenc)
721         ZOOM_connection_option_set(link, "rpnCharset", opt_queryenc);
722     if (*opt_sru && *opt_elements)
723         ZOOM_connection_option_set(link, "schema", opt_elements);
724     else if (*opt_elements)
725         ZOOM_connection_option_set(link, "elementSetName", opt_elements);
726     if (*opt_requestsyn)
727         ZOOM_connection_option_set(link, "preferredRecordSyntax", opt_requestsyn);
728
729     if (opt_maxrecs && *opt_maxrecs)
730     {
731         cl->maxrecs = atoi(opt_maxrecs);
732     }
733
734     /* convert back to string representation used in ZOOM API */
735     sprintf(maxrecs_str, "%d", cl->maxrecs);
736     ZOOM_connection_option_set(link, "count", maxrecs_str);
737
738     if (cl->maxrecs > 20)
739         ZOOM_connection_option_set(link, "presentChunk", "20");
740     else
741         ZOOM_connection_option_set(link, "presentChunk", maxrecs_str);
742
743     sprintf(startrecs_str, "%d", cl->startrecs);
744     ZOOM_connection_option_set(link, "start", startrecs_str);
745
746     /* TODO Verify does it break something for CQL targets(non-SOLR) ? */
747     /* facets definition is in PQF */
748     client_set_facets_request(cl, link);
749
750     q = ZOOM_query_create();
751     if (cl->cqlquery)
752     {
753         yaz_log(YLOG_LOG, "Search %s CQL: %s", client_get_id(cl),
754                 cl->cqlquery);
755         ZOOM_query_cql(q, cl->cqlquery);
756         if (*opt_sort)
757             ZOOM_query_sortby(q, opt_sort);
758     }
759     else
760     {
761         yaz_log(YLOG_LOG, "Search %s PQF: %s", client_get_id(cl), cl->pquery);
762         
763         ZOOM_query_prefix(q, cl->pquery);
764     }
765     if (sort_strategy_and_spec &&
766         strlen(sort_strategy_and_spec) < 40 /* spec below */)
767     {
768         char spec[50], *p;
769         strcpy(spec, sort_strategy_and_spec);
770         p = strchr(spec, ':');
771         if (p)
772         {
773             *p++ = '\0'; /* cut the string in two */
774             while (*p == ' ')
775                 p++;
776             if (increasing)
777                 strcat(p, " <");
778             else
779                 strcat(p, " >");
780             yaz_log(YLOG_LOG, "applying %s %s", spec, p);
781             ZOOM_query_sortby2(q, spec, p);
782         }
783     }
784     rs = ZOOM_connection_search(link, q);
785     ZOOM_query_destroy(q);
786     ZOOM_resultset_destroy(cl->resultset);
787     cl->resultset = rs;
788     connection_continue(co);
789 }
790
791 struct client *client_create(const char *id)
792 {
793     struct client *cl = xmalloc(sizeof(*cl));
794     cl->maxrecs = 100;
795     cl->startrecs = 0;
796     cl->pquery = 0;
797     cl->cqlquery = 0;
798     cl->database = 0;
799     cl->connection = 0;
800     cl->session = 0;
801     cl->hits = 0;
802     cl->record_offset = 0;
803     cl->diagnostic = 0;
804     cl->state = Client_Disconnected;
805     cl->show_raw = 0;
806     cl->resultset = 0;
807     cl->suggestions = 0;
808     cl->mutex = 0;
809     pazpar2_mutex_create(&cl->mutex, "client");
810     cl->preferred = 0;
811     cl->ref_count = 1;
812     assert(id);
813     cl->id = xstrdup(id);
814     client_use(1);
815     
816     return cl;
817 }
818
819 void client_lock(struct client *c)
820 {
821     yaz_mutex_enter(c->mutex);
822 }
823
824 void client_unlock(struct client *c)
825 {
826     yaz_mutex_leave(c->mutex);
827 }
828
829 void client_incref(struct client *c)
830 {
831     pazpar2_incref(&c->ref_count, c->mutex);
832     yaz_log(YLOG_DEBUG, "client_incref c=%p %s cnt=%d",
833             c, client_get_id(c), c->ref_count);
834 }
835
836 int client_destroy(struct client *c)
837 {
838     if (c)
839     {
840         yaz_log(YLOG_DEBUG, "client_destroy c=%p %s cnt=%d",
841                 c, client_get_id(c), c->ref_count);
842         if (!pazpar2_decref(&c->ref_count, c->mutex))
843         {
844             xfree(c->pquery);
845             c->pquery = 0;
846             xfree(c->cqlquery);
847             c->cqlquery = 0;
848             xfree(c->id);
849             assert(!c->connection);
850
851             if (c->resultset)
852             {
853                 ZOOM_resultset_destroy(c->resultset);
854             }
855             yaz_mutex_destroy(&c->mutex);
856             xfree(c);
857             client_use(-1);
858             return 1;
859         }
860     }
861     return 0;
862 }
863
864 void client_set_connection(struct client *cl, struct connection *con)
865 {
866     if (cl->resultset)
867         ZOOM_resultset_release(cl->resultset);
868     if (con)
869     {
870         assert(cl->connection == 0);
871         cl->connection = con;
872         client_incref(cl);
873     }
874     else
875     {
876         cl->connection = con;
877         client_destroy(cl);
878     }
879 }
880
881 void client_disconnect(struct client *cl)
882 {
883     if (cl->state != Client_Idle)
884         client_set_state(cl, Client_Disconnected);
885     client_set_connection(cl, 0);
886 }
887
888
889 // Initialize CCL map for a target
890 static CCL_bibset prepare_cclmap(struct client *cl)
891 {
892     struct session_database *sdb = client_get_database(cl);
893     struct setting *s;
894     CCL_bibset res;
895
896     if (!sdb->settings)
897         return 0;
898     res = ccl_qual_mk();
899     for (s = sdb->settings[PZ_CCLMAP]; s; s = s->next)
900     {
901         char *p = strchr(s->name + 3, ':');
902         if (!p)
903         {
904             yaz_log(YLOG_WARN, "Malformed cclmap name: %s", s->name);
905             ccl_qual_rm(&res);
906             return 0;
907         }
908         p++;
909         ccl_qual_fitem(res, s->value, p);
910     }
911     return res;
912 }
913
914 // returns a xmalloced CQL query corresponding to the pquery in client
915 static char *make_cqlquery(struct client *cl)
916 {
917     cql_transform_t cqlt = cql_transform_create();
918     Z_RPNQuery *zquery;
919     char *r;
920     WRBUF wrb = wrbuf_alloc();
921     int status;
922     ODR odr_out = odr_createmem(ODR_ENCODE);
923
924     zquery = p_query_rpn(odr_out, cl->pquery);
925     yaz_log(YLOG_LOG, "PQF: %s", cl->pquery);
926     if ((status = cql_transform_rpn2cql_wrbuf(cqlt, wrb, zquery)))
927     {
928         yaz_log(YLOG_WARN, "Failed to generate CQL query, code=%d", status);
929         r = 0;
930     }
931     else
932     {
933         r = xstrdup(wrbuf_cstr(wrb));
934     }     
935     wrbuf_destroy(wrb);
936     odr_destroy(odr_out);
937     cql_transform_close(cqlt);
938     return r;
939 }
940
941 // returns a xmalloced SOLR query corresponding to the pquery in client
942 // TODO Could prob. be merge with the similar make_cqlquery
943 static char *make_solrquery(struct client *cl)
944 {
945     solr_transform_t sqlt = solr_transform_create();
946     Z_RPNQuery *zquery;
947     char *r;
948     WRBUF wrb = wrbuf_alloc();
949     int status;
950     ODR odr_out = odr_createmem(ODR_ENCODE);
951
952     zquery = p_query_rpn(odr_out, cl->pquery);
953     if (zquery == 0) {
954         yaz_log(YLOG_WARN, "Failed to generate RPN from PQF: %s", cl->pquery);
955         return 0;
956     }
957     yaz_log(YLOG_LOG, "PQF: %s", cl->pquery);
958     if ((status = solr_transform_rpn2solr_wrbuf(sqlt, wrb, zquery)))
959     {
960         yaz_log(YLOG_WARN, "Failed to generate SOLR query from PQF %s, code=%d", cl->pquery, status);
961         r = 0;
962     }
963     else
964     {
965         r = xstrdup(wrbuf_cstr(wrb));
966     }
967     wrbuf_destroy(wrb);
968     odr_destroy(odr_out);
969     solr_transform_close(sqlt);
970     return r;
971 }
972
973 static void apply_limit(struct session_database *sdb,
974                         facet_limits_t facet_limits,
975                         WRBUF w_pqf, WRBUF w_ccl)
976 {
977     int i = 0;
978     const char *name;
979     const char *value;
980     NMEM nmem_tmp = nmem_create();
981     for (i = 0; (name = facet_limits_get(facet_limits, i, &value)); i++)
982     {
983         struct setting *s = 0;
984         
985         for (s = sdb->settings[PZ_LIMITMAP]; s; s = s->next)
986         {
987             const char *p = strchr(s->name + 3, ':');
988             if (p && !strcmp(p + 1, name) && s->value)
989             {
990                 char **values = 0;
991                 int i, num = 0;
992                 nmem_strsplit_escape2(nmem_tmp, "|", value, &values,
993                                       &num, 1, '\\', 1);
994
995                 if (!strncmp(s->value, "rpn:", 4))
996                 {
997                     const char *pqf = s->value + 4;
998
999                     wrbuf_puts(w_pqf, "@and ");
1000                     wrbuf_puts(w_pqf, pqf);
1001                     wrbuf_puts(w_pqf, " ");
1002                     for (i = 0; i < num; i++)
1003                     {
1004                         if (i < num - 1)
1005                             wrbuf_puts(w_pqf, "@or ");
1006                         yaz_encode_pqf_term(w_pqf, values[i],
1007                                             strlen(values[i]));
1008                     }
1009                 }
1010                 else if (!strncmp(s->value, "ccl:", 4))
1011                 {
1012                     const char *ccl = s->value + 4;
1013
1014                     wrbuf_puts(w_ccl, " and (");
1015
1016                     for (i = 0; i < num; i++)
1017                     {
1018                         if (i)
1019                             wrbuf_puts(w_ccl, " or ");
1020                         wrbuf_puts(w_ccl, ccl);
1021                         wrbuf_puts(w_ccl, "=\"");
1022                         wrbuf_puts(w_ccl, values[i]);
1023                         wrbuf_puts(w_ccl, "\"");
1024                     }
1025                     wrbuf_puts(w_ccl, ")");
1026
1027                 }
1028                 break;
1029             }
1030         }
1031         nmem_reset(nmem_tmp);
1032         if (!s)
1033         {
1034             yaz_log(YLOG_WARN, "Target %s: limit %s used, but no limitmap defined",
1035                     (sdb->database ? sdb->database->id : "<no id>"), name);
1036         }
1037     }
1038     nmem_destroy(nmem_tmp);
1039 }
1040                         
1041 // Parse the query given the settings specific to this client
1042 int client_parse_query(struct client *cl, const char *query,
1043                        facet_limits_t facet_limits)
1044 {
1045     struct session *se = client_get_session(cl);
1046     struct session_database *sdb = client_get_database(cl);
1047     struct ccl_rpn_node *cn;
1048     int cerror, cpos;
1049     CCL_bibset ccl_map = prepare_cclmap(cl);
1050     const char *sru = session_setting_oneval(sdb, PZ_SRU);
1051     const char *pqf_prefix = session_setting_oneval(sdb, PZ_PQF_PREFIX);
1052     const char *pqf_strftime = session_setting_oneval(sdb, PZ_PQF_STRFTIME);
1053     const char *query_syntax = session_setting_oneval(sdb, PZ_QUERY_SYNTAX);
1054     WRBUF w_ccl, w_pqf;
1055     if (!ccl_map)
1056         return -1;
1057
1058     cl->hits = -1;
1059     w_ccl = wrbuf_alloc();
1060     wrbuf_puts(w_ccl, query);
1061
1062     w_pqf = wrbuf_alloc();
1063     if (*pqf_prefix)
1064     {
1065         wrbuf_puts(w_pqf, pqf_prefix);
1066         wrbuf_puts(w_pqf, " ");
1067     }
1068
1069     apply_limit(sdb, facet_limits, w_pqf, w_ccl);
1070
1071     yaz_log(YLOG_LOG, "CCL query: %s", wrbuf_cstr(w_ccl));
1072     cn = ccl_find_str(ccl_map, wrbuf_cstr(w_ccl), &cerror, &cpos);
1073     ccl_qual_rm(&ccl_map);
1074     if (!cn)
1075     {
1076         client_set_state(cl, Client_Error);
1077         session_log(se, YLOG_WARN, "Failed to parse CCL query '%s' for %s",
1078                     wrbuf_cstr(w_ccl),
1079                     client_get_id(cl));
1080         wrbuf_destroy(w_ccl);
1081         wrbuf_destroy(w_pqf);
1082         return -1;
1083     }
1084     wrbuf_destroy(w_ccl);
1085
1086     if (!pqf_strftime || !*pqf_strftime)
1087         ccl_pquery(w_pqf, cn);
1088     else
1089     {
1090         time_t cur_time = time(0);
1091         struct tm *tm =  localtime(&cur_time);
1092         char tmp_str[300];
1093         const char *cp = tmp_str;
1094
1095         /* see man strftime(3) for things .. In particular %% gets converted
1096          to %.. And That's our original query .. */
1097         strftime(tmp_str, sizeof(tmp_str)-1, pqf_strftime, tm);
1098         for (; *cp; cp++)
1099         {
1100             if (cp[0] == '%')
1101                 ccl_pquery(w_pqf, cn);
1102             else
1103                 wrbuf_putc(w_pqf, cp[0]);
1104         }
1105     }
1106     xfree(cl->pquery);
1107     cl->pquery = xstrdup(wrbuf_cstr(w_pqf));
1108     wrbuf_destroy(w_pqf);
1109
1110     yaz_log(YLOG_LOG, "PQF query: %s", cl->pquery);
1111
1112     xfree(cl->cqlquery);
1113
1114     /* Support for PQF on SRU targets. */
1115     /* TODO Refactor */
1116     yaz_log(YLOG_DEBUG, "Query syntax: %s", query_syntax);
1117     if (strcmp(query_syntax, "pqf") != 0 && *sru)
1118     {
1119         if (!strcmp(sru, "solr")) {
1120             if (!(cl->cqlquery = make_solrquery(cl)))
1121                 return -1;
1122         }
1123         else {
1124             if (!(cl->cqlquery = make_cqlquery(cl)))
1125                 return -1;
1126         }
1127     }
1128     else
1129         cl->cqlquery = 0;
1130
1131     /* TODO FIX Not thread safe */
1132     if (!se->relevance)
1133     {
1134         // Initialize relevance structure with query terms
1135         se->relevance = relevance_create_ccl(
1136             se->service->charsets, se->nmem, cn);
1137     }
1138
1139     ccl_rpn_delete(cn);
1140     return 0;
1141 }
1142
1143 void client_set_session(struct client *cl, struct session *se)
1144 {
1145     cl->session = se;
1146 }
1147
1148 int client_is_active(struct client *cl)
1149 {
1150     if (cl->connection && (cl->state == Client_Connecting ||
1151                            cl->state == Client_Working))
1152         return 1;
1153     return 0;
1154 }
1155
1156 int client_is_active_preferred(struct client *cl)
1157 {
1158     /* only count if this is a preferred target. */
1159     if (!cl->preferred)
1160         return 0;
1161     /* TODO No sure this the condition that Seb wants */
1162     if (cl->connection && (cl->state == Client_Connecting ||
1163                            cl->state == Client_Working))
1164         return 1;
1165     return 0;
1166 }
1167
1168 Odr_int client_get_hits(struct client *cl)
1169 {
1170     return cl->hits;
1171 }
1172
1173 int client_get_num_records(struct client *cl)
1174 {
1175     return cl->record_offset;
1176 }
1177
1178 void client_set_diagnostic(struct client *cl, int diagnostic)
1179 {
1180     cl->diagnostic = diagnostic;
1181 }
1182
1183 int client_get_diagnostic(struct client *cl)
1184 {
1185     return cl->diagnostic;
1186 }
1187
1188 const char * client_get_suggestions_xml(struct client *cl, WRBUF wrbuf)
1189 {
1190     /* int idx; */
1191     struct suggestions *suggestions = cl->suggestions;
1192
1193     if (!suggestions) {
1194         //yaz_log(YLOG_DEBUG, "No suggestions found");
1195         return "";
1196     }
1197     if (suggestions->passthrough) {
1198         yaz_log(YLOG_DEBUG, "Passthrough Suggestions: \n%s\n", suggestions->passthrough);
1199         return suggestions->passthrough;
1200     }
1201     if (suggestions->num == 0) {
1202         return "";
1203     }
1204     /*
1205     for (idx = 0; idx < suggestions->num; idx++) {
1206         wrbuf_printf(wrbuf, "<suggest term=\"%s\"", suggestions->suggest[idx]);
1207         if (suggestions->misspelled[idx] && suggestions->misspelled[idx]) {
1208             wrbuf_puts(wrbuf, suggestions->misspelled[idx]);
1209             wrbuf_puts(wrbuf, "</suggest>\n");
1210         }
1211         else
1212             wrbuf_puts(wrbuf, "/>\n");
1213     }
1214     */
1215     return wrbuf_cstr(wrbuf);
1216 }
1217
1218
1219 void client_set_database(struct client *cl, struct session_database *db)
1220 {
1221     cl->database = db;
1222 }
1223
1224 const char *client_get_id(struct client *cl)
1225 {
1226     return cl->id;
1227 }
1228
1229 void client_set_maxrecs(struct client *cl, int v)
1230 {
1231     cl->maxrecs = v;
1232 }
1233
1234 int client_get_maxrecs(struct client *cl)
1235 {
1236     return cl->maxrecs;
1237 }
1238
1239 void client_set_startrecs(struct client *cl, int v)
1240 {
1241     cl->startrecs = v;
1242 }
1243
1244 void client_set_preferred(struct client *cl, int v)
1245 {
1246     cl->preferred = v;
1247 }
1248
1249
1250 struct suggestions* client_suggestions_create(const char* suggestions_string)
1251 {
1252     int i;
1253     NMEM nmem;
1254     struct suggestions *suggestions;
1255     if (suggestions_string == 0)
1256         return 0;
1257     nmem = nmem_create();
1258     suggestions = nmem_malloc(nmem, sizeof(*suggestions));
1259     yaz_log(YLOG_DEBUG, "client target suggestions: %s", suggestions_string);
1260
1261     suggestions->nmem = nmem;
1262     suggestions->num = 0;
1263     suggestions->misspelled = 0;
1264     suggestions->suggest = 0;
1265     suggestions->passthrough = nmem_strdup_null(nmem, suggestions_string);
1266
1267     if (suggestions_string)
1268         nmem_strsplit_escape2(suggestions->nmem, "\n", suggestions_string, &suggestions->suggest,
1269                               &suggestions->num, 1, '\\', 0);
1270     /* Set up misspelled array */
1271     suggestions->misspelled = (char **) nmem_malloc(nmem, suggestions->num * sizeof(**suggestions->misspelled));
1272     /* replace = with \0 .. for each item */
1273     for (i = 0; i < suggestions->num; i++)
1274     {
1275         char *cp = strchr(suggestions->suggest[i], '=');
1276         if (cp) {
1277             *cp = '\0';
1278             suggestions->misspelled[i] = cp+1;
1279         }
1280     }
1281     return suggestions;
1282 }
1283
1284 static void client_suggestions_destroy(struct client *cl)
1285 {
1286     NMEM nmem = cl->suggestions->nmem;
1287     cl->suggestions = 0;
1288     nmem_destroy(nmem);
1289 }
1290
1291 /*
1292  * Local variables:
1293  * c-basic-offset: 4
1294  * c-file-style: "Stroustrup"
1295  * indent-tabs-mode: nil
1296  * End:
1297  * vim: shiftwidth=4 tabstop=8 expandtab
1298  */
1299