Redis functional including configure + code
[yaz-moved-to-github.git] / src / zoom-memcached.c
1 /* This file is part of the YAZ toolkit.
2  * Copyright (C) Index Data
3  * See the file LICENSE for details.
4  */
5 /**
6  * \file zoom-memcached.c
7  * \brief Implements query/record caching using memcached
8  */
9 #if HAVE_CONFIG_H
10 #include <config.h>
11 #endif
12
13 #include <assert.h>
14 #include <string.h>
15 #include <errno.h>
16 #include "zoom-p.h"
17
18 #include <yaz/yaz-util.h>
19 #include <yaz/xmalloc.h>
20 #include <yaz/log.h>
21 #include <yaz/diagbib1.h>
22
23 #if HAVE_LIBMEMCACHED_MEMCACHED_H
24 #if HAVE_MEMCACHED_RETURN_T
25 #else
26 typedef memcached_return memcached_return_t;
27 #endif
28 #endif
29
30 void ZOOM_memcached_init(ZOOM_connection c)
31 {
32 #if HAVE_LIBMEMCACHED_MEMCACHED_H
33     c->mc_st = 0;
34 #endif
35 #if HAVE_HIREDIS
36     c->redis_c = 0;
37 #endif
38 }
39
40 void ZOOM_memcached_destroy(ZOOM_connection c)
41 {
42 #if HAVE_LIBMEMCACHED_MEMCACHED_H
43     if (c->mc_st)
44         memcached_free(c->mc_st);
45 #endif
46 #if HAVE_HIREDIS
47     if (c->redis_c)
48         redisFree(c->redis_c);
49 #endif
50 }
51
52 #if HAVE_LIBMEMCACHED_MEMCACHED_H
53 /* memcached wrapper.. Because memcached function do not exist in older libs */
54 static memcached_st *yaz_memcached_wrap(const char *conf)
55 {
56 #if HAVE_MEMCACHED_FUNC
57     return memcached(conf, strlen(conf));
58 #else
59     char **darray;
60     int i, num;
61     memcached_st *mc = memcached_create(0);
62     NMEM nmem = nmem_create();
63     memcached_return_t rc;
64
65     nmem_strsplit_blank(nmem, conf, &darray, &num);
66     for (i = 0; mc && i < num; i++)
67     {
68         if (!yaz_strncasecmp(darray[i], "--SERVER=", 9))
69         {
70             char *host = darray[i] + 9;
71             char *port = strchr(host, ':');
72             char *weight = strstr(host, "/?");
73             if (port)
74                 *port++ = '\0';
75             if (weight)
76             {
77                 *weight = '\0';
78                 weight += 2;
79             }
80             rc = memcached_server_add(mc, host, port ? atoi(port) : 11211);
81             yaz_log(YLOG_LOG, "memcached_server_add host=%s rc=%u %s",
82                     host, (unsigned) rc, memcached_strerror(mc, rc));
83             if (rc != MEMCACHED_SUCCESS)
84             {
85                 memcached_free(mc);
86                 mc = 0;
87             }
88         }
89         else
90         {
91             /* bad directive */
92             memcached_free(mc);
93             mc = 0;
94         }
95     }
96     nmem_destroy(nmem);
97     return mc;
98 #endif
99 }
100 #endif
101
102 int ZOOM_memcached_configure(ZOOM_connection c)
103 {
104     const char *val;
105 #if HAVE_HIREDIS
106     if (c->redis_c)
107     {
108         redisFree(c->redis_c);
109         c->redis_c = 0;
110     }
111 #endif
112 #if HAVE_LIBMEMCACHED_MEMCACHED_H
113     if (c->mc_st)
114     {
115         memcached_free(c->mc_st);
116         c->mc_st = 0;
117     }
118 #endif
119
120     val = ZOOM_options_get(c->options, "redis");
121     if (val && *val)
122     {
123 #if HAVE_HIREDIS
124         struct timeval timeout = { 1, 500000 }; /* 1.5 seconds */
125
126         c->redis_c = redisConnectWithTimeout(val, 6379, timeout);
127         if (c->redis_c == 0 || c->redis_c->err)
128         {
129             ZOOM_set_error(c, ZOOM_ERROR_MEMCACHED,
130                            "could not create redis");
131             return -1;
132         }
133         return 0; /* don't bother with memcached if redis is enabled */
134 #else
135         ZOOM_set_error(c, ZOOM_ERROR_MEMCACHED, "not enabled");
136         return -1;
137 #endif
138     }
139     val = ZOOM_options_get(c->options, "memcached");
140     if (val && *val)
141     {
142 #if HAVE_LIBMEMCACHED_MEMCACHED_H
143         c->mc_st = yaz_memcached_wrap(val);
144         if (!c->mc_st)
145         {
146             ZOOM_set_error(c, ZOOM_ERROR_MEMCACHED,
147                            "could not create memcached");
148             return -1;
149         }
150         memcached_behavior_set(c->mc_st, MEMCACHED_BEHAVIOR_BINARY_PROTOCOL, 1);
151 #else
152         ZOOM_set_error(c, ZOOM_ERROR_MEMCACHED, "not enabled");
153         return -1;
154 #endif
155     }
156     return 0;
157 }
158
159 #if HAVE_GCRYPT_H
160 static void wrbuf_vary_puts(WRBUF w, const char *v)
161 {
162     if (v)
163     {
164         if (strlen(v) > 40)
165         {
166             wrbuf_sha1_puts(w, v, 1);
167         }
168         else
169         {
170             wrbuf_puts(w, v);
171         }
172     }
173 }
174 #endif
175
176 void ZOOM_memcached_resultset(ZOOM_resultset r, ZOOM_query q)
177 {
178 #if HAVE_GCRYPT_H
179     ZOOM_connection c = r->connection;
180
181     r->mc_key = wrbuf_alloc();
182     wrbuf_puts(r->mc_key, "1;");
183     wrbuf_vary_puts(r->mc_key, c->host_port);
184     wrbuf_puts(r->mc_key, ";");
185     wrbuf_vary_puts(r->mc_key, ZOOM_resultset_option_get(r, "extraArgs"));
186     wrbuf_puts(r->mc_key, ";");
187     wrbuf_vary_puts(r->mc_key, c->user);
188     wrbuf_puts(r->mc_key, ";");
189     wrbuf_vary_puts(r->mc_key, c->group);
190     wrbuf_puts(r->mc_key, ";");
191     if (c->password)
192         wrbuf_sha1_puts(r->mc_key, c->password, 1);
193     wrbuf_puts(r->mc_key, ";");
194     {
195         WRBUF w = wrbuf_alloc();
196         ZOOM_query_get_hash(q, w);
197         wrbuf_sha1_puts(r->mc_key, wrbuf_cstr(w), 1);
198         wrbuf_destroy(w);
199     }
200     wrbuf_puts(r->mc_key, ";");
201     wrbuf_vary_puts(r->mc_key, r->req_facets);
202 #endif
203 }
204
205 void ZOOM_memcached_search(ZOOM_connection c, ZOOM_resultset resultset)
206 {
207 #if HAVE_HIREDIS
208     if (c->redis_c && resultset->live_set == 0)
209     {
210         redisReply *reply;
211         const char *argv[2];
212
213         argv[0] = "GET";
214         argv[1] = wrbuf_cstr(resultset->mc_key);
215
216         reply = redisCommandArgv(c->redis_c, 2, argv, 0);
217         /* count;precision (ASCII) + '\0' + BER buffer for otherInformation */
218         if (reply && reply->type == REDIS_REPLY_STRING)
219         {
220             char *v = reply->str;
221             int v_len = reply->len;
222             ZOOM_Event event;
223             size_t lead_len = strlen(v) + 1;
224
225             resultset->size = odr_atoi(v);
226
227             yaz_log(YLOG_LOG, "For key %s got value %s lead_len=%d len=%d",
228                     wrbuf_cstr(resultset->mc_key), v, (int) lead_len,
229                     (int) v_len);
230             if (v_len > lead_len)
231             {
232                 Z_OtherInformation *oi = 0;
233                 int oi_len = v_len - lead_len;
234                 odr_setbuf(resultset->odr, v + lead_len, oi_len, 0);
235                 if (!z_OtherInformation(resultset->odr, &oi, 0, 0))
236                 {
237                     yaz_log(YLOG_WARN, "oi decoding failed");
238                     freeReplyObject(reply);
239                     return;
240                 }
241                 ZOOM_handle_search_result(c, resultset, oi);
242                 ZOOM_handle_facet_result(c, resultset, oi);
243             }
244             event = ZOOM_Event_create(ZOOM_EVENT_RECV_SEARCH);
245             ZOOM_connection_put_event(c, event);
246             resultset->live_set = 1;
247         }
248         freeReplyObject(reply);
249     }
250 #endif
251 #if HAVE_LIBMEMCACHED_MEMCACHED_H
252     if (c->mc_st && resultset->live_set == 0)
253     {
254         size_t v_len;
255         uint32_t flags;
256         memcached_return_t rc;
257         char *v = memcached_get(c->mc_st, wrbuf_buf(resultset->mc_key),
258                                 wrbuf_len(resultset->mc_key),
259                                 &v_len, &flags, &rc);
260         /* count;precision (ASCII) + '\0' + BER buffer for otherInformation */
261         if (v)
262         {
263             ZOOM_Event event;
264             size_t lead_len = strlen(v) + 1;
265
266             resultset->size = odr_atoi(v);
267
268             yaz_log(YLOG_LOG, "For key %s got value %s lead_len=%d len=%d",
269                     wrbuf_cstr(resultset->mc_key), v, (int) lead_len,
270                     (int) v_len);
271             if (v_len > lead_len)
272             {
273                 Z_OtherInformation *oi = 0;
274                 int oi_len = v_len - lead_len;
275                 odr_setbuf(resultset->odr, v + lead_len, oi_len, 0);
276                 if (!z_OtherInformation(resultset->odr, &oi, 0, 0))
277                 {
278                     yaz_log(YLOG_WARN, "oi decoding failed");
279                     free(v);
280                     return;
281                 }
282                 ZOOM_handle_search_result(c, resultset, oi);
283                 ZOOM_handle_facet_result(c, resultset, oi);
284             }
285             free(v);
286             event = ZOOM_Event_create(ZOOM_EVENT_RECV_SEARCH);
287             ZOOM_connection_put_event(c, event);
288             resultset->live_set = 1;
289         }
290     }
291 #endif
292 }
293
294 void ZOOM_memcached_hitcount(ZOOM_connection c, ZOOM_resultset resultset,
295                              Z_OtherInformation *oi, const char *precision)
296 {
297 #if HAVE_HIREDIS
298     if (c->redis_c && resultset->live_set == 0)
299     {
300         char *str;
301         ODR odr = odr_createmem(ODR_ENCODE);
302         char *oi_buf = 0;
303         int oi_len = 0;
304         char *key;
305
306         str = odr_malloc(odr, 20 + strlen(precision));
307         /* count;precision (ASCII) + '\0' + BER buffer for otherInformation */
308         sprintf(str, ODR_INT_PRINTF ";%s", resultset->size, precision);
309         if (oi)
310         {
311             z_OtherInformation(odr, &oi, 0, 0);
312             oi_buf = odr_getbuf(odr, &oi_len, 0);
313         }
314         key = odr_malloc(odr, strlen(str) + 1 + oi_len);
315         strcpy(key, str);
316         if (oi_len)
317             memcpy(key + strlen(str) + 1, oi_buf, oi_len);
318
319         {
320             redisReply *reply;
321             const char *argv[3];
322             size_t argvlen[3];
323             argv[0] = "SET";
324             argvlen[0] = 3;
325             argv[1] = wrbuf_buf(resultset->mc_key);
326             argvlen[1] = wrbuf_len(resultset->mc_key);
327             argv[2] = key;
328             argvlen[2] = strlen(str) + 1 + oi_len;
329             reply = redisCommandArgv(c->redis_c, 3, argv, argvlen);
330             freeReplyObject(reply);
331         }
332         odr_destroy(odr);
333     }
334 #endif
335 #if HAVE_LIBMEMCACHED_MEMCACHED_H
336     if (c->mc_st && resultset->live_set == 0)
337     {
338         uint32_t flags = 0;
339         memcached_return_t rc;
340         time_t expiration = 36000;
341         char *str;
342         ODR odr = odr_createmem(ODR_ENCODE);
343         char *oi_buf = 0;
344         int oi_len = 0;
345         char *key;
346
347         str = odr_malloc(odr, 20 + strlen(precision));
348         /* count;precision (ASCII) + '\0' + BER buffer for otherInformation */
349         sprintf(str, ODR_INT_PRINTF ";%s", resultset->size, precision);
350         if (oi)
351         {
352             z_OtherInformation(odr, &oi, 0, 0);
353             oi_buf = odr_getbuf(odr, &oi_len, 0);
354         }
355         key = odr_malloc(odr, strlen(str) + 1 + oi_len);
356         strcpy(key, str);
357         if (oi_len)
358             memcpy(key + strlen(str) + 1, oi_buf, oi_len);
359
360         rc = memcached_set(c->mc_st,
361                            wrbuf_buf(resultset->mc_key),
362                            wrbuf_len(resultset->mc_key),
363                            key, strlen(str) + 1 + oi_len, expiration, flags);
364         yaz_log(YLOG_LOG, "Store hit count key=%s value=%s oi_len=%d rc=%u %s",
365                 wrbuf_cstr(resultset->mc_key), str, oi_len, (unsigned) rc,
366                 memcached_strerror(c->mc_st, rc));
367         odr_destroy(odr);
368     }
369 #endif
370 }
371
372 void ZOOM_memcached_add(ZOOM_resultset r, Z_NamePlusRecord *npr,
373                         int pos,
374                         const char *syntax, const char *elementSetName,
375                         const char *schema,
376                         Z_SRW_diagnostic *diag)
377 {
378 #if HAVE_HIREDIS
379     if (r->connection->redis_c &&
380         !diag && npr->which == Z_NamePlusRecord_databaseRecord)
381     {
382         WRBUF k = wrbuf_alloc();
383         WRBUF rec_sha1 = wrbuf_alloc();
384         ODR odr = odr_createmem(ODR_ENCODE);
385         char *rec_buf;
386         int rec_len;
387         const char *argv[3];
388         size_t argvlen[3];
389         redisReply *reply;
390
391         z_NamePlusRecord(odr, &npr, 0, 0);
392         rec_buf = odr_getbuf(odr, &rec_len, 0);
393
394         wrbuf_write(k, wrbuf_buf(r->mc_key), wrbuf_len(r->mc_key));
395         wrbuf_printf(k, ";%d;%s;%s;%s", pos,
396                      syntax ? syntax : "",
397                      elementSetName ? elementSetName : "",
398                      schema ? schema : "");
399
400         wrbuf_sha1_write(rec_sha1, rec_buf, rec_len, 1);
401
402         argv[0] = "SET";
403         argvlen[0] = 3;
404         argv[1] = wrbuf_buf(k);
405         argvlen[1] = wrbuf_len(k);
406         argv[2] = wrbuf_buf(rec_sha1);
407         argvlen[2] = wrbuf_len(rec_sha1);
408
409         reply = redisCommandArgv(r->connection->redis_c, 3, argv, argvlen);
410         yaz_log(YLOG_LOG, "Store record key=%s val=%s",
411                 wrbuf_cstr(k), wrbuf_cstr(rec_sha1));
412         freeReplyObject(reply);
413
414         argv[1] = wrbuf_buf(rec_sha1);
415         argvlen[1] = wrbuf_len(rec_sha1);
416         argv[2] = rec_buf;
417         argvlen[2] = rec_len;
418
419         reply = redisCommandArgv(r->connection->redis_c, 3, argv, argvlen);
420         yaz_log(YLOG_LOG, "Add record key=%s rec_len=%d",
421                 wrbuf_cstr(rec_sha1), rec_len);
422         freeReplyObject(reply);
423
424         odr_destroy(odr);
425         wrbuf_destroy(k);
426         wrbuf_destroy(rec_sha1);
427     }
428 #endif
429 #if HAVE_LIBMEMCACHED_MEMCACHED_H
430     if (r->connection->mc_st &&
431         !diag && npr->which == Z_NamePlusRecord_databaseRecord)
432     {
433         WRBUF k = wrbuf_alloc();
434         WRBUF rec_sha1 = wrbuf_alloc();
435         uint32_t flags = 0;
436         memcached_return_t rc;
437         time_t expiration = 36000;
438         ODR odr = odr_createmem(ODR_ENCODE);
439         char *rec_buf;
440         int rec_len;
441
442         z_NamePlusRecord(odr, &npr, 0, 0);
443         rec_buf = odr_getbuf(odr, &rec_len, 0);
444
445         wrbuf_write(k, wrbuf_buf(r->mc_key), wrbuf_len(r->mc_key));
446         wrbuf_printf(k, ";%d;%s;%s;%s", pos,
447                      syntax ? syntax : "",
448                      elementSetName ? elementSetName : "",
449                      schema ? schema : "");
450
451         wrbuf_sha1_write(rec_sha1, rec_buf, rec_len, 1);
452
453         rc = memcached_set(r->connection->mc_st,
454                            wrbuf_buf(k), wrbuf_len(k),
455                            wrbuf_buf(rec_sha1), wrbuf_len(rec_sha1),
456                            expiration, flags);
457
458         yaz_log(YLOG_LOG, "Store record key=%s val=%s rc=%u %s",
459                 wrbuf_cstr(k), wrbuf_cstr(rec_sha1), (unsigned) rc,
460                 memcached_strerror(r->connection->mc_st, rc));
461
462         rc = memcached_add(r->connection->mc_st,
463                            wrbuf_buf(rec_sha1), wrbuf_len(rec_sha1),
464                            rec_buf, rec_len,
465                            expiration, flags);
466
467         yaz_log(YLOG_LOG, "Add record key=%s rec_len=%d rc=%u %s",
468                 wrbuf_cstr(rec_sha1), rec_len, (unsigned) rc,
469                 memcached_strerror(r->connection->mc_st, rc));
470
471         odr_destroy(odr);
472         wrbuf_destroy(k);
473         wrbuf_destroy(rec_sha1);
474     }
475 #endif
476 }
477
478 Z_NamePlusRecord *ZOOM_memcached_lookup(ZOOM_resultset r, int pos,
479                                         const char *syntax,
480                                         const char *elementSetName,
481                                         const char *schema)
482 {
483 #if HAVE_HIREDIS
484     if (r->connection && r->connection->redis_c)
485     {
486         WRBUF k = wrbuf_alloc();
487         const char *argv[2];
488         size_t argvlen[2];
489         redisReply *reply1;
490
491         wrbuf_write(k, wrbuf_buf(r->mc_key), wrbuf_len(r->mc_key));
492         wrbuf_printf(k, ";%d;%s;%s;%s", pos,
493                      syntax ? syntax : "",
494                      elementSetName ? elementSetName : "",
495                      schema ? schema : "");
496
497         yaz_log(YLOG_LOG, "Lookup record %s", wrbuf_cstr(k));
498         argv[0] = "GET";
499         argvlen[0] = 3;
500         argv[1] = wrbuf_buf(k);
501         argvlen[1] = wrbuf_len(k);
502         reply1 = redisCommandArgv(r->connection->redis_c, 2, argv, argvlen);
503
504         wrbuf_destroy(k);
505         if (reply1 && reply1->type == REDIS_REPLY_STRING)
506         {
507             redisReply *reply2;
508             char *sha1_buf = reply1->str;
509             int sha1_len = reply1->len;
510
511             yaz_log(YLOG_LOG, "Lookup record %.*s", (int) sha1_len, sha1_buf);
512
513             argv[0] = "GET";
514             argvlen[0] = 3;
515             argv[1] = sha1_buf;
516             argvlen[1] = sha1_len;
517
518             reply2 = redisCommandArgv(r->connection->redis_c, 2, argv, argvlen);
519             if (reply2 && reply2->type == REDIS_REPLY_STRING)
520             {
521                 Z_NamePlusRecord *npr = 0;
522                 char *v_buf = reply2->str;
523                 int v_len = reply2->len;
524
525                 odr_setbuf(r->odr, v_buf, v_len, 0);
526                 z_NamePlusRecord(r->odr, &npr, 0, 0);
527                 if (npr)
528                     yaz_log(YLOG_LOG, "returned redis copy");
529                 freeReplyObject(reply2);
530                 freeReplyObject(reply1);
531                 return npr;
532             }
533             freeReplyObject(reply2);
534         }
535         freeReplyObject(reply1);
536     }
537 #endif
538 #if HAVE_LIBMEMCACHED_MEMCACHED_H
539     if (r->connection && r->connection->mc_st)
540     {
541         WRBUF k = wrbuf_alloc();
542         char *sha1_buf;
543         size_t sha1_len;
544         uint32_t flags;
545         memcached_return_t rc;
546
547         wrbuf_write(k, wrbuf_buf(r->mc_key), wrbuf_len(r->mc_key));
548         wrbuf_printf(k, ";%d;%s;%s;%s", pos,
549                      syntax ? syntax : "",
550                      elementSetName ? elementSetName : "",
551                      schema ? schema : "");
552
553         yaz_log(YLOG_LOG, "Lookup record %s", wrbuf_cstr(k));
554         sha1_buf = memcached_get(r->connection->mc_st,
555                                  wrbuf_buf(k), wrbuf_len(k),
556                                  &sha1_len, &flags, &rc);
557
558         wrbuf_destroy(k);
559         if (sha1_buf)
560         {
561             size_t v_len;
562             char *v_buf;
563
564             yaz_log(YLOG_LOG, "Lookup record %.*s", (int) sha1_len, sha1_buf);
565             v_buf = memcached_get(r->connection->mc_st, sha1_buf, sha1_len,
566                                   &v_len, &flags, &rc);
567             free(sha1_buf);
568             if (v_buf)
569             {
570                 Z_NamePlusRecord *npr = 0;
571
572                 odr_setbuf(r->odr, v_buf, v_len, 0);
573                 z_NamePlusRecord(r->odr, &npr, 0, 0);
574                 free(v_buf);
575                 if (npr)
576                     yaz_log(YLOG_LOG, "returned memcached copy");
577                 return npr;
578             }
579         }
580     }
581 #endif
582     return 0;
583
584 }
585 /*
586  * Local variables:
587  * c-basic-offset: 4
588  * c-file-style: "Stroustrup"
589  * indent-tabs-mode: nil
590  * End:
591  * vim: shiftwidth=4 tabstop=8 expandtab
592  */
593