void ZOOM_memcached_init(ZOOM_connection c)
{
-#if HAVE_LIBMEMCACHED_MEMCACHED_H
+#if HAVE_LIBMEMCACHED
c->mc_st = 0;
#endif
+#if HAVE_HIREDIS
+ c->redis_c = 0;
+#endif
+ c->expire_search = 600;
+ c->expire_record = 1200;
}
void ZOOM_memcached_destroy(ZOOM_connection c)
{
-#if HAVE_LIBMEMCACHED_MEMCACHED_H
+#if HAVE_LIBMEMCACHED
if (c->mc_st)
memcached_free(c->mc_st);
#endif
+#if HAVE_HIREDIS
+ if (c->redis_c)
+ redisFree(c->redis_c);
+#endif
}
+#if HAVE_LIBMEMCACHED
+static memcached_st *create_memcached(const char *conf,
+ int *expire_search, int *expire_record)
+{
+ char **darray;
+ int i, num;
+ memcached_st *mc = memcached_create(0);
+ NMEM nmem = nmem_create();
+ memcached_return_t rc;
+
+ nmem_strsplit_blank(nmem, conf, &darray, &num);
+ for (i = 0; mc && i < num; i++)
+ {
+ if (!yaz_strncasecmp(darray[i], "--SERVER=", 9))
+ {
+ char *host = darray[i] + 9;
+ char *port = strchr(host, ':');
+ char *weight = strstr(host, "/?");
+ if (port)
+ *port++ = '\0';
+ if (weight)
+ {
+ *weight = '\0';
+ weight += 2;
+ }
+ rc = memcached_server_add(mc, host, port ? atoi(port) : 11211);
+ yaz_log(YLOG_LOG, "memcached_server_add host=%s rc=%u %s",
+ host, (unsigned) rc, memcached_strerror(mc, rc));
+ if (rc != MEMCACHED_SUCCESS)
+ {
+ memcached_free(mc);
+ mc = 0;
+ }
+ }
+ else if (!yaz_strncasecmp(darray[i], "--EXPIRE=", 9))
+ {
+ *expire_search = atoi(darray[i] + 9);
+ *expire_record = 600 + *expire_search;
+ }
+ else
+ {
+ /* bad directive */
+ memcached_free(mc);
+ mc = 0;
+ }
+ }
+ nmem_destroy(nmem);
+ return mc;
+}
+#endif
+
+#if HAVE_HIREDIS
+static redisContext *create_redis(const char *conf,
+ int *expire_search, int *expire_record)
+{
+ char **darray;
+ int i, num;
+ NMEM nmem = nmem_create();
+ redisContext *context = 0;
+
+ nmem_strsplit_blank(nmem, conf, &darray, &num);
+ for (i = 0; i < num; i++)
+ {
+ if (!yaz_strncasecmp(darray[i], "--SERVER=", 9))
+ {
+ struct timeval timeout = { 1, 500000 }; /* 1.5 seconds */
+ char *host = darray[i] + 9;
+ char *port = strchr(host, ':');
+ if (port)
+ *port++ = '\0';
+ context = redisConnectWithTimeout(host,
+ port ? atoi(port) : 6379,
+ timeout);
+ }
+ else if (!yaz_strncasecmp(darray[i], "--EXPIRE=", 9))
+ {
+ *expire_search = atoi(darray[i] + 9);
+ *expire_record = 600 + *expire_search;
+ }
+ }
+ nmem_destroy(nmem);
+ return context;
+}
+#endif
+
int ZOOM_memcached_configure(ZOOM_connection c)
{
const char *val;
-#if HAVE_LIBMEMCACHED_MEMCACHED_H
+#if HAVE_HIREDIS
+ if (c->redis_c)
+ {
+ redisFree(c->redis_c);
+ c->redis_c = 0;
+ }
+#endif
+#if HAVE_LIBMEMCACHED
if (c->mc_st)
{
memcached_free(c->mc_st);
c->mc_st = 0;
}
#endif
+
+ val = ZOOM_options_get(c->options, "redis");
+ if (val && *val)
+ {
+#if HAVE_HIREDIS
+ c->redis_c = create_redis(val,
+ &c->expire_search, &c->expire_record);
+ if (c->redis_c == 0 || c->redis_c->err)
+ {
+ ZOOM_set_error(c, ZOOM_ERROR_MEMCACHED,
+ "could not create redis");
+ return -1;
+ }
+ return 0; /* don't bother with memcached if redis is enabled */
+#else
+ ZOOM_set_error(c, ZOOM_ERROR_MEMCACHED, "not enabled");
+ return -1;
+#endif
+ }
val = ZOOM_options_get(c->options, "memcached");
if (val && *val)
{
-#if HAVE_LIBMEMCACHED_MEMCACHED_H
- c->mc_st = memcached(val, strlen(val));
+#if HAVE_LIBMEMCACHED
+ c->mc_st = create_memcached(val, &c->expire_search, &c->expire_record);
if (!c->mc_st)
{
- ZOOM_set_error(c, ZOOM_ERROR_MEMCACHED, val);
+ ZOOM_set_error(c, ZOOM_ERROR_MEMCACHED,
+ "could not create memcached");
return -1;
}
memcached_behavior_set(c->mc_st, MEMCACHED_BEHAVIOR_BINARY_PROTOCOL, 1);
return 0;
}
+#if HAVE_GCRYPT_H
+static void wrbuf_vary_puts(WRBUF w, const char *v)
+{
+ if (v)
+ {
+ if (strlen(v) > 40)
+ {
+ wrbuf_sha1_puts(w, v, 1);
+ }
+ else
+ {
+ wrbuf_puts(w, v);
+ }
+ }
+}
+#endif
+
void ZOOM_memcached_resultset(ZOOM_resultset r, ZOOM_query q)
{
-#if HAVE_LIBMEMCACHED_MEMCACHED_H
+#if HAVE_GCRYPT_H
ZOOM_connection c = r->connection;
+
r->mc_key = wrbuf_alloc();
- wrbuf_puts(r->mc_key, "0;");
- wrbuf_puts(r->mc_key, c->host_port);
+ wrbuf_puts(r->mc_key, "1;");
+ wrbuf_vary_puts(r->mc_key, c->host_port);
wrbuf_puts(r->mc_key, ";");
- if (c->user)
- wrbuf_puts(r->mc_key, c->user);
+ wrbuf_vary_puts(r->mc_key, ZOOM_resultset_option_get(r, "extraArgs"));
wrbuf_puts(r->mc_key, ";");
- if (c->group)
- wrbuf_puts(r->mc_key, c->group);
+ wrbuf_vary_puts(r->mc_key, c->user);
+ wrbuf_puts(r->mc_key, ";");
+ wrbuf_vary_puts(r->mc_key, c->group);
wrbuf_puts(r->mc_key, ";");
if (c->password)
wrbuf_sha1_puts(r->mc_key, c->password, 1);
wrbuf_destroy(w);
}
wrbuf_puts(r->mc_key, ";");
- if (r->req_facets)
- wrbuf_puts(r->mc_key, r->req_facets);
+ wrbuf_vary_puts(r->mc_key, r->req_facets);
#endif
}
void ZOOM_memcached_search(ZOOM_connection c, ZOOM_resultset resultset)
{
-#if HAVE_LIBMEMCACHED_MEMCACHED_H
- /* TODO: add sorting */
+#if HAVE_HIREDIS
+ if (c->redis_c && resultset->live_set == 0)
+ {
+ redisReply *reply;
+ const char *argv[2];
+
+ argv[0] = "GET";
+ argv[1] = wrbuf_cstr(resultset->mc_key);
+
+ reply = redisCommandArgv(c->redis_c, 2, argv, 0);
+ /* count;precision (ASCII) + '\0' + BER buffer for otherInformation */
+ if (reply && reply->type == REDIS_REPLY_STRING)
+ {
+ char *v = reply->str;
+ int v_len = reply->len;
+ ZOOM_Event event;
+ size_t lead_len = strlen(v) + 1;
+
+ resultset->size = odr_atoi(v);
+
+ yaz_log(YLOG_LOG, "For key %s got value %s lead_len=%d len=%d",
+ wrbuf_cstr(resultset->mc_key), v, (int) lead_len,
+ (int) v_len);
+ if (v_len > lead_len)
+ {
+ Z_OtherInformation *oi = 0;
+ int oi_len = v_len - lead_len;
+ odr_setbuf(resultset->odr, v + lead_len, oi_len, 0);
+ if (!z_OtherInformation(resultset->odr, &oi, 0, 0))
+ {
+ yaz_log(YLOG_WARN, "oi decoding failed");
+ freeReplyObject(reply);
+ return;
+ }
+ ZOOM_handle_search_result(c, resultset, oi);
+ ZOOM_handle_facet_result(c, resultset, oi);
+ }
+ event = ZOOM_Event_create(ZOOM_EVENT_RECV_SEARCH);
+ ZOOM_connection_put_event(c, event);
+ resultset->live_set = 1;
+ }
+ if (reply)
+ freeReplyObject(reply);
+ }
+#endif
+#if HAVE_LIBMEMCACHED
if (c->mc_st && resultset->live_set == 0)
{
size_t v_len;
char *v = memcached_get(c->mc_st, wrbuf_buf(resultset->mc_key),
wrbuf_len(resultset->mc_key),
&v_len, &flags, &rc);
+ /* count;precision (ASCII) + '\0' + BER buffer for otherInformation */
if (v)
{
ZOOM_Event event;
- WRBUF w = wrbuf_alloc();
-
- wrbuf_write(w, v, v_len);
- free(v);
- resultset->size = odr_atoi(wrbuf_cstr(w));
+ size_t lead_len = strlen(v) + 1;
- yaz_log(YLOG_LOG, "For key %s got value %s",
- wrbuf_cstr(resultset->mc_key), wrbuf_cstr(w));
+ resultset->size = odr_atoi(v);
- wrbuf_destroy(w);
+ yaz_log(YLOG_LOG, "For key %s got value %s lead_len=%d len=%d",
+ wrbuf_cstr(resultset->mc_key), v, (int) lead_len,
+ (int) v_len);
+ if (v_len > lead_len)
+ {
+ Z_OtherInformation *oi = 0;
+ int oi_len = v_len - lead_len;
+ odr_setbuf(resultset->odr, v + lead_len, oi_len, 0);
+ if (!z_OtherInformation(resultset->odr, &oi, 0, 0))
+ {
+ yaz_log(YLOG_WARN, "oi decoding failed");
+ free(v);
+ return;
+ }
+ ZOOM_handle_search_result(c, resultset, oi);
+ ZOOM_handle_facet_result(c, resultset, oi);
+ }
+ free(v);
event = ZOOM_Event_create(ZOOM_EVENT_RECV_SEARCH);
ZOOM_connection_put_event(c, event);
resultset->live_set = 1;
#endif
}
-void ZOOM_memcached_hitcount(ZOOM_connection c, ZOOM_resultset resultset)
+#if HAVE_HIREDIS
+static void expire_redis(redisContext *redis_c,
+ const char *buf, size_t len, int exp)
{
-#if HAVE_LIBMEMCACHED_MEMCACHED_H
+ redisReply *reply;
+ const char *argv[3];
+ size_t argvlen[3];
+ char key_val[20];
+
+ sprintf(key_val, "%d", exp);
+
+ argv[0] = "EXPIRE";
+ argvlen[0] = 6;
+ argv[1] = buf;
+ argvlen[1] = len;
+ argv[2] = key_val;
+ argvlen[2] = strlen(key_val);
+ reply = redisCommandArgv(redis_c, 3, argv, argvlen);
+ freeReplyObject(reply);
+}
+#endif
+
+void ZOOM_memcached_hitcount(ZOOM_connection c, ZOOM_resultset resultset,
+ Z_OtherInformation *oi, const char *precision)
+{
+#if HAVE_HIREDIS
+ if (c->redis_c && resultset->live_set == 0)
+ {
+ char *str;
+ ODR odr = odr_createmem(ODR_ENCODE);
+ char *oi_buf = 0;
+ int oi_len = 0;
+ char *key;
+
+ str = odr_malloc(odr, 20 + strlen(precision));
+ /* count;precision (ASCII) + '\0' + BER buffer for otherInformation */
+ sprintf(str, ODR_INT_PRINTF ";%s", resultset->size, precision);
+ if (oi)
+ {
+ z_OtherInformation(odr, &oi, 0, 0);
+ oi_buf = odr_getbuf(odr, &oi_len, 0);
+ }
+ key = odr_malloc(odr, strlen(str) + 1 + oi_len);
+ strcpy(key, str);
+ if (oi_len)
+ memcpy(key + strlen(str) + 1, oi_buf, oi_len);
+
+ {
+ redisReply *reply;
+ const char *argv[3];
+ size_t argvlen[3];
+ argv[0] = "SET";
+ argvlen[0] = 3;
+ argv[1] = wrbuf_buf(resultset->mc_key);
+ argvlen[1] = wrbuf_len(resultset->mc_key);
+ argv[2] = key;
+ argvlen[2] = strlen(str) + 1 + oi_len;
+ reply = redisCommandArgv(c->redis_c, 3, argv, argvlen);
+ freeReplyObject(reply);
+ }
+ expire_redis(c->redis_c,
+ wrbuf_buf(resultset->mc_key),
+ wrbuf_len(resultset->mc_key),
+ c->expire_search);
+ odr_destroy(odr);
+ }
+#endif
+#if HAVE_LIBMEMCACHED
if (c->mc_st && resultset->live_set == 0)
{
uint32_t flags = 0;
memcached_return_t rc;
- time_t expiration = 36000;
- char str[40];
+ char *str;
+ ODR odr = odr_createmem(ODR_ENCODE);
+ char *oi_buf = 0;
+ int oi_len = 0;
+ char *key;
+
+ str = odr_malloc(odr, 20 + strlen(precision));
+ /* count;precision (ASCII) + '\0' + BER buffer for otherInformation */
+ sprintf(str, ODR_INT_PRINTF ";%s", resultset->size, precision);
+ if (oi)
+ {
+ z_OtherInformation(odr, &oi, 0, 0);
+ oi_buf = odr_getbuf(odr, &oi_len, 0);
+ }
+ key = odr_malloc(odr, strlen(str) + 1 + oi_len);
+ strcpy(key, str);
+ if (oi_len)
+ memcpy(key + strlen(str) + 1, oi_buf, oi_len);
- sprintf(str, ODR_INT_PRINTF, resultset->size);
rc = memcached_set(c->mc_st,
- wrbuf_buf(resultset->mc_key),wrbuf_len(resultset->mc_key),
- str, strlen(str), expiration, flags);
- yaz_log(YLOG_LOG, "Store hit count key=%s value=%s rc=%u %s",
- wrbuf_cstr(resultset->mc_key), str, (unsigned) rc,
- memcached_last_error_message(c->mc_st));
+ wrbuf_buf(resultset->mc_key),
+ wrbuf_len(resultset->mc_key),
+ key, strlen(str) + 1 + oi_len,
+ c->expire_search, flags);
+ yaz_log(YLOG_LOG, "Store hit count key=%s value=%s oi_len=%d rc=%u %s",
+ wrbuf_cstr(resultset->mc_key), str, oi_len, (unsigned) rc,
+ memcached_strerror(c->mc_st, rc));
+ odr_destroy(odr);
}
#endif
}
const char *schema,
Z_SRW_diagnostic *diag)
{
-#if HAVE_LIBMEMCACHED_MEMCACHED_H
+#if HAVE_HIREDIS
+ if (r->connection->redis_c &&
+ !diag && npr->which == Z_NamePlusRecord_databaseRecord)
+ {
+ WRBUF k = wrbuf_alloc();
+ WRBUF rec_sha1 = wrbuf_alloc();
+ ODR odr = odr_createmem(ODR_ENCODE);
+ char *rec_buf;
+ int rec_len;
+ const char *argv[3];
+ size_t argvlen[3];
+ redisReply *reply;
+
+ z_NamePlusRecord(odr, &npr, 0, 0);
+ rec_buf = odr_getbuf(odr, &rec_len, 0);
+
+ wrbuf_write(k, wrbuf_buf(r->mc_key), wrbuf_len(r->mc_key));
+ wrbuf_printf(k, ";%d;%s;%s;%s", pos,
+ syntax ? syntax : "",
+ elementSetName ? elementSetName : "",
+ schema ? schema : "");
+
+ wrbuf_sha1_write(rec_sha1, rec_buf, rec_len, 1);
+
+ argv[0] = "SET";
+ argvlen[0] = 3;
+ argv[1] = wrbuf_buf(k);
+ argvlen[1] = wrbuf_len(k);
+ argv[2] = wrbuf_buf(rec_sha1);
+ argvlen[2] = wrbuf_len(rec_sha1);
+
+ reply = redisCommandArgv(r->connection->redis_c, 3, argv, argvlen);
+ yaz_log(YLOG_LOG, "Store record key=%s val=%s",
+ wrbuf_cstr(k), wrbuf_cstr(rec_sha1));
+ freeReplyObject(reply);
+
+ expire_redis(r->connection->redis_c, argv[1], argvlen[1],
+ r->connection->expire_search);
+
+ argv[1] = wrbuf_buf(rec_sha1);
+ argvlen[1] = wrbuf_len(rec_sha1);
+ argv[2] = rec_buf;
+ argvlen[2] = rec_len;
+
+ reply = redisCommandArgv(r->connection->redis_c, 3, argv, argvlen);
+ yaz_log(YLOG_LOG, "Add record key=%s rec_len=%d",
+ wrbuf_cstr(rec_sha1), rec_len);
+ freeReplyObject(reply);
+
+ expire_redis(r->connection->redis_c, argv[1], argvlen[1],
+ r->connection->expire_record);
+
+ odr_destroy(odr);
+ wrbuf_destroy(k);
+ wrbuf_destroy(rec_sha1);
+ }
+#endif
+#if HAVE_LIBMEMCACHED
if (r->connection->mc_st &&
!diag && npr->which == Z_NamePlusRecord_databaseRecord)
{
WRBUF k = wrbuf_alloc();
+ WRBUF rec_sha1 = wrbuf_alloc();
uint32_t flags = 0;
memcached_return_t rc;
- time_t expiration = 36000;
ODR odr = odr_createmem(ODR_ENCODE);
char *rec_buf;
int rec_len;
syntax ? syntax : "",
elementSetName ? elementSetName : "",
schema ? schema : "");
+
+ wrbuf_sha1_write(rec_sha1, rec_buf, rec_len, 1);
+
rc = memcached_set(r->connection->mc_st,
- wrbuf_buf(k),wrbuf_len(k),
+ wrbuf_buf(k), wrbuf_len(k),
+ wrbuf_buf(rec_sha1), wrbuf_len(rec_sha1),
+ r->connection->expire_search, flags);
+
+ yaz_log(YLOG_LOG, "Store record key=%s val=%s rc=%u %s",
+ wrbuf_cstr(k), wrbuf_cstr(rec_sha1), (unsigned) rc,
+ memcached_strerror(r->connection->mc_st, rc));
+
+ rc = memcached_add(r->connection->mc_st,
+ wrbuf_buf(rec_sha1), wrbuf_len(rec_sha1),
rec_buf, rec_len,
- expiration, flags);
+ r->connection->expire_record, flags);
+
+ yaz_log(YLOG_LOG, "Add record key=%s rec_len=%d rc=%u %s",
+ wrbuf_cstr(rec_sha1), rec_len, (unsigned) rc,
+ memcached_strerror(r->connection->mc_st, rc));
- yaz_log(YLOG_LOG, "Store record lkey=%s len=%d rc=%u %s",
- wrbuf_cstr(k), rec_len, (unsigned) rc,
- memcached_last_error_message(r->connection->mc_st));
odr_destroy(odr);
wrbuf_destroy(k);
+ wrbuf_destroy(rec_sha1);
}
#endif
}
const char *elementSetName,
const char *schema)
{
-#if HAVE_LIBMEMCACHED_MEMCACHED_H
+#if HAVE_HIREDIS
+ if (r->connection && r->connection->redis_c)
+ {
+ WRBUF k = wrbuf_alloc();
+ const char *argv[2];
+ size_t argvlen[2];
+ redisReply *reply1;
+
+ wrbuf_write(k, wrbuf_buf(r->mc_key), wrbuf_len(r->mc_key));
+ wrbuf_printf(k, ";%d;%s;%s;%s", pos,
+ syntax ? syntax : "",
+ elementSetName ? elementSetName : "",
+ schema ? schema : "");
+
+ yaz_log(YLOG_LOG, "Lookup record %s", wrbuf_cstr(k));
+ argv[0] = "GET";
+ argvlen[0] = 3;
+ argv[1] = wrbuf_buf(k);
+ argvlen[1] = wrbuf_len(k);
+ reply1 = redisCommandArgv(r->connection->redis_c, 2, argv, argvlen);
+
+ wrbuf_destroy(k);
+ if (reply1 && reply1->type == REDIS_REPLY_STRING)
+ {
+ redisReply *reply2;
+ char *sha1_buf = reply1->str;
+ int sha1_len = reply1->len;
+
+ yaz_log(YLOG_LOG, "Lookup record %.*s", (int) sha1_len, sha1_buf);
+
+ argv[0] = "GET";
+ argvlen[0] = 3;
+ argv[1] = sha1_buf;
+ argvlen[1] = sha1_len;
+
+ reply2 = redisCommandArgv(r->connection->redis_c, 2, argv, argvlen);
+ if (reply2 && reply2->type == REDIS_REPLY_STRING)
+ {
+ Z_NamePlusRecord *npr = 0;
+ char *v_buf = reply2->str;
+ int v_len = reply2->len;
+
+ odr_setbuf(r->odr, v_buf, v_len, 0);
+ z_NamePlusRecord(r->odr, &npr, 0, 0);
+ if (npr)
+ yaz_log(YLOG_LOG, "returned redis copy");
+ freeReplyObject(reply2);
+ freeReplyObject(reply1);
+ return npr;
+ }
+ freeReplyObject(reply2);
+ }
+ freeReplyObject(reply1);
+ }
+#endif
+#if HAVE_LIBMEMCACHED
if (r->connection && r->connection->mc_st)
{
WRBUF k = wrbuf_alloc();
- size_t v_len;
- char *v_buf;
+ char *sha1_buf;
+ size_t sha1_len;
uint32_t flags;
memcached_return_t rc;
schema ? schema : "");
yaz_log(YLOG_LOG, "Lookup record %s", wrbuf_cstr(k));
- v_buf = memcached_get(r->connection->mc_st, wrbuf_buf(k), wrbuf_len(k),
- &v_len, &flags, &rc);
+ sha1_buf = memcached_get(r->connection->mc_st,
+ wrbuf_buf(k), wrbuf_len(k),
+ &sha1_len, &flags, &rc);
+
wrbuf_destroy(k);
- if (v_buf)
+ if (sha1_buf)
{
- Z_NamePlusRecord *npr = 0;
+ size_t v_len;
+ char *v_buf;
+
+ yaz_log(YLOG_LOG, "Lookup record %.*s", (int) sha1_len, sha1_buf);
+ v_buf = memcached_get(r->connection->mc_st, sha1_buf, sha1_len,
+ &v_len, &flags, &rc);
+ free(sha1_buf);
+ if (v_buf)
+ {
+ Z_NamePlusRecord *npr = 0;
- odr_setbuf(r->odr, v_buf, v_len, 0);
- z_NamePlusRecord(r->odr, &npr, 0, 0);
- free(v_buf);
- if (npr)
- yaz_log(YLOG_LOG, "returned memcached copy");
- return npr;
+ odr_setbuf(r->odr, v_buf, v_len, 0);
+ z_NamePlusRecord(r->odr, &npr, 0, 0);
+ free(v_buf);
+ if (npr)
+ yaz_log(YLOG_LOG, "returned memcached copy");
+ return npr;
+ }
}
}
#endif