fbdc886d85b6afe6e4cbeaf5516ebd3173af463d
[yaz-moved-to-github.git] / src / zoom-memcached.c
1 /* This file is part of the YAZ toolkit.
2  * Copyright (C) Index Data
3  * See the file LICENSE for details.
4  */
5 /**
6  * \file zoom-memcached.c
7  * \brief Implements query/record caching using memcached
8  */
9 #if HAVE_CONFIG_H
10 #include <config.h>
11 #endif
12
13 #include <assert.h>
14 #include <string.h>
15 #include <errno.h>
16 #include "zoom-p.h"
17
18 #include <yaz/yaz-util.h>
19 #include <yaz/xmalloc.h>
20 #include <yaz/log.h>
21 #include <yaz/diagbib1.h>
22
23 #if HAVE_LIBMEMCACHED
24 #if LIBMEMCACHED_VERSION_HEX >= 0x01000000
25 #define HAVE_MEMCACHED_FUNC 1
26 #else
27 #define HAVE_MEMCACHED_FUNC 0
28 #endif
29 #endif
30
31 void ZOOM_memcached_init(ZOOM_connection c)
32 {
33 #if HAVE_LIBMEMCACHED
34     c->mc_st = 0;
35 #endif
36 #if HAVE_HIREDIS
37     c->redis_c = 0;
38 #endif
39 }
40
41 void ZOOM_memcached_destroy(ZOOM_connection c)
42 {
43 #if HAVE_LIBMEMCACHED
44     if (c->mc_st)
45         memcached_free(c->mc_st);
46 #endif
47 #if HAVE_HIREDIS
48     if (c->redis_c)
49         redisFree(c->redis_c);
50 #endif
51 }
52
53 #if HAVE_LIBMEMCACHED
54 /* memcached wrapper.. Because memcached function do not exist in older libs */
55 static memcached_st *yaz_memcached_wrap(const char *conf)
56 {
57 #if HAVE_MEMCACHED_FUNC
58     return memcached(conf, strlen(conf));
59 #else
60     char **darray;
61     int i, num;
62     memcached_st *mc = memcached_create(0);
63     NMEM nmem = nmem_create();
64     memcached_return_t rc;
65
66     nmem_strsplit_blank(nmem, conf, &darray, &num);
67     for (i = 0; mc && i < num; i++)
68     {
69         if (!yaz_strncasecmp(darray[i], "--SERVER=", 9))
70         {
71             char *host = darray[i] + 9;
72             char *port = strchr(host, ':');
73             char *weight = strstr(host, "/?");
74             if (port)
75                 *port++ = '\0';
76             if (weight)
77             {
78                 *weight = '\0';
79                 weight += 2;
80             }
81             rc = memcached_server_add(mc, host, port ? atoi(port) : 11211);
82             yaz_log(YLOG_LOG, "memcached_server_add host=%s rc=%u %s",
83                     host, (unsigned) rc, memcached_strerror(mc, rc));
84             if (rc != MEMCACHED_SUCCESS)
85             {
86                 memcached_free(mc);
87                 mc = 0;
88             }
89         }
90         else
91         {
92             /* bad directive */
93             memcached_free(mc);
94             mc = 0;
95         }
96     }
97     nmem_destroy(nmem);
98     return mc;
99 #endif
100 }
101 #endif
102
103 #if HAVE_HIREDIS
104 static redisContext *create_redis(const char *conf)
105 {
106     char **darray;
107     int i, num;
108     NMEM nmem = nmem_create();
109     redisContext *context = 0;
110
111     nmem_strsplit_blank(nmem, conf, &darray, &num);
112     for (i = 0; i < num; i++)
113     {
114         if (!yaz_strncasecmp(darray[i], "--SERVER=", 9))
115         {
116             struct timeval timeout = { 1, 500000 }; /* 1.5 seconds */
117             char *host = darray[i] + 9;
118             char *port = strchr(host, ':');
119             if (port)
120                 *port++ = '\0';
121             context = redisConnectWithTimeout(host,
122                                               port ? atoi(port) : 6379,
123                                               timeout);
124         }
125     }
126     nmem_destroy(nmem);
127     return context;
128 }
129 #endif
130
131 int ZOOM_memcached_configure(ZOOM_connection c)
132 {
133     const char *val;
134 #if HAVE_HIREDIS
135     if (c->redis_c)
136     {
137         redisFree(c->redis_c);
138         c->redis_c = 0;
139     }
140 #endif
141 #if HAVE_LIBMEMCACHED
142     if (c->mc_st)
143     {
144         memcached_free(c->mc_st);
145         c->mc_st = 0;
146     }
147 #endif
148
149     val = ZOOM_options_get(c->options, "redis");
150     if (val && *val)
151     {
152 #if HAVE_HIREDIS
153         c->redis_c = create_redis(val);
154         if (c->redis_c == 0 || c->redis_c->err)
155         {
156             ZOOM_set_error(c, ZOOM_ERROR_MEMCACHED,
157                            "could not create redis");
158             return -1;
159         }
160         return 0; /* don't bother with memcached if redis is enabled */
161 #else
162         ZOOM_set_error(c, ZOOM_ERROR_MEMCACHED, "not enabled");
163         return -1;
164 #endif
165     }
166     val = ZOOM_options_get(c->options, "memcached");
167     if (val && *val)
168     {
169 #if HAVE_LIBMEMCACHED
170         c->mc_st = yaz_memcached_wrap(val);
171         if (!c->mc_st)
172         {
173             ZOOM_set_error(c, ZOOM_ERROR_MEMCACHED,
174                            "could not create memcached");
175             return -1;
176         }
177         memcached_behavior_set(c->mc_st, MEMCACHED_BEHAVIOR_BINARY_PROTOCOL, 1);
178 #else
179         ZOOM_set_error(c, ZOOM_ERROR_MEMCACHED, "not enabled");
180         return -1;
181 #endif
182     }
183     return 0;
184 }
185
186 #if HAVE_GCRYPT_H
187 static void wrbuf_vary_puts(WRBUF w, const char *v)
188 {
189     if (v)
190     {
191         if (strlen(v) > 40)
192         {
193             wrbuf_sha1_puts(w, v, 1);
194         }
195         else
196         {
197             wrbuf_puts(w, v);
198         }
199     }
200 }
201 #endif
202
203 void ZOOM_memcached_resultset(ZOOM_resultset r, ZOOM_query q)
204 {
205 #if HAVE_GCRYPT_H
206     ZOOM_connection c = r->connection;
207
208     r->mc_key = wrbuf_alloc();
209     wrbuf_puts(r->mc_key, "1;");
210     wrbuf_vary_puts(r->mc_key, c->host_port);
211     wrbuf_puts(r->mc_key, ";");
212     wrbuf_vary_puts(r->mc_key, ZOOM_resultset_option_get(r, "extraArgs"));
213     wrbuf_puts(r->mc_key, ";");
214     wrbuf_vary_puts(r->mc_key, c->user);
215     wrbuf_puts(r->mc_key, ";");
216     wrbuf_vary_puts(r->mc_key, c->group);
217     wrbuf_puts(r->mc_key, ";");
218     if (c->password)
219         wrbuf_sha1_puts(r->mc_key, c->password, 1);
220     wrbuf_puts(r->mc_key, ";");
221     {
222         WRBUF w = wrbuf_alloc();
223         ZOOM_query_get_hash(q, w);
224         wrbuf_sha1_puts(r->mc_key, wrbuf_cstr(w), 1);
225         wrbuf_destroy(w);
226     }
227     wrbuf_puts(r->mc_key, ";");
228     wrbuf_vary_puts(r->mc_key, r->req_facets);
229 #endif
230 }
231
232 void ZOOM_memcached_search(ZOOM_connection c, ZOOM_resultset resultset)
233 {
234 #if HAVE_HIREDIS
235     if (c->redis_c && resultset->live_set == 0)
236     {
237         redisReply *reply;
238         const char *argv[2];
239
240         argv[0] = "GET";
241         argv[1] = wrbuf_cstr(resultset->mc_key);
242
243         reply = redisCommandArgv(c->redis_c, 2, argv, 0);
244         /* count;precision (ASCII) + '\0' + BER buffer for otherInformation */
245         if (reply && reply->type == REDIS_REPLY_STRING)
246         {
247             char *v = reply->str;
248             int v_len = reply->len;
249             ZOOM_Event event;
250             size_t lead_len = strlen(v) + 1;
251
252             resultset->size = odr_atoi(v);
253
254             yaz_log(YLOG_LOG, "For key %s got value %s lead_len=%d len=%d",
255                     wrbuf_cstr(resultset->mc_key), v, (int) lead_len,
256                     (int) v_len);
257             if (v_len > lead_len)
258             {
259                 Z_OtherInformation *oi = 0;
260                 int oi_len = v_len - lead_len;
261                 odr_setbuf(resultset->odr, v + lead_len, oi_len, 0);
262                 if (!z_OtherInformation(resultset->odr, &oi, 0, 0))
263                 {
264                     yaz_log(YLOG_WARN, "oi decoding failed");
265                     freeReplyObject(reply);
266                     return;
267                 }
268                 ZOOM_handle_search_result(c, resultset, oi);
269                 ZOOM_handle_facet_result(c, resultset, oi);
270             }
271             event = ZOOM_Event_create(ZOOM_EVENT_RECV_SEARCH);
272             ZOOM_connection_put_event(c, event);
273             resultset->live_set = 1;
274         }
275         freeReplyObject(reply);
276     }
277 #endif
278 #if HAVE_LIBMEMCACHED
279     if (c->mc_st && resultset->live_set == 0)
280     {
281         size_t v_len;
282         uint32_t flags;
283         memcached_return_t rc;
284         char *v = memcached_get(c->mc_st, wrbuf_buf(resultset->mc_key),
285                                 wrbuf_len(resultset->mc_key),
286                                 &v_len, &flags, &rc);
287         /* count;precision (ASCII) + '\0' + BER buffer for otherInformation */
288         if (v)
289         {
290             ZOOM_Event event;
291             size_t lead_len = strlen(v) + 1;
292
293             resultset->size = odr_atoi(v);
294
295             yaz_log(YLOG_LOG, "For key %s got value %s lead_len=%d len=%d",
296                     wrbuf_cstr(resultset->mc_key), v, (int) lead_len,
297                     (int) v_len);
298             if (v_len > lead_len)
299             {
300                 Z_OtherInformation *oi = 0;
301                 int oi_len = v_len - lead_len;
302                 odr_setbuf(resultset->odr, v + lead_len, oi_len, 0);
303                 if (!z_OtherInformation(resultset->odr, &oi, 0, 0))
304                 {
305                     yaz_log(YLOG_WARN, "oi decoding failed");
306                     free(v);
307                     return;
308                 }
309                 ZOOM_handle_search_result(c, resultset, oi);
310                 ZOOM_handle_facet_result(c, resultset, oi);
311             }
312             free(v);
313             event = ZOOM_Event_create(ZOOM_EVENT_RECV_SEARCH);
314             ZOOM_connection_put_event(c, event);
315             resultset->live_set = 1;
316         }
317     }
318 #endif
319 }
320
321 void ZOOM_memcached_hitcount(ZOOM_connection c, ZOOM_resultset resultset,
322                              Z_OtherInformation *oi, const char *precision)
323 {
324 #if HAVE_HIREDIS
325     if (c->redis_c && resultset->live_set == 0)
326     {
327         char *str;
328         ODR odr = odr_createmem(ODR_ENCODE);
329         char *oi_buf = 0;
330         int oi_len = 0;
331         char *key;
332
333         str = odr_malloc(odr, 20 + strlen(precision));
334         /* count;precision (ASCII) + '\0' + BER buffer for otherInformation */
335         sprintf(str, ODR_INT_PRINTF ";%s", resultset->size, precision);
336         if (oi)
337         {
338             z_OtherInformation(odr, &oi, 0, 0);
339             oi_buf = odr_getbuf(odr, &oi_len, 0);
340         }
341         key = odr_malloc(odr, strlen(str) + 1 + oi_len);
342         strcpy(key, str);
343         if (oi_len)
344             memcpy(key + strlen(str) + 1, oi_buf, oi_len);
345
346         {
347             redisReply *reply;
348             const char *argv[3];
349             size_t argvlen[3];
350             argv[0] = "SET";
351             argvlen[0] = 3;
352             argv[1] = wrbuf_buf(resultset->mc_key);
353             argvlen[1] = wrbuf_len(resultset->mc_key);
354             argv[2] = key;
355             argvlen[2] = strlen(str) + 1 + oi_len;
356             reply = redisCommandArgv(c->redis_c, 3, argv, argvlen);
357             freeReplyObject(reply);
358         }
359         odr_destroy(odr);
360     }
361 #endif
362 #if HAVE_LIBMEMCACHED
363     if (c->mc_st && resultset->live_set == 0)
364     {
365         uint32_t flags = 0;
366         memcached_return_t rc;
367         time_t expiration = 36000;
368         char *str;
369         ODR odr = odr_createmem(ODR_ENCODE);
370         char *oi_buf = 0;
371         int oi_len = 0;
372         char *key;
373
374         str = odr_malloc(odr, 20 + strlen(precision));
375         /* count;precision (ASCII) + '\0' + BER buffer for otherInformation */
376         sprintf(str, ODR_INT_PRINTF ";%s", resultset->size, precision);
377         if (oi)
378         {
379             z_OtherInformation(odr, &oi, 0, 0);
380             oi_buf = odr_getbuf(odr, &oi_len, 0);
381         }
382         key = odr_malloc(odr, strlen(str) + 1 + oi_len);
383         strcpy(key, str);
384         if (oi_len)
385             memcpy(key + strlen(str) + 1, oi_buf, oi_len);
386
387         rc = memcached_set(c->mc_st,
388                            wrbuf_buf(resultset->mc_key),
389                            wrbuf_len(resultset->mc_key),
390                            key, strlen(str) + 1 + oi_len, expiration, flags);
391         yaz_log(YLOG_LOG, "Store hit count key=%s value=%s oi_len=%d rc=%u %s",
392                 wrbuf_cstr(resultset->mc_key), str, oi_len, (unsigned) rc,
393                 memcached_strerror(c->mc_st, rc));
394         odr_destroy(odr);
395     }
396 #endif
397 }
398
399 void ZOOM_memcached_add(ZOOM_resultset r, Z_NamePlusRecord *npr,
400                         int pos,
401                         const char *syntax, const char *elementSetName,
402                         const char *schema,
403                         Z_SRW_diagnostic *diag)
404 {
405 #if HAVE_HIREDIS
406     if (r->connection->redis_c &&
407         !diag && npr->which == Z_NamePlusRecord_databaseRecord)
408     {
409         WRBUF k = wrbuf_alloc();
410         WRBUF rec_sha1 = wrbuf_alloc();
411         ODR odr = odr_createmem(ODR_ENCODE);
412         char *rec_buf;
413         int rec_len;
414         const char *argv[3];
415         size_t argvlen[3];
416         redisReply *reply;
417
418         z_NamePlusRecord(odr, &npr, 0, 0);
419         rec_buf = odr_getbuf(odr, &rec_len, 0);
420
421         wrbuf_write(k, wrbuf_buf(r->mc_key), wrbuf_len(r->mc_key));
422         wrbuf_printf(k, ";%d;%s;%s;%s", pos,
423                      syntax ? syntax : "",
424                      elementSetName ? elementSetName : "",
425                      schema ? schema : "");
426
427         wrbuf_sha1_write(rec_sha1, rec_buf, rec_len, 1);
428
429         argv[0] = "SET";
430         argvlen[0] = 3;
431         argv[1] = wrbuf_buf(k);
432         argvlen[1] = wrbuf_len(k);
433         argv[2] = wrbuf_buf(rec_sha1);
434         argvlen[2] = wrbuf_len(rec_sha1);
435
436         reply = redisCommandArgv(r->connection->redis_c, 3, argv, argvlen);
437         yaz_log(YLOG_LOG, "Store record key=%s val=%s",
438                 wrbuf_cstr(k), wrbuf_cstr(rec_sha1));
439         freeReplyObject(reply);
440
441         argv[1] = wrbuf_buf(rec_sha1);
442         argvlen[1] = wrbuf_len(rec_sha1);
443         argv[2] = rec_buf;
444         argvlen[2] = rec_len;
445
446         reply = redisCommandArgv(r->connection->redis_c, 3, argv, argvlen);
447         yaz_log(YLOG_LOG, "Add record key=%s rec_len=%d",
448                 wrbuf_cstr(rec_sha1), rec_len);
449         freeReplyObject(reply);
450
451         odr_destroy(odr);
452         wrbuf_destroy(k);
453         wrbuf_destroy(rec_sha1);
454     }
455 #endif
456 #if HAVE_LIBMEMCACHED
457     if (r->connection->mc_st &&
458         !diag && npr->which == Z_NamePlusRecord_databaseRecord)
459     {
460         WRBUF k = wrbuf_alloc();
461         WRBUF rec_sha1 = wrbuf_alloc();
462         uint32_t flags = 0;
463         memcached_return_t rc;
464         time_t expiration = 36000;
465         ODR odr = odr_createmem(ODR_ENCODE);
466         char *rec_buf;
467         int rec_len;
468
469         z_NamePlusRecord(odr, &npr, 0, 0);
470         rec_buf = odr_getbuf(odr, &rec_len, 0);
471
472         wrbuf_write(k, wrbuf_buf(r->mc_key), wrbuf_len(r->mc_key));
473         wrbuf_printf(k, ";%d;%s;%s;%s", pos,
474                      syntax ? syntax : "",
475                      elementSetName ? elementSetName : "",
476                      schema ? schema : "");
477
478         wrbuf_sha1_write(rec_sha1, rec_buf, rec_len, 1);
479
480         rc = memcached_set(r->connection->mc_st,
481                            wrbuf_buf(k), wrbuf_len(k),
482                            wrbuf_buf(rec_sha1), wrbuf_len(rec_sha1),
483                            expiration, flags);
484
485         yaz_log(YLOG_LOG, "Store record key=%s val=%s rc=%u %s",
486                 wrbuf_cstr(k), wrbuf_cstr(rec_sha1), (unsigned) rc,
487                 memcached_strerror(r->connection->mc_st, rc));
488
489         rc = memcached_add(r->connection->mc_st,
490                            wrbuf_buf(rec_sha1), wrbuf_len(rec_sha1),
491                            rec_buf, rec_len,
492                            expiration, flags);
493
494         yaz_log(YLOG_LOG, "Add record key=%s rec_len=%d rc=%u %s",
495                 wrbuf_cstr(rec_sha1), rec_len, (unsigned) rc,
496                 memcached_strerror(r->connection->mc_st, rc));
497
498         odr_destroy(odr);
499         wrbuf_destroy(k);
500         wrbuf_destroy(rec_sha1);
501     }
502 #endif
503 }
504
505 Z_NamePlusRecord *ZOOM_memcached_lookup(ZOOM_resultset r, int pos,
506                                         const char *syntax,
507                                         const char *elementSetName,
508                                         const char *schema)
509 {
510 #if HAVE_HIREDIS
511     if (r->connection && r->connection->redis_c)
512     {
513         WRBUF k = wrbuf_alloc();
514         const char *argv[2];
515         size_t argvlen[2];
516         redisReply *reply1;
517
518         wrbuf_write(k, wrbuf_buf(r->mc_key), wrbuf_len(r->mc_key));
519         wrbuf_printf(k, ";%d;%s;%s;%s", pos,
520                      syntax ? syntax : "",
521                      elementSetName ? elementSetName : "",
522                      schema ? schema : "");
523
524         yaz_log(YLOG_LOG, "Lookup record %s", wrbuf_cstr(k));
525         argv[0] = "GET";
526         argvlen[0] = 3;
527         argv[1] = wrbuf_buf(k);
528         argvlen[1] = wrbuf_len(k);
529         reply1 = redisCommandArgv(r->connection->redis_c, 2, argv, argvlen);
530
531         wrbuf_destroy(k);
532         if (reply1 && reply1->type == REDIS_REPLY_STRING)
533         {
534             redisReply *reply2;
535             char *sha1_buf = reply1->str;
536             int sha1_len = reply1->len;
537
538             yaz_log(YLOG_LOG, "Lookup record %.*s", (int) sha1_len, sha1_buf);
539
540             argv[0] = "GET";
541             argvlen[0] = 3;
542             argv[1] = sha1_buf;
543             argvlen[1] = sha1_len;
544
545             reply2 = redisCommandArgv(r->connection->redis_c, 2, argv, argvlen);
546             if (reply2 && reply2->type == REDIS_REPLY_STRING)
547             {
548                 Z_NamePlusRecord *npr = 0;
549                 char *v_buf = reply2->str;
550                 int v_len = reply2->len;
551
552                 odr_setbuf(r->odr, v_buf, v_len, 0);
553                 z_NamePlusRecord(r->odr, &npr, 0, 0);
554                 if (npr)
555                     yaz_log(YLOG_LOG, "returned redis copy");
556                 freeReplyObject(reply2);
557                 freeReplyObject(reply1);
558                 return npr;
559             }
560             freeReplyObject(reply2);
561         }
562         freeReplyObject(reply1);
563     }
564 #endif
565 #if HAVE_LIBMEMCACHED
566     if (r->connection && r->connection->mc_st)
567     {
568         WRBUF k = wrbuf_alloc();
569         char *sha1_buf;
570         size_t sha1_len;
571         uint32_t flags;
572         memcached_return_t rc;
573
574         wrbuf_write(k, wrbuf_buf(r->mc_key), wrbuf_len(r->mc_key));
575         wrbuf_printf(k, ";%d;%s;%s;%s", pos,
576                      syntax ? syntax : "",
577                      elementSetName ? elementSetName : "",
578                      schema ? schema : "");
579
580         yaz_log(YLOG_LOG, "Lookup record %s", wrbuf_cstr(k));
581         sha1_buf = memcached_get(r->connection->mc_st,
582                                  wrbuf_buf(k), wrbuf_len(k),
583                                  &sha1_len, &flags, &rc);
584
585         wrbuf_destroy(k);
586         if (sha1_buf)
587         {
588             size_t v_len;
589             char *v_buf;
590
591             yaz_log(YLOG_LOG, "Lookup record %.*s", (int) sha1_len, sha1_buf);
592             v_buf = memcached_get(r->connection->mc_st, sha1_buf, sha1_len,
593                                   &v_len, &flags, &rc);
594             free(sha1_buf);
595             if (v_buf)
596             {
597                 Z_NamePlusRecord *npr = 0;
598
599                 odr_setbuf(r->odr, v_buf, v_len, 0);
600                 z_NamePlusRecord(r->odr, &npr, 0, 0);
601                 free(v_buf);
602                 if (npr)
603                     yaz_log(YLOG_LOG, "returned memcached copy");
604                 return npr;
605             }
606         }
607     }
608 #endif
609     return 0;
610
611 }
612 /*
613  * Local variables:
614  * c-basic-offset: 4
615  * c-file-style: "Stroustrup"
616  * indent-tabs-mode: nil
617  * End:
618  * vim: shiftwidth=4 tabstop=8 expandtab
619  */
620