Caches NamePlusRecord BER; works well
[yaz-moved-to-github.git] / src / zoom-record-cache.c
1 /* This file is part of the YAZ toolkit.
2  * Copyright (C) Index Data
3  * See the file LICENSE for details.
4  */
5 /**
6  * \file zoom-record-cache.c
7  * \brief Implements ZOOM record caching
8  */
9 #if HAVE_CONFIG_H
10 #include <config.h>
11 #endif
12
13 #include <assert.h>
14 #include <string.h>
15 #include <errno.h>
16 #include "zoom-p.h"
17
18 #include <yaz/diagbib1.h>
19 #include <yaz/record_render.h>
20 #include <yaz/shptr.h>
21
22 #if SHPTR
23 YAZ_SHPTR_TYPE(WRBUF)
24 #endif
25
26 struct ZOOM_record_p {
27     ODR odr;
28 #if SHPTR
29     struct WRBUF_shptr *record_wrbuf;
30 #else
31     WRBUF wrbuf;
32 #endif
33
34     Z_NamePlusRecord *npr;
35     const char *schema;
36
37     const char *diag_uri;
38     const char *diag_message;
39     const char *diag_details;
40     const char *diag_set;
41 };
42
43 struct ZOOM_record_cache_p {
44     struct ZOOM_record_p rec;
45     char *elementSetName;
46     char *syntax;
47     char *schema;
48     int pos;
49     ZOOM_record_cache next;
50 };
51
52 static size_t record_hash(int pos)
53 {
54     if (pos < 0)
55         pos = 0;
56     return pos % RECORD_HASH_SIZE;
57 }
58
59 static ZOOM_record record_cache_add(ZOOM_resultset r,
60                                     Z_NamePlusRecord *npr,
61                                     int pos,
62                                     const char *syntax,
63                                     const char *elementSetName,
64                                     const char *schema,
65                                     Z_SRW_diagnostic *diag)
66 {
67     ZOOM_record_cache rc = 0;
68
69     ZOOM_Event event = ZOOM_Event_create(ZOOM_EVENT_RECV_RECORD);
70     ZOOM_connection_put_event(r->connection, event);
71
72     for (rc = r->record_hash[record_hash(pos)]; rc; rc = rc->next)
73     {
74         if (pos == rc->pos
75             && yaz_strcmp_null(schema, rc->schema) == 0
76             && yaz_strcmp_null(elementSetName,rc->elementSetName) == 0
77             && yaz_strcmp_null(syntax, rc->syntax) == 0)
78             break;
79     }
80     if (!rc)
81     {
82         rc = (ZOOM_record_cache) odr_malloc(r->odr, sizeof(*rc));
83         rc->rec.odr = 0;
84 #if SHPTR
85         YAZ_SHPTR_INC(r->record_wrbuf);
86         rc->rec.record_wrbuf = r->record_wrbuf;
87 #else
88         rc->rec.wrbuf = 0;
89 #endif
90         rc->elementSetName = odr_strdup_null(r->odr, elementSetName);
91
92         rc->syntax = odr_strdup_null(r->odr, syntax);
93
94         rc->schema = odr_strdup_null(r->odr, schema);
95
96         rc->pos = pos;
97         rc->next = r->record_hash[record_hash(pos)];
98         r->record_hash[record_hash(pos)] = rc;
99
100     }
101
102     rc->rec.npr = npr;
103     rc->rec.schema = odr_strdup_null(r->odr, schema);
104     rc->rec.diag_set = 0;
105     rc->rec.diag_uri = 0;
106     rc->rec.diag_message = 0;
107     rc->rec.diag_details = 0;
108     if (diag)
109     {
110         if (diag->uri)
111         {
112             char *cp;
113             rc->rec.diag_set = odr_strdup(r->odr, diag->uri);
114             if ((cp = strrchr(rc->rec.diag_set, '/')))
115                 *cp = '\0';
116             rc->rec.diag_uri = odr_strdup(r->odr, diag->uri);
117         }
118         rc->rec.diag_message = odr_strdup_null(r->odr, diag->message);
119         rc->rec.diag_details = odr_strdup_null(r->odr, diag->details);
120     }
121     return &rc->rec;
122 }
123
124 void ZOOM_record_cache_add(ZOOM_resultset r, Z_NamePlusRecord *npr,
125                            int pos,
126                            const char *syntax, const char *elementSetName,
127                            const char *schema,
128                            Z_SRW_diagnostic *diag)
129 {
130     record_cache_add(r, npr, pos, syntax, elementSetName, schema, diag);
131 #if HAVE_LIBMEMCACHED_MEMCACHED_H
132     if (r->connection->mc_st &&
133         !diag && npr->which == Z_NamePlusRecord_databaseRecord)
134     {
135         WRBUF k = wrbuf_alloc();
136         uint32_t flags = 0;
137         memcached_return_t rc;
138         time_t expiration = 36000;
139         ODR odr = odr_createmem(ODR_ENCODE);
140         char *rec_buf;
141         int rec_len;
142
143         z_NamePlusRecord(odr, &npr, 0, 0);
144         rec_buf = odr_getbuf(odr, &rec_len, 0);
145
146         wrbuf_write(k, wrbuf_buf(r->mc_key), wrbuf_len(r->mc_key));
147         wrbuf_printf(k, ";%d;%s;%s;%s", pos,
148                      syntax ? syntax : "",
149                      elementSetName ? elementSetName : "",
150                      schema ? schema : "");
151         rc = memcached_set(r->connection->mc_st,
152                            wrbuf_buf(k),wrbuf_len(k),
153                            rec_buf, rec_len,
154                            expiration, flags);
155
156         yaz_log(YLOG_LOG, "Store record lkey=%s len=%d rc=%u %s",
157                 wrbuf_cstr(k), rec_len, (unsigned) rc,
158                 memcached_last_error_message(r->connection->mc_st));
159         odr_destroy(odr);
160         wrbuf_destroy(k);
161     }
162 #endif
163 }
164
165
166 ZOOM_record ZOOM_record_cache_lookup(ZOOM_resultset r, int pos,
167                                      const char *syntax,
168                                      const char *elementSetName,
169                                      const char *schema)
170 {
171     ZOOM_record_cache rc;
172
173     for (rc = r->record_hash[record_hash(pos)]; rc; rc = rc->next)
174     {
175         if (pos == rc->pos)
176         {
177             if (yaz_strcmp_null(schema, rc->schema))
178                 continue;
179             if (yaz_strcmp_null(elementSetName,rc->elementSetName))
180                 continue;
181             if (yaz_strcmp_null(syntax, rc->syntax))
182                 continue;
183             return &rc->rec;
184         }
185     }
186 #if HAVE_LIBMEMCACHED_MEMCACHED_H
187     if (r->connection && r->connection->mc_st)
188     {
189         WRBUF k = wrbuf_alloc();
190         size_t v_len;
191         char *v_buf;
192         uint32_t flags;
193         memcached_return_t rc;
194
195         wrbuf_write(k, wrbuf_buf(r->mc_key), wrbuf_len(r->mc_key));
196         wrbuf_printf(k, ";%d;%s;%s;%s", pos,
197                      syntax ? syntax : "",
198                      elementSetName ? elementSetName : "",
199                      schema ? schema : "");
200
201         yaz_log(YLOG_LOG, "Lookup record %s", wrbuf_cstr(k));
202         v_buf = memcached_get(r->connection->mc_st, wrbuf_buf(k), wrbuf_len(k),
203                               &v_len, &flags, &rc);
204         wrbuf_destroy(k);
205         if (v_buf)
206         {
207             Z_NamePlusRecord *npr = 0;
208
209             odr_setbuf(r->odr, v_buf, v_len, 0);
210
211             z_NamePlusRecord(r->odr, &npr, 0, 0);
212             free(v_buf);
213             if (npr)
214             {
215                 yaz_log(YLOG_LOG, "returned memcached copy");
216                 return record_cache_add(r, npr, pos, syntax, elementSetName,
217                                         schema, 0);
218             }
219             yaz_log(YLOG_WARN, "memcached_get npr failed v_len=%ld",
220                     (long) v_len);
221         }
222     }
223 #endif
224     return 0;
225 }
226
227 ZOOM_API(ZOOM_record)
228     ZOOM_record_clone(ZOOM_record srec)
229 {
230     char *buf;
231     int size;
232     ODR odr_enc;
233     ZOOM_record nrec;
234
235     odr_enc = odr_createmem(ODR_ENCODE);
236     if (!z_NamePlusRecord(odr_enc, &srec->npr, 0, 0))
237         return 0;
238     buf = odr_getbuf(odr_enc, &size, 0);
239
240     nrec = (ZOOM_record) xmalloc(sizeof(*nrec));
241     nrec->odr = odr_createmem(ODR_DECODE);
242 #if SHPTR
243     nrec->record_wrbuf = 0;
244 #else
245     nrec->wrbuf = 0;
246 #endif
247     odr_setbuf(nrec->odr, buf, size, 0);
248     z_NamePlusRecord(nrec->odr, &nrec->npr, 0, 0);
249
250     nrec->schema = odr_strdup_null(nrec->odr, srec->schema);
251     nrec->diag_uri = odr_strdup_null(nrec->odr, srec->diag_uri);
252     nrec->diag_message = odr_strdup_null(nrec->odr, srec->diag_message);
253     nrec->diag_details = odr_strdup_null(nrec->odr, srec->diag_details);
254     nrec->diag_set = odr_strdup_null(nrec->odr, srec->diag_set);
255     odr_destroy(odr_enc);
256     return nrec;
257 }
258
259 static void ZOOM_record_release(ZOOM_record rec)
260 {
261     if (!rec)
262         return;
263
264 #if SHPTR
265     if (rec->record_wrbuf)
266         YAZ_SHPTR_DEC(rec->record_wrbuf, wrbuf_destroy);
267 #else
268     if (rec->wrbuf)
269         wrbuf_destroy(rec->wrbuf);
270 #endif
271
272     if (rec->odr)
273         odr_destroy(rec->odr);
274 }
275
276 ZOOM_API(void)
277     ZOOM_resultset_cache_reset(ZOOM_resultset r)
278 {
279     int i;
280     for (i = 0; i<RECORD_HASH_SIZE; i++)
281     {
282         ZOOM_record_cache rc;
283         for (rc = r->record_hash[i]; rc; rc = rc->next)
284         {
285             ZOOM_record_release(&rc->rec);
286         }
287         r->record_hash[i] = 0;
288     }
289 }
290
291
292 ZOOM_API(const char *)
293     ZOOM_record_get(ZOOM_record rec, const char *type_spec, int *len)
294 {
295     WRBUF wrbuf;
296
297     if (len)
298         *len = 0; /* default return */
299
300     if (!rec || !rec->npr)
301         return 0;
302
303 #if SHPTR
304     if (!rec->record_wrbuf)
305     {
306         WRBUF w = wrbuf_alloc();
307         YAZ_SHPTR_INIT(rec->record_wrbuf, w);
308     }
309     wrbuf = rec->record_wrbuf->ptr;
310 #else
311     if (!rec->wrbuf)
312         rec->wrbuf = wrbuf_alloc();
313     wrbuf = rec->wrbuf;
314 #endif
315     return yaz_record_render(rec->npr, rec->schema, wrbuf, type_spec, len);
316 }
317
318 ZOOM_API(int)
319     ZOOM_record_error(ZOOM_record rec, const char **cp,
320                       const char **addinfo, const char **diagset)
321 {
322     Z_NamePlusRecord *npr;
323
324     if (!rec)
325         return 0;
326
327     npr = rec->npr;
328     if (rec->diag_uri)
329     {
330         if (cp)
331             *cp = rec->diag_message;
332         if (addinfo)
333             *addinfo = rec->diag_details;
334         if (diagset)
335             *diagset = rec->diag_set;
336         return ZOOM_uri_to_code(rec->diag_uri);
337     }
338     if (npr && npr->which == Z_NamePlusRecord_surrogateDiagnostic)
339     {
340         Z_DiagRec *diag_rec = npr->u.surrogateDiagnostic;
341         int error = YAZ_BIB1_UNSPECIFIED_ERROR;
342         const char *add = 0;
343
344         if (diag_rec->which == Z_DiagRec_defaultFormat)
345         {
346             Z_DefaultDiagFormat *ddf = diag_rec->u.defaultFormat;
347             oid_class oclass;
348
349             error = *ddf->condition;
350             switch (ddf->which)
351             {
352             case Z_DefaultDiagFormat_v2Addinfo:
353                 add = ddf->u.v2Addinfo;
354                 break;
355             case Z_DefaultDiagFormat_v3Addinfo:
356                 add = ddf->u.v3Addinfo;
357                 break;
358             }
359             if (diagset)
360                 *diagset =
361                     yaz_oid_to_string(yaz_oid_std(),
362                                       ddf->diagnosticSetId, &oclass);
363         }
364         else
365         {
366             if (diagset)
367                 *diagset = "Bib-1";
368         }
369         if (addinfo)
370             *addinfo = add ? add : "";
371         if (cp)
372             *cp = diagbib1_str(error);
373         return error;
374     }
375     return 0;
376 }
377
378 ZOOM_API(void)
379     ZOOM_record_destroy(ZOOM_record rec)
380 {
381     ZOOM_record_release(rec);
382     xfree(rec);
383 }
384
385 /*
386  * Local variables:
387  * c-basic-offset: 4
388  * c-file-style: "Stroustrup"
389  * indent-tabs-mode: nil
390  * End:
391  * vim: shiftwidth=4 tabstop=8 expandtab
392  */
393