ZOOM: no memcached/redis hack. Options for redis context
[yaz-moved-to-github.git] / src / zoom-memcached.c
1 /* This file is part of the YAZ toolkit.
2  * Copyright (C) Index Data
3  * See the file LICENSE for details.
4  */
5 /**
6  * \file zoom-memcached.c
7  * \brief Implements query/record caching using memcached
8  */
9 #if HAVE_CONFIG_H
10 #include <config.h>
11 #endif
12
13 #include <assert.h>
14 #include <string.h>
15 #include <errno.h>
16 #include "zoom-p.h"
17
18 #include <yaz/yaz-util.h>
19 #include <yaz/xmalloc.h>
20 #include <yaz/log.h>
21 #include <yaz/diagbib1.h>
22
23 #if HAVE_LIBMEMCACHED_MEMCACHED_H
24 #if HAVE_MEMCACHED_RETURN_T
25 #else
26 typedef memcached_return memcached_return_t;
27 #endif
28 #endif
29
30 void ZOOM_memcached_init(ZOOM_connection c)
31 {
32 #if HAVE_LIBMEMCACHED_MEMCACHED_H
33     c->mc_st = 0;
34 #endif
35 #if HAVE_HIREDIS
36     c->redis_c = 0;
37 #endif
38 }
39
40 void ZOOM_memcached_destroy(ZOOM_connection c)
41 {
42 #if HAVE_LIBMEMCACHED_MEMCACHED_H
43     if (c->mc_st)
44         memcached_free(c->mc_st);
45 #endif
46 #if HAVE_HIREDIS
47     if (c->redis_c)
48         redisFree(c->redis_c);
49 #endif
50 }
51
52 #if HAVE_LIBMEMCACHED_MEMCACHED_H
53 /* memcached wrapper.. Because memcached function do not exist in older libs */
54 static memcached_st *yaz_memcached_wrap(const char *conf)
55 {
56 #if HAVE_MEMCACHED_FUNC
57     return memcached(conf, strlen(conf));
58 #else
59     char **darray;
60     int i, num;
61     memcached_st *mc = memcached_create(0);
62     NMEM nmem = nmem_create();
63     memcached_return_t rc;
64
65     nmem_strsplit_blank(nmem, conf, &darray, &num);
66     for (i = 0; mc && i < num; i++)
67     {
68         if (!yaz_strncasecmp(darray[i], "--SERVER=", 9))
69         {
70             char *host = darray[i] + 9;
71             char *port = strchr(host, ':');
72             char *weight = strstr(host, "/?");
73             if (port)
74                 *port++ = '\0';
75             if (weight)
76             {
77                 *weight = '\0';
78                 weight += 2;
79             }
80             rc = memcached_server_add(mc, host, port ? atoi(port) : 11211);
81             yaz_log(YLOG_LOG, "memcached_server_add host=%s rc=%u %s",
82                     host, (unsigned) rc, memcached_strerror(mc, rc));
83             if (rc != MEMCACHED_SUCCESS)
84             {
85                 memcached_free(mc);
86                 mc = 0;
87             }
88         }
89         else
90         {
91             /* bad directive */
92             memcached_free(mc);
93             mc = 0;
94         }
95     }
96     nmem_destroy(nmem);
97     return mc;
98 #endif
99 }
100 #endif
101
102 #if HAVE_HIREDIS
103 static redisContext *create_redis(const char *conf)
104 {
105     char **darray;
106     int i, num;
107     NMEM nmem = nmem_create();
108     redisContext *context = 0;
109
110     nmem_strsplit_blank(nmem, conf, &darray, &num);
111     for (i = 0; i < num; i++)
112     {
113         if (!yaz_strncasecmp(darray[i], "--SERVER=", 9))
114         {
115             struct timeval timeout = { 1, 500000 }; /* 1.5 seconds */
116             char *host = darray[i] + 9;
117             char *port = strchr(host, ':');
118             if (port)
119                 *port++ = '\0';
120             context = redisConnectWithTimeout(host,
121                                               port ? atoi(port) : 6379,
122                                               timeout);
123         }
124     }
125     nmem_destroy(nmem);
126     return context;
127 }
128 #endif
129
130 int ZOOM_memcached_configure(ZOOM_connection c)
131 {
132     const char *val;
133 #if HAVE_HIREDIS
134     if (c->redis_c)
135     {
136         redisFree(c->redis_c);
137         c->redis_c = 0;
138     }
139 #endif
140 #if HAVE_LIBMEMCACHED_MEMCACHED_H
141     if (c->mc_st)
142     {
143         memcached_free(c->mc_st);
144         c->mc_st = 0;
145     }
146 #endif
147
148     val = ZOOM_options_get(c->options, "redis");
149     if (val && *val)
150     {
151 #if HAVE_HIREDIS
152         c->redis_c = create_redis(val);
153         if (c->redis_c == 0 || c->redis_c->err)
154         {
155             ZOOM_set_error(c, ZOOM_ERROR_MEMCACHED,
156                            "could not create redis");
157             return -1;
158         }
159         return 0; /* don't bother with memcached if redis is enabled */
160 #else
161         ZOOM_set_error(c, ZOOM_ERROR_MEMCACHED, "not enabled");
162         return -1;
163 #endif
164     }
165     val = ZOOM_options_get(c->options, "memcached");
166     if (val && *val)
167     {
168 #if HAVE_LIBMEMCACHED_MEMCACHED_H
169         c->mc_st = yaz_memcached_wrap(val);
170         if (!c->mc_st)
171         {
172             ZOOM_set_error(c, ZOOM_ERROR_MEMCACHED,
173                            "could not create memcached");
174             return -1;
175         }
176         memcached_behavior_set(c->mc_st, MEMCACHED_BEHAVIOR_BINARY_PROTOCOL, 1);
177 #else
178         ZOOM_set_error(c, ZOOM_ERROR_MEMCACHED, "not enabled");
179         return -1;
180 #endif
181     }
182     return 0;
183 }
184
185 #if HAVE_GCRYPT_H
186 static void wrbuf_vary_puts(WRBUF w, const char *v)
187 {
188     if (v)
189     {
190         if (strlen(v) > 40)
191         {
192             wrbuf_sha1_puts(w, v, 1);
193         }
194         else
195         {
196             wrbuf_puts(w, v);
197         }
198     }
199 }
200 #endif
201
202 void ZOOM_memcached_resultset(ZOOM_resultset r, ZOOM_query q)
203 {
204 #if HAVE_GCRYPT_H
205     ZOOM_connection c = r->connection;
206
207     r->mc_key = wrbuf_alloc();
208     wrbuf_puts(r->mc_key, "1;");
209     wrbuf_vary_puts(r->mc_key, c->host_port);
210     wrbuf_puts(r->mc_key, ";");
211     wrbuf_vary_puts(r->mc_key, ZOOM_resultset_option_get(r, "extraArgs"));
212     wrbuf_puts(r->mc_key, ";");
213     wrbuf_vary_puts(r->mc_key, c->user);
214     wrbuf_puts(r->mc_key, ";");
215     wrbuf_vary_puts(r->mc_key, c->group);
216     wrbuf_puts(r->mc_key, ";");
217     if (c->password)
218         wrbuf_sha1_puts(r->mc_key, c->password, 1);
219     wrbuf_puts(r->mc_key, ";");
220     {
221         WRBUF w = wrbuf_alloc();
222         ZOOM_query_get_hash(q, w);
223         wrbuf_sha1_puts(r->mc_key, wrbuf_cstr(w), 1);
224         wrbuf_destroy(w);
225     }
226     wrbuf_puts(r->mc_key, ";");
227     wrbuf_vary_puts(r->mc_key, r->req_facets);
228 #endif
229 }
230
231 void ZOOM_memcached_search(ZOOM_connection c, ZOOM_resultset resultset)
232 {
233 #if HAVE_HIREDIS
234     if (c->redis_c && resultset->live_set == 0)
235     {
236         redisReply *reply;
237         const char *argv[2];
238
239         argv[0] = "GET";
240         argv[1] = wrbuf_cstr(resultset->mc_key);
241
242         reply = redisCommandArgv(c->redis_c, 2, argv, 0);
243         /* count;precision (ASCII) + '\0' + BER buffer for otherInformation */
244         if (reply && reply->type == REDIS_REPLY_STRING)
245         {
246             char *v = reply->str;
247             int v_len = reply->len;
248             ZOOM_Event event;
249             size_t lead_len = strlen(v) + 1;
250
251             resultset->size = odr_atoi(v);
252
253             yaz_log(YLOG_LOG, "For key %s got value %s lead_len=%d len=%d",
254                     wrbuf_cstr(resultset->mc_key), v, (int) lead_len,
255                     (int) v_len);
256             if (v_len > lead_len)
257             {
258                 Z_OtherInformation *oi = 0;
259                 int oi_len = v_len - lead_len;
260                 odr_setbuf(resultset->odr, v + lead_len, oi_len, 0);
261                 if (!z_OtherInformation(resultset->odr, &oi, 0, 0))
262                 {
263                     yaz_log(YLOG_WARN, "oi decoding failed");
264                     freeReplyObject(reply);
265                     return;
266                 }
267                 ZOOM_handle_search_result(c, resultset, oi);
268                 ZOOM_handle_facet_result(c, resultset, oi);
269             }
270             event = ZOOM_Event_create(ZOOM_EVENT_RECV_SEARCH);
271             ZOOM_connection_put_event(c, event);
272             resultset->live_set = 1;
273         }
274         freeReplyObject(reply);
275     }
276 #endif
277 #if HAVE_LIBMEMCACHED_MEMCACHED_H
278     if (c->mc_st && resultset->live_set == 0)
279     {
280         size_t v_len;
281         uint32_t flags;
282         memcached_return_t rc;
283         char *v = memcached_get(c->mc_st, wrbuf_buf(resultset->mc_key),
284                                 wrbuf_len(resultset->mc_key),
285                                 &v_len, &flags, &rc);
286         /* count;precision (ASCII) + '\0' + BER buffer for otherInformation */
287         if (v)
288         {
289             ZOOM_Event event;
290             size_t lead_len = strlen(v) + 1;
291
292             resultset->size = odr_atoi(v);
293
294             yaz_log(YLOG_LOG, "For key %s got value %s lead_len=%d len=%d",
295                     wrbuf_cstr(resultset->mc_key), v, (int) lead_len,
296                     (int) v_len);
297             if (v_len > lead_len)
298             {
299                 Z_OtherInformation *oi = 0;
300                 int oi_len = v_len - lead_len;
301                 odr_setbuf(resultset->odr, v + lead_len, oi_len, 0);
302                 if (!z_OtherInformation(resultset->odr, &oi, 0, 0))
303                 {
304                     yaz_log(YLOG_WARN, "oi decoding failed");
305                     free(v);
306                     return;
307                 }
308                 ZOOM_handle_search_result(c, resultset, oi);
309                 ZOOM_handle_facet_result(c, resultset, oi);
310             }
311             free(v);
312             event = ZOOM_Event_create(ZOOM_EVENT_RECV_SEARCH);
313             ZOOM_connection_put_event(c, event);
314             resultset->live_set = 1;
315         }
316     }
317 #endif
318 }
319
320 void ZOOM_memcached_hitcount(ZOOM_connection c, ZOOM_resultset resultset,
321                              Z_OtherInformation *oi, const char *precision)
322 {
323 #if HAVE_HIREDIS
324     if (c->redis_c && resultset->live_set == 0)
325     {
326         char *str;
327         ODR odr = odr_createmem(ODR_ENCODE);
328         char *oi_buf = 0;
329         int oi_len = 0;
330         char *key;
331
332         str = odr_malloc(odr, 20 + strlen(precision));
333         /* count;precision (ASCII) + '\0' + BER buffer for otherInformation */
334         sprintf(str, ODR_INT_PRINTF ";%s", resultset->size, precision);
335         if (oi)
336         {
337             z_OtherInformation(odr, &oi, 0, 0);
338             oi_buf = odr_getbuf(odr, &oi_len, 0);
339         }
340         key = odr_malloc(odr, strlen(str) + 1 + oi_len);
341         strcpy(key, str);
342         if (oi_len)
343             memcpy(key + strlen(str) + 1, oi_buf, oi_len);
344
345         {
346             redisReply *reply;
347             const char *argv[3];
348             size_t argvlen[3];
349             argv[0] = "SET";
350             argvlen[0] = 3;
351             argv[1] = wrbuf_buf(resultset->mc_key);
352             argvlen[1] = wrbuf_len(resultset->mc_key);
353             argv[2] = key;
354             argvlen[2] = strlen(str) + 1 + oi_len;
355             reply = redisCommandArgv(c->redis_c, 3, argv, argvlen);
356             freeReplyObject(reply);
357         }
358         odr_destroy(odr);
359     }
360 #endif
361 #if HAVE_LIBMEMCACHED_MEMCACHED_H
362     if (c->mc_st && resultset->live_set == 0)
363     {
364         uint32_t flags = 0;
365         memcached_return_t rc;
366         time_t expiration = 36000;
367         char *str;
368         ODR odr = odr_createmem(ODR_ENCODE);
369         char *oi_buf = 0;
370         int oi_len = 0;
371         char *key;
372
373         str = odr_malloc(odr, 20 + strlen(precision));
374         /* count;precision (ASCII) + '\0' + BER buffer for otherInformation */
375         sprintf(str, ODR_INT_PRINTF ";%s", resultset->size, precision);
376         if (oi)
377         {
378             z_OtherInformation(odr, &oi, 0, 0);
379             oi_buf = odr_getbuf(odr, &oi_len, 0);
380         }
381         key = odr_malloc(odr, strlen(str) + 1 + oi_len);
382         strcpy(key, str);
383         if (oi_len)
384             memcpy(key + strlen(str) + 1, oi_buf, oi_len);
385
386         rc = memcached_set(c->mc_st,
387                            wrbuf_buf(resultset->mc_key),
388                            wrbuf_len(resultset->mc_key),
389                            key, strlen(str) + 1 + oi_len, expiration, flags);
390         yaz_log(YLOG_LOG, "Store hit count key=%s value=%s oi_len=%d rc=%u %s",
391                 wrbuf_cstr(resultset->mc_key), str, oi_len, (unsigned) rc,
392                 memcached_strerror(c->mc_st, rc));
393         odr_destroy(odr);
394     }
395 #endif
396 }
397
398 void ZOOM_memcached_add(ZOOM_resultset r, Z_NamePlusRecord *npr,
399                         int pos,
400                         const char *syntax, const char *elementSetName,
401                         const char *schema,
402                         Z_SRW_diagnostic *diag)
403 {
404 #if HAVE_HIREDIS
405     if (r->connection->redis_c &&
406         !diag && npr->which == Z_NamePlusRecord_databaseRecord)
407     {
408         WRBUF k = wrbuf_alloc();
409         WRBUF rec_sha1 = wrbuf_alloc();
410         ODR odr = odr_createmem(ODR_ENCODE);
411         char *rec_buf;
412         int rec_len;
413         const char *argv[3];
414         size_t argvlen[3];
415         redisReply *reply;
416
417         z_NamePlusRecord(odr, &npr, 0, 0);
418         rec_buf = odr_getbuf(odr, &rec_len, 0);
419
420         wrbuf_write(k, wrbuf_buf(r->mc_key), wrbuf_len(r->mc_key));
421         wrbuf_printf(k, ";%d;%s;%s;%s", pos,
422                      syntax ? syntax : "",
423                      elementSetName ? elementSetName : "",
424                      schema ? schema : "");
425
426         wrbuf_sha1_write(rec_sha1, rec_buf, rec_len, 1);
427
428         argv[0] = "SET";
429         argvlen[0] = 3;
430         argv[1] = wrbuf_buf(k);
431         argvlen[1] = wrbuf_len(k);
432         argv[2] = wrbuf_buf(rec_sha1);
433         argvlen[2] = wrbuf_len(rec_sha1);
434
435         reply = redisCommandArgv(r->connection->redis_c, 3, argv, argvlen);
436         yaz_log(YLOG_LOG, "Store record key=%s val=%s",
437                 wrbuf_cstr(k), wrbuf_cstr(rec_sha1));
438         freeReplyObject(reply);
439
440         argv[1] = wrbuf_buf(rec_sha1);
441         argvlen[1] = wrbuf_len(rec_sha1);
442         argv[2] = rec_buf;
443         argvlen[2] = rec_len;
444
445         reply = redisCommandArgv(r->connection->redis_c, 3, argv, argvlen);
446         yaz_log(YLOG_LOG, "Add record key=%s rec_len=%d",
447                 wrbuf_cstr(rec_sha1), rec_len);
448         freeReplyObject(reply);
449
450         odr_destroy(odr);
451         wrbuf_destroy(k);
452         wrbuf_destroy(rec_sha1);
453     }
454 #endif
455 #if HAVE_LIBMEMCACHED_MEMCACHED_H
456     if (r->connection->mc_st &&
457         !diag && npr->which == Z_NamePlusRecord_databaseRecord)
458     {
459         WRBUF k = wrbuf_alloc();
460         WRBUF rec_sha1 = wrbuf_alloc();
461         uint32_t flags = 0;
462         memcached_return_t rc;
463         time_t expiration = 36000;
464         ODR odr = odr_createmem(ODR_ENCODE);
465         char *rec_buf;
466         int rec_len;
467
468         z_NamePlusRecord(odr, &npr, 0, 0);
469         rec_buf = odr_getbuf(odr, &rec_len, 0);
470
471         wrbuf_write(k, wrbuf_buf(r->mc_key), wrbuf_len(r->mc_key));
472         wrbuf_printf(k, ";%d;%s;%s;%s", pos,
473                      syntax ? syntax : "",
474                      elementSetName ? elementSetName : "",
475                      schema ? schema : "");
476
477         wrbuf_sha1_write(rec_sha1, rec_buf, rec_len, 1);
478
479         rc = memcached_set(r->connection->mc_st,
480                            wrbuf_buf(k), wrbuf_len(k),
481                            wrbuf_buf(rec_sha1), wrbuf_len(rec_sha1),
482                            expiration, flags);
483
484         yaz_log(YLOG_LOG, "Store record key=%s val=%s rc=%u %s",
485                 wrbuf_cstr(k), wrbuf_cstr(rec_sha1), (unsigned) rc,
486                 memcached_strerror(r->connection->mc_st, rc));
487
488         rc = memcached_add(r->connection->mc_st,
489                            wrbuf_buf(rec_sha1), wrbuf_len(rec_sha1),
490                            rec_buf, rec_len,
491                            expiration, flags);
492
493         yaz_log(YLOG_LOG, "Add record key=%s rec_len=%d rc=%u %s",
494                 wrbuf_cstr(rec_sha1), rec_len, (unsigned) rc,
495                 memcached_strerror(r->connection->mc_st, rc));
496
497         odr_destroy(odr);
498         wrbuf_destroy(k);
499         wrbuf_destroy(rec_sha1);
500     }
501 #endif
502 }
503
504 Z_NamePlusRecord *ZOOM_memcached_lookup(ZOOM_resultset r, int pos,
505                                         const char *syntax,
506                                         const char *elementSetName,
507                                         const char *schema)
508 {
509 #if HAVE_HIREDIS
510     if (r->connection && r->connection->redis_c)
511     {
512         WRBUF k = wrbuf_alloc();
513         const char *argv[2];
514         size_t argvlen[2];
515         redisReply *reply1;
516
517         wrbuf_write(k, wrbuf_buf(r->mc_key), wrbuf_len(r->mc_key));
518         wrbuf_printf(k, ";%d;%s;%s;%s", pos,
519                      syntax ? syntax : "",
520                      elementSetName ? elementSetName : "",
521                      schema ? schema : "");
522
523         yaz_log(YLOG_LOG, "Lookup record %s", wrbuf_cstr(k));
524         argv[0] = "GET";
525         argvlen[0] = 3;
526         argv[1] = wrbuf_buf(k);
527         argvlen[1] = wrbuf_len(k);
528         reply1 = redisCommandArgv(r->connection->redis_c, 2, argv, argvlen);
529
530         wrbuf_destroy(k);
531         if (reply1 && reply1->type == REDIS_REPLY_STRING)
532         {
533             redisReply *reply2;
534             char *sha1_buf = reply1->str;
535             int sha1_len = reply1->len;
536
537             yaz_log(YLOG_LOG, "Lookup record %.*s", (int) sha1_len, sha1_buf);
538
539             argv[0] = "GET";
540             argvlen[0] = 3;
541             argv[1] = sha1_buf;
542             argvlen[1] = sha1_len;
543
544             reply2 = redisCommandArgv(r->connection->redis_c, 2, argv, argvlen);
545             if (reply2 && reply2->type == REDIS_REPLY_STRING)
546             {
547                 Z_NamePlusRecord *npr = 0;
548                 char *v_buf = reply2->str;
549                 int v_len = reply2->len;
550
551                 odr_setbuf(r->odr, v_buf, v_len, 0);
552                 z_NamePlusRecord(r->odr, &npr, 0, 0);
553                 if (npr)
554                     yaz_log(YLOG_LOG, "returned redis copy");
555                 freeReplyObject(reply2);
556                 freeReplyObject(reply1);
557                 return npr;
558             }
559             freeReplyObject(reply2);
560         }
561         freeReplyObject(reply1);
562     }
563 #endif
564 #if HAVE_LIBMEMCACHED_MEMCACHED_H
565     if (r->connection && r->connection->mc_st)
566     {
567         WRBUF k = wrbuf_alloc();
568         char *sha1_buf;
569         size_t sha1_len;
570         uint32_t flags;
571         memcached_return_t rc;
572
573         wrbuf_write(k, wrbuf_buf(r->mc_key), wrbuf_len(r->mc_key));
574         wrbuf_printf(k, ";%d;%s;%s;%s", pos,
575                      syntax ? syntax : "",
576                      elementSetName ? elementSetName : "",
577                      schema ? schema : "");
578
579         yaz_log(YLOG_LOG, "Lookup record %s", wrbuf_cstr(k));
580         sha1_buf = memcached_get(r->connection->mc_st,
581                                  wrbuf_buf(k), wrbuf_len(k),
582                                  &sha1_len, &flags, &rc);
583
584         wrbuf_destroy(k);
585         if (sha1_buf)
586         {
587             size_t v_len;
588             char *v_buf;
589
590             yaz_log(YLOG_LOG, "Lookup record %.*s", (int) sha1_len, sha1_buf);
591             v_buf = memcached_get(r->connection->mc_st, sha1_buf, sha1_len,
592                                   &v_len, &flags, &rc);
593             free(sha1_buf);
594             if (v_buf)
595             {
596                 Z_NamePlusRecord *npr = 0;
597
598                 odr_setbuf(r->odr, v_buf, v_len, 0);
599                 z_NamePlusRecord(r->odr, &npr, 0, 0);
600                 free(v_buf);
601                 if (npr)
602                     yaz_log(YLOG_LOG, "returned memcached copy");
603                 return npr;
604             }
605         }
606     }
607 #endif
608     return 0;
609
610 }
611 /*
612  * Local variables:
613  * c-basic-offset: 4
614  * c-file-style: "Stroustrup"
615  * indent-tabs-mode: nil
616  * End:
617  * vim: shiftwidth=4 tabstop=8 expandtab
618  */
619