Fix yaz-marcdump segfault YAZ-801
[yaz-moved-to-github.git] / src / zoom-memcached.c
1 /* This file is part of the YAZ toolkit.
2  * Copyright (C) Index Data
3  * See the file LICENSE for details.
4  */
5 /**
6  * \file zoom-memcached.c
7  * \brief Implements query/record caching using memcached
8  */
9 #if HAVE_CONFIG_H
10 #include <config.h>
11 #endif
12
13 #include <assert.h>
14 #include <string.h>
15 #include <errno.h>
16 #include "zoom-p.h"
17
18 #include <yaz/yaz-util.h>
19 #include <yaz/xmalloc.h>
20 #include <yaz/log.h>
21 #include <yaz/diagbib1.h>
22
23 void ZOOM_memcached_init(ZOOM_connection c)
24 {
25 #if HAVE_LIBMEMCACHED
26     c->mc_st = 0;
27 #endif
28 #if HAVE_HIREDIS
29     c->redis_c = 0;
30 #endif
31     c->expire_search = 600;
32     c->expire_record = 1200;
33 }
34
35 void ZOOM_memcached_destroy(ZOOM_connection c)
36 {
37 #if HAVE_LIBMEMCACHED
38     if (c->mc_st)
39         memcached_free(c->mc_st);
40 #endif
41 #if HAVE_HIREDIS
42     if (c->redis_c)
43         redisFree(c->redis_c);
44 #endif
45 }
46
47 #if HAVE_LIBMEMCACHED
48 static memcached_st *create_memcached(const char *conf,
49                                       int *expire_search, int *expire_record)
50 {
51     char **darray;
52     int i, num;
53     memcached_st *mc = memcached_create(0);
54     NMEM nmem = nmem_create();
55     memcached_return_t rc;
56
57     nmem_strsplit_blank(nmem, conf, &darray, &num);
58     for (i = 0; mc && i < num; i++)
59     {
60         if (!yaz_strncasecmp(darray[i], "--SERVER=", 9))
61         {
62             char *host = darray[i] + 9;
63             char *port = strchr(host, ':');
64             char *weight = strstr(host, "/?");
65             if (port)
66                 *port++ = '\0';
67             if (weight)
68             {
69                 *weight = '\0';
70                 weight += 2;
71             }
72             rc = memcached_server_add(mc, host, port ? atoi(port) : 11211);
73             yaz_log(YLOG_LOG, "memcached_server_add host=%s rc=%u %s",
74                     host, (unsigned) rc, memcached_strerror(mc, rc));
75             if (rc != MEMCACHED_SUCCESS)
76             {
77                 memcached_free(mc);
78                 mc = 0;
79             }
80         }
81         else if (!yaz_strncasecmp(darray[i], "--EXPIRE=", 9))
82         {
83             *expire_search = atoi(darray[i] + 9);
84             *expire_record = 600 + *expire_search;
85         }
86         else
87         {
88             /* bad directive */
89             memcached_free(mc);
90             mc = 0;
91         }
92     }
93     nmem_destroy(nmem);
94     return mc;
95 }
96 #endif
97
98 #if HAVE_HIREDIS
99 static redisContext *create_redis(const char *conf,
100                                   int *expire_search, int *expire_record)
101 {
102     char **darray;
103     int i, num;
104     NMEM nmem = nmem_create();
105     redisContext *context = 0;
106
107     nmem_strsplit_blank(nmem, conf, &darray, &num);
108     for (i = 0; i < num; i++)
109     {
110         if (!yaz_strncasecmp(darray[i], "--SERVER=", 9))
111         {
112             struct timeval timeout = { 1, 500000 }; /* 1.5 seconds */
113             char *host = darray[i] + 9;
114             char *port = strchr(host, ':');
115             if (port)
116                 *port++ = '\0';
117             context = redisConnectWithTimeout(host,
118                                               port ? atoi(port) : 6379,
119                                               timeout);
120         }
121         else if (!yaz_strncasecmp(darray[i], "--EXPIRE=", 9))
122         {
123             *expire_search = atoi(darray[i] + 9);
124             *expire_record = 600 + *expire_search;
125         }
126     }
127     nmem_destroy(nmem);
128     return context;
129 }
130 #endif
131
132 int ZOOM_memcached_configure(ZOOM_connection c)
133 {
134     const char *val;
135 #if HAVE_HIREDIS
136     if (c->redis_c)
137     {
138         redisFree(c->redis_c);
139         c->redis_c = 0;
140     }
141 #endif
142 #if HAVE_LIBMEMCACHED
143     if (c->mc_st)
144     {
145         memcached_free(c->mc_st);
146         c->mc_st = 0;
147     }
148 #endif
149
150     val = ZOOM_options_get(c->options, "redis");
151     if (val && *val)
152     {
153 #if HAVE_HIREDIS
154         c->redis_c = create_redis(val,
155                                   &c->expire_search, &c->expire_record);
156         if (c->redis_c == 0 || c->redis_c->err)
157         {
158             ZOOM_set_error(c, ZOOM_ERROR_MEMCACHED,
159                            "could not create redis");
160             return -1;
161         }
162         return 0; /* don't bother with memcached if redis is enabled */
163 #else
164         ZOOM_set_error(c, ZOOM_ERROR_MEMCACHED, "not enabled");
165         return -1;
166 #endif
167     }
168     val = ZOOM_options_get(c->options, "memcached");
169     if (val && *val)
170     {
171 #if HAVE_LIBMEMCACHED
172         c->mc_st = create_memcached(val, &c->expire_search, &c->expire_record);
173         if (!c->mc_st)
174         {
175             ZOOM_set_error(c, ZOOM_ERROR_MEMCACHED,
176                            "could not create memcached");
177             return -1;
178         }
179         memcached_behavior_set(c->mc_st, MEMCACHED_BEHAVIOR_BINARY_PROTOCOL, 1);
180 #else
181         ZOOM_set_error(c, ZOOM_ERROR_MEMCACHED, "not enabled");
182         return -1;
183 #endif
184     }
185     return 0;
186 }
187
188 #if HAVE_GCRYPT_H
189 static void wrbuf_vary_puts(WRBUF w, const char *v)
190 {
191     if (v)
192     {
193         if (strlen(v) > 40)
194         {
195             wrbuf_sha1_puts(w, v, 1);
196         }
197         else
198         {
199             wrbuf_puts(w, v);
200         }
201     }
202 }
203 #endif
204
205 void ZOOM_memcached_resultset(ZOOM_resultset r, ZOOM_query q)
206 {
207 #if HAVE_GCRYPT_H
208     ZOOM_connection c = r->connection;
209
210     r->mc_key = wrbuf_alloc();
211     wrbuf_puts(r->mc_key, "1;");
212     wrbuf_vary_puts(r->mc_key, c->host_port);
213     wrbuf_puts(r->mc_key, ";");
214     wrbuf_vary_puts(r->mc_key, ZOOM_resultset_option_get(r, "extraArgs"));
215     wrbuf_puts(r->mc_key, ";");
216     wrbuf_vary_puts(r->mc_key, c->user);
217     wrbuf_puts(r->mc_key, ";");
218     wrbuf_vary_puts(r->mc_key, c->group);
219     wrbuf_puts(r->mc_key, ";");
220     if (c->password)
221         wrbuf_sha1_puts(r->mc_key, c->password, 1);
222     wrbuf_puts(r->mc_key, ";");
223     {
224         WRBUF w = wrbuf_alloc();
225         ZOOM_query_get_hash(q, w);
226         wrbuf_sha1_puts(r->mc_key, wrbuf_cstr(w), 1);
227         wrbuf_destroy(w);
228     }
229     wrbuf_puts(r->mc_key, ";");
230     wrbuf_vary_puts(r->mc_key, r->req_facets);
231 #endif
232 }
233
234 void ZOOM_memcached_search(ZOOM_connection c, ZOOM_resultset resultset)
235 {
236 #if HAVE_HIREDIS
237     if (c->redis_c && resultset->live_set == 0)
238     {
239         redisReply *reply;
240         const char *argv[2];
241
242         argv[0] = "GET";
243         argv[1] = wrbuf_cstr(resultset->mc_key);
244
245         reply = redisCommandArgv(c->redis_c, 2, argv, 0);
246         /* count;precision (ASCII) + '\0' + BER buffer for otherInformation */
247         if (reply && reply->type == REDIS_REPLY_STRING)
248         {
249             char *v = reply->str;
250             int v_len = reply->len;
251             ZOOM_Event event;
252             size_t lead_len = strlen(v) + 1;
253
254             resultset->size = odr_atoi(v);
255
256             yaz_log(YLOG_LOG, "For key %s got value %s lead_len=%d len=%d",
257                     wrbuf_cstr(resultset->mc_key), v, (int) lead_len,
258                     (int) v_len);
259             if (v_len > lead_len)
260             {
261                 Z_OtherInformation *oi = 0;
262                 int oi_len = v_len - lead_len;
263                 odr_setbuf(resultset->odr, v + lead_len, oi_len, 0);
264                 if (!z_OtherInformation(resultset->odr, &oi, 0, 0))
265                 {
266                     yaz_log(YLOG_WARN, "oi decoding failed");
267                     freeReplyObject(reply);
268                     return;
269                 }
270                 ZOOM_handle_search_result(c, resultset, oi);
271                 ZOOM_handle_facet_result(c, resultset, oi);
272             }
273             event = ZOOM_Event_create(ZOOM_EVENT_RECV_SEARCH);
274             ZOOM_connection_put_event(c, event);
275             resultset->live_set = 1;
276         }
277         if (reply)
278             freeReplyObject(reply);
279     }
280 #endif
281 #if HAVE_LIBMEMCACHED
282     if (c->mc_st && resultset->live_set == 0)
283     {
284         size_t v_len;
285         uint32_t flags;
286         memcached_return_t rc;
287         char *v = memcached_get(c->mc_st, wrbuf_buf(resultset->mc_key),
288                                 wrbuf_len(resultset->mc_key),
289                                 &v_len, &flags, &rc);
290         /* count;precision (ASCII) + '\0' + BER buffer for otherInformation */
291         if (v)
292         {
293             ZOOM_Event event;
294             size_t lead_len = strlen(v) + 1;
295
296             resultset->size = odr_atoi(v);
297
298             yaz_log(YLOG_LOG, "For key %s got value %s lead_len=%d len=%d",
299                     wrbuf_cstr(resultset->mc_key), v, (int) lead_len,
300                     (int) v_len);
301             if (v_len > lead_len)
302             {
303                 Z_OtherInformation *oi = 0;
304                 int oi_len = v_len - lead_len;
305                 odr_setbuf(resultset->odr, v + lead_len, oi_len, 0);
306                 if (!z_OtherInformation(resultset->odr, &oi, 0, 0))
307                 {
308                     yaz_log(YLOG_WARN, "oi decoding failed");
309                     free(v);
310                     return;
311                 }
312                 ZOOM_handle_search_result(c, resultset, oi);
313                 ZOOM_handle_facet_result(c, resultset, oi);
314             }
315             free(v);
316             event = ZOOM_Event_create(ZOOM_EVENT_RECV_SEARCH);
317             ZOOM_connection_put_event(c, event);
318             resultset->live_set = 1;
319         }
320     }
321 #endif
322 }
323
324 #if HAVE_HIREDIS
325 static void expire_redis(redisContext *redis_c,
326                          const char *buf, size_t len, int exp)
327 {
328     redisReply *reply;
329     const char *argv[3];
330     size_t argvlen[3];
331     char key_val[20];
332
333     sprintf(key_val, "%d", exp);
334
335     argv[0] = "EXPIRE";
336     argvlen[0] = 6;
337     argv[1] = buf;
338     argvlen[1] = len;
339     argv[2] = key_val;
340     argvlen[2] = strlen(key_val);
341     reply = redisCommandArgv(redis_c, 3, argv, argvlen);
342     freeReplyObject(reply);
343 }
344 #endif
345
346 void ZOOM_memcached_hitcount(ZOOM_connection c, ZOOM_resultset resultset,
347                              Z_OtherInformation *oi, const char *precision)
348 {
349 #if HAVE_HIREDIS
350     if (c->redis_c && resultset->live_set == 0)
351     {
352         char *str;
353         ODR odr = odr_createmem(ODR_ENCODE);
354         char *oi_buf = 0;
355         int oi_len = 0;
356         char *key;
357
358         str = odr_malloc(odr, 20 + strlen(precision));
359         /* count;precision (ASCII) + '\0' + BER buffer for otherInformation */
360         sprintf(str, ODR_INT_PRINTF ";%s", resultset->size, precision);
361         if (oi)
362         {
363             z_OtherInformation(odr, &oi, 0, 0);
364             oi_buf = odr_getbuf(odr, &oi_len, 0);
365         }
366         key = odr_malloc(odr, strlen(str) + 1 + oi_len);
367         strcpy(key, str);
368         if (oi_len)
369             memcpy(key + strlen(str) + 1, oi_buf, oi_len);
370
371         {
372             redisReply *reply;
373             const char *argv[3];
374             size_t argvlen[3];
375             argv[0] = "SET";
376             argvlen[0] = 3;
377             argv[1] = wrbuf_buf(resultset->mc_key);
378             argvlen[1] = wrbuf_len(resultset->mc_key);
379             argv[2] = key;
380             argvlen[2] = strlen(str) + 1 + oi_len;
381             reply = redisCommandArgv(c->redis_c, 3, argv, argvlen);
382             freeReplyObject(reply);
383         }
384         expire_redis(c->redis_c,
385                      wrbuf_buf(resultset->mc_key),
386                      wrbuf_len(resultset->mc_key),
387                      c->expire_search);
388         odr_destroy(odr);
389     }
390 #endif
391 #if HAVE_LIBMEMCACHED
392     if (c->mc_st && resultset->live_set == 0)
393     {
394         uint32_t flags = 0;
395         memcached_return_t rc;
396         char *str;
397         ODR odr = odr_createmem(ODR_ENCODE);
398         char *oi_buf = 0;
399         int oi_len = 0;
400         char *key;
401
402         str = odr_malloc(odr, 20 + strlen(precision));
403         /* count;precision (ASCII) + '\0' + BER buffer for otherInformation */
404         sprintf(str, ODR_INT_PRINTF ";%s", resultset->size, precision);
405         if (oi)
406         {
407             z_OtherInformation(odr, &oi, 0, 0);
408             oi_buf = odr_getbuf(odr, &oi_len, 0);
409         }
410         key = odr_malloc(odr, strlen(str) + 1 + oi_len);
411         strcpy(key, str);
412         if (oi_len)
413             memcpy(key + strlen(str) + 1, oi_buf, oi_len);
414
415         rc = memcached_set(c->mc_st,
416                            wrbuf_buf(resultset->mc_key),
417                            wrbuf_len(resultset->mc_key),
418                            key, strlen(str) + 1 + oi_len,
419                            c->expire_search, flags);
420         yaz_log(YLOG_LOG, "Store hit count key=%s value=%s oi_len=%d rc=%u %s",
421                 wrbuf_cstr(resultset->mc_key), str, oi_len, (unsigned) rc,
422                 memcached_strerror(c->mc_st, rc));
423         odr_destroy(odr);
424     }
425 #endif
426 }
427
428 void ZOOM_memcached_add(ZOOM_resultset r, Z_NamePlusRecord *npr,
429                         int pos,
430                         const char *syntax, const char *elementSetName,
431                         const char *schema,
432                         Z_SRW_diagnostic *diag)
433 {
434 #if HAVE_HIREDIS
435     if (r->connection->redis_c &&
436         !diag && npr->which == Z_NamePlusRecord_databaseRecord)
437     {
438         WRBUF k = wrbuf_alloc();
439         WRBUF rec_sha1 = wrbuf_alloc();
440         ODR odr = odr_createmem(ODR_ENCODE);
441         char *rec_buf;
442         int rec_len;
443         const char *argv[3];
444         size_t argvlen[3];
445         redisReply *reply;
446
447         z_NamePlusRecord(odr, &npr, 0, 0);
448         rec_buf = odr_getbuf(odr, &rec_len, 0);
449
450         wrbuf_write(k, wrbuf_buf(r->mc_key), wrbuf_len(r->mc_key));
451         wrbuf_printf(k, ";%d;%s;%s;%s", pos,
452                      syntax ? syntax : "",
453                      elementSetName ? elementSetName : "",
454                      schema ? schema : "");
455
456         wrbuf_sha1_write(rec_sha1, rec_buf, rec_len, 1);
457
458         argv[0] = "SET";
459         argvlen[0] = 3;
460         argv[1] = wrbuf_buf(k);
461         argvlen[1] = wrbuf_len(k);
462         argv[2] = wrbuf_buf(rec_sha1);
463         argvlen[2] = wrbuf_len(rec_sha1);
464
465         reply = redisCommandArgv(r->connection->redis_c, 3, argv, argvlen);
466         yaz_log(YLOG_LOG, "Store record key=%s val=%s",
467                 wrbuf_cstr(k), wrbuf_cstr(rec_sha1));
468         freeReplyObject(reply);
469
470         expire_redis(r->connection->redis_c, argv[1], argvlen[1],
471                      r->connection->expire_search);
472
473         argv[1] = wrbuf_buf(rec_sha1);
474         argvlen[1] = wrbuf_len(rec_sha1);
475         argv[2] = rec_buf;
476         argvlen[2] = rec_len;
477
478         reply = redisCommandArgv(r->connection->redis_c, 3, argv, argvlen);
479         yaz_log(YLOG_LOG, "Add record key=%s rec_len=%d",
480                 wrbuf_cstr(rec_sha1), rec_len);
481         freeReplyObject(reply);
482
483         expire_redis(r->connection->redis_c, argv[1], argvlen[1],
484                      r->connection->expire_record);
485
486         odr_destroy(odr);
487         wrbuf_destroy(k);
488         wrbuf_destroy(rec_sha1);
489     }
490 #endif
491 #if HAVE_LIBMEMCACHED
492     if (r->connection->mc_st &&
493         !diag && npr->which == Z_NamePlusRecord_databaseRecord)
494     {
495         WRBUF k = wrbuf_alloc();
496         WRBUF rec_sha1 = wrbuf_alloc();
497         uint32_t flags = 0;
498         memcached_return_t rc;
499         ODR odr = odr_createmem(ODR_ENCODE);
500         char *rec_buf;
501         int rec_len;
502
503         z_NamePlusRecord(odr, &npr, 0, 0);
504         rec_buf = odr_getbuf(odr, &rec_len, 0);
505
506         wrbuf_write(k, wrbuf_buf(r->mc_key), wrbuf_len(r->mc_key));
507         wrbuf_printf(k, ";%d;%s;%s;%s", pos,
508                      syntax ? syntax : "",
509                      elementSetName ? elementSetName : "",
510                      schema ? schema : "");
511
512         wrbuf_sha1_write(rec_sha1, rec_buf, rec_len, 1);
513
514         rc = memcached_set(r->connection->mc_st,
515                            wrbuf_buf(k), wrbuf_len(k),
516                            wrbuf_buf(rec_sha1), wrbuf_len(rec_sha1),
517                            r->connection->expire_search, flags);
518
519         yaz_log(YLOG_LOG, "Store record key=%s val=%s rc=%u %s",
520                 wrbuf_cstr(k), wrbuf_cstr(rec_sha1), (unsigned) rc,
521                 memcached_strerror(r->connection->mc_st, rc));
522
523         rc = memcached_add(r->connection->mc_st,
524                            wrbuf_buf(rec_sha1), wrbuf_len(rec_sha1),
525                            rec_buf, rec_len,
526                            r->connection->expire_record, flags);
527
528         yaz_log(YLOG_LOG, "Add record key=%s rec_len=%d rc=%u %s",
529                 wrbuf_cstr(rec_sha1), rec_len, (unsigned) rc,
530                 memcached_strerror(r->connection->mc_st, rc));
531
532         odr_destroy(odr);
533         wrbuf_destroy(k);
534         wrbuf_destroy(rec_sha1);
535     }
536 #endif
537 }
538
539 Z_NamePlusRecord *ZOOM_memcached_lookup(ZOOM_resultset r, int pos,
540                                         const char *syntax,
541                                         const char *elementSetName,
542                                         const char *schema)
543 {
544 #if HAVE_HIREDIS
545     if (r->connection && r->connection->redis_c)
546     {
547         WRBUF k = wrbuf_alloc();
548         const char *argv[2];
549         size_t argvlen[2];
550         redisReply *reply1;
551
552         wrbuf_write(k, wrbuf_buf(r->mc_key), wrbuf_len(r->mc_key));
553         wrbuf_printf(k, ";%d;%s;%s;%s", pos,
554                      syntax ? syntax : "",
555                      elementSetName ? elementSetName : "",
556                      schema ? schema : "");
557
558         yaz_log(YLOG_LOG, "Lookup record %s", wrbuf_cstr(k));
559         argv[0] = "GET";
560         argvlen[0] = 3;
561         argv[1] = wrbuf_buf(k);
562         argvlen[1] = wrbuf_len(k);
563         reply1 = redisCommandArgv(r->connection->redis_c, 2, argv, argvlen);
564
565         wrbuf_destroy(k);
566         if (reply1 && reply1->type == REDIS_REPLY_STRING)
567         {
568             redisReply *reply2;
569             char *sha1_buf = reply1->str;
570             int sha1_len = reply1->len;
571
572             yaz_log(YLOG_LOG, "Lookup record %.*s", (int) sha1_len, sha1_buf);
573
574             argv[0] = "GET";
575             argvlen[0] = 3;
576             argv[1] = sha1_buf;
577             argvlen[1] = sha1_len;
578
579             reply2 = redisCommandArgv(r->connection->redis_c, 2, argv, argvlen);
580             if (reply2 && reply2->type == REDIS_REPLY_STRING)
581             {
582                 Z_NamePlusRecord *npr = 0;
583                 char *v_buf = reply2->str;
584                 int v_len = reply2->len;
585
586                 odr_setbuf(r->odr, v_buf, v_len, 0);
587                 z_NamePlusRecord(r->odr, &npr, 0, 0);
588                 if (npr)
589                     yaz_log(YLOG_LOG, "returned redis copy");
590                 freeReplyObject(reply2);
591                 freeReplyObject(reply1);
592                 return npr;
593             }
594             freeReplyObject(reply2);
595         }
596         freeReplyObject(reply1);
597     }
598 #endif
599 #if HAVE_LIBMEMCACHED
600     if (r->connection && r->connection->mc_st)
601     {
602         WRBUF k = wrbuf_alloc();
603         char *sha1_buf;
604         size_t sha1_len;
605         uint32_t flags;
606         memcached_return_t rc;
607
608         wrbuf_write(k, wrbuf_buf(r->mc_key), wrbuf_len(r->mc_key));
609         wrbuf_printf(k, ";%d;%s;%s;%s", pos,
610                      syntax ? syntax : "",
611                      elementSetName ? elementSetName : "",
612                      schema ? schema : "");
613
614         yaz_log(YLOG_LOG, "Lookup record %s", wrbuf_cstr(k));
615         sha1_buf = memcached_get(r->connection->mc_st,
616                                  wrbuf_buf(k), wrbuf_len(k),
617                                  &sha1_len, &flags, &rc);
618
619         wrbuf_destroy(k);
620         if (sha1_buf)
621         {
622             size_t v_len;
623             char *v_buf;
624
625             yaz_log(YLOG_LOG, "Lookup record %.*s", (int) sha1_len, sha1_buf);
626             v_buf = memcached_get(r->connection->mc_st, sha1_buf, sha1_len,
627                                   &v_len, &flags, &rc);
628             free(sha1_buf);
629             if (v_buf)
630             {
631                 Z_NamePlusRecord *npr = 0;
632
633                 odr_setbuf(r->odr, v_buf, v_len, 0);
634                 z_NamePlusRecord(r->odr, &npr, 0, 0);
635                 free(v_buf);
636                 if (npr)
637                     yaz_log(YLOG_LOG, "returned memcached copy");
638                 return npr;
639             }
640         }
641     }
642 #endif
643     return 0;
644
645 }
646 /*
647  * Local variables:
648  * c-basic-offset: 4
649  * c-file-style: "Stroustrup"
650  * indent-tabs-mode: nil
651  * End:
652  * vim: shiftwidth=4 tabstop=8 expandtab
653  */
654