Observe number of concurrent record ingests
[pazpar2-moved-to-github.git] / src / client.c
1 /* This file is part of Pazpar2.
2    Copyright (C) 2006-2010 Index Data
3
4 Pazpar2 is free software; you can redistribute it and/or modify it under
5 the terms of the GNU General Public License as published by the Free
6 Software Foundation; either version 2, or (at your option) any later
7 version.
8
9 Pazpar2 is distributed in the hope that it will be useful, but WITHOUT ANY
10 WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12 for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
17
18 */
19
20 /** \file client.c
21     \brief Z39.50 client 
22 */
23
24 #if HAVE_CONFIG_H
25 #include <config.h>
26 #endif
27 #include <pthread.h>
28 #include <stdlib.h>
29 #include <stdio.h>
30 #include <string.h>
31 #if HAVE_SYS_TIME_H
32 #include <sys/time.h>
33 #endif
34 #if HAVE_UNISTD_H
35 #include <unistd.h>
36 #endif
37 #include <signal.h>
38 #include <assert.h>
39
40 #include <yaz/marcdisp.h>
41 #include <yaz/comstack.h>
42 #include <yaz/tcpip.h>
43 #include <yaz/proto.h>
44 #include <yaz/readconf.h>
45 #include <yaz/pquery.h>
46 #include <yaz/otherinfo.h>
47 #include <yaz/yaz-util.h>
48 #include <yaz/nmem.h>
49 #include <yaz/query-charset.h>
50 #include <yaz/querytowrbuf.h>
51 #include <yaz/oid_db.h>
52 #include <yaz/diagbib1.h>
53 #include <yaz/snprintf.h>
54 #include <yaz/rpn2cql.h>
55
56 #define USE_TIMING 0
57 #if USE_TIMING
58 #include <yaz/timing.h>
59 #endif
60
61 #include "session.h"
62 #include "parameters.h"
63 #include "client.h"
64 #include "connection.h"
65 #include "settings.h"
66 #include "relevance.h"
67 #include "incref.h"
68
69 /** \brief Represents client state for a connection to one search target */
70 struct client {
71     struct session_database *database;
72     struct connection *connection;
73     struct session *session;
74     char *pquery; // Current search
75     char *cqlquery; // used for SRU targets only
76     Odr_int hits;
77     int record_offset;
78     int maxrecs;
79     int startrecs;
80     int diagnostic;
81     enum client_state state;
82     struct show_raw *show_raw;
83     struct client *next;     // next client in session or next in free list
84     ZOOM_resultset resultset;
85     YAZ_MUTEX mutex;
86     int ref_count;
87 };
88
89 struct show_raw {
90     int active; // whether this request has been sent to the server
91     int position;
92     int binary;
93     char *syntax;
94     char *esn;
95     void (*error_handler)(void *data, const char *addinfo);
96     void (*record_handler)(void *data, const char *buf, size_t sz);
97     void *data;
98     struct show_raw *next;
99 };
100
101 static const char *client_states[] = {
102     "Client_Connecting",
103     "Client_Idle",
104     "Client_Working",
105     "Client_Error",
106     "Client_Failed",
107     "Client_Disconnected"
108 };
109
110 const char *client_get_state_str(struct client *cl)
111 {
112     return client_states[cl->state];
113 }
114
115 enum client_state client_get_state(struct client *cl)
116 {
117     return cl->state;
118 }
119
120 void client_set_state(struct client *cl, enum client_state st)
121 {
122     cl->state = st;
123     /* no need to check for all client being non-active if this one
124        already is. Note that session_active_clients also LOCKS session */
125 #if 0
126     if (!client_is_active(cl) && cl->session)
127     {
128         int no_active = session_active_clients(cl->session);
129         if (no_active == 0)
130             session_alert_watch(cl->session, SESSION_WATCH_SHOW);
131     }
132 #endif
133 }
134
135 static void client_show_raw_error(struct client *cl, const char *addinfo);
136
137 // Close connection and set state to error
138 void client_fatal(struct client *cl)
139 {
140     yaz_log(YLOG_WARN, "Fatal error from %s", client_get_url(cl));
141     connection_destroy(cl->connection);
142     client_set_state(cl, Client_Error);
143 }
144
145 struct connection *client_get_connection(struct client *cl)
146 {
147     return cl->connection;
148 }
149
150 struct session_database *client_get_database(struct client *cl)
151 {
152     return cl->database;
153 }
154
155 struct session *client_get_session(struct client *cl)
156 {
157     return cl->session;
158 }
159
160 const char *client_get_pquery(struct client *cl)
161 {
162     return cl->pquery;
163 }
164
165 static void client_send_raw_present(struct client *cl);
166 static int nativesyntax_to_type(struct session_database *sdb, char *type,
167                                 ZOOM_record rec);
168
169 static void client_show_immediate(
170     ZOOM_resultset resultset, struct session_database *sdb, int position,
171     void *data,
172     void (*error_handler)(void *data, const char *addinfo),
173     void (*record_handler)(void *data, const char *buf, size_t sz),
174     int binary)
175 {
176     ZOOM_record rec = 0;
177     char type[80];
178     const char *buf;
179     int len;
180
181     if (!resultset)
182     {
183         error_handler(data, "no resultset");
184         return;
185     }
186     rec = ZOOM_resultset_record(resultset, position-1);
187     if (!rec)
188     {
189         error_handler(data, "no record");
190         return;
191     }
192     if (binary)
193         strcpy(type, "raw");
194     else
195         nativesyntax_to_type(sdb, type, rec);
196     buf = ZOOM_record_get(rec, type, &len);
197     if (!buf)
198     {
199         error_handler(data, "no record");
200         return;
201     }
202     record_handler(data, buf, len);
203 }
204
205
206 int client_show_raw_begin(struct client *cl, int position,
207                           const char *syntax, const char *esn,
208                           void *data,
209                           void (*error_handler)(void *data, const char *addinfo),
210                           void (*record_handler)(void *data, const char *buf,
211                                                  size_t sz),
212                           int binary)
213 {
214     if (syntax == 0 && esn == 0)
215         client_show_immediate(cl->resultset, client_get_database(cl),
216                               position, data,
217                               error_handler, record_handler,
218                               binary);
219     else
220     {
221         struct show_raw *rr, **rrp;
222
223         if (!cl->connection)
224             return -1;
225     
226
227         rr = xmalloc(sizeof(*rr));
228         rr->position = position;
229         rr->active = 0;
230         rr->data = data;
231         rr->error_handler = error_handler;
232         rr->record_handler = record_handler;
233         rr->binary = binary;
234         if (syntax)
235             rr->syntax = xstrdup(syntax);
236         else
237             rr->syntax = 0;
238         if (esn)
239             rr->esn = xstrdup(esn);
240         else
241             rr->esn = 0;
242         rr->next = 0;
243         
244         for (rrp = &cl->show_raw; *rrp; rrp = &(*rrp)->next)
245             ;
246         *rrp = rr;
247         
248         if (cl->state == Client_Failed)
249         {
250             client_show_raw_error(cl, "client failed");
251         }
252         else if (cl->state == Client_Disconnected)
253         {
254             client_show_raw_error(cl, "client disconnected");
255         }
256         else
257         {
258             client_send_raw_present(cl);
259         }
260     }
261     return 0;
262 }
263
264 static void client_show_raw_delete(struct show_raw *r)
265 {
266     xfree(r->syntax);
267     xfree(r->esn);
268     xfree(r);
269 }
270
271 void client_show_raw_remove(struct client *cl, void *data)
272 {
273     struct show_raw *rr = data;
274     struct show_raw **rrp = &cl->show_raw;
275     while (*rrp != rr)
276         rrp = &(*rrp)->next;
277     if (*rrp)
278     {
279         *rrp = rr->next;
280         client_show_raw_delete(rr);
281     }
282 }
283
284 void client_show_raw_dequeue(struct client *cl)
285 {
286     struct show_raw *rr = cl->show_raw;
287
288     cl->show_raw = rr->next;
289     client_show_raw_delete(rr);
290 }
291
292 static void client_show_raw_error(struct client *cl, const char *addinfo)
293 {
294     while (cl->show_raw)
295     {
296         cl->show_raw->error_handler(cl->show_raw->data, addinfo);
297         client_show_raw_dequeue(cl);
298     }
299 }
300
301 static void client_send_raw_present(struct client *cl)
302 {
303     struct session_database *sdb = client_get_database(cl);
304     struct connection *co = client_get_connection(cl);
305     ZOOM_resultset set = cl->resultset;
306
307     int offset = cl->show_raw->position;
308     const char *syntax = 0;
309     const char *elements = 0;
310
311     assert(cl->show_raw);
312     assert(set);
313
314     yaz_log(YLOG_DEBUG, "%s: trying to present %d record(s) from %d",
315             client_get_url(cl), 1, offset);
316
317     if (cl->show_raw->syntax)
318         syntax = cl->show_raw->syntax;
319     else
320         syntax = session_setting_oneval(sdb, PZ_REQUESTSYNTAX);
321     ZOOM_resultset_option_set(set, "preferredRecordSyntax", syntax);
322
323     if (cl->show_raw->esn)
324         elements = cl->show_raw->esn;
325     else
326         elements = session_setting_oneval(sdb, PZ_ELEMENTS);
327     if (elements && *elements)
328         ZOOM_resultset_option_set(set, "elementSetName", elements);
329
330     ZOOM_resultset_records(set, 0, offset-1, 1);
331     cl->show_raw->active = 1;
332
333     connection_continue(co);
334 }
335
336 static int nativesyntax_to_type(struct session_database *sdb, char *type,
337                                 ZOOM_record rec)
338 {
339     const char *s = session_setting_oneval(sdb, PZ_NATIVESYNTAX);
340
341     if (s && *s)
342     {
343         if (!strncmp(s, "iso2709", 7))
344         {
345             const char *cp = strchr(s, ';');
346             yaz_snprintf(type, 80, "xml; charset=%s", cp ? cp+1 : "marc-8s");
347         }
348         else if (!strncmp(s, "xml", 3))
349         {
350             strcpy(type, "xml");
351         }
352         else
353             return -1;
354         return 0;
355     }
356     else  /* attempt to deduce structure */
357     {
358         const char *syntax = ZOOM_record_get(rec, "syntax", NULL);
359         if (syntax)
360         {
361             if (!strcmp(syntax, "XML"))
362             {
363                 strcpy(type, "xml");
364                 return 0;
365             }
366             else if (!strcmp(syntax, "USmarc") || !strcmp(syntax, "MARC21"))
367             {
368                 strcpy(type, "xml; charset=marc8-s");
369                 return 0;
370             }
371             else return -1;
372         }
373         else return -1;
374     }
375 }
376
377 static void ingest_raw_record(struct client *cl, ZOOM_record rec)
378 {
379     const char *buf;
380     int len;
381     char type[80];
382
383     if (cl->show_raw->binary)
384         strcpy(type, "raw");
385     else
386     {
387         struct session_database *sdb = client_get_database(cl);
388         nativesyntax_to_type(sdb, type, rec);
389     }
390
391     buf = ZOOM_record_get(rec, type, &len);
392     cl->show_raw->record_handler(cl->show_raw->data,  buf, len);
393     client_show_raw_dequeue(cl);
394 }
395
396 void client_search_response(struct client *cl)
397 {
398     struct connection *co = cl->connection;
399     struct session *se = cl->session;
400     ZOOM_connection link = connection_get_link(co);
401     ZOOM_resultset resultset = cl->resultset;
402     const char *error, *addinfo = 0;
403     
404     if (ZOOM_connection_error(link, &error, &addinfo))
405     {
406         cl->hits = 0;
407         client_set_state(cl, Client_Error);
408         yaz_log(YLOG_WARN, "Search error %s (%s): %s",
409                 error, addinfo, client_get_url(cl));
410     }
411     else
412     {
413         cl->record_offset = cl->startrecs;
414         cl->hits = ZOOM_resultset_size(resultset);
415         if (se)
416             se->total_hits += cl->hits;
417     }
418 }
419
420 void client_got_records(struct client *cl)
421 {
422     if (cl->session)
423     {
424         session_alert_watch(cl->session, SESSION_WATCH_SHOW);
425         session_alert_watch(cl->session, SESSION_WATCH_RECORD);
426     }
427 }
428
429 void client_record_response(struct client *cl)
430 {
431     static pthread_mutex_t ingest_mutex = PTHREAD_MUTEX_INITIALIZER;
432     static int ingest_counter = 0, ingest_max = 0;
433     struct connection *co = cl->connection;
434     ZOOM_connection link = connection_get_link(co);
435     ZOOM_resultset resultset = cl->resultset;
436     const char *error, *addinfo;
437
438     if (ZOOM_connection_error(link, &error, &addinfo))
439     {
440         client_set_state(cl, Client_Error);
441         yaz_log(YLOG_WARN, "Search error %s (%s): %s",
442             error, addinfo, client_get_url(cl));
443     }
444     else
445     {
446         ZOOM_record rec = 0;
447         const char *msg, *addinfo;
448         
449         if (cl->show_raw && cl->show_raw->active)
450         {
451             if ((rec = ZOOM_resultset_record(resultset,
452                                              cl->show_raw->position-1)))
453             {
454                 cl->show_raw->active = 0;
455                 ingest_raw_record(cl, rec);
456             }
457             else
458             {
459                 yaz_log(YLOG_WARN, "Expected record, but got NULL, offset=%d",
460                         cl->show_raw->position-1);
461             }
462         }
463         else
464         {
465             int offset = cl->record_offset;
466             if ((rec = ZOOM_resultset_record(resultset, offset)))
467             {
468                 cl->record_offset++;
469                 if (cl->session == 0)
470                     ;
471                 else if (ZOOM_record_error(rec, &msg, &addinfo, 0))
472                     yaz_log(YLOG_WARN, "Record error %s (%s): %s (rec #%d)",
473                             error, addinfo, client_get_url(cl),
474                             cl->record_offset);
475                 else
476                 {
477                     struct session_database *sdb = client_get_database(cl);
478                     NMEM nmem = nmem_create();
479                     const char *xmlrec;
480                     int new_max = 0;
481                     char type[80];
482                     yaz_log(YLOG_LOG, "Record ingest begin client=%p session=%p", cl, cl->session);
483                     pthread_mutex_lock(&ingest_mutex);
484                     ++ingest_counter;
485                     if (ingest_counter > ingest_max)
486                     {
487                         ingest_max = ingest_counter;
488                         new_max = ingest_max;
489                     }
490                     pthread_mutex_unlock(&ingest_mutex);
491                     if (new_max)
492                         yaz_log(YLOG_LOG, "New max client=%p new_max=%d", cl, new_max);
493                     if (nativesyntax_to_type(sdb, type, rec))
494                         yaz_log(YLOG_WARN, "Failed to determine record type");
495                     xmlrec = ZOOM_record_get(rec, type, NULL);
496                     if (!xmlrec)
497                         yaz_log(YLOG_WARN, "ZOOM_record_get failed from %s",
498                                 client_get_url(cl));
499                     else
500                     {
501                         if (ingest_record(cl, xmlrec, cl->record_offset, nmem))
502                             yaz_log(YLOG_WARN, "Failed to ingest from %s",
503                                     client_get_url(cl));
504                     }
505                     pthread_mutex_lock(&ingest_mutex);
506                     --ingest_counter;
507                     pthread_mutex_unlock(&ingest_mutex);
508                     nmem_destroy(nmem);
509                     yaz_log(YLOG_LOG, "Record ingest end client=%p session=%p max=%d", cl, cl->session, ingest_max);
510                 }
511             }
512             else
513             {
514                 yaz_log(YLOG_WARN, "Expected record, but got NULL, offset=%d",
515                         offset);
516             }
517         }
518     }
519 }
520
521 void client_start_search(struct client *cl)
522 {
523     struct session_database *sdb = client_get_database(cl);
524     struct connection *co = client_get_connection(cl);
525     ZOOM_connection link = connection_get_link(co);
526     ZOOM_resultset rs;
527     char *databaseName = sdb->database->databases[0];
528     const char *opt_piggyback = session_setting_oneval(sdb, PZ_PIGGYBACK);
529     const char *opt_queryenc = session_setting_oneval(sdb, PZ_QUERYENCODING);
530     const char *opt_elements = session_setting_oneval(sdb, PZ_ELEMENTS);
531     const char *opt_requestsyn = session_setting_oneval(sdb, PZ_REQUESTSYNTAX);
532     const char *opt_maxrecs = session_setting_oneval(sdb, PZ_MAXRECS);
533     const char *opt_sru = session_setting_oneval(sdb, PZ_SRU);
534     const char *opt_sort = session_setting_oneval(sdb, PZ_SORT);
535     char maxrecs_str[24], startrecs_str[24];
536
537     assert(link);
538
539     cl->hits = -1;
540     cl->record_offset = 0;
541     cl->diagnostic = 0;
542     client_set_state(cl, Client_Working);
543
544     if (*opt_piggyback)
545         ZOOM_connection_option_set(link, "piggyback", opt_piggyback);
546     else
547         ZOOM_connection_option_set(link, "piggyback", "1");
548     if (*opt_queryenc)
549         ZOOM_connection_option_set(link, "rpnCharset", opt_queryenc);
550     if (*opt_sru && *opt_elements)
551         ZOOM_connection_option_set(link, "schema", opt_elements);
552     else if (*opt_elements)
553         ZOOM_connection_option_set(link, "elementSetName", opt_elements);
554     if (*opt_requestsyn)
555         ZOOM_connection_option_set(link, "preferredRecordSyntax", opt_requestsyn);
556
557     if (!*opt_maxrecs)
558     {
559         sprintf(maxrecs_str, "%d", cl->maxrecs);
560         opt_maxrecs = maxrecs_str;
561     }
562     ZOOM_connection_option_set(link, "count", opt_maxrecs);
563
564
565     if (atoi(opt_maxrecs) > 20)
566         ZOOM_connection_option_set(link, "presentChunk", "20");
567     else
568         ZOOM_connection_option_set(link, "presentChunk", opt_maxrecs);
569
570     sprintf(startrecs_str, "%d", cl->startrecs);
571     ZOOM_connection_option_set(link, "start", startrecs_str);
572
573     if (databaseName)
574         ZOOM_connection_option_set(link, "databaseName", databaseName);
575
576     if (cl->cqlquery)
577     {
578         ZOOM_query q = ZOOM_query_create();
579         yaz_log(YLOG_LOG, "Search %s CQL: %s", sdb->database->url, cl->cqlquery);
580         ZOOM_query_cql(q, cl->cqlquery);
581         if (*opt_sort)
582             ZOOM_query_sortby(q, opt_sort);
583         rs = ZOOM_connection_search(link, q);
584         ZOOM_query_destroy(q);
585     }
586     else
587     {
588         yaz_log(YLOG_LOG, "Search %s PQF: %s", sdb->database->url, cl->pquery);
589         rs = ZOOM_connection_search_pqf(link, cl->pquery);
590     }
591     ZOOM_resultset_destroy(cl->resultset);
592     cl->resultset = rs;
593     connection_continue(co);
594 }
595
596 struct client *client_create(void)
597 {
598     struct client *r = xmalloc(sizeof(*r));
599     r->maxrecs = 100;
600     r->startrecs = 0;
601     r->pquery = 0;
602     r->cqlquery = 0;
603     r->database = 0;
604     r->connection = 0;
605     r->session = 0;
606     r->hits = 0;
607     r->record_offset = 0;
608     r->diagnostic = 0;
609     r->state = Client_Disconnected;
610     r->show_raw = 0;
611     r->resultset = 0;
612     r->next = 0;
613     r->mutex = 0;
614     yaz_mutex_create(&r->mutex);
615     yaz_mutex_set_name(r->mutex, "client");
616
617     r->ref_count = 1;
618     
619     return r;
620 }
621
622 void client_incref(struct client *c)
623 {
624     pazpar2_incref(&c->ref_count, c->mutex);
625     yaz_log(YLOG_DEBUG, "client_incref %s %d", client_get_url(c), c->ref_count);
626 }
627
628 int client_destroy(struct client *c)
629 {
630     if (c)
631     {
632         yaz_log(YLOG_DEBUG, "client_destroy %s %d",
633                 client_get_url(c), c->ref_count);
634         if (!pazpar2_decref(&c->ref_count, c->mutex))
635         {
636             c->next = 0;
637             xfree(c->pquery);
638             c->pquery = 0;
639             xfree(c->cqlquery);
640             c->cqlquery = 0;
641
642             ZOOM_resultset_destroy(c->resultset);
643             yaz_mutex_destroy(&c->mutex);
644             xfree(c);
645             return 1;
646         }
647     }
648     return 0;
649 }
650
651 void client_set_connection(struct client *cl, struct connection *con)
652 {
653     if (con)
654     {
655         assert(cl->connection == 0);
656         cl->connection = con;
657         client_incref(cl);
658     }
659     else
660     {
661         cl->connection = con;
662         client_destroy(cl);
663     }
664 }
665
666 void client_disconnect(struct client *cl)
667 {
668     if (cl->state != Client_Idle)
669         client_set_state(cl, Client_Disconnected);
670     client_set_connection(cl, 0);
671 }
672
673 // Extract terms from query into null-terminated termlist
674 static void extract_terms(NMEM nmem, struct ccl_rpn_node *query, char **termlist)
675 {
676     int num = 0;
677
678     pull_terms(nmem, query, termlist, &num);
679     termlist[num] = 0;
680 }
681
682 // Initialize CCL map for a target
683 static CCL_bibset prepare_cclmap(struct client *cl)
684 {
685     struct session_database *sdb = client_get_database(cl);
686     struct setting *s;
687     CCL_bibset res;
688
689     if (!sdb->settings)
690         return 0;
691     res = ccl_qual_mk();
692     for (s = sdb->settings[PZ_CCLMAP]; s; s = s->next)
693     {
694         char *p = strchr(s->name + 3, ':');
695         if (!p)
696         {
697             yaz_log(YLOG_WARN, "Malformed cclmap name: %s", s->name);
698             ccl_qual_rm(&res);
699             return 0;
700         }
701         p++;
702         ccl_qual_fitem(res, s->value, p);
703     }
704     return res;
705 }
706
707 // returns a xmalloced CQL query corresponding to the pquery in client
708 static char *make_cqlquery(struct client *cl)
709 {
710     cql_transform_t cqlt = cql_transform_create();
711     Z_RPNQuery *zquery;
712     char *r;
713     WRBUF wrb = wrbuf_alloc();
714     int status;
715     ODR odr_out = odr_createmem(ODR_ENCODE);
716
717     zquery = p_query_rpn(odr_out, cl->pquery);
718     yaz_log(YLOG_LOG, "PQF: %s", cl->pquery);
719     if ((status = cql_transform_rpn2cql_wrbuf(cqlt, wrb, zquery)))
720     {
721         yaz_log(YLOG_WARN, "Failed to generate CQL query, code=%d", status);
722         r = 0;
723     }
724     else
725     {
726         r = xstrdup(wrbuf_cstr(wrb));
727     }     
728     wrbuf_destroy(wrb);
729     odr_destroy(odr_out);
730     cql_transform_close(cqlt);
731     return r;
732 }
733
734 // Parse the query given the settings specific to this client
735 int client_parse_query(struct client *cl, const char *query)
736 {
737     struct session *se = client_get_session(cl);
738     struct session_database *sdb = client_get_database(cl);
739     struct ccl_rpn_node *cn;
740     int cerror, cpos;
741     CCL_bibset ccl_map = prepare_cclmap(cl);
742     const char *sru = session_setting_oneval(sdb, PZ_SRU);
743     const char *pqf_prefix = session_setting_oneval(sdb, PZ_PQF_PREFIX);
744     const char *pqf_strftime = session_setting_oneval(sdb, PZ_PQF_STRFTIME);
745
746     if (!ccl_map)
747         return -1;
748
749     cn = ccl_find_str(ccl_map, query, &cerror, &cpos);
750     ccl_qual_rm(&ccl_map);
751     if (!cn)
752     {
753         client_set_state(cl, Client_Error);
754         yaz_log(YLOG_WARN, "Failed to parse CCL query %s for %s",
755                 query,
756                 client_get_database(cl)->database->url);
757         return -1;
758     }
759     wrbuf_rewind(se->wrbuf);
760     if (*pqf_prefix)
761     {
762         wrbuf_puts(se->wrbuf, pqf_prefix);
763         wrbuf_puts(se->wrbuf, " ");
764     }
765     if (!pqf_strftime || !*pqf_strftime)
766         ccl_pquery(se->wrbuf, cn);
767     else
768     {
769         time_t cur_time = time(0);
770         struct tm *tm =  localtime(&cur_time);
771         char tmp_str[300];
772         const char *cp = tmp_str;
773
774         /* see man strftime(3) for things .. In particular %% gets converted
775          to %.. And That's our original query .. */
776         strftime(tmp_str, sizeof(tmp_str)-1, pqf_strftime, tm);
777         for (; *cp; cp++)
778         {
779             if (cp[0] == '%')
780                 ccl_pquery(se->wrbuf, cn);
781             else
782                 wrbuf_putc(se->wrbuf, cp[0]);
783         }
784     }
785     xfree(cl->pquery);
786     cl->pquery = xstrdup(wrbuf_cstr(se->wrbuf));
787
788     xfree(cl->cqlquery);
789     if (*sru)
790     {
791         if (!(cl->cqlquery = make_cqlquery(cl)))
792             return -1;
793     }
794     else
795         cl->cqlquery = 0;
796
797     if (!se->relevance)
798     {
799         // Initialize relevance structure with query terms
800         char *p[512];
801         extract_terms(se->nmem, cn, p);
802         se->relevance = relevance_create(
803             se->service->relevance_pct,
804             se->nmem, (const char **) p);
805     }
806
807     ccl_rpn_delete(cn);
808     return 0;
809 }
810
811
812 void client_remove_from_session(struct client *c)
813 {
814     struct session *se;
815     client_incref(c);
816
817     se = c->session;
818     assert(se);
819     if (se)
820     {
821         struct client **ccp = &se->clients;
822         
823         while (*ccp && *ccp != c)
824             ccp = &(*ccp)->next;
825         assert(*ccp == c);
826         *ccp = c->next;
827         
828         c->database = 0;
829         c->session = 0;
830         c->next = 0;
831     }
832     client_destroy(c);
833 }
834
835 void client_set_session(struct client *cl, struct session *se)
836 {
837     cl->session = se;
838     cl->next = se->clients;
839     se->clients = cl;
840 }
841
842 int client_is_active(struct client *cl)
843 {
844     if (cl->connection && (cl->state == Client_Connecting ||
845                            cl->state == Client_Working))
846         return 1;
847     return 0;
848 }
849
850 struct client *client_next_in_session(struct client *cl)
851 {
852     if (cl)
853         return cl->next;
854     return 0;
855
856 }
857
858 Odr_int client_get_hits(struct client *cl)
859 {
860     return cl->hits;
861 }
862
863 int client_get_num_records(struct client *cl)
864 {
865     return cl->record_offset;
866 }
867
868 void client_set_diagnostic(struct client *cl, int diagnostic)
869 {
870     cl->diagnostic = diagnostic;
871 }
872
873 int client_get_diagnostic(struct client *cl)
874 {
875     return cl->diagnostic;
876 }
877
878 void client_set_database(struct client *cl, struct session_database *db)
879 {
880     cl->database = db;
881 }
882
883 struct host *client_get_host(struct client *cl)
884 {
885     return client_get_database(cl)->database->host;
886 }
887
888 const char *client_get_url(struct client *cl)
889 {
890     if (cl->database)
891         return client_get_database(cl)->database->url;
892     else
893         return "NOURL";
894 }
895
896 void client_set_maxrecs(struct client *cl, int v)
897 {
898     cl->maxrecs = v;
899 }
900
901 void client_set_startrecs(struct client *cl, int v)
902 {
903     cl->startrecs = v;
904 }
905
906 /*
907  * Local variables:
908  * c-basic-offset: 4
909  * c-file-style: "Stroustrup"
910  * indent-tabs-mode: nil
911  * End:
912  * vim: shiftwidth=4 tabstop=8 expandtab
913  */
914