Implementation of a block on preferred targets
[pazpar2-moved-to-github.git] / src / client.c
1 /* This file is part of Pazpar2.
2    Copyright (C) 2006-2010 Index Data
3
4 Pazpar2 is free software; you can redistribute it and/or modify it under
5 the terms of the GNU General Public License as published by the Free
6 Software Foundation; either version 2, or (at your option) any later
7 version.
8
9 Pazpar2 is distributed in the hope that it will be useful, but WITHOUT ANY
10 WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12 for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
17
18 */
19
20 /** \file client.c
21     \brief Z39.50 client 
22 */
23
24 #if HAVE_CONFIG_H
25 #include <config.h>
26 #endif
27 #include <stdlib.h>
28 #include <stdio.h>
29 #include <string.h>
30 #if HAVE_SYS_TIME_H
31 #include <sys/time.h>
32 #endif
33 #if HAVE_UNISTD_H
34 #include <unistd.h>
35 #endif
36 #include <signal.h>
37 #include <assert.h>
38
39 #include <yaz/marcdisp.h>
40 #include <yaz/comstack.h>
41 #include <yaz/tcpip.h>
42 #include <yaz/proto.h>
43 #include <yaz/readconf.h>
44 #include <yaz/pquery.h>
45 #include <yaz/otherinfo.h>
46 #include <yaz/yaz-util.h>
47 #include <yaz/nmem.h>
48 #include <yaz/query-charset.h>
49 #include <yaz/querytowrbuf.h>
50 #include <yaz/oid_db.h>
51 #include <yaz/diagbib1.h>
52 #include <yaz/snprintf.h>
53 #include <yaz/rpn2cql.h>
54 #include <yaz/rpn2solr.h>
55
56 #define USE_TIMING 0
57 #if USE_TIMING
58 #include <yaz/timing.h>
59 #endif
60
61 #include "ppmutex.h"
62 #include "session.h"
63 #include "parameters.h"
64 #include "client.h"
65 #include "connection.h"
66 #include "settings.h"
67 #include "relevance.h"
68 #include "incref.h"
69
70 /* client counting (1) , disable client counting (0) */
71 #if 1
72 static YAZ_MUTEX g_mutex = 0;
73 static int no_clients = 0;
74
75 static void client_use(int delta)
76 {
77     if (!g_mutex)
78         yaz_mutex_create(&g_mutex);
79     yaz_mutex_enter(g_mutex);
80     no_clients += delta;
81     yaz_mutex_leave(g_mutex);
82     yaz_log(YLOG_DEBUG, "%s clients=%d", delta > 0 ? "INC" : "DEC", no_clients);
83 }
84 #else
85 #define client_use(x)
86 #endif
87
88 /** \brief Represents client state for a connection to one search target */
89 struct client {
90     struct session_database *database;
91     struct connection *connection;
92     struct session *session;
93     char *pquery; // Current search
94     char *cqlquery; // used for SRU targets only
95     Odr_int hits;
96     int record_offset;
97     int maxrecs;
98     int startrecs;
99     int diagnostic;
100     int preferred;
101     enum client_state state;
102     struct show_raw *show_raw;
103     ZOOM_resultset resultset;
104     YAZ_MUTEX mutex;
105     int ref_count;
106 };
107
108 struct show_raw {
109     int active; // whether this request has been sent to the server
110     int position;
111     int binary;
112     char *syntax;
113     char *esn;
114     void (*error_handler)(void *data, const char *addinfo);
115     void (*record_handler)(void *data, const char *buf, size_t sz);
116     void *data;
117     struct show_raw *next;
118 };
119
120 static const char *client_states[] = {
121     "Client_Connecting",
122     "Client_Idle",
123     "Client_Working",
124     "Client_Error",
125     "Client_Failed",
126     "Client_Disconnected"
127 };
128
129 const char *client_get_state_str(struct client *cl)
130 {
131     return client_states[cl->state];
132 }
133
134 enum client_state client_get_state(struct client *cl)
135 {
136     return cl->state;
137 }
138
139 void client_set_state(struct client *cl, enum client_state st)
140 {
141     int was_active = 0;
142     if (client_is_active(cl))
143         was_active = 1;
144     cl->state = st;
145     /* If client is going from being active to inactive and all clients
146        are now idle we fire a watch for the session . The assumption is
147        that session is not mutex locked if client is already active */
148     if (was_active && !client_is_active(cl) && cl->session)
149     {
150         int no_active = session_active_clients(cl->session);
151         if (no_active == 0) {
152             session_alert_watch(cl->session, SESSION_WATCH_SHOW);
153             session_alert_watch(cl->session, SESSION_WATCH_SHOW_PREF);
154         }
155     }
156 }
157
158 static void client_show_raw_error(struct client *cl, const char *addinfo);
159
160 struct connection *client_get_connection(struct client *cl)
161 {
162     return cl->connection;
163 }
164
165 struct session_database *client_get_database(struct client *cl)
166 {
167     return cl->database;
168 }
169
170 struct session *client_get_session(struct client *cl)
171 {
172     return cl->session;
173 }
174
175 const char *client_get_pquery(struct client *cl)
176 {
177     return cl->pquery;
178 }
179
180 static void client_send_raw_present(struct client *cl);
181 static int nativesyntax_to_type(struct session_database *sdb, char *type,
182                                 ZOOM_record rec);
183
184 static void client_show_immediate(
185     ZOOM_resultset resultset, struct session_database *sdb, int position,
186     void *data,
187     void (*error_handler)(void *data, const char *addinfo),
188     void (*record_handler)(void *data, const char *buf, size_t sz),
189     int binary)
190 {
191     ZOOM_record rec = 0;
192     char type[80];
193     const char *buf;
194     int len;
195
196     if (!resultset)
197     {
198         error_handler(data, "no resultset");
199         return;
200     }
201     rec = ZOOM_resultset_record(resultset, position-1);
202     if (!rec)
203     {
204         error_handler(data, "no record");
205         return;
206     }
207     if (binary)
208         strcpy(type, "raw");
209     else
210         nativesyntax_to_type(sdb, type, rec);
211     buf = ZOOM_record_get(rec, type, &len);
212     if (!buf)
213     {
214         error_handler(data, "no record");
215         return;
216     }
217     record_handler(data, buf, len);
218 }
219
220
221 int client_show_raw_begin(struct client *cl, int position,
222                           const char *syntax, const char *esn,
223                           void *data,
224                           void (*error_handler)(void *data, const char *addinfo),
225                           void (*record_handler)(void *data, const char *buf,
226                                                  size_t sz),
227                           int binary)
228 {
229     if (syntax == 0 && esn == 0)
230         client_show_immediate(cl->resultset, client_get_database(cl),
231                               position, data,
232                               error_handler, record_handler,
233                               binary);
234     else
235     {
236         struct show_raw *rr, **rrp;
237
238         if (!cl->connection)
239             return -1;
240     
241
242         rr = xmalloc(sizeof(*rr));
243         rr->position = position;
244         rr->active = 0;
245         rr->data = data;
246         rr->error_handler = error_handler;
247         rr->record_handler = record_handler;
248         rr->binary = binary;
249         if (syntax)
250             rr->syntax = xstrdup(syntax);
251         else
252             rr->syntax = 0;
253         if (esn)
254             rr->esn = xstrdup(esn);
255         else
256             rr->esn = 0;
257         rr->next = 0;
258         
259         for (rrp = &cl->show_raw; *rrp; rrp = &(*rrp)->next)
260             ;
261         *rrp = rr;
262         
263         if (cl->state == Client_Failed)
264         {
265             client_show_raw_error(cl, "client failed");
266         }
267         else if (cl->state == Client_Disconnected)
268         {
269             client_show_raw_error(cl, "client disconnected");
270         }
271         else
272         {
273             client_send_raw_present(cl);
274         }
275     }
276     return 0;
277 }
278
279 static void client_show_raw_delete(struct show_raw *r)
280 {
281     xfree(r->syntax);
282     xfree(r->esn);
283     xfree(r);
284 }
285
286 void client_show_raw_remove(struct client *cl, void *data)
287 {
288     struct show_raw *rr = data;
289     struct show_raw **rrp = &cl->show_raw;
290     while (*rrp != rr)
291         rrp = &(*rrp)->next;
292     if (*rrp)
293     {
294         *rrp = rr->next;
295         client_show_raw_delete(rr);
296     }
297 }
298
299 void client_show_raw_dequeue(struct client *cl)
300 {
301     struct show_raw *rr = cl->show_raw;
302
303     cl->show_raw = rr->next;
304     client_show_raw_delete(rr);
305 }
306
307 static void client_show_raw_error(struct client *cl, const char *addinfo)
308 {
309     while (cl->show_raw)
310     {
311         cl->show_raw->error_handler(cl->show_raw->data, addinfo);
312         client_show_raw_dequeue(cl);
313     }
314 }
315
316 static void client_send_raw_present(struct client *cl)
317 {
318     struct session_database *sdb = client_get_database(cl);
319     struct connection *co = client_get_connection(cl);
320     ZOOM_resultset set = cl->resultset;
321
322     int offset = cl->show_raw->position;
323     const char *syntax = 0;
324     const char *elements = 0;
325
326     assert(cl->show_raw);
327     assert(set);
328
329     yaz_log(YLOG_DEBUG, "%s: trying to present %d record(s) from %d",
330             client_get_url(cl), 1, offset);
331
332     if (cl->show_raw->syntax)
333         syntax = cl->show_raw->syntax;
334     else
335         syntax = session_setting_oneval(sdb, PZ_REQUESTSYNTAX);
336     ZOOM_resultset_option_set(set, "preferredRecordSyntax", syntax);
337
338     if (cl->show_raw->esn)
339         elements = cl->show_raw->esn;
340     else
341         elements = session_setting_oneval(sdb, PZ_ELEMENTS);
342     if (elements && *elements)
343         ZOOM_resultset_option_set(set, "elementSetName", elements);
344
345     ZOOM_resultset_records(set, 0, offset-1, 1);
346     cl->show_raw->active = 1;
347
348     connection_continue(co);
349 }
350
351 static int nativesyntax_to_type(struct session_database *sdb, char *type,
352                                 ZOOM_record rec)
353 {
354     const char *s = session_setting_oneval(sdb, PZ_NATIVESYNTAX);
355
356     if (s && *s)
357     {
358         if (!strncmp(s, "iso2709", 7))
359         {
360             const char *cp = strchr(s, ';');
361             yaz_snprintf(type, 80, "xml; charset=%s", cp ? cp+1 : "marc-8s");
362         }
363         else if (!strncmp(s, "xml", 3))
364         {
365             strcpy(type, "xml");
366         }
367         else if (!strncmp(s, "txml", 4))
368         {
369             const char *cp = strchr(s, ';');
370             yaz_snprintf(type, 80, "txml; charset=%s", cp ? cp+1 : "marc-8s");
371         }
372         else
373             return -1;
374         return 0;
375     }
376     else  /* attempt to deduce structure */
377     {
378         const char *syntax = ZOOM_record_get(rec, "syntax", NULL);
379         if (syntax)
380         {
381             if (!strcmp(syntax, "XML"))
382             {
383                 strcpy(type, "xml");
384                 return 0;
385             }
386             else if (!strcmp(syntax, "TXML"))
387                 {
388                     strcpy(type, "txml");
389                     return 0;
390                 }
391             else if (!strcmp(syntax, "USmarc") || !strcmp(syntax, "MARC21"))
392             {
393                 strcpy(type, "xml; charset=marc8-s");
394                 return 0;
395             }
396             else return -1;
397         }
398         else return -1;
399     }
400 }
401
402 /**
403  * TODO Consider thread safety!!!
404  *
405  */
406 int client_report_facets(struct client *cl, ZOOM_resultset rs) {
407     int facet_idx;
408     ZOOM_facet_field *facets = ZOOM_resultset_facets(rs);
409     int facet_num;
410     struct session *se = client_get_session(cl);
411     facet_num = ZOOM_resultset_facets_size(rs);
412     yaz_log(YLOG_DEBUG, "client_report_facets: %d", facet_num);
413
414     for (facet_idx = 0; facet_idx < facet_num; facet_idx++) {
415         const char *name = ZOOM_facet_field_name(facets[facet_idx]);
416         size_t term_idx;
417         size_t term_num = ZOOM_facet_field_term_count(facets[facet_idx]);
418         for (term_idx = 0; term_idx < term_num; term_idx++ ) {
419             int freq;
420             const char *term = ZOOM_facet_field_get_term(facets[facet_idx], term_idx, &freq);
421             if (term)
422                 add_facet(se, name, term, freq);
423         }
424     }
425
426     return 0;
427 }
428
429 static void ingest_raw_record(struct client *cl, ZOOM_record rec)
430 {
431     const char *buf;
432     int len;
433     char type[80];
434
435     if (cl->show_raw->binary)
436         strcpy(type, "raw");
437     else
438     {
439         struct session_database *sdb = client_get_database(cl);
440         nativesyntax_to_type(sdb, type, rec);
441     }
442
443     buf = ZOOM_record_get(rec, type, &len);
444     cl->show_raw->record_handler(cl->show_raw->data,  buf, len);
445     client_show_raw_dequeue(cl);
446 }
447
448 static void client_check_preferred_watch(struct client *cl)
449 {
450     struct session *se = cl->session;
451     if (se)
452     {
453         client_unlock(cl);
454         if (session_preferred_clients_ready(se))
455             session_alert_watch(se, SESSION_WATCH_SHOW_PREF);
456         client_lock(cl);
457     }
458 }
459
460 void client_search_response(struct client *cl)
461 {
462     struct connection *co = cl->connection;
463     struct session *se = cl->session;
464     ZOOM_connection link = connection_get_link(co);
465     ZOOM_resultset resultset = cl->resultset;
466
467     const char *error, *addinfo = 0;
468     
469     if (ZOOM_connection_error(link, &error, &addinfo))
470     {
471         cl->hits = 0;
472         client_set_state(cl, Client_Error);
473         yaz_log(YLOG_WARN, "Search error %s (%s): %s",
474                 error, addinfo, client_get_url(cl));
475     }
476     else
477     {
478         client_report_facets(cl, resultset);
479         cl->record_offset = cl->startrecs;
480         cl->hits = ZOOM_resultset_size(resultset);
481         if (se)
482             se->total_hits += cl->hits;
483         if (cl->preferred)
484             client_check_preferred_watch(cl);
485     }
486 }
487
488 void client_got_records(struct client *cl)
489 {
490     struct session *se = cl->session;
491     if (se)
492     {
493         client_unlock(cl);
494         session_alert_watch(se, SESSION_WATCH_SHOW);
495         session_alert_watch(se, SESSION_WATCH_RECORD);
496         client_lock(cl);
497     }
498 }
499
500 void client_record_response(struct client *cl)
501 {
502     struct connection *co = cl->connection;
503     ZOOM_connection link = connection_get_link(co);
504     ZOOM_resultset resultset = cl->resultset;
505     const char *error, *addinfo;
506
507     if (ZOOM_connection_error(link, &error, &addinfo))
508     {
509         client_set_state(cl, Client_Error);
510         yaz_log(YLOG_WARN, "Search error %s (%s): %s",
511             error, addinfo, client_get_url(cl));
512     }
513     else
514     {
515         ZOOM_record rec = 0;
516         const char *msg, *addinfo;
517         
518         if (cl->show_raw && cl->show_raw->active)
519         {
520             if ((rec = ZOOM_resultset_record(resultset,
521                                              cl->show_raw->position-1)))
522             {
523                 cl->show_raw->active = 0;
524                 ingest_raw_record(cl, rec);
525             }
526             else
527             {
528                 yaz_log(YLOG_WARN, "Expected record, but got NULL, offset=%d",
529                         cl->show_raw->position-1);
530             }
531         }
532         else
533         {
534             int offset = cl->record_offset;
535             if ((rec = ZOOM_resultset_record(resultset, offset)))
536             {
537                 cl->record_offset++;
538                 if (cl->session == 0)
539                     ;
540                 else if (ZOOM_record_error(rec, &msg, &addinfo, 0))
541                 {
542                     yaz_log(YLOG_WARN, "Record error %s (%s): %s (rec #%d)",
543                             msg, addinfo, client_get_url(cl),
544                             cl->record_offset);
545                 }
546                 else
547                 {
548                     struct session_database *sdb = client_get_database(cl);
549                     NMEM nmem = nmem_create();
550                     const char *xmlrec;
551                     char type[80];
552
553                     if (nativesyntax_to_type(sdb, type, rec))
554                         yaz_log(YLOG_WARN, "Failed to determine record type");
555                     xmlrec = ZOOM_record_get(rec, type, NULL);
556                     if (!xmlrec)
557                         yaz_log(YLOG_WARN, "ZOOM_record_get failed from %s",
558                                 client_get_url(cl));
559                     else
560                     {
561                         if (ingest_record(cl, xmlrec, cl->record_offset, nmem))
562                             yaz_log(YLOG_WARN, "Failed to ingest from %s",
563                                     client_get_url(cl));
564                     }
565                     nmem_destroy(nmem);
566                 }
567             }
568             else
569             {
570                 yaz_log(YLOG_WARN, "Expected record, but got NULL, offset=%d",
571                         offset);
572             }
573         }
574     }
575 }
576
577 static int client_set_facets_request(struct client *cl, ZOOM_connection link)
578 {
579     struct session_database *sdb = client_get_database(cl);
580     const char *opt_facet_term_sort  = session_setting_oneval(sdb, PZ_TERMLIST_TERM_SORT);
581     const char *opt_facet_term_count = session_setting_oneval(sdb, PZ_TERMLIST_TERM_COUNT);
582     /* Disable when no count is set */
583     /* TODO Verify: Do we need to reset the  ZOOM facets if a ZOOM Connection is being reused??? */
584     if (opt_facet_term_count && *opt_facet_term_count)
585     {
586         int index = 0;
587         struct session *session = client_get_session(cl);
588         struct conf_service *service = session->service;
589         int num = service->num_metadata;
590         WRBUF wrbuf = wrbuf_alloc();
591         yaz_log(YLOG_DEBUG, "Facet settings, sort: %s count: %s",
592                 opt_facet_term_sort, opt_facet_term_count);
593         for (index = 0; index < num; index++)
594         {
595             struct conf_metadata *conf_meta = &service->metadata[index];
596             if (conf_meta->termlist)
597             {
598                 if (wrbuf_len(wrbuf))
599                     wrbuf_puts(wrbuf, ", ");
600                 wrbuf_printf(wrbuf, "@attr 1=%s", conf_meta->name);
601                 
602                 if (opt_facet_term_sort && *opt_facet_term_sort)
603                     wrbuf_printf(wrbuf, " @attr 2=%s", opt_facet_term_sort);
604                 wrbuf_printf(wrbuf, " @attr 3=%s", opt_facet_term_count);
605             }
606         }
607         if (wrbuf_len(wrbuf))
608         {
609             yaz_log(YLOG_LOG, "Setting ZOOM facets option: %s", wrbuf_cstr(wrbuf));
610             ZOOM_connection_option_set(link, "facets", wrbuf_cstr(wrbuf));
611             return 1;
612         }
613     }
614     return 0;
615 }
616
617 int client_has_facet(struct client *cl, const char *name) {
618     ZOOM_facet_field facet_field;
619     if (!cl || !cl->resultset || !name) {
620         yaz_log(YLOG_DEBUG, "client has facet: Missing %p %p %s", cl, (cl ? cl->resultset: 0), name);
621         return 0;
622     }
623     facet_field = ZOOM_resultset_get_facet_field(cl->resultset, name);
624     if (facet_field) {
625         yaz_log(YLOG_DEBUG, "client: has facets for %s", name);
626         return 1;
627     }
628     yaz_log(YLOG_DEBUG, "client: No facets for %s", name);
629     return 0;
630 }
631
632
633 void client_start_search(struct client *cl)
634 {
635     struct session_database *sdb = client_get_database(cl);
636     struct connection *co = client_get_connection(cl);
637     ZOOM_connection link = connection_get_link(co);
638     ZOOM_resultset rs;
639     char *databaseName = sdb->database->databases[0];
640     const char *opt_piggyback   = session_setting_oneval(sdb, PZ_PIGGYBACK);
641     const char *opt_queryenc    = session_setting_oneval(sdb, PZ_QUERYENCODING);
642     const char *opt_elements    = session_setting_oneval(sdb, PZ_ELEMENTS);
643     const char *opt_requestsyn  = session_setting_oneval(sdb, PZ_REQUESTSYNTAX);
644     const char *opt_maxrecs     = session_setting_oneval(sdb, PZ_MAXRECS);
645     const char *opt_sru         = session_setting_oneval(sdb, PZ_SRU);
646     const char *opt_sort        = session_setting_oneval(sdb, PZ_SORT);
647     const char *opt_preferred   = session_setting_oneval(sdb, PZ_PREFERRED);
648     char maxrecs_str[24], startrecs_str[24];
649
650     assert(link);
651
652     cl->hits = -1;
653     cl->record_offset = 0;
654     cl->diagnostic = 0;
655
656     if (opt_preferred) {
657         cl->preferred = atoi(opt_preferred);
658         yaz_log(YLOG_LOG, "Target %s has preferred: %d", sdb->database->url, cl->preferred);
659     }
660     client_set_state(cl, Client_Working);
661
662     if (*opt_piggyback)
663         ZOOM_connection_option_set(link, "piggyback", opt_piggyback);
664     else
665         ZOOM_connection_option_set(link, "piggyback", "1");
666     if (*opt_queryenc)
667         ZOOM_connection_option_set(link, "rpnCharset", opt_queryenc);
668     if (*opt_sru && *opt_elements)
669         ZOOM_connection_option_set(link, "schema", opt_elements);
670     else if (*opt_elements)
671         ZOOM_connection_option_set(link, "elementSetName", opt_elements);
672     if (*opt_requestsyn)
673         ZOOM_connection_option_set(link, "preferredRecordSyntax", opt_requestsyn);
674
675     if (!*opt_maxrecs)
676     {
677         sprintf(maxrecs_str, "%d", cl->maxrecs);
678         opt_maxrecs = maxrecs_str;
679     }
680     ZOOM_connection_option_set(link, "count", opt_maxrecs);
681
682
683     if (atoi(opt_maxrecs) > 20)
684         ZOOM_connection_option_set(link, "presentChunk", "20");
685     else
686         ZOOM_connection_option_set(link, "presentChunk", opt_maxrecs);
687
688     sprintf(startrecs_str, "%d", cl->startrecs);
689     ZOOM_connection_option_set(link, "start", startrecs_str);
690
691     if (databaseName)
692         ZOOM_connection_option_set(link, "databaseName", databaseName);
693
694     /* TODO Verify does it break something for CQL targets(non-SOLR) ? */
695     /* facets definition is in PQF */
696     client_set_facets_request(cl, link);
697
698     if (cl->cqlquery)
699     {
700         ZOOM_query q = ZOOM_query_create();
701         yaz_log(YLOG_LOG, "Search %s CQL: %s", sdb->database->url, cl->cqlquery);
702         ZOOM_query_cql(q, cl->cqlquery);
703         if (*opt_sort)
704             ZOOM_query_sortby(q, opt_sort);
705         rs = ZOOM_connection_search(link, q);
706         ZOOM_query_destroy(q);
707     }
708     else
709     {
710         yaz_log(YLOG_LOG, "Search %s PQF: %s", sdb->database->url, cl->pquery);
711         rs = ZOOM_connection_search_pqf(link, cl->pquery);
712     }
713     ZOOM_resultset_destroy(cl->resultset);
714     cl->resultset = rs;
715     connection_continue(co);
716 }
717
718 struct client *client_create(void)
719 {
720     struct client *r = xmalloc(sizeof(*r));
721     r->maxrecs = 100;
722     r->startrecs = 0;
723     r->pquery = 0;
724     r->cqlquery = 0;
725     r->database = 0;
726     r->connection = 0;
727     r->session = 0;
728     r->hits = 0;
729     r->record_offset = 0;
730     r->diagnostic = 0;
731     r->state = Client_Disconnected;
732     r->show_raw = 0;
733     r->resultset = 0;
734     r->mutex = 0;
735     pazpar2_mutex_create(&r->mutex, "client");
736     r->preferred = 0;
737     r->ref_count = 1;
738     client_use(1);
739     
740     return r;
741 }
742
743 void client_lock(struct client *c)
744 {
745     yaz_mutex_enter(c->mutex);
746 }
747
748 void client_unlock(struct client *c)
749 {
750     yaz_mutex_leave(c->mutex);
751 }
752
753 void client_incref(struct client *c)
754 {
755     pazpar2_incref(&c->ref_count, c->mutex);
756     yaz_log(YLOG_DEBUG, "client_incref c=%p %s cnt=%d",
757             c, client_get_url(c), c->ref_count);
758 }
759
760 int client_destroy(struct client *c)
761 {
762     if (c)
763     {
764         yaz_log(YLOG_DEBUG, "client_destroy c=%p %s cnt=%d",
765                 c, client_get_url(c), c->ref_count);
766         if (!pazpar2_decref(&c->ref_count, c->mutex))
767         {
768             xfree(c->pquery);
769             c->pquery = 0;
770             xfree(c->cqlquery);
771             c->cqlquery = 0;
772             assert(!c->connection);
773
774             if (c->resultset)
775             {
776                 ZOOM_resultset_destroy(c->resultset);
777             }
778             yaz_mutex_destroy(&c->mutex);
779             xfree(c);
780             client_use(-1);
781             return 1;
782         }
783     }
784     return 0;
785 }
786
787 void client_set_connection(struct client *cl, struct connection *con)
788 {
789     if (cl->resultset)
790         ZOOM_resultset_release(cl->resultset);
791     if (con)
792     {
793         assert(cl->connection == 0);
794         cl->connection = con;
795         client_incref(cl);
796     }
797     else
798     {
799         cl->connection = con;
800         client_destroy(cl);
801     }
802 }
803
804 void client_disconnect(struct client *cl)
805 {
806     if (cl->state != Client_Idle)
807         client_set_state(cl, Client_Disconnected);
808     client_set_connection(cl, 0);
809 }
810
811 // Extract terms from query into null-terminated termlist
812 static void extract_terms(NMEM nmem, struct ccl_rpn_node *query, char **termlist)
813 {
814     int num = 0;
815
816     pull_terms(nmem, query, termlist, &num);
817     termlist[num] = 0;
818 }
819
820 // Initialize CCL map for a target
821 static CCL_bibset prepare_cclmap(struct client *cl)
822 {
823     struct session_database *sdb = client_get_database(cl);
824     struct setting *s;
825     CCL_bibset res;
826
827     if (!sdb->settings)
828         return 0;
829     res = ccl_qual_mk();
830     for (s = sdb->settings[PZ_CCLMAP]; s; s = s->next)
831     {
832         char *p = strchr(s->name + 3, ':');
833         if (!p)
834         {
835             yaz_log(YLOG_WARN, "Malformed cclmap name: %s", s->name);
836             ccl_qual_rm(&res);
837             return 0;
838         }
839         p++;
840         ccl_qual_fitem(res, s->value, p);
841     }
842     return res;
843 }
844
845 // returns a xmalloced CQL query corresponding to the pquery in client
846 static char *make_cqlquery(struct client *cl)
847 {
848     cql_transform_t cqlt = cql_transform_create();
849     Z_RPNQuery *zquery;
850     char *r;
851     WRBUF wrb = wrbuf_alloc();
852     int status;
853     ODR odr_out = odr_createmem(ODR_ENCODE);
854
855     zquery = p_query_rpn(odr_out, cl->pquery);
856     yaz_log(YLOG_LOG, "PQF: %s", cl->pquery);
857     if ((status = cql_transform_rpn2cql_wrbuf(cqlt, wrb, zquery)))
858     {
859         yaz_log(YLOG_WARN, "Failed to generate CQL query, code=%d", status);
860         r = 0;
861     }
862     else
863     {
864         r = xstrdup(wrbuf_cstr(wrb));
865     }     
866     wrbuf_destroy(wrb);
867     odr_destroy(odr_out);
868     cql_transform_close(cqlt);
869     return r;
870 }
871
872 // returns a xmalloced SOLR query corresponding to the pquery in client
873 // TODO Could prob. be merge with the similar make_cqlquery
874 static char *make_solrquery(struct client *cl)
875 {
876     solr_transform_t sqlt = solr_transform_create();
877     Z_RPNQuery *zquery;
878     char *r;
879     WRBUF wrb = wrbuf_alloc();
880     int status;
881     ODR odr_out = odr_createmem(ODR_ENCODE);
882
883     zquery = p_query_rpn(odr_out, cl->pquery);
884     yaz_log(YLOG_LOG, "PQF: %s", cl->pquery);
885     if ((status = solr_transform_rpn2solr_wrbuf(sqlt, wrb, zquery)))
886     {
887         yaz_log(YLOG_WARN, "Failed to generate SOLR query, code=%d", status);
888         r = 0;
889     }
890     else
891     {
892         r = xstrdup(wrbuf_cstr(wrb));
893     }
894     wrbuf_destroy(wrb);
895     odr_destroy(odr_out);
896     solr_transform_close(sqlt);
897     return r;
898 }
899
900 // Parse the query given the settings specific to this client
901 int client_parse_query(struct client *cl, const char *query)
902 {
903     struct session *se = client_get_session(cl);
904     struct session_database *sdb = client_get_database(cl);
905     struct ccl_rpn_node *cn;
906     int cerror, cpos;
907     CCL_bibset ccl_map = prepare_cclmap(cl);
908     const char *sru = session_setting_oneval(sdb, PZ_SRU);
909     const char *pqf_prefix = session_setting_oneval(sdb, PZ_PQF_PREFIX);
910     const char *pqf_strftime = session_setting_oneval(sdb, PZ_PQF_STRFTIME);
911
912     if (!ccl_map)
913         return -1;
914
915     cn = ccl_find_str(ccl_map, query, &cerror, &cpos);
916     ccl_qual_rm(&ccl_map);
917     if (!cn)
918     {
919         client_set_state(cl, Client_Error);
920         yaz_log(YLOG_WARN, "Failed to parse CCL query %s for %s",
921                 query,
922                 client_get_database(cl)->database->url);
923         return -1;
924     }
925     wrbuf_rewind(se->wrbuf);
926     if (*pqf_prefix)
927     {
928         wrbuf_puts(se->wrbuf, pqf_prefix);
929         wrbuf_puts(se->wrbuf, " ");
930     }
931     if (!pqf_strftime || !*pqf_strftime)
932         ccl_pquery(se->wrbuf, cn);
933     else
934     {
935         time_t cur_time = time(0);
936         struct tm *tm =  localtime(&cur_time);
937         char tmp_str[300];
938         const char *cp = tmp_str;
939
940         /* see man strftime(3) for things .. In particular %% gets converted
941          to %.. And That's our original query .. */
942         strftime(tmp_str, sizeof(tmp_str)-1, pqf_strftime, tm);
943         for (; *cp; cp++)
944         {
945             if (cp[0] == '%')
946                 ccl_pquery(se->wrbuf, cn);
947             else
948                 wrbuf_putc(se->wrbuf, cp[0]);
949         }
950     }
951     xfree(cl->pquery);
952     cl->pquery = xstrdup(wrbuf_cstr(se->wrbuf));
953
954     xfree(cl->cqlquery);
955     if (*sru)
956     {
957         if (!strcmp(sru, "solr")) {
958             if (!(cl->cqlquery = make_solrquery(cl)))
959                 return -1;
960         }
961         else {
962             if (!(cl->cqlquery = make_cqlquery(cl)))
963                 return -1;
964         }
965     }
966     else
967         cl->cqlquery = 0;
968
969     /* TODO FIX Not thread safe */
970     if (!se->relevance)
971     {
972         // Initialize relevance structure with query terms
973         char *p[512];
974         extract_terms(se->nmem, cn, p);
975         se->relevance = relevance_create(
976             se->service->relevance_pct,
977             se->nmem, (const char **) p);
978     }
979
980     ccl_rpn_delete(cn);
981     return 0;
982 }
983
984 void client_set_session(struct client *cl, struct session *se)
985 {
986     cl->session = se;
987 }
988
989 int client_is_active(struct client *cl)
990 {
991     if (cl->connection && (cl->state == Client_Connecting ||
992                            cl->state == Client_Working))
993         return 1;
994     return 0;
995 }
996
997 int client_is_active_preferred(struct client *cl)
998 {
999     /* only count if this is a preferred target. */
1000     if (!cl->preferred)
1001         return 0;
1002     /* TODO No sure this the condition that Seb wants */
1003     if (cl->connection && (cl->state == Client_Connecting ||
1004                            cl->state == Client_Working))
1005         return 1;
1006     return 0;
1007 }
1008
1009
1010 Odr_int client_get_hits(struct client *cl)
1011 {
1012     return cl->hits;
1013 }
1014
1015 int client_get_num_records(struct client *cl)
1016 {
1017     return cl->record_offset;
1018 }
1019
1020 void client_set_diagnostic(struct client *cl, int diagnostic)
1021 {
1022     cl->diagnostic = diagnostic;
1023 }
1024
1025 int client_get_diagnostic(struct client *cl)
1026 {
1027     return cl->diagnostic;
1028 }
1029
1030 void client_set_database(struct client *cl, struct session_database *db)
1031 {
1032     cl->database = db;
1033 }
1034
1035 struct host *client_get_host(struct client *cl)
1036 {
1037     return client_get_database(cl)->database->host;
1038 }
1039
1040 const char *client_get_url(struct client *cl)
1041 {
1042     if (cl->database)
1043         return client_get_database(cl)->database->url;
1044     else
1045         return "NOURL";
1046 }
1047
1048 void client_set_maxrecs(struct client *cl, int v)
1049 {
1050     cl->maxrecs = v;
1051 }
1052
1053 void client_set_startrecs(struct client *cl, int v)
1054 {
1055     cl->startrecs = v;
1056 }
1057
1058 void client_set_preferred(struct client *cl, int v)
1059 {
1060     cl->preferred = v;
1061 }
1062
1063
1064 /*
1065  * Local variables:
1066  * c-basic-offset: 4
1067  * c-file-style: "Stroustrup"
1068  * indent-tabs-mode: nil
1069  * End:
1070  * vim: shiftwidth=4 tabstop=8 expandtab
1071  */
1072