Added client_use and client_count. Non-public methods
[pazpar2-moved-to-github.git] / src / client.c
1 /* This file is part of Pazpar2.
2    Copyright (C) 2006-2011 Index Data
3
4 Pazpar2 is free software; you can redistribute it and/or modify it under
5 the terms of the GNU General Public License as published by the Free
6 Software Foundation; either version 2, or (at your option) any later
7 version.
8
9 Pazpar2 is distributed in the hope that it will be useful, but WITHOUT ANY
10 WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12 for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
17
18 */
19
20 /** \file client.c
21     \brief Z39.50 client 
22 */
23
24 #if HAVE_CONFIG_H
25 #include <config.h>
26 #endif
27 #include <stdlib.h>
28 #include <stdio.h>
29 #include <string.h>
30 #if HAVE_SYS_TIME_H
31 #include <sys/time.h>
32 #endif
33 #if HAVE_UNISTD_H
34 #include <unistd.h>
35 #endif
36 #include <signal.h>
37 #include <assert.h>
38
39 #include <yaz/marcdisp.h>
40 #include <yaz/comstack.h>
41 #include <yaz/tcpip.h>
42 #include <yaz/proto.h>
43 #include <yaz/readconf.h>
44 #include <yaz/pquery.h>
45 #include <yaz/otherinfo.h>
46 #include <yaz/yaz-util.h>
47 #include <yaz/nmem.h>
48 #include <yaz/query-charset.h>
49 #include <yaz/querytowrbuf.h>
50 #include <yaz/oid_db.h>
51 #include <yaz/diagbib1.h>
52 #include <yaz/snprintf.h>
53 #include <yaz/rpn2cql.h>
54 #include <yaz/rpn2solr.h>
55
56 #define USE_TIMING 0
57 #if USE_TIMING
58 #include <yaz/timing.h>
59 #endif
60
61 #include "ppmutex.h"
62 #include "session.h"
63 #include "parameters.h"
64 #include "client.h"
65 #include "connection.h"
66 #include "settings.h"
67 #include "relevance.h"
68 #include "incref.h"
69
70 static YAZ_MUTEX g_mutex = 0;
71 static int no_clients = 0;
72
73 static int client_use(int delta)
74 {
75     int clients;
76     if (!g_mutex)
77         yaz_mutex_create(&g_mutex);
78     yaz_mutex_enter(g_mutex);
79     no_clients += delta;
80     clients = no_clients;
81     yaz_mutex_leave(g_mutex);
82     yaz_log(YLOG_DEBUG, "%s clients=%d", delta == 0 ? "" : (delta > 0 ? "INC" : "DEC"), clients);
83     return clients;
84 }
85
86 int  clients_count(void) {
87     return client_use(0);
88 }
89
90
91 /** \brief Represents client state for a connection to one search target */
92 struct client {
93     struct session_database *database;
94     struct connection *connection;
95     struct session *session;
96     char *pquery; // Current search
97     char *cqlquery; // used for SRU targets only
98     Odr_int hits;
99     int record_offset;
100     int maxrecs;
101     int startrecs;
102     int diagnostic;
103     int preferred;
104     enum client_state state;
105     struct show_raw *show_raw;
106     ZOOM_resultset resultset;
107     YAZ_MUTEX mutex;
108     int ref_count;
109     /* copy of database->url */
110     char *url;
111 };
112
113 struct show_raw {
114     int active; // whether this request has been sent to the server
115     int position;
116     int binary;
117     char *syntax;
118     char *esn;
119     void (*error_handler)(void *data, const char *addinfo);
120     void (*record_handler)(void *data, const char *buf, size_t sz);
121     void *data;
122     struct show_raw *next;
123 };
124
125 static const char *client_states[] = {
126     "Client_Connecting",
127     "Client_Idle",
128     "Client_Working",
129     "Client_Error",
130     "Client_Failed",
131     "Client_Disconnected"
132 };
133
134 const char *client_get_state_str(struct client *cl)
135 {
136     return client_states[cl->state];
137 }
138
139 enum client_state client_get_state(struct client *cl)
140 {
141     return cl->state;
142 }
143
144 void client_set_state(struct client *cl, enum client_state st)
145 {
146     int was_active = 0;
147     if (client_is_active(cl))
148         was_active = 1;
149     cl->state = st;
150     /* If client is going from being active to inactive and all clients
151        are now idle we fire a watch for the session . The assumption is
152        that session is not mutex locked if client is already active */
153     if (was_active && !client_is_active(cl) && cl->session)
154     {
155
156         int no_active = session_active_clients(cl->session);
157         yaz_log(YLOG_DEBUG, "%s: releasing watches on zero active: %d", client_get_url(cl), no_active);
158         if (no_active == 0) {
159             session_alert_watch(cl->session, SESSION_WATCH_SHOW);
160             session_alert_watch(cl->session, SESSION_WATCH_SHOW_PREF);
161         }
162     }
163 }
164
165 static void client_show_raw_error(struct client *cl, const char *addinfo);
166
167 struct connection *client_get_connection(struct client *cl)
168 {
169     return cl->connection;
170 }
171
172 struct session_database *client_get_database(struct client *cl)
173 {
174     return cl->database;
175 }
176
177 struct session *client_get_session(struct client *cl)
178 {
179     return cl->session;
180 }
181
182 const char *client_get_pquery(struct client *cl)
183 {
184     return cl->pquery;
185 }
186
187 static void client_send_raw_present(struct client *cl);
188 static int nativesyntax_to_type(struct session_database *sdb, char *type,
189                                 ZOOM_record rec);
190
191 static void client_show_immediate(
192     ZOOM_resultset resultset, struct session_database *sdb, int position,
193     void *data,
194     void (*error_handler)(void *data, const char *addinfo),
195     void (*record_handler)(void *data, const char *buf, size_t sz),
196     int binary)
197 {
198     ZOOM_record rec = 0;
199     char type[80];
200     const char *buf;
201     int len;
202
203     if (!resultset)
204     {
205         error_handler(data, "no resultset");
206         return;
207     }
208     rec = ZOOM_resultset_record(resultset, position-1);
209     if (!rec)
210     {
211         error_handler(data, "no record");
212         return;
213     }
214     if (binary)
215         strcpy(type, "raw");
216     else
217         nativesyntax_to_type(sdb, type, rec);
218     buf = ZOOM_record_get(rec, type, &len);
219     if (!buf)
220     {
221         error_handler(data, "no record");
222         return;
223     }
224     record_handler(data, buf, len);
225 }
226
227
228 int client_show_raw_begin(struct client *cl, int position,
229                           const char *syntax, const char *esn,
230                           void *data,
231                           void (*error_handler)(void *data, const char *addinfo),
232                           void (*record_handler)(void *data, const char *buf,
233                                                  size_t sz),
234                           int binary)
235 {
236     if (syntax == 0 && esn == 0)
237         client_show_immediate(cl->resultset, client_get_database(cl),
238                               position, data,
239                               error_handler, record_handler,
240                               binary);
241     else
242     {
243         struct show_raw *rr, **rrp;
244
245         if (!cl->connection)
246             return -1;
247     
248
249         rr = xmalloc(sizeof(*rr));
250         rr->position = position;
251         rr->active = 0;
252         rr->data = data;
253         rr->error_handler = error_handler;
254         rr->record_handler = record_handler;
255         rr->binary = binary;
256         if (syntax)
257             rr->syntax = xstrdup(syntax);
258         else
259             rr->syntax = 0;
260         if (esn)
261             rr->esn = xstrdup(esn);
262         else
263             rr->esn = 0;
264         rr->next = 0;
265         
266         for (rrp = &cl->show_raw; *rrp; rrp = &(*rrp)->next)
267             ;
268         *rrp = rr;
269         
270         if (cl->state == Client_Failed)
271         {
272             client_show_raw_error(cl, "client failed");
273         }
274         else if (cl->state == Client_Disconnected)
275         {
276             client_show_raw_error(cl, "client disconnected");
277         }
278         else
279         {
280             client_send_raw_present(cl);
281         }
282     }
283     return 0;
284 }
285
286 static void client_show_raw_delete(struct show_raw *r)
287 {
288     xfree(r->syntax);
289     xfree(r->esn);
290     xfree(r);
291 }
292
293 void client_show_raw_remove(struct client *cl, void *data)
294 {
295     struct show_raw *rr = data;
296     struct show_raw **rrp = &cl->show_raw;
297     while (*rrp != rr)
298         rrp = &(*rrp)->next;
299     if (*rrp)
300     {
301         *rrp = rr->next;
302         client_show_raw_delete(rr);
303     }
304 }
305
306 void client_show_raw_dequeue(struct client *cl)
307 {
308     struct show_raw *rr = cl->show_raw;
309
310     cl->show_raw = rr->next;
311     client_show_raw_delete(rr);
312 }
313
314 static void client_show_raw_error(struct client *cl, const char *addinfo)
315 {
316     while (cl->show_raw)
317     {
318         cl->show_raw->error_handler(cl->show_raw->data, addinfo);
319         client_show_raw_dequeue(cl);
320     }
321 }
322
323 static void client_send_raw_present(struct client *cl)
324 {
325     struct session_database *sdb = client_get_database(cl);
326     struct connection *co = client_get_connection(cl);
327     ZOOM_resultset set = cl->resultset;
328
329     int offset = cl->show_raw->position;
330     const char *syntax = 0;
331     const char *elements = 0;
332
333     assert(cl->show_raw);
334     assert(set);
335
336     yaz_log(YLOG_DEBUG, "%s: trying to present %d record(s) from %d",
337             client_get_url(cl), 1, offset);
338
339     if (cl->show_raw->syntax)
340         syntax = cl->show_raw->syntax;
341     else
342         syntax = session_setting_oneval(sdb, PZ_REQUESTSYNTAX);
343     ZOOM_resultset_option_set(set, "preferredRecordSyntax", syntax);
344
345     if (cl->show_raw->esn)
346         elements = cl->show_raw->esn;
347     else
348         elements = session_setting_oneval(sdb, PZ_ELEMENTS);
349     if (elements && *elements)
350         ZOOM_resultset_option_set(set, "elementSetName", elements);
351
352     ZOOM_resultset_records(set, 0, offset-1, 1);
353     cl->show_raw->active = 1;
354
355     connection_continue(co);
356 }
357
358 static int nativesyntax_to_type(struct session_database *sdb, char *type,
359                                 ZOOM_record rec)
360 {
361     const char *s = session_setting_oneval(sdb, PZ_NATIVESYNTAX);
362
363     if (s && *s)
364     {
365         if (!strncmp(s, "iso2709", 7))
366         {
367             const char *cp = strchr(s, ';');
368             yaz_snprintf(type, 80, "xml; charset=%s", cp ? cp+1 : "marc-8s");
369         }
370         else if (!strncmp(s, "xml", 3))
371         {
372             strcpy(type, "xml");
373         }
374         else if (!strncmp(s, "txml", 4))
375         {
376             const char *cp = strchr(s, ';');
377             yaz_snprintf(type, 80, "txml; charset=%s", cp ? cp+1 : "marc-8s");
378         }
379         else
380             return -1;
381         return 0;
382     }
383     else  /* attempt to deduce structure */
384     {
385         const char *syntax = ZOOM_record_get(rec, "syntax", NULL);
386         if (syntax)
387         {
388             if (!strcmp(syntax, "XML"))
389             {
390                 strcpy(type, "xml");
391                 return 0;
392             }
393             else if (!strcmp(syntax, "TXML"))
394                 {
395                     strcpy(type, "txml");
396                     return 0;
397                 }
398             else if (!strcmp(syntax, "USmarc") || !strcmp(syntax, "MARC21"))
399             {
400                 strcpy(type, "xml; charset=marc8-s");
401                 return 0;
402             }
403             else return -1;
404         }
405         else return -1;
406     }
407 }
408
409 /**
410  * TODO Consider thread safety!!!
411  *
412  */
413 int client_report_facets(struct client *cl, ZOOM_resultset rs) {
414     int facet_idx;
415     ZOOM_facet_field *facets = ZOOM_resultset_facets(rs);
416     int facet_num;
417     struct session *se = client_get_session(cl);
418     facet_num = ZOOM_resultset_facets_size(rs);
419     yaz_log(YLOG_DEBUG, "client_report_facets: %d", facet_num);
420
421     for (facet_idx = 0; facet_idx < facet_num; facet_idx++) {
422         const char *name = ZOOM_facet_field_name(facets[facet_idx]);
423         size_t term_idx;
424         size_t term_num = ZOOM_facet_field_term_count(facets[facet_idx]);
425         for (term_idx = 0; term_idx < term_num; term_idx++ ) {
426             int freq;
427             const char *term = ZOOM_facet_field_get_term(facets[facet_idx], term_idx, &freq);
428             if (term)
429                 add_facet(se, name, term, freq);
430         }
431     }
432
433     return 0;
434 }
435
436 static void ingest_raw_record(struct client *cl, ZOOM_record rec)
437 {
438     const char *buf;
439     int len;
440     char type[80];
441
442     if (cl->show_raw->binary)
443         strcpy(type, "raw");
444     else
445     {
446         struct session_database *sdb = client_get_database(cl);
447         nativesyntax_to_type(sdb, type, rec);
448     }
449
450     buf = ZOOM_record_get(rec, type, &len);
451     cl->show_raw->record_handler(cl->show_raw->data,  buf, len);
452     client_show_raw_dequeue(cl);
453 }
454
455 void client_check_preferred_watch(struct client *cl)
456 {
457     struct session *se = cl->session;
458     yaz_log(YLOG_DEBUG, "client_check_preferred_watch: %s ", client_get_url(cl));
459     if (se)
460     {
461         client_unlock(cl);
462         if (session_is_preferred_clients_ready(se)) {
463             session_alert_watch(se, SESSION_WATCH_SHOW_PREF);
464         }
465         else
466             yaz_log(YLOG_DEBUG, "client_check_preferred_watch: Still locked on preferred targets.");
467
468         client_lock(cl);
469     }
470     else
471         yaz_log(YLOG_WARN, "client_check_preferred_watch: %s. No session!", client_get_url(cl));
472
473 }
474
475 void client_search_response(struct client *cl)
476 {
477     struct connection *co = cl->connection;
478     struct session *se = cl->session;
479     ZOOM_connection link = connection_get_link(co);
480     ZOOM_resultset resultset = cl->resultset;
481
482     const char *error, *addinfo = 0;
483     
484     if (ZOOM_connection_error(link, &error, &addinfo))
485     {
486         cl->hits = 0;
487         client_set_state(cl, Client_Error);
488         yaz_log(YLOG_WARN, "Search error %s (%s): %s",
489                 error, addinfo, client_get_url(cl));
490     }
491     else
492     {
493         yaz_log(YLOG_DEBUG, "client_search_response: hits "
494                 ODR_INT_PRINTF, cl->hits);
495         client_report_facets(cl, resultset);
496         cl->record_offset = cl->startrecs;
497         cl->hits = ZOOM_resultset_size(resultset);
498         if (se) {
499             se->total_hits += cl->hits;
500             yaz_log(YLOG_DEBUG, "client_search_response: total hits "
501                     ODR_INT_PRINTF, se->total_hits);
502         }
503     }
504 }
505
506 void client_got_records(struct client *cl)
507 {
508     struct session *se = cl->session;
509     if (se)
510     {
511         client_unlock(cl);
512         session_alert_watch(se, SESSION_WATCH_SHOW);
513         session_alert_watch(se, SESSION_WATCH_RECORD);
514         client_lock(cl);
515     }
516 }
517
518 void client_record_response(struct client *cl)
519 {
520     struct connection *co = cl->connection;
521     ZOOM_connection link = connection_get_link(co);
522     ZOOM_resultset resultset = cl->resultset;
523     const char *error, *addinfo;
524
525     if (ZOOM_connection_error(link, &error, &addinfo))
526     {
527         client_set_state(cl, Client_Error);
528         yaz_log(YLOG_WARN, "Search error %s (%s): %s",
529             error, addinfo, client_get_url(cl));
530     }
531     else
532     {
533         ZOOM_record rec = 0;
534         const char *msg, *addinfo;
535         
536         if (cl->show_raw && cl->show_raw->active)
537         {
538             if ((rec = ZOOM_resultset_record(resultset,
539                                              cl->show_raw->position-1)))
540             {
541                 cl->show_raw->active = 0;
542                 ingest_raw_record(cl, rec);
543             }
544             else
545             {
546                 yaz_log(YLOG_WARN, "Expected record, but got NULL, offset=%d",
547                         cl->show_raw->position-1);
548             }
549         }
550         else
551         {
552             int offset = cl->record_offset;
553             if ((rec = ZOOM_resultset_record(resultset, offset)))
554             {
555                 cl->record_offset++;
556                 if (cl->session == 0)
557                     ;
558                 else if (ZOOM_record_error(rec, &msg, &addinfo, 0))
559                 {
560                     yaz_log(YLOG_WARN, "Record error %s (%s): %s (rec #%d)",
561                             msg, addinfo, client_get_url(cl),
562                             cl->record_offset);
563                 }
564                 else
565                 {
566                     struct session_database *sdb = client_get_database(cl);
567                     NMEM nmem = nmem_create();
568                     const char *xmlrec;
569                     char type[80];
570
571                     if (nativesyntax_to_type(sdb, type, rec))
572                         yaz_log(YLOG_WARN, "Failed to determine record type");
573                     xmlrec = ZOOM_record_get(rec, type, NULL);
574                     if (!xmlrec)
575                         yaz_log(YLOG_WARN, "ZOOM_record_get failed from %s",
576                                 client_get_url(cl));
577                     else
578                     {
579                         if (ingest_record(cl, xmlrec, cl->record_offset, nmem))
580                             yaz_log(YLOG_WARN, "Failed to ingest from %s",
581                                     client_get_url(cl));
582                     }
583                     nmem_destroy(nmem);
584                 }
585             }
586             else
587             {
588                 yaz_log(YLOG_WARN, "Expected record, but got NULL, offset=%d",
589                         offset);
590             }
591         }
592     }
593 }
594
595 static int client_set_facets_request(struct client *cl, ZOOM_connection link)
596 {
597     struct session_database *sdb = client_get_database(cl);
598     const char *opt_facet_term_sort  = session_setting_oneval(sdb, PZ_TERMLIST_TERM_SORT);
599     const char *opt_facet_term_count = session_setting_oneval(sdb, PZ_TERMLIST_TERM_COUNT);
600
601     /* Future record filtering on target */
602     /* const char *opt_facet_record_filter = session_setting_oneval(sdb, PZ_RECORDFILTER); */
603
604     /* Disable when no count is set */
605     /* TODO Verify: Do we need to reset the  ZOOM facets if a ZOOM Connection is being reused??? */
606     if (opt_facet_term_count && *opt_facet_term_count)
607     {
608         int index = 0;
609         struct session *session = client_get_session(cl);
610         struct conf_service *service = session->service;
611         int num = service->num_metadata;
612         WRBUF wrbuf = wrbuf_alloc();
613         yaz_log(YLOG_DEBUG, "Facet settings, sort: %s count: %s",
614                 opt_facet_term_sort, opt_facet_term_count);
615         for (index = 0; index < num; index++)
616         {
617             struct conf_metadata *conf_meta = &service->metadata[index];
618             if (conf_meta->termlist)
619             {
620                 if (wrbuf_len(wrbuf))
621                     wrbuf_puts(wrbuf, ", ");
622                 wrbuf_printf(wrbuf, "@attr 1=%s", conf_meta->name);
623                 
624                 if (opt_facet_term_sort && *opt_facet_term_sort)
625                     wrbuf_printf(wrbuf, " @attr 2=%s", opt_facet_term_sort);
626                 wrbuf_printf(wrbuf, " @attr 3=%s", opt_facet_term_count);
627             }
628         }
629         if (wrbuf_len(wrbuf))
630         {
631             yaz_log(YLOG_LOG, "Setting ZOOM facets option: %s", wrbuf_cstr(wrbuf));
632             ZOOM_connection_option_set(link, "facets", wrbuf_cstr(wrbuf));
633             return 1;
634         }
635     }
636     return 0;
637 }
638
639 int client_has_facet(struct client *cl, const char *name) {
640     ZOOM_facet_field facet_field;
641     if (!cl || !cl->resultset || !name) {
642         yaz_log(YLOG_DEBUG, "client has facet: Missing %p %p %s", cl, (cl ? cl->resultset: 0), name);
643         return 0;
644     }
645     facet_field = ZOOM_resultset_get_facet_field(cl->resultset, name);
646     if (facet_field) {
647         yaz_log(YLOG_DEBUG, "client: has facets for %s", name);
648         return 1;
649     }
650     yaz_log(YLOG_DEBUG, "client: No facets for %s", name);
651     return 0;
652 }
653
654
655 void client_start_search(struct client *cl)
656 {
657     struct session_database *sdb = client_get_database(cl);
658     struct connection *co = client_get_connection(cl);
659     ZOOM_connection link = connection_get_link(co);
660     ZOOM_resultset rs;
661     char *databaseName = sdb->database->databases[0];
662     const char *opt_piggyback   = session_setting_oneval(sdb, PZ_PIGGYBACK);
663     const char *opt_queryenc    = session_setting_oneval(sdb, PZ_QUERYENCODING);
664     const char *opt_elements    = session_setting_oneval(sdb, PZ_ELEMENTS);
665     const char *opt_requestsyn  = session_setting_oneval(sdb, PZ_REQUESTSYNTAX);
666     const char *opt_maxrecs     = session_setting_oneval(sdb, PZ_MAXRECS);
667     const char *opt_sru         = session_setting_oneval(sdb, PZ_SRU);
668     const char *opt_sort        = session_setting_oneval(sdb, PZ_SORT);
669     const char *opt_preferred   = session_setting_oneval(sdb, PZ_PREFERRED);
670     char maxrecs_str[24], startrecs_str[24];
671
672     assert(link);
673
674     cl->hits = -1;
675     cl->record_offset = 0;
676     cl->diagnostic = 0;
677
678     if (opt_preferred) {
679         cl->preferred = atoi(opt_preferred);
680         if (cl->preferred)
681             yaz_log(YLOG_LOG, "Target %s has preferred status: %d", sdb->database->url, cl->preferred);
682     }
683     client_set_state(cl, Client_Working);
684
685     if (*opt_piggyback)
686         ZOOM_connection_option_set(link, "piggyback", opt_piggyback);
687     else
688         ZOOM_connection_option_set(link, "piggyback", "1");
689     if (*opt_queryenc)
690         ZOOM_connection_option_set(link, "rpnCharset", opt_queryenc);
691     if (*opt_sru && *opt_elements)
692         ZOOM_connection_option_set(link, "schema", opt_elements);
693     else if (*opt_elements)
694         ZOOM_connection_option_set(link, "elementSetName", opt_elements);
695     if (*opt_requestsyn)
696         ZOOM_connection_option_set(link, "preferredRecordSyntax", opt_requestsyn);
697
698     if (opt_maxrecs && *opt_maxrecs)
699     {
700         cl->maxrecs = atoi(opt_maxrecs);
701     }
702
703     /* convert back to string representation used in ZOOM API */
704     sprintf(maxrecs_str, "%d", cl->maxrecs);
705     ZOOM_connection_option_set(link, "count", maxrecs_str);
706
707     if (cl->maxrecs > 20)
708         ZOOM_connection_option_set(link, "presentChunk", "20");
709     else
710         ZOOM_connection_option_set(link, "presentChunk", maxrecs_str);
711
712     sprintf(startrecs_str, "%d", cl->startrecs);
713     ZOOM_connection_option_set(link, "start", startrecs_str);
714
715     if (databaseName)
716         ZOOM_connection_option_set(link, "databaseName", databaseName);
717
718     /* TODO Verify does it break something for CQL targets(non-SOLR) ? */
719     /* facets definition is in PQF */
720     client_set_facets_request(cl, link);
721
722     if (cl->cqlquery)
723     {
724         ZOOM_query q = ZOOM_query_create();
725         yaz_log(YLOG_LOG, "Search %s CQL: %s", sdb->database->url, cl->cqlquery);
726         ZOOM_query_cql(q, cl->cqlquery);
727         if (*opt_sort)
728             ZOOM_query_sortby(q, opt_sort);
729         rs = ZOOM_connection_search(link, q);
730         ZOOM_query_destroy(q);
731     }
732     else
733     {
734         yaz_log(YLOG_LOG, "Search %s PQF: %s", sdb->database->url, cl->pquery);
735         rs = ZOOM_connection_search_pqf(link, cl->pquery);
736     }
737     ZOOM_resultset_destroy(cl->resultset);
738     cl->resultset = rs;
739     connection_continue(co);
740 }
741
742 struct client *client_create(void)
743 {
744     struct client *cl = xmalloc(sizeof(*cl));
745     cl->maxrecs = 100;
746     cl->startrecs = 0;
747     cl->pquery = 0;
748     cl->cqlquery = 0;
749     cl->database = 0;
750     cl->connection = 0;
751     cl->session = 0;
752     cl->hits = 0;
753     cl->record_offset = 0;
754     cl->diagnostic = 0;
755     cl->state = Client_Disconnected;
756     cl->show_raw = 0;
757     cl->resultset = 0;
758     cl->mutex = 0;
759     pazpar2_mutex_create(&cl->mutex, "client");
760     cl->preferred = 0;
761     cl->ref_count = 1;
762     cl->url = 0;
763     client_use(1);
764     
765     return cl;
766 }
767
768 void client_lock(struct client *c)
769 {
770     yaz_mutex_enter(c->mutex);
771 }
772
773 void client_unlock(struct client *c)
774 {
775     yaz_mutex_leave(c->mutex);
776 }
777
778 void client_incref(struct client *c)
779 {
780     pazpar2_incref(&c->ref_count, c->mutex);
781     yaz_log(YLOG_DEBUG, "client_incref c=%p %s cnt=%d",
782             c, client_get_url(c), c->ref_count);
783 }
784
785 int client_destroy(struct client *c)
786 {
787     if (c)
788     {
789         yaz_log(YLOG_DEBUG, "client_destroy c=%p %s cnt=%d",
790                 c, client_get_url(c), c->ref_count);
791         if (!pazpar2_decref(&c->ref_count, c->mutex))
792         {
793             xfree(c->pquery);
794             c->pquery = 0;
795             xfree(c->cqlquery);
796             c->cqlquery = 0;
797             xfree(c->url);
798             assert(!c->connection);
799
800             if (c->resultset)
801             {
802                 ZOOM_resultset_destroy(c->resultset);
803             }
804             yaz_mutex_destroy(&c->mutex);
805             xfree(c);
806             client_use(-1);
807             return 1;
808         }
809     }
810     return 0;
811 }
812
813 void client_set_connection(struct client *cl, struct connection *con)
814 {
815     if (cl->resultset)
816         ZOOM_resultset_release(cl->resultset);
817     if (con)
818     {
819         assert(cl->connection == 0);
820         cl->connection = con;
821         client_incref(cl);
822     }
823     else
824     {
825         cl->connection = con;
826         client_destroy(cl);
827     }
828 }
829
830 void client_disconnect(struct client *cl)
831 {
832     if (cl->state != Client_Idle)
833         client_set_state(cl, Client_Disconnected);
834     client_set_connection(cl, 0);
835 }
836
837 // Extract terms from query into null-terminated termlist
838 static void extract_terms(NMEM nmem, struct ccl_rpn_node *query, char **termlist)
839 {
840     int num = 0;
841
842     pull_terms(nmem, query, termlist, &num);
843     termlist[num] = 0;
844 }
845
846 // Initialize CCL map for a target
847 static CCL_bibset prepare_cclmap(struct client *cl)
848 {
849     struct session_database *sdb = client_get_database(cl);
850     struct setting *s;
851     CCL_bibset res;
852
853     if (!sdb->settings)
854         return 0;
855     res = ccl_qual_mk();
856     for (s = sdb->settings[PZ_CCLMAP]; s; s = s->next)
857     {
858         char *p = strchr(s->name + 3, ':');
859         if (!p)
860         {
861             yaz_log(YLOG_WARN, "Malformed cclmap name: %s", s->name);
862             ccl_qual_rm(&res);
863             return 0;
864         }
865         p++;
866         ccl_qual_fitem(res, s->value, p);
867     }
868     return res;
869 }
870
871 // returns a xmalloced CQL query corresponding to the pquery in client
872 static char *make_cqlquery(struct client *cl)
873 {
874     cql_transform_t cqlt = cql_transform_create();
875     Z_RPNQuery *zquery;
876     char *r;
877     WRBUF wrb = wrbuf_alloc();
878     int status;
879     ODR odr_out = odr_createmem(ODR_ENCODE);
880
881     zquery = p_query_rpn(odr_out, cl->pquery);
882     yaz_log(YLOG_LOG, "PQF: %s", cl->pquery);
883     if ((status = cql_transform_rpn2cql_wrbuf(cqlt, wrb, zquery)))
884     {
885         yaz_log(YLOG_WARN, "Failed to generate CQL query, code=%d", status);
886         r = 0;
887     }
888     else
889     {
890         r = xstrdup(wrbuf_cstr(wrb));
891     }     
892     wrbuf_destroy(wrb);
893     odr_destroy(odr_out);
894     cql_transform_close(cqlt);
895     return r;
896 }
897
898 // returns a xmalloced SOLR query corresponding to the pquery in client
899 // TODO Could prob. be merge with the similar make_cqlquery
900 static char *make_solrquery(struct client *cl)
901 {
902     solr_transform_t sqlt = solr_transform_create();
903     Z_RPNQuery *zquery;
904     char *r;
905     WRBUF wrb = wrbuf_alloc();
906     int status;
907     ODR odr_out = odr_createmem(ODR_ENCODE);
908
909     zquery = p_query_rpn(odr_out, cl->pquery);
910     yaz_log(YLOG_LOG, "PQF: %s", cl->pquery);
911     if ((status = solr_transform_rpn2solr_wrbuf(sqlt, wrb, zquery)))
912     {
913         yaz_log(YLOG_WARN, "Failed to generate SOLR query, code=%d", status);
914         r = 0;
915     }
916     else
917     {
918         r = xstrdup(wrbuf_cstr(wrb));
919     }
920     wrbuf_destroy(wrb);
921     odr_destroy(odr_out);
922     solr_transform_close(sqlt);
923     return r;
924 }
925
926 // Parse the query given the settings specific to this client
927 int client_parse_query(struct client *cl, const char *query)
928 {
929     struct session *se = client_get_session(cl);
930     struct session_database *sdb = client_get_database(cl);
931     struct ccl_rpn_node *cn;
932     int cerror, cpos;
933     CCL_bibset ccl_map = prepare_cclmap(cl);
934     const char *sru = session_setting_oneval(sdb, PZ_SRU);
935     const char *pqf_prefix = session_setting_oneval(sdb, PZ_PQF_PREFIX);
936     const char *pqf_strftime = session_setting_oneval(sdb, PZ_PQF_STRFTIME);
937
938     if (!ccl_map)
939         return -1;
940
941     cn = ccl_find_str(ccl_map, query, &cerror, &cpos);
942     ccl_qual_rm(&ccl_map);
943     if (!cn)
944     {
945         client_set_state(cl, Client_Error);
946         session_log(se, YLOG_WARN, "Failed to parse CCL query '%s' for %s",
947                 query,
948                 client_get_database(cl)->database->url);
949         return -1;
950     }
951     wrbuf_rewind(se->wrbuf);
952     if (*pqf_prefix)
953     {
954         wrbuf_puts(se->wrbuf, pqf_prefix);
955         wrbuf_puts(se->wrbuf, " ");
956     }
957     if (!pqf_strftime || !*pqf_strftime)
958         ccl_pquery(se->wrbuf, cn);
959     else
960     {
961         time_t cur_time = time(0);
962         struct tm *tm =  localtime(&cur_time);
963         char tmp_str[300];
964         const char *cp = tmp_str;
965
966         /* see man strftime(3) for things .. In particular %% gets converted
967          to %.. And That's our original query .. */
968         strftime(tmp_str, sizeof(tmp_str)-1, pqf_strftime, tm);
969         for (; *cp; cp++)
970         {
971             if (cp[0] == '%')
972                 ccl_pquery(se->wrbuf, cn);
973             else
974                 wrbuf_putc(se->wrbuf, cp[0]);
975         }
976     }
977     xfree(cl->pquery);
978     cl->pquery = xstrdup(wrbuf_cstr(se->wrbuf));
979
980     xfree(cl->cqlquery);
981     if (*sru)
982     {
983         if (!strcmp(sru, "solr")) {
984             if (!(cl->cqlquery = make_solrquery(cl)))
985                 return -1;
986         }
987         else {
988             if (!(cl->cqlquery = make_cqlquery(cl)))
989                 return -1;
990         }
991     }
992     else
993         cl->cqlquery = 0;
994
995     /* TODO FIX Not thread safe */
996     if (!se->relevance)
997     {
998         // Initialize relevance structure with query terms
999         char *p[512];
1000         extract_terms(se->nmem, cn, p);
1001         se->relevance = relevance_create(
1002             se->service->relevance_pct,
1003             se->nmem, (const char **) p);
1004     }
1005
1006     ccl_rpn_delete(cn);
1007     return 0;
1008 }
1009
1010 void client_set_session(struct client *cl, struct session *se)
1011 {
1012     cl->session = se;
1013 }
1014
1015 int client_is_active(struct client *cl)
1016 {
1017     if (cl->connection && (cl->state == Client_Connecting ||
1018                            cl->state == Client_Working))
1019         return 1;
1020     return 0;
1021 }
1022
1023 int client_is_active_preferred(struct client *cl)
1024 {
1025     /* only count if this is a preferred target. */
1026     if (!cl->preferred)
1027         return 0;
1028     /* TODO No sure this the condition that Seb wants */
1029     if (cl->connection && (cl->state == Client_Connecting ||
1030                            cl->state == Client_Working))
1031         return 1;
1032     return 0;
1033 }
1034
1035
1036 Odr_int client_get_hits(struct client *cl)
1037 {
1038     return cl->hits;
1039 }
1040
1041 int client_get_num_records(struct client *cl)
1042 {
1043     return cl->record_offset;
1044 }
1045
1046 void client_set_diagnostic(struct client *cl, int diagnostic)
1047 {
1048     cl->diagnostic = diagnostic;
1049 }
1050
1051 int client_get_diagnostic(struct client *cl)
1052 {
1053     return cl->diagnostic;
1054 }
1055
1056 void client_set_database(struct client *cl, struct session_database *db)
1057 {
1058     cl->database = db;
1059     /* Copy the URL for safe logging even after session is gone */
1060     if (db) {
1061         cl->url = xstrdup(db->database->url);
1062     }
1063 }
1064
1065 struct host *client_get_host(struct client *cl)
1066 {
1067     return client_get_database(cl)->database->host;
1068 }
1069
1070 const char *client_get_url(struct client *cl)
1071 {
1072     if (cl->url)
1073         return cl->url;
1074     else
1075         /* This must not happen anymore, as the url is present until destruction of client  */
1076         return "NOURL";
1077 }
1078
1079 void client_set_maxrecs(struct client *cl, int v)
1080 {
1081     cl->maxrecs = v;
1082 }
1083
1084 int client_get_maxrecs(struct client *cl)
1085 {
1086     return cl->maxrecs;
1087 }
1088
1089 void client_set_startrecs(struct client *cl, int v)
1090 {
1091     cl->startrecs = v;
1092 }
1093
1094 void client_set_preferred(struct client *cl, int v)
1095 {
1096     cl->preferred = v;
1097 }
1098
1099
1100 /*
1101  * Local variables:
1102  * c-basic-offset: 4
1103  * c-file-style: "Stroustrup"
1104  * indent-tabs-mode: nil
1105  * End:
1106  * vim: shiftwidth=4 tabstop=8 expandtab
1107  */
1108