No more manifest files
[yaz-moved-to-github.git] / src / zoom-memcached.c
1 /* This file is part of the YAZ toolkit.
2  * Copyright (C) Index Data
3  * See the file LICENSE for details.
4  */
5 /**
6  * \file zoom-memcached.c
7  * \brief Implements query/record caching using memcached
8  */
9 #if HAVE_CONFIG_H
10 #include <config.h>
11 #endif
12
13 #include <assert.h>
14 #include <string.h>
15 #include <errno.h>
16 #include "zoom-p.h"
17
18 #include <yaz/yaz-util.h>
19 #include <yaz/xmalloc.h>
20 #include <yaz/log.h>
21 #include <yaz/diagbib1.h>
22
23 void ZOOM_memcached_init(ZOOM_connection c)
24 {
25 #if HAVE_LIBMEMCACHED
26     c->mc_st = 0;
27 #endif
28 #if HAVE_HIREDIS
29     c->redis_c = 0;
30 #endif
31     c->expire_search = 600;
32     c->expire_record = 1200;
33 }
34
35 void ZOOM_memcached_destroy(ZOOM_connection c)
36 {
37 #if HAVE_LIBMEMCACHED
38     if (c->mc_st)
39         memcached_free(c->mc_st);
40 #endif
41 #if HAVE_HIREDIS
42     if (c->redis_c)
43         redisFree(c->redis_c);
44 #endif
45 }
46
47 #if HAVE_LIBMEMCACHED
48 static memcached_st *create_memcached(const char *conf,
49                                       int *expire_search, int *expire_record)
50 {
51     char **darray;
52     int i, num;
53     memcached_st *mc = memcached_create(0);
54     NMEM nmem = nmem_create();
55     memcached_return_t rc;
56
57     nmem_strsplit_blank(nmem, conf, &darray, &num);
58     for (i = 0; mc && i < num; i++)
59     {
60         if (!yaz_strncasecmp(darray[i], "--SERVER=", 9))
61         {
62             char *host = darray[i] + 9;
63             char *port = strchr(host, ':');
64             char *weight = strstr(host, "/?");
65             if (port)
66                 *port++ = '\0';
67             if (weight)
68             {
69                 *weight = '\0';
70                 weight += 2;
71             }
72             rc = memcached_server_add(mc, host, port ? atoi(port) : 11211);
73             yaz_log(YLOG_LOG, "memcached_server_add host=%s rc=%u %s",
74                     host, (unsigned) rc, memcached_strerror(mc, rc));
75             if (rc != MEMCACHED_SUCCESS)
76             {
77                 memcached_free(mc);
78                 mc = 0;
79             }
80         }
81         else if (!yaz_strncasecmp(darray[i], "--EXPIRE=", 9))
82         {
83             *expire_search = atoi(darray[i] + 9);
84             *expire_record = 600 + *expire_search;
85         }
86         else
87         {
88             /* bad directive */
89             memcached_free(mc);
90             mc = 0;
91         }
92     }
93     nmem_destroy(nmem);
94     return mc;
95 }
96 #endif
97
98 #if HAVE_HIREDIS
99 static redisContext *create_redis(const char *conf,
100                                   int *expire_search, int *expire_record)
101 {
102     char **darray;
103     int i, num;
104     NMEM nmem = nmem_create();
105     redisContext *context = 0;
106
107     nmem_strsplit_blank(nmem, conf, &darray, &num);
108     for (i = 0; i < num; i++)
109     {
110         if (!yaz_strncasecmp(darray[i], "--SERVER=", 9))
111         {
112             struct timeval timeout = { 1, 500000 }; /* 1.5 seconds */
113             char *host = darray[i] + 9;
114             char *port = strchr(host, ':');
115             if (port)
116                 *port++ = '\0';
117             context = redisConnectWithTimeout(host,
118                                               port ? atoi(port) : 6379,
119                                               timeout);
120         }
121         else if (!yaz_strncasecmp(darray[i], "--EXPIRE=", 9))
122         {
123             *expire_search = atoi(darray[i] + 9);
124             *expire_record = 600 + *expire_search;
125         }
126     }
127     nmem_destroy(nmem);
128     return context;
129 }
130 #endif
131
132 int ZOOM_memcached_configure(ZOOM_connection c)
133 {
134     const char *val;
135 #if HAVE_HIREDIS
136     if (c->redis_c)
137     {
138         redisFree(c->redis_c);
139         c->redis_c = 0;
140     }
141 #endif
142 #if HAVE_LIBMEMCACHED
143     if (c->mc_st)
144     {
145         memcached_free(c->mc_st);
146         c->mc_st = 0;
147     }
148 #endif
149
150     val = ZOOM_options_get(c->options, "redis");
151     if (val && *val)
152     {
153 #if HAVE_HIREDIS
154         c->redis_c = create_redis(val,
155                                   &c->expire_search, &c->expire_record);
156         if (c->redis_c == 0 || c->redis_c->err)
157         {
158             ZOOM_set_error(c, ZOOM_ERROR_MEMCACHED,
159                            "could not create redis");
160             return -1;
161         }
162         return 0; /* don't bother with memcached if redis is enabled */
163 #else
164         ZOOM_set_error(c, ZOOM_ERROR_MEMCACHED, "not enabled");
165         return -1;
166 #endif
167     }
168     val = ZOOM_options_get(c->options, "memcached");
169     if (val && *val)
170     {
171 #if HAVE_LIBMEMCACHED
172         c->mc_st = create_memcached(val, &c->expire_search, &c->expire_record);
173         if (!c->mc_st)
174         {
175             ZOOM_set_error(c, ZOOM_ERROR_MEMCACHED,
176                            "could not create memcached");
177             return -1;
178         }
179         memcached_behavior_set(c->mc_st, MEMCACHED_BEHAVIOR_BINARY_PROTOCOL, 1);
180 #else
181         ZOOM_set_error(c, ZOOM_ERROR_MEMCACHED, "not enabled");
182         return -1;
183 #endif
184     }
185     return 0;
186 }
187
188 static void wrbuf_vary_puts(WRBUF w, const char *v)
189 {
190     if (v)
191     {
192         if (strlen(v) > 40)
193         {
194             wrbuf_sha1_puts(w, v, 1);
195         }
196         else
197         {
198             wrbuf_puts(w, v);
199         }
200     }
201 }
202
203 void ZOOM_memcached_resultset(ZOOM_resultset r, ZOOM_query q)
204 {
205     ZOOM_connection c = r->connection;
206
207     r->mc_key = wrbuf_alloc();
208     wrbuf_puts(r->mc_key, "1;");
209     wrbuf_vary_puts(r->mc_key, c->host_port);
210     wrbuf_puts(r->mc_key, ";");
211     wrbuf_vary_puts(r->mc_key, ZOOM_resultset_option_get(r, "extraArgs"));
212     wrbuf_puts(r->mc_key, ";");
213     wrbuf_vary_puts(r->mc_key, c->user);
214     wrbuf_puts(r->mc_key, ";");
215     wrbuf_vary_puts(r->mc_key, c->group);
216     wrbuf_puts(r->mc_key, ";");
217     if (c->password)
218         wrbuf_sha1_puts(r->mc_key, c->password, 1);
219     wrbuf_puts(r->mc_key, ";");
220     {
221         WRBUF w = wrbuf_alloc();
222         ZOOM_query_get_hash(q, w);
223         wrbuf_sha1_puts(r->mc_key, wrbuf_cstr(w), 1);
224         wrbuf_destroy(w);
225     }
226     wrbuf_puts(r->mc_key, ";");
227     wrbuf_vary_puts(r->mc_key, r->req_facets);
228 }
229
230 void ZOOM_memcached_search(ZOOM_connection c, ZOOM_resultset resultset)
231 {
232 #if HAVE_HIREDIS
233     if (c->redis_c && resultset->live_set == 0)
234     {
235         redisReply *reply;
236         const char *argv[2];
237
238         argv[0] = "GET";
239         argv[1] = wrbuf_cstr(resultset->mc_key);
240
241         reply = redisCommandArgv(c->redis_c, 2, argv, 0);
242         /* count;precision (ASCII) + '\0' + BER buffer for otherInformation */
243         if (reply && reply->type == REDIS_REPLY_STRING)
244         {
245             char *v = reply->str;
246             int v_len = reply->len;
247             ZOOM_Event event;
248             size_t lead_len = strlen(v) + 1;
249
250             resultset->size = odr_atoi(v);
251
252             yaz_log(YLOG_LOG, "For key %s got value %s lead_len=%d len=%d",
253                     wrbuf_cstr(resultset->mc_key), v, (int) lead_len,
254                     (int) v_len);
255             if (v_len > lead_len)
256             {
257                 Z_OtherInformation *oi = 0;
258                 int oi_len = v_len - lead_len;
259                 odr_setbuf(resultset->odr, v + lead_len, oi_len, 0);
260                 if (!z_OtherInformation(resultset->odr, &oi, 0, 0))
261                 {
262                     yaz_log(YLOG_WARN, "oi decoding failed");
263                     freeReplyObject(reply);
264                     return;
265                 }
266                 ZOOM_handle_search_result(c, resultset, oi);
267                 ZOOM_handle_facet_result(c, resultset, oi);
268             }
269             event = ZOOM_Event_create(ZOOM_EVENT_RECV_SEARCH);
270             ZOOM_connection_put_event(c, event);
271             resultset->live_set = 1;
272         }
273         if (reply)
274             freeReplyObject(reply);
275     }
276 #endif
277 #if HAVE_LIBMEMCACHED
278     if (c->mc_st && resultset->live_set == 0)
279     {
280         size_t v_len;
281         uint32_t flags;
282         memcached_return_t rc;
283         char *v = memcached_get(c->mc_st, wrbuf_buf(resultset->mc_key),
284                                 wrbuf_len(resultset->mc_key),
285                                 &v_len, &flags, &rc);
286         /* count;precision (ASCII) + '\0' + BER buffer for otherInformation */
287         if (v)
288         {
289             ZOOM_Event event;
290             size_t lead_len = strlen(v) + 1;
291
292             resultset->size = odr_atoi(v);
293
294             yaz_log(YLOG_LOG, "For key %s got value %s lead_len=%d len=%d",
295                     wrbuf_cstr(resultset->mc_key), v, (int) lead_len,
296                     (int) v_len);
297             if (v_len > lead_len)
298             {
299                 Z_OtherInformation *oi = 0;
300                 int oi_len = v_len - lead_len;
301                 odr_setbuf(resultset->odr, v + lead_len, oi_len, 0);
302                 if (!z_OtherInformation(resultset->odr, &oi, 0, 0))
303                 {
304                     yaz_log(YLOG_WARN, "oi decoding failed");
305                     free(v);
306                     return;
307                 }
308                 ZOOM_handle_search_result(c, resultset, oi);
309                 ZOOM_handle_facet_result(c, resultset, oi);
310             }
311             free(v);
312             event = ZOOM_Event_create(ZOOM_EVENT_RECV_SEARCH);
313             ZOOM_connection_put_event(c, event);
314             resultset->live_set = 1;
315         }
316     }
317 #endif
318 }
319
320 #if HAVE_HIREDIS
321 static void expire_redis(redisContext *redis_c,
322                          const char *buf, size_t len, int exp)
323 {
324     redisReply *reply;
325     const char *argv[3];
326     size_t argvlen[3];
327     char key_val[20];
328
329     sprintf(key_val, "%d", exp);
330
331     argv[0] = "EXPIRE";
332     argvlen[0] = 6;
333     argv[1] = buf;
334     argvlen[1] = len;
335     argv[2] = key_val;
336     argvlen[2] = strlen(key_val);
337     reply = redisCommandArgv(redis_c, 3, argv, argvlen);
338     freeReplyObject(reply);
339 }
340 #endif
341
342 void ZOOM_memcached_hitcount(ZOOM_connection c, ZOOM_resultset resultset,
343                              Z_OtherInformation *oi, const char *precision)
344 {
345 #if HAVE_HIREDIS
346     if (c->redis_c && resultset->live_set == 0)
347     {
348         char *str;
349         ODR odr = odr_createmem(ODR_ENCODE);
350         char *oi_buf = 0;
351         int oi_len = 0;
352         char *key;
353
354         str = odr_malloc(odr, 20 + strlen(precision));
355         /* count;precision (ASCII) + '\0' + BER buffer for otherInformation */
356         sprintf(str, ODR_INT_PRINTF ";%s", resultset->size, precision);
357         if (oi)
358         {
359             z_OtherInformation(odr, &oi, 0, 0);
360             oi_buf = odr_getbuf(odr, &oi_len, 0);
361         }
362         key = odr_malloc(odr, strlen(str) + 1 + oi_len);
363         strcpy(key, str);
364         if (oi_len)
365             memcpy(key + strlen(str) + 1, oi_buf, oi_len);
366
367         {
368             redisReply *reply;
369             const char *argv[3];
370             size_t argvlen[3];
371             argv[0] = "SET";
372             argvlen[0] = 3;
373             argv[1] = wrbuf_buf(resultset->mc_key);
374             argvlen[1] = wrbuf_len(resultset->mc_key);
375             argv[2] = key;
376             argvlen[2] = strlen(str) + 1 + oi_len;
377             reply = redisCommandArgv(c->redis_c, 3, argv, argvlen);
378             freeReplyObject(reply);
379         }
380         expire_redis(c->redis_c,
381                      wrbuf_buf(resultset->mc_key),
382                      wrbuf_len(resultset->mc_key),
383                      c->expire_search);
384         odr_destroy(odr);
385     }
386 #endif
387 #if HAVE_LIBMEMCACHED
388     if (c->mc_st && resultset->live_set == 0)
389     {
390         uint32_t flags = 0;
391         memcached_return_t rc;
392         char *str;
393         ODR odr = odr_createmem(ODR_ENCODE);
394         char *oi_buf = 0;
395         int oi_len = 0;
396         char *key;
397
398         str = odr_malloc(odr, 20 + strlen(precision));
399         /* count;precision (ASCII) + '\0' + BER buffer for otherInformation */
400         sprintf(str, ODR_INT_PRINTF ";%s", resultset->size, precision);
401         if (oi)
402         {
403             z_OtherInformation(odr, &oi, 0, 0);
404             oi_buf = odr_getbuf(odr, &oi_len, 0);
405         }
406         key = odr_malloc(odr, strlen(str) + 1 + oi_len);
407         strcpy(key, str);
408         if (oi_len)
409             memcpy(key + strlen(str) + 1, oi_buf, oi_len);
410
411         rc = memcached_set(c->mc_st,
412                            wrbuf_buf(resultset->mc_key),
413                            wrbuf_len(resultset->mc_key),
414                            key, strlen(str) + 1 + oi_len,
415                            c->expire_search, flags);
416         yaz_log(YLOG_LOG, "Store hit count key=%s value=%s oi_len=%d rc=%u %s",
417                 wrbuf_cstr(resultset->mc_key), str, oi_len, (unsigned) rc,
418                 memcached_strerror(c->mc_st, rc));
419         odr_destroy(odr);
420     }
421 #endif
422 }
423
424 void ZOOM_memcached_add(ZOOM_resultset r, Z_NamePlusRecord *npr,
425                         int pos,
426                         const char *syntax, const char *elementSetName,
427                         const char *schema,
428                         Z_SRW_diagnostic *diag)
429 {
430 #if HAVE_HIREDIS
431     if (r->connection->redis_c &&
432         !diag && npr->which == Z_NamePlusRecord_databaseRecord)
433     {
434         WRBUF k = wrbuf_alloc();
435         WRBUF rec_sha1 = wrbuf_alloc();
436         ODR odr = odr_createmem(ODR_ENCODE);
437         char *rec_buf;
438         int rec_len;
439         const char *argv[3];
440         size_t argvlen[3];
441         redisReply *reply;
442
443         z_NamePlusRecord(odr, &npr, 0, 0);
444         rec_buf = odr_getbuf(odr, &rec_len, 0);
445
446         wrbuf_write(k, wrbuf_buf(r->mc_key), wrbuf_len(r->mc_key));
447         wrbuf_printf(k, ";%d;%s;%s;%s", pos,
448                      syntax ? syntax : "",
449                      elementSetName ? elementSetName : "",
450                      schema ? schema : "");
451
452         wrbuf_sha1_write(rec_sha1, rec_buf, rec_len, 1);
453
454         argv[0] = "SET";
455         argvlen[0] = 3;
456         argv[1] = wrbuf_buf(k);
457         argvlen[1] = wrbuf_len(k);
458         argv[2] = wrbuf_buf(rec_sha1);
459         argvlen[2] = wrbuf_len(rec_sha1);
460
461         reply = redisCommandArgv(r->connection->redis_c, 3, argv, argvlen);
462         yaz_log(YLOG_LOG, "Store record key=%s val=%s",
463                 wrbuf_cstr(k), wrbuf_cstr(rec_sha1));
464         freeReplyObject(reply);
465
466         expire_redis(r->connection->redis_c, argv[1], argvlen[1],
467                      r->connection->expire_search);
468
469         argv[1] = wrbuf_buf(rec_sha1);
470         argvlen[1] = wrbuf_len(rec_sha1);
471         argv[2] = rec_buf;
472         argvlen[2] = rec_len;
473
474         reply = redisCommandArgv(r->connection->redis_c, 3, argv, argvlen);
475         yaz_log(YLOG_LOG, "Add record key=%s rec_len=%d",
476                 wrbuf_cstr(rec_sha1), rec_len);
477         freeReplyObject(reply);
478
479         expire_redis(r->connection->redis_c, argv[1], argvlen[1],
480                      r->connection->expire_record);
481
482         odr_destroy(odr);
483         wrbuf_destroy(k);
484         wrbuf_destroy(rec_sha1);
485     }
486 #endif
487 #if HAVE_LIBMEMCACHED
488     if (r->connection->mc_st &&
489         !diag && npr->which == Z_NamePlusRecord_databaseRecord)
490     {
491         WRBUF k = wrbuf_alloc();
492         WRBUF rec_sha1 = wrbuf_alloc();
493         uint32_t flags = 0;
494         memcached_return_t rc;
495         ODR odr = odr_createmem(ODR_ENCODE);
496         char *rec_buf;
497         int rec_len;
498
499         z_NamePlusRecord(odr, &npr, 0, 0);
500         rec_buf = odr_getbuf(odr, &rec_len, 0);
501
502         wrbuf_write(k, wrbuf_buf(r->mc_key), wrbuf_len(r->mc_key));
503         wrbuf_printf(k, ";%d;%s;%s;%s", pos,
504                      syntax ? syntax : "",
505                      elementSetName ? elementSetName : "",
506                      schema ? schema : "");
507
508         wrbuf_sha1_write(rec_sha1, rec_buf, rec_len, 1);
509
510         rc = memcached_set(r->connection->mc_st,
511                            wrbuf_buf(k), wrbuf_len(k),
512                            wrbuf_buf(rec_sha1), wrbuf_len(rec_sha1),
513                            r->connection->expire_search, flags);
514
515         yaz_log(YLOG_LOG, "Store record key=%s val=%s rc=%u %s",
516                 wrbuf_cstr(k), wrbuf_cstr(rec_sha1), (unsigned) rc,
517                 memcached_strerror(r->connection->mc_st, rc));
518
519         rc = memcached_add(r->connection->mc_st,
520                            wrbuf_buf(rec_sha1), wrbuf_len(rec_sha1),
521                            rec_buf, rec_len,
522                            r->connection->expire_record, flags);
523
524         yaz_log(YLOG_LOG, "Add record key=%s rec_len=%d rc=%u %s",
525                 wrbuf_cstr(rec_sha1), rec_len, (unsigned) rc,
526                 memcached_strerror(r->connection->mc_st, rc));
527
528         odr_destroy(odr);
529         wrbuf_destroy(k);
530         wrbuf_destroy(rec_sha1);
531     }
532 #endif
533 }
534
535 Z_NamePlusRecord *ZOOM_memcached_lookup(ZOOM_resultset r, int pos,
536                                         const char *syntax,
537                                         const char *elementSetName,
538                                         const char *schema)
539 {
540 #if HAVE_HIREDIS
541     if (r->connection && r->connection->redis_c)
542     {
543         WRBUF k = wrbuf_alloc();
544         const char *argv[2];
545         size_t argvlen[2];
546         redisReply *reply1;
547
548         wrbuf_write(k, wrbuf_buf(r->mc_key), wrbuf_len(r->mc_key));
549         wrbuf_printf(k, ";%d;%s;%s;%s", pos,
550                      syntax ? syntax : "",
551                      elementSetName ? elementSetName : "",
552                      schema ? schema : "");
553
554         yaz_log(YLOG_LOG, "Lookup record %s", wrbuf_cstr(k));
555         argv[0] = "GET";
556         argvlen[0] = 3;
557         argv[1] = wrbuf_buf(k);
558         argvlen[1] = wrbuf_len(k);
559         reply1 = redisCommandArgv(r->connection->redis_c, 2, argv, argvlen);
560
561         wrbuf_destroy(k);
562         if (reply1 && reply1->type == REDIS_REPLY_STRING)
563         {
564             redisReply *reply2;
565             char *sha1_buf = reply1->str;
566             int sha1_len = reply1->len;
567
568             yaz_log(YLOG_LOG, "Lookup record %.*s", (int) sha1_len, sha1_buf);
569
570             argv[0] = "GET";
571             argvlen[0] = 3;
572             argv[1] = sha1_buf;
573             argvlen[1] = sha1_len;
574
575             reply2 = redisCommandArgv(r->connection->redis_c, 2, argv, argvlen);
576             if (reply2 && reply2->type == REDIS_REPLY_STRING)
577             {
578                 Z_NamePlusRecord *npr = 0;
579                 char *v_buf = reply2->str;
580                 int v_len = reply2->len;
581
582                 odr_setbuf(r->odr, v_buf, v_len, 0);
583                 z_NamePlusRecord(r->odr, &npr, 0, 0);
584                 if (npr)
585                     yaz_log(YLOG_LOG, "returned redis copy");
586                 freeReplyObject(reply2);
587                 freeReplyObject(reply1);
588                 return npr;
589             }
590             freeReplyObject(reply2);
591         }
592         freeReplyObject(reply1);
593     }
594 #endif
595 #if HAVE_LIBMEMCACHED
596     if (r->connection && r->connection->mc_st)
597     {
598         WRBUF k = wrbuf_alloc();
599         char *sha1_buf;
600         size_t sha1_len;
601         uint32_t flags;
602         memcached_return_t rc;
603
604         wrbuf_write(k, wrbuf_buf(r->mc_key), wrbuf_len(r->mc_key));
605         wrbuf_printf(k, ";%d;%s;%s;%s", pos,
606                      syntax ? syntax : "",
607                      elementSetName ? elementSetName : "",
608                      schema ? schema : "");
609
610         yaz_log(YLOG_LOG, "Lookup record %s", wrbuf_cstr(k));
611         sha1_buf = memcached_get(r->connection->mc_st,
612                                  wrbuf_buf(k), wrbuf_len(k),
613                                  &sha1_len, &flags, &rc);
614
615         wrbuf_destroy(k);
616         if (sha1_buf)
617         {
618             size_t v_len;
619             char *v_buf;
620
621             yaz_log(YLOG_LOG, "Lookup record %.*s", (int) sha1_len, sha1_buf);
622             v_buf = memcached_get(r->connection->mc_st, sha1_buf, sha1_len,
623                                   &v_len, &flags, &rc);
624             free(sha1_buf);
625             if (v_buf)
626             {
627                 Z_NamePlusRecord *npr = 0;
628
629                 odr_setbuf(r->odr, v_buf, v_len, 0);
630                 z_NamePlusRecord(r->odr, &npr, 0, 0);
631                 free(v_buf);
632                 if (npr)
633                     yaz_log(YLOG_LOG, "returned memcached copy");
634                 return npr;
635             }
636         }
637     }
638 #endif
639     return 0;
640
641 }
642 /*
643  * Local variables:
644  * c-basic-offset: 4
645  * c-file-style: "Stroustrup"
646  * indent-tabs-mode: nil
647  * End:
648  * vim: shiftwidth=4 tabstop=8 expandtab
649  */
650