Honor memcached options= --server=localhost as redis conf
[yaz-moved-to-github.git] / src / zoom-memcached.c
1 /* This file is part of the YAZ toolkit.
2  * Copyright (C) Index Data
3  * See the file LICENSE for details.
4  */
5 /**
6  * \file zoom-memcached.c
7  * \brief Implements query/record caching using memcached
8  */
9 #if HAVE_CONFIG_H
10 #include <config.h>
11 #endif
12
13 #include <assert.h>
14 #include <string.h>
15 #include <errno.h>
16 #include "zoom-p.h"
17
18 #include <yaz/yaz-util.h>
19 #include <yaz/xmalloc.h>
20 #include <yaz/log.h>
21 #include <yaz/diagbib1.h>
22
23 #if HAVE_LIBMEMCACHED_MEMCACHED_H
24 #if HAVE_MEMCACHED_RETURN_T
25 #else
26 typedef memcached_return memcached_return_t;
27 #endif
28 #endif
29
30 void ZOOM_memcached_init(ZOOM_connection c)
31 {
32 #if HAVE_LIBMEMCACHED_MEMCACHED_H
33     c->mc_st = 0;
34 #endif
35 #if HAVE_HIREDIS
36     c->redis_c = 0;
37 #endif
38 }
39
40 void ZOOM_memcached_destroy(ZOOM_connection c)
41 {
42 #if HAVE_LIBMEMCACHED_MEMCACHED_H
43     if (c->mc_st)
44         memcached_free(c->mc_st);
45 #endif
46 #if HAVE_HIREDIS
47     if (c->redis_c)
48         redisFree(c->redis_c);
49 #endif
50 }
51
52 #if HAVE_LIBMEMCACHED_MEMCACHED_H
53 /* memcached wrapper.. Because memcached function do not exist in older libs */
54 static memcached_st *yaz_memcached_wrap(const char *conf)
55 {
56 #if HAVE_MEMCACHED_FUNC
57     return memcached(conf, strlen(conf));
58 #else
59     char **darray;
60     int i, num;
61     memcached_st *mc = memcached_create(0);
62     NMEM nmem = nmem_create();
63     memcached_return_t rc;
64
65     nmem_strsplit_blank(nmem, conf, &darray, &num);
66     for (i = 0; mc && i < num; i++)
67     {
68         if (!yaz_strncasecmp(darray[i], "--SERVER=", 9))
69         {
70             char *host = darray[i] + 9;
71             char *port = strchr(host, ':');
72             char *weight = strstr(host, "/?");
73             if (port)
74                 *port++ = '\0';
75             if (weight)
76             {
77                 *weight = '\0';
78                 weight += 2;
79             }
80             rc = memcached_server_add(mc, host, port ? atoi(port) : 11211);
81             yaz_log(YLOG_LOG, "memcached_server_add host=%s rc=%u %s",
82                     host, (unsigned) rc, memcached_strerror(mc, rc));
83             if (rc != MEMCACHED_SUCCESS)
84             {
85                 memcached_free(mc);
86                 mc = 0;
87             }
88         }
89         else
90         {
91             /* bad directive */
92             memcached_free(mc);
93             mc = 0;
94         }
95     }
96     nmem_destroy(nmem);
97     return mc;
98 #endif
99 }
100 #endif
101
102 #if HAVE_HIREDIS
103 static redisContext *create_redis(const char *conf)
104 {
105     char **darray;
106     int i, num;
107     NMEM nmem = nmem_create();
108     redisContext *context = 0;
109
110     nmem_strsplit_blank(nmem, conf, &darray, &num);
111     for (i = 0; i < num; i++)
112     {
113         if (!yaz_strncasecmp(darray[i], "--SERVER=", 9))
114         {
115             struct timeval timeout = { 1, 500000 }; /* 1.5 seconds */
116             char *host = darray[i] + 9;
117             char *port = strchr(host, ':');
118             char *weight = strstr(host, "/?");
119             if (port)
120                 *port++ = '\0';
121             if (weight)
122             {
123                 *weight = '\0';
124                 weight += 2;
125             }
126
127             context = redisConnectWithTimeout(host,
128                                               port ? atoi(port) : 6379,
129                                               timeout);
130         }
131     }
132     nmem_destroy(nmem);
133     return context;
134 }
135 #endif
136
137 int ZOOM_memcached_configure(ZOOM_connection c)
138 {
139     const char *val;
140 #if HAVE_HIREDIS
141     if (c->redis_c)
142     {
143         redisFree(c->redis_c);
144         c->redis_c = 0;
145     }
146 #endif
147 #if HAVE_LIBMEMCACHED_MEMCACHED_H
148     if (c->mc_st)
149     {
150         memcached_free(c->mc_st);
151         c->mc_st = 0;
152     }
153 #endif
154
155     val = ZOOM_options_get(c->options, "redis");
156     if (val && *val)
157     {
158 #if HAVE_HIREDIS
159         struct timeval timeout = { 1, 500000 }; /* 1.5 seconds */
160
161         c->redis_c = redisConnectWithTimeout(val, 6379, timeout);
162         if (c->redis_c == 0 || c->redis_c->err)
163         {
164             ZOOM_set_error(c, ZOOM_ERROR_MEMCACHED,
165                            "could not create redis");
166             return -1;
167         }
168         return 0; /* don't bother with memcached if redis is enabled */
169 #else
170         ZOOM_set_error(c, ZOOM_ERROR_MEMCACHED, "not enabled");
171         return -1;
172 #endif
173     }
174     val = ZOOM_options_get(c->options, "memcached");
175     if (val && *val)
176     {
177 #if HAVE_LIBMEMCACHED_MEMCACHED_H
178         c->mc_st = yaz_memcached_wrap(val);
179         if (!c->mc_st)
180         {
181             ZOOM_set_error(c, ZOOM_ERROR_MEMCACHED,
182                            "could not create memcached");
183             return -1;
184         }
185         memcached_behavior_set(c->mc_st, MEMCACHED_BEHAVIOR_BINARY_PROTOCOL, 1);
186 #else
187 #if HAVE_HIREDIS
188         c->redis_c = create_redis(val);
189         if (c->redis_c == 0 || c->redis_c->err)
190         {
191             ZOOM_set_error(c, ZOOM_ERROR_MEMCACHED,
192                            "could not create redis");
193             return -1;
194         }
195 #else
196         ZOOM_set_error(c, ZOOM_ERROR_MEMCACHED, "not enabled");
197         return -1;
198 #endif
199 #endif
200     }
201     return 0;
202 }
203
204 #if HAVE_GCRYPT_H
205 static void wrbuf_vary_puts(WRBUF w, const char *v)
206 {
207     if (v)
208     {
209         if (strlen(v) > 40)
210         {
211             wrbuf_sha1_puts(w, v, 1);
212         }
213         else
214         {
215             wrbuf_puts(w, v);
216         }
217     }
218 }
219 #endif
220
221 void ZOOM_memcached_resultset(ZOOM_resultset r, ZOOM_query q)
222 {
223 #if HAVE_GCRYPT_H
224     ZOOM_connection c = r->connection;
225
226     r->mc_key = wrbuf_alloc();
227     wrbuf_puts(r->mc_key, "1;");
228     wrbuf_vary_puts(r->mc_key, c->host_port);
229     wrbuf_puts(r->mc_key, ";");
230     wrbuf_vary_puts(r->mc_key, ZOOM_resultset_option_get(r, "extraArgs"));
231     wrbuf_puts(r->mc_key, ";");
232     wrbuf_vary_puts(r->mc_key, c->user);
233     wrbuf_puts(r->mc_key, ";");
234     wrbuf_vary_puts(r->mc_key, c->group);
235     wrbuf_puts(r->mc_key, ";");
236     if (c->password)
237         wrbuf_sha1_puts(r->mc_key, c->password, 1);
238     wrbuf_puts(r->mc_key, ";");
239     {
240         WRBUF w = wrbuf_alloc();
241         ZOOM_query_get_hash(q, w);
242         wrbuf_sha1_puts(r->mc_key, wrbuf_cstr(w), 1);
243         wrbuf_destroy(w);
244     }
245     wrbuf_puts(r->mc_key, ";");
246     wrbuf_vary_puts(r->mc_key, r->req_facets);
247 #endif
248 }
249
250 void ZOOM_memcached_search(ZOOM_connection c, ZOOM_resultset resultset)
251 {
252 #if HAVE_HIREDIS
253     if (c->redis_c && resultset->live_set == 0)
254     {
255         redisReply *reply;
256         const char *argv[2];
257
258         argv[0] = "GET";
259         argv[1] = wrbuf_cstr(resultset->mc_key);
260
261         reply = redisCommandArgv(c->redis_c, 2, argv, 0);
262         /* count;precision (ASCII) + '\0' + BER buffer for otherInformation */
263         if (reply && reply->type == REDIS_REPLY_STRING)
264         {
265             char *v = reply->str;
266             int v_len = reply->len;
267             ZOOM_Event event;
268             size_t lead_len = strlen(v) + 1;
269
270             resultset->size = odr_atoi(v);
271
272             yaz_log(YLOG_LOG, "For key %s got value %s lead_len=%d len=%d",
273                     wrbuf_cstr(resultset->mc_key), v, (int) lead_len,
274                     (int) v_len);
275             if (v_len > lead_len)
276             {
277                 Z_OtherInformation *oi = 0;
278                 int oi_len = v_len - lead_len;
279                 odr_setbuf(resultset->odr, v + lead_len, oi_len, 0);
280                 if (!z_OtherInformation(resultset->odr, &oi, 0, 0))
281                 {
282                     yaz_log(YLOG_WARN, "oi decoding failed");
283                     freeReplyObject(reply);
284                     return;
285                 }
286                 ZOOM_handle_search_result(c, resultset, oi);
287                 ZOOM_handle_facet_result(c, resultset, oi);
288             }
289             event = ZOOM_Event_create(ZOOM_EVENT_RECV_SEARCH);
290             ZOOM_connection_put_event(c, event);
291             resultset->live_set = 1;
292         }
293         freeReplyObject(reply);
294     }
295 #endif
296 #if HAVE_LIBMEMCACHED_MEMCACHED_H
297     if (c->mc_st && resultset->live_set == 0)
298     {
299         size_t v_len;
300         uint32_t flags;
301         memcached_return_t rc;
302         char *v = memcached_get(c->mc_st, wrbuf_buf(resultset->mc_key),
303                                 wrbuf_len(resultset->mc_key),
304                                 &v_len, &flags, &rc);
305         /* count;precision (ASCII) + '\0' + BER buffer for otherInformation */
306         if (v)
307         {
308             ZOOM_Event event;
309             size_t lead_len = strlen(v) + 1;
310
311             resultset->size = odr_atoi(v);
312
313             yaz_log(YLOG_LOG, "For key %s got value %s lead_len=%d len=%d",
314                     wrbuf_cstr(resultset->mc_key), v, (int) lead_len,
315                     (int) v_len);
316             if (v_len > lead_len)
317             {
318                 Z_OtherInformation *oi = 0;
319                 int oi_len = v_len - lead_len;
320                 odr_setbuf(resultset->odr, v + lead_len, oi_len, 0);
321                 if (!z_OtherInformation(resultset->odr, &oi, 0, 0))
322                 {
323                     yaz_log(YLOG_WARN, "oi decoding failed");
324                     free(v);
325                     return;
326                 }
327                 ZOOM_handle_search_result(c, resultset, oi);
328                 ZOOM_handle_facet_result(c, resultset, oi);
329             }
330             free(v);
331             event = ZOOM_Event_create(ZOOM_EVENT_RECV_SEARCH);
332             ZOOM_connection_put_event(c, event);
333             resultset->live_set = 1;
334         }
335     }
336 #endif
337 }
338
339 void ZOOM_memcached_hitcount(ZOOM_connection c, ZOOM_resultset resultset,
340                              Z_OtherInformation *oi, const char *precision)
341 {
342 #if HAVE_HIREDIS
343     if (c->redis_c && resultset->live_set == 0)
344     {
345         char *str;
346         ODR odr = odr_createmem(ODR_ENCODE);
347         char *oi_buf = 0;
348         int oi_len = 0;
349         char *key;
350
351         str = odr_malloc(odr, 20 + strlen(precision));
352         /* count;precision (ASCII) + '\0' + BER buffer for otherInformation */
353         sprintf(str, ODR_INT_PRINTF ";%s", resultset->size, precision);
354         if (oi)
355         {
356             z_OtherInformation(odr, &oi, 0, 0);
357             oi_buf = odr_getbuf(odr, &oi_len, 0);
358         }
359         key = odr_malloc(odr, strlen(str) + 1 + oi_len);
360         strcpy(key, str);
361         if (oi_len)
362             memcpy(key + strlen(str) + 1, oi_buf, oi_len);
363
364         {
365             redisReply *reply;
366             const char *argv[3];
367             size_t argvlen[3];
368             argv[0] = "SET";
369             argvlen[0] = 3;
370             argv[1] = wrbuf_buf(resultset->mc_key);
371             argvlen[1] = wrbuf_len(resultset->mc_key);
372             argv[2] = key;
373             argvlen[2] = strlen(str) + 1 + oi_len;
374             reply = redisCommandArgv(c->redis_c, 3, argv, argvlen);
375             freeReplyObject(reply);
376         }
377         odr_destroy(odr);
378     }
379 #endif
380 #if HAVE_LIBMEMCACHED_MEMCACHED_H
381     if (c->mc_st && resultset->live_set == 0)
382     {
383         uint32_t flags = 0;
384         memcached_return_t rc;
385         time_t expiration = 36000;
386         char *str;
387         ODR odr = odr_createmem(ODR_ENCODE);
388         char *oi_buf = 0;
389         int oi_len = 0;
390         char *key;
391
392         str = odr_malloc(odr, 20 + strlen(precision));
393         /* count;precision (ASCII) + '\0' + BER buffer for otherInformation */
394         sprintf(str, ODR_INT_PRINTF ";%s", resultset->size, precision);
395         if (oi)
396         {
397             z_OtherInformation(odr, &oi, 0, 0);
398             oi_buf = odr_getbuf(odr, &oi_len, 0);
399         }
400         key = odr_malloc(odr, strlen(str) + 1 + oi_len);
401         strcpy(key, str);
402         if (oi_len)
403             memcpy(key + strlen(str) + 1, oi_buf, oi_len);
404
405         rc = memcached_set(c->mc_st,
406                            wrbuf_buf(resultset->mc_key),
407                            wrbuf_len(resultset->mc_key),
408                            key, strlen(str) + 1 + oi_len, expiration, flags);
409         yaz_log(YLOG_LOG, "Store hit count key=%s value=%s oi_len=%d rc=%u %s",
410                 wrbuf_cstr(resultset->mc_key), str, oi_len, (unsigned) rc,
411                 memcached_strerror(c->mc_st, rc));
412         odr_destroy(odr);
413     }
414 #endif
415 }
416
417 void ZOOM_memcached_add(ZOOM_resultset r, Z_NamePlusRecord *npr,
418                         int pos,
419                         const char *syntax, const char *elementSetName,
420                         const char *schema,
421                         Z_SRW_diagnostic *diag)
422 {
423 #if HAVE_HIREDIS
424     if (r->connection->redis_c &&
425         !diag && npr->which == Z_NamePlusRecord_databaseRecord)
426     {
427         WRBUF k = wrbuf_alloc();
428         WRBUF rec_sha1 = wrbuf_alloc();
429         ODR odr = odr_createmem(ODR_ENCODE);
430         char *rec_buf;
431         int rec_len;
432         const char *argv[3];
433         size_t argvlen[3];
434         redisReply *reply;
435
436         z_NamePlusRecord(odr, &npr, 0, 0);
437         rec_buf = odr_getbuf(odr, &rec_len, 0);
438
439         wrbuf_write(k, wrbuf_buf(r->mc_key), wrbuf_len(r->mc_key));
440         wrbuf_printf(k, ";%d;%s;%s;%s", pos,
441                      syntax ? syntax : "",
442                      elementSetName ? elementSetName : "",
443                      schema ? schema : "");
444
445         wrbuf_sha1_write(rec_sha1, rec_buf, rec_len, 1);
446
447         argv[0] = "SET";
448         argvlen[0] = 3;
449         argv[1] = wrbuf_buf(k);
450         argvlen[1] = wrbuf_len(k);
451         argv[2] = wrbuf_buf(rec_sha1);
452         argvlen[2] = wrbuf_len(rec_sha1);
453
454         reply = redisCommandArgv(r->connection->redis_c, 3, argv, argvlen);
455         yaz_log(YLOG_LOG, "Store record key=%s val=%s",
456                 wrbuf_cstr(k), wrbuf_cstr(rec_sha1));
457         freeReplyObject(reply);
458
459         argv[1] = wrbuf_buf(rec_sha1);
460         argvlen[1] = wrbuf_len(rec_sha1);
461         argv[2] = rec_buf;
462         argvlen[2] = rec_len;
463
464         reply = redisCommandArgv(r->connection->redis_c, 3, argv, argvlen);
465         yaz_log(YLOG_LOG, "Add record key=%s rec_len=%d",
466                 wrbuf_cstr(rec_sha1), rec_len);
467         freeReplyObject(reply);
468
469         odr_destroy(odr);
470         wrbuf_destroy(k);
471         wrbuf_destroy(rec_sha1);
472     }
473 #endif
474 #if HAVE_LIBMEMCACHED_MEMCACHED_H
475     if (r->connection->mc_st &&
476         !diag && npr->which == Z_NamePlusRecord_databaseRecord)
477     {
478         WRBUF k = wrbuf_alloc();
479         WRBUF rec_sha1 = wrbuf_alloc();
480         uint32_t flags = 0;
481         memcached_return_t rc;
482         time_t expiration = 36000;
483         ODR odr = odr_createmem(ODR_ENCODE);
484         char *rec_buf;
485         int rec_len;
486
487         z_NamePlusRecord(odr, &npr, 0, 0);
488         rec_buf = odr_getbuf(odr, &rec_len, 0);
489
490         wrbuf_write(k, wrbuf_buf(r->mc_key), wrbuf_len(r->mc_key));
491         wrbuf_printf(k, ";%d;%s;%s;%s", pos,
492                      syntax ? syntax : "",
493                      elementSetName ? elementSetName : "",
494                      schema ? schema : "");
495
496         wrbuf_sha1_write(rec_sha1, rec_buf, rec_len, 1);
497
498         rc = memcached_set(r->connection->mc_st,
499                            wrbuf_buf(k), wrbuf_len(k),
500                            wrbuf_buf(rec_sha1), wrbuf_len(rec_sha1),
501                            expiration, flags);
502
503         yaz_log(YLOG_LOG, "Store record key=%s val=%s rc=%u %s",
504                 wrbuf_cstr(k), wrbuf_cstr(rec_sha1), (unsigned) rc,
505                 memcached_strerror(r->connection->mc_st, rc));
506
507         rc = memcached_add(r->connection->mc_st,
508                            wrbuf_buf(rec_sha1), wrbuf_len(rec_sha1),
509                            rec_buf, rec_len,
510                            expiration, flags);
511
512         yaz_log(YLOG_LOG, "Add record key=%s rec_len=%d rc=%u %s",
513                 wrbuf_cstr(rec_sha1), rec_len, (unsigned) rc,
514                 memcached_strerror(r->connection->mc_st, rc));
515
516         odr_destroy(odr);
517         wrbuf_destroy(k);
518         wrbuf_destroy(rec_sha1);
519     }
520 #endif
521 }
522
523 Z_NamePlusRecord *ZOOM_memcached_lookup(ZOOM_resultset r, int pos,
524                                         const char *syntax,
525                                         const char *elementSetName,
526                                         const char *schema)
527 {
528 #if HAVE_HIREDIS
529     if (r->connection && r->connection->redis_c)
530     {
531         WRBUF k = wrbuf_alloc();
532         const char *argv[2];
533         size_t argvlen[2];
534         redisReply *reply1;
535
536         wrbuf_write(k, wrbuf_buf(r->mc_key), wrbuf_len(r->mc_key));
537         wrbuf_printf(k, ";%d;%s;%s;%s", pos,
538                      syntax ? syntax : "",
539                      elementSetName ? elementSetName : "",
540                      schema ? schema : "");
541
542         yaz_log(YLOG_LOG, "Lookup record %s", wrbuf_cstr(k));
543         argv[0] = "GET";
544         argvlen[0] = 3;
545         argv[1] = wrbuf_buf(k);
546         argvlen[1] = wrbuf_len(k);
547         reply1 = redisCommandArgv(r->connection->redis_c, 2, argv, argvlen);
548
549         wrbuf_destroy(k);
550         if (reply1 && reply1->type == REDIS_REPLY_STRING)
551         {
552             redisReply *reply2;
553             char *sha1_buf = reply1->str;
554             int sha1_len = reply1->len;
555
556             yaz_log(YLOG_LOG, "Lookup record %.*s", (int) sha1_len, sha1_buf);
557
558             argv[0] = "GET";
559             argvlen[0] = 3;
560             argv[1] = sha1_buf;
561             argvlen[1] = sha1_len;
562
563             reply2 = redisCommandArgv(r->connection->redis_c, 2, argv, argvlen);
564             if (reply2 && reply2->type == REDIS_REPLY_STRING)
565             {
566                 Z_NamePlusRecord *npr = 0;
567                 char *v_buf = reply2->str;
568                 int v_len = reply2->len;
569
570                 odr_setbuf(r->odr, v_buf, v_len, 0);
571                 z_NamePlusRecord(r->odr, &npr, 0, 0);
572                 if (npr)
573                     yaz_log(YLOG_LOG, "returned redis copy");
574                 freeReplyObject(reply2);
575                 freeReplyObject(reply1);
576                 return npr;
577             }
578             freeReplyObject(reply2);
579         }
580         freeReplyObject(reply1);
581     }
582 #endif
583 #if HAVE_LIBMEMCACHED_MEMCACHED_H
584     if (r->connection && r->connection->mc_st)
585     {
586         WRBUF k = wrbuf_alloc();
587         char *sha1_buf;
588         size_t sha1_len;
589         uint32_t flags;
590         memcached_return_t rc;
591
592         wrbuf_write(k, wrbuf_buf(r->mc_key), wrbuf_len(r->mc_key));
593         wrbuf_printf(k, ";%d;%s;%s;%s", pos,
594                      syntax ? syntax : "",
595                      elementSetName ? elementSetName : "",
596                      schema ? schema : "");
597
598         yaz_log(YLOG_LOG, "Lookup record %s", wrbuf_cstr(k));
599         sha1_buf = memcached_get(r->connection->mc_st,
600                                  wrbuf_buf(k), wrbuf_len(k),
601                                  &sha1_len, &flags, &rc);
602
603         wrbuf_destroy(k);
604         if (sha1_buf)
605         {
606             size_t v_len;
607             char *v_buf;
608
609             yaz_log(YLOG_LOG, "Lookup record %.*s", (int) sha1_len, sha1_buf);
610             v_buf = memcached_get(r->connection->mc_st, sha1_buf, sha1_len,
611                                   &v_len, &flags, &rc);
612             free(sha1_buf);
613             if (v_buf)
614             {
615                 Z_NamePlusRecord *npr = 0;
616
617                 odr_setbuf(r->odr, v_buf, v_len, 0);
618                 z_NamePlusRecord(r->odr, &npr, 0, 0);
619                 free(v_buf);
620                 if (npr)
621                     yaz_log(YLOG_LOG, "returned memcached copy");
622                 return npr;
623             }
624         }
625     }
626 #endif
627     return 0;
628
629 }
630 /*
631  * Local variables:
632  * c-basic-offset: 4
633  * c-file-style: "Stroustrup"
634  * indent-tabs-mode: nil
635  * End:
636  * vim: shiftwidth=4 tabstop=8 expandtab
637  */
638