1 /* This file is part of the YAZ toolkit.
2 * Copyright (C) Index Data
3 * See the file LICENSE for details.
6 * \file zoom-memcached.c
7 * \brief Implements query/record caching using memcached
18 #include <yaz/yaz-util.h>
19 #include <yaz/xmalloc.h>
21 #include <yaz/diagbib1.h>
24 #if LIBMEMCACHED_VERSION_HEX >= 0x01000000
25 #define HAVE_MEMCACHED_FUNC 1
27 #define HAVE_MEMCACHED_FUNC 0
31 void ZOOM_memcached_init(ZOOM_connection c)
41 void ZOOM_memcached_destroy(ZOOM_connection c)
45 memcached_free(c->mc_st);
49 redisFree(c->redis_c);
54 /* memcached wrapper.. Because memcached function do not exist in older libs */
55 static memcached_st *yaz_memcached_wrap(const char *conf)
57 #if HAVE_MEMCACHED_FUNC
58 return memcached(conf, strlen(conf));
62 memcached_st *mc = memcached_create(0);
63 NMEM nmem = nmem_create();
64 memcached_return_t rc;
66 nmem_strsplit_blank(nmem, conf, &darray, &num);
67 for (i = 0; mc && i < num; i++)
69 if (!yaz_strncasecmp(darray[i], "--SERVER=", 9))
71 char *host = darray[i] + 9;
72 char *port = strchr(host, ':');
73 char *weight = strstr(host, "/?");
81 rc = memcached_server_add(mc, host, port ? atoi(port) : 11211);
82 yaz_log(YLOG_LOG, "memcached_server_add host=%s rc=%u %s",
83 host, (unsigned) rc, memcached_strerror(mc, rc));
84 if (rc != MEMCACHED_SUCCESS)
104 static redisContext *create_redis(const char *conf)
108 NMEM nmem = nmem_create();
109 redisContext *context = 0;
111 nmem_strsplit_blank(nmem, conf, &darray, &num);
112 for (i = 0; i < num; i++)
114 if (!yaz_strncasecmp(darray[i], "--SERVER=", 9))
116 struct timeval timeout = { 1, 500000 }; /* 1.5 seconds */
117 char *host = darray[i] + 9;
118 char *port = strchr(host, ':');
121 context = redisConnectWithTimeout(host,
122 port ? atoi(port) : 6379,
131 int ZOOM_memcached_configure(ZOOM_connection c)
137 redisFree(c->redis_c);
141 #if HAVE_LIBMEMCACHED
144 memcached_free(c->mc_st);
149 val = ZOOM_options_get(c->options, "redis");
153 c->redis_c = create_redis(val);
154 if (c->redis_c == 0 || c->redis_c->err)
156 ZOOM_set_error(c, ZOOM_ERROR_MEMCACHED,
157 "could not create redis");
160 return 0; /* don't bother with memcached if redis is enabled */
162 ZOOM_set_error(c, ZOOM_ERROR_MEMCACHED, "not enabled");
166 val = ZOOM_options_get(c->options, "memcached");
169 #if HAVE_LIBMEMCACHED
170 c->mc_st = yaz_memcached_wrap(val);
173 ZOOM_set_error(c, ZOOM_ERROR_MEMCACHED,
174 "could not create memcached");
177 memcached_behavior_set(c->mc_st, MEMCACHED_BEHAVIOR_BINARY_PROTOCOL, 1);
179 ZOOM_set_error(c, ZOOM_ERROR_MEMCACHED, "not enabled");
187 static void wrbuf_vary_puts(WRBUF w, const char *v)
193 wrbuf_sha1_puts(w, v, 1);
203 void ZOOM_memcached_resultset(ZOOM_resultset r, ZOOM_query q)
206 ZOOM_connection c = r->connection;
208 r->mc_key = wrbuf_alloc();
209 wrbuf_puts(r->mc_key, "1;");
210 wrbuf_vary_puts(r->mc_key, c->host_port);
211 wrbuf_puts(r->mc_key, ";");
212 wrbuf_vary_puts(r->mc_key, ZOOM_resultset_option_get(r, "extraArgs"));
213 wrbuf_puts(r->mc_key, ";");
214 wrbuf_vary_puts(r->mc_key, c->user);
215 wrbuf_puts(r->mc_key, ";");
216 wrbuf_vary_puts(r->mc_key, c->group);
217 wrbuf_puts(r->mc_key, ";");
219 wrbuf_sha1_puts(r->mc_key, c->password, 1);
220 wrbuf_puts(r->mc_key, ";");
222 WRBUF w = wrbuf_alloc();
223 ZOOM_query_get_hash(q, w);
224 wrbuf_sha1_puts(r->mc_key, wrbuf_cstr(w), 1);
227 wrbuf_puts(r->mc_key, ";");
228 wrbuf_vary_puts(r->mc_key, r->req_facets);
232 void ZOOM_memcached_search(ZOOM_connection c, ZOOM_resultset resultset)
235 if (c->redis_c && resultset->live_set == 0)
241 argv[1] = wrbuf_cstr(resultset->mc_key);
243 reply = redisCommandArgv(c->redis_c, 2, argv, 0);
244 /* count;precision (ASCII) + '\0' + BER buffer for otherInformation */
245 if (reply && reply->type == REDIS_REPLY_STRING)
247 char *v = reply->str;
248 int v_len = reply->len;
250 size_t lead_len = strlen(v) + 1;
252 resultset->size = odr_atoi(v);
254 yaz_log(YLOG_LOG, "For key %s got value %s lead_len=%d len=%d",
255 wrbuf_cstr(resultset->mc_key), v, (int) lead_len,
257 if (v_len > lead_len)
259 Z_OtherInformation *oi = 0;
260 int oi_len = v_len - lead_len;
261 odr_setbuf(resultset->odr, v + lead_len, oi_len, 0);
262 if (!z_OtherInformation(resultset->odr, &oi, 0, 0))
264 yaz_log(YLOG_WARN, "oi decoding failed");
265 freeReplyObject(reply);
268 ZOOM_handle_search_result(c, resultset, oi);
269 ZOOM_handle_facet_result(c, resultset, oi);
271 event = ZOOM_Event_create(ZOOM_EVENT_RECV_SEARCH);
272 ZOOM_connection_put_event(c, event);
273 resultset->live_set = 1;
275 freeReplyObject(reply);
278 #if HAVE_LIBMEMCACHED
279 if (c->mc_st && resultset->live_set == 0)
283 memcached_return_t rc;
284 char *v = memcached_get(c->mc_st, wrbuf_buf(resultset->mc_key),
285 wrbuf_len(resultset->mc_key),
286 &v_len, &flags, &rc);
287 /* count;precision (ASCII) + '\0' + BER buffer for otherInformation */
291 size_t lead_len = strlen(v) + 1;
293 resultset->size = odr_atoi(v);
295 yaz_log(YLOG_LOG, "For key %s got value %s lead_len=%d len=%d",
296 wrbuf_cstr(resultset->mc_key), v, (int) lead_len,
298 if (v_len > lead_len)
300 Z_OtherInformation *oi = 0;
301 int oi_len = v_len - lead_len;
302 odr_setbuf(resultset->odr, v + lead_len, oi_len, 0);
303 if (!z_OtherInformation(resultset->odr, &oi, 0, 0))
305 yaz_log(YLOG_WARN, "oi decoding failed");
309 ZOOM_handle_search_result(c, resultset, oi);
310 ZOOM_handle_facet_result(c, resultset, oi);
313 event = ZOOM_Event_create(ZOOM_EVENT_RECV_SEARCH);
314 ZOOM_connection_put_event(c, event);
315 resultset->live_set = 1;
321 void ZOOM_memcached_hitcount(ZOOM_connection c, ZOOM_resultset resultset,
322 Z_OtherInformation *oi, const char *precision)
325 if (c->redis_c && resultset->live_set == 0)
328 ODR odr = odr_createmem(ODR_ENCODE);
333 str = odr_malloc(odr, 20 + strlen(precision));
334 /* count;precision (ASCII) + '\0' + BER buffer for otherInformation */
335 sprintf(str, ODR_INT_PRINTF ";%s", resultset->size, precision);
338 z_OtherInformation(odr, &oi, 0, 0);
339 oi_buf = odr_getbuf(odr, &oi_len, 0);
341 key = odr_malloc(odr, strlen(str) + 1 + oi_len);
344 memcpy(key + strlen(str) + 1, oi_buf, oi_len);
352 argv[1] = wrbuf_buf(resultset->mc_key);
353 argvlen[1] = wrbuf_len(resultset->mc_key);
355 argvlen[2] = strlen(str) + 1 + oi_len;
356 reply = redisCommandArgv(c->redis_c, 3, argv, argvlen);
357 freeReplyObject(reply);
362 #if HAVE_LIBMEMCACHED
363 if (c->mc_st && resultset->live_set == 0)
366 memcached_return_t rc;
367 time_t expiration = 36000;
369 ODR odr = odr_createmem(ODR_ENCODE);
374 str = odr_malloc(odr, 20 + strlen(precision));
375 /* count;precision (ASCII) + '\0' + BER buffer for otherInformation */
376 sprintf(str, ODR_INT_PRINTF ";%s", resultset->size, precision);
379 z_OtherInformation(odr, &oi, 0, 0);
380 oi_buf = odr_getbuf(odr, &oi_len, 0);
382 key = odr_malloc(odr, strlen(str) + 1 + oi_len);
385 memcpy(key + strlen(str) + 1, oi_buf, oi_len);
387 rc = memcached_set(c->mc_st,
388 wrbuf_buf(resultset->mc_key),
389 wrbuf_len(resultset->mc_key),
390 key, strlen(str) + 1 + oi_len, expiration, flags);
391 yaz_log(YLOG_LOG, "Store hit count key=%s value=%s oi_len=%d rc=%u %s",
392 wrbuf_cstr(resultset->mc_key), str, oi_len, (unsigned) rc,
393 memcached_strerror(c->mc_st, rc));
399 void ZOOM_memcached_add(ZOOM_resultset r, Z_NamePlusRecord *npr,
401 const char *syntax, const char *elementSetName,
403 Z_SRW_diagnostic *diag)
406 if (r->connection->redis_c &&
407 !diag && npr->which == Z_NamePlusRecord_databaseRecord)
409 WRBUF k = wrbuf_alloc();
410 WRBUF rec_sha1 = wrbuf_alloc();
411 ODR odr = odr_createmem(ODR_ENCODE);
418 z_NamePlusRecord(odr, &npr, 0, 0);
419 rec_buf = odr_getbuf(odr, &rec_len, 0);
421 wrbuf_write(k, wrbuf_buf(r->mc_key), wrbuf_len(r->mc_key));
422 wrbuf_printf(k, ";%d;%s;%s;%s", pos,
423 syntax ? syntax : "",
424 elementSetName ? elementSetName : "",
425 schema ? schema : "");
427 wrbuf_sha1_write(rec_sha1, rec_buf, rec_len, 1);
431 argv[1] = wrbuf_buf(k);
432 argvlen[1] = wrbuf_len(k);
433 argv[2] = wrbuf_buf(rec_sha1);
434 argvlen[2] = wrbuf_len(rec_sha1);
436 reply = redisCommandArgv(r->connection->redis_c, 3, argv, argvlen);
437 yaz_log(YLOG_LOG, "Store record key=%s val=%s",
438 wrbuf_cstr(k), wrbuf_cstr(rec_sha1));
439 freeReplyObject(reply);
441 argv[1] = wrbuf_buf(rec_sha1);
442 argvlen[1] = wrbuf_len(rec_sha1);
444 argvlen[2] = rec_len;
446 reply = redisCommandArgv(r->connection->redis_c, 3, argv, argvlen);
447 yaz_log(YLOG_LOG, "Add record key=%s rec_len=%d",
448 wrbuf_cstr(rec_sha1), rec_len);
449 freeReplyObject(reply);
453 wrbuf_destroy(rec_sha1);
456 #if HAVE_LIBMEMCACHED
457 if (r->connection->mc_st &&
458 !diag && npr->which == Z_NamePlusRecord_databaseRecord)
460 WRBUF k = wrbuf_alloc();
461 WRBUF rec_sha1 = wrbuf_alloc();
463 memcached_return_t rc;
464 time_t expiration = 36000;
465 ODR odr = odr_createmem(ODR_ENCODE);
469 z_NamePlusRecord(odr, &npr, 0, 0);
470 rec_buf = odr_getbuf(odr, &rec_len, 0);
472 wrbuf_write(k, wrbuf_buf(r->mc_key), wrbuf_len(r->mc_key));
473 wrbuf_printf(k, ";%d;%s;%s;%s", pos,
474 syntax ? syntax : "",
475 elementSetName ? elementSetName : "",
476 schema ? schema : "");
478 wrbuf_sha1_write(rec_sha1, rec_buf, rec_len, 1);
480 rc = memcached_set(r->connection->mc_st,
481 wrbuf_buf(k), wrbuf_len(k),
482 wrbuf_buf(rec_sha1), wrbuf_len(rec_sha1),
485 yaz_log(YLOG_LOG, "Store record key=%s val=%s rc=%u %s",
486 wrbuf_cstr(k), wrbuf_cstr(rec_sha1), (unsigned) rc,
487 memcached_strerror(r->connection->mc_st, rc));
489 rc = memcached_add(r->connection->mc_st,
490 wrbuf_buf(rec_sha1), wrbuf_len(rec_sha1),
494 yaz_log(YLOG_LOG, "Add record key=%s rec_len=%d rc=%u %s",
495 wrbuf_cstr(rec_sha1), rec_len, (unsigned) rc,
496 memcached_strerror(r->connection->mc_st, rc));
500 wrbuf_destroy(rec_sha1);
505 Z_NamePlusRecord *ZOOM_memcached_lookup(ZOOM_resultset r, int pos,
507 const char *elementSetName,
511 if (r->connection && r->connection->redis_c)
513 WRBUF k = wrbuf_alloc();
518 wrbuf_write(k, wrbuf_buf(r->mc_key), wrbuf_len(r->mc_key));
519 wrbuf_printf(k, ";%d;%s;%s;%s", pos,
520 syntax ? syntax : "",
521 elementSetName ? elementSetName : "",
522 schema ? schema : "");
524 yaz_log(YLOG_LOG, "Lookup record %s", wrbuf_cstr(k));
527 argv[1] = wrbuf_buf(k);
528 argvlen[1] = wrbuf_len(k);
529 reply1 = redisCommandArgv(r->connection->redis_c, 2, argv, argvlen);
532 if (reply1 && reply1->type == REDIS_REPLY_STRING)
535 char *sha1_buf = reply1->str;
536 int sha1_len = reply1->len;
538 yaz_log(YLOG_LOG, "Lookup record %.*s", (int) sha1_len, sha1_buf);
543 argvlen[1] = sha1_len;
545 reply2 = redisCommandArgv(r->connection->redis_c, 2, argv, argvlen);
546 if (reply2 && reply2->type == REDIS_REPLY_STRING)
548 Z_NamePlusRecord *npr = 0;
549 char *v_buf = reply2->str;
550 int v_len = reply2->len;
552 odr_setbuf(r->odr, v_buf, v_len, 0);
553 z_NamePlusRecord(r->odr, &npr, 0, 0);
555 yaz_log(YLOG_LOG, "returned redis copy");
556 freeReplyObject(reply2);
557 freeReplyObject(reply1);
560 freeReplyObject(reply2);
562 freeReplyObject(reply1);
565 #if HAVE_LIBMEMCACHED
566 if (r->connection && r->connection->mc_st)
568 WRBUF k = wrbuf_alloc();
572 memcached_return_t rc;
574 wrbuf_write(k, wrbuf_buf(r->mc_key), wrbuf_len(r->mc_key));
575 wrbuf_printf(k, ";%d;%s;%s;%s", pos,
576 syntax ? syntax : "",
577 elementSetName ? elementSetName : "",
578 schema ? schema : "");
580 yaz_log(YLOG_LOG, "Lookup record %s", wrbuf_cstr(k));
581 sha1_buf = memcached_get(r->connection->mc_st,
582 wrbuf_buf(k), wrbuf_len(k),
583 &sha1_len, &flags, &rc);
591 yaz_log(YLOG_LOG, "Lookup record %.*s", (int) sha1_len, sha1_buf);
592 v_buf = memcached_get(r->connection->mc_st, sha1_buf, sha1_len,
593 &v_len, &flags, &rc);
597 Z_NamePlusRecord *npr = 0;
599 odr_setbuf(r->odr, v_buf, v_len, 0);
600 z_NamePlusRecord(r->odr, &npr, 0, 0);
603 yaz_log(YLOG_LOG, "returned memcached copy");
615 * c-file-style: "Stroustrup"
616 * indent-tabs-mode: nil
618 * vim: shiftwidth=4 tabstop=8 expandtab