First work on multi value sort ISAM
authorAdam Dickmeiss <adam@indexdata.dk>
Mon, 8 Sep 2008 12:07:08 +0000 (14:07 +0200)
committerAdam Dickmeiss <adam@indexdata.dk>
Mon, 8 Sep 2008 12:07:08 +0000 (14:07 +0200)
include/sortidx.h
index/retrieve.c
index/sortidx.c
index/zebraapi.c
index/zsets.c
test/api/test_sortidx.c

index 31d210b..f944ba7 100644 (file)
@@ -23,6 +23,7 @@ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
 #include <yaz/yconfig.h>
 #include <idzebra/version.h>
 #include <idzebra/bfile.h>
+#include <yaz/wrbuf.h>
 
 YAZ_BEGIN_CDECL
 
@@ -35,6 +36,7 @@ typedef struct zebra_sort_index *zebra_sort_index_t;
 
 #define ZEBRA_SORT_TYPE_FLAT 1
 #define ZEBRA_SORT_TYPE_ISAMB 2
+#define ZEBRA_SORT_TYPE_MULTI 3
 
 
 /** \brief creates sort handle
@@ -84,7 +86,7 @@ void zebra_sort_delete(zebra_sort_index_t si);
     \retval 0 could not be read
     \retval 1 could be read (found)
 */
-int zebra_sort_read(zebra_sort_index_t si, char *buf);
+int zebra_sort_read(zebra_sort_index_t si, WRBUF w);
 
 YAZ_END_CDECL
 
index f5e6bb6..5b602ed 100644 (file)
@@ -249,7 +249,7 @@ int zebra_special_sort_fetch(
     else
     {
         char dst_buf[IT_MAX_WORD];
-        char str[IT_MAX_WORD];
+        WRBUF str = wrbuf_alloc();
         const char *index_type;
         const char *db = 0;
         const char *string_index = 0;
@@ -261,7 +261,7 @@ int zebra_special_sort_fetch(
 
         zebraExplain_lookup_ord(zh->reg->zei, ord, &index_type, &db, &string_index);
         
-        zebra_term_untrans(zh, index_type, dst_buf, str);
+        zebra_term_untrans(zh, index_type, dst_buf, wrbuf_cstr(str));
 
         if (!oid_oidcmp(input_format, yaz_oid_recsyn_xml))
         {
@@ -285,6 +285,7 @@ int zebra_special_sort_fetch(
             wrbuf_printf(wrbuf, "%s %s %s\n", string_index, index_type,
                          dst_buf);
         }
+        wrbuf_destroy(str);
         return 0;
     }
 }
index 6f96c37..65bff6b 100644 (file)
@@ -29,12 +29,14 @@ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
 #include "recindex.h"
 
 #define SORT_MAX_TERM 110
+#define SORT_MAX_MULTI 4096
 
 #define SORT_IDX_BLOCKSIZE 64
 
 struct sort_term {
     zint sysno;
-    char term[SORT_MAX_TERM];
+    zint length;
+    char term[SORT_MAX_MULTI];
 };
 
 
@@ -44,7 +46,8 @@ static void sort_term_log_item(int level, const void *b, const char *txt)
 
     memcpy(&a1, b, sizeof(a1));
 
-    yaz_log(level, "%s " ZINT_FORMAT " %s", txt, a1.sysno, a1.term);
+    yaz_log(level, "%s " ZINT_FORMAT " %.*s", txt, a1.sysno, 
+            (int) a1.length, a1.term);
 }
 
 static int sort_term_compare(const void *a, const void *b)
@@ -66,7 +69,7 @@ static void *sort_term_code_start(void)
     return 0;
 }
 
-static void sort_term_encode(void *p, char **dst, const char **src)
+static void sort_term_encode1(void *p, char **dst, const char **src)
 {
     struct sort_term a1;
 
@@ -78,14 +81,44 @@ static void sort_term_encode(void *p, char **dst, const char **src)
     *dst += strlen(a1.term) + 1;
 }
 
-static void sort_term_decode(void *p, char **dst, const char **src)
+static void sort_term_encode2(void *p, char **dst, const char **src)
 {
     struct sort_term a1;
 
+    memcpy(&a1, *src, sizeof(a1));
+    *src += sizeof(a1);
+
+    zebra_zint_encode(dst, a1.sysno); /* encode record id */
+    zebra_zint_encode(dst, a1.length); /* encode length */
+    memcpy(*dst, a1.term, a1.length);
+    *dst += a1.length;
+}
+
+static void sort_term_decode1(void *p, char **dst, const char **src)
+{
+    struct sort_term a1;
+    size_t slen;
+
     zebra_zint_decode(src, &a1.sysno);
 
     strcpy(a1.term, *src);
-    *src += strlen(a1.term) + 1;
+    slen = strlen(a1.term);
+    *src += slen + 1;
+    a1.length = slen;
+
+    memcpy(*dst, &a1, sizeof(a1));
+    *dst += sizeof(a1);
+}
+
+static void sort_term_decode2(void *p, char **dst, const char **src)
+{
+    struct sort_term a1;
+
+    zebra_zint_decode(src, &a1.sysno);
+    zebra_zint_decode(src, &a1.length);
+
+    memcpy(a1.term, *src, a1.length);
+    *src += a1.length;
 
     memcpy(*dst, &a1, sizeof(a1));
     *dst += sizeof(a1);
@@ -172,6 +205,7 @@ void zebra_sort_close(zebra_sort_index_t si)
            bf_close(sf->u.bf);
             break;
         case ZEBRA_SORT_TYPE_ISAMB:
+        case ZEBRA_SORT_TYPE_MULTI:
             if (sf->isam_pp)
                 isamb_pp_close(sf->isam_pp);
             isamb_set_root_ptr(sf->u.isamb, sf->isam_p);
@@ -192,6 +226,13 @@ int zebra_sort_type(zebra_sort_index_t si, int id)
     ISAMC_M method;
     char fname[80];
     struct sortFile *sf;
+
+    method.compare_item = sort_term_compare;
+    method.log_item = sort_term_log_item;
+    method.codec.reset = sort_term_code_reset;
+    method.codec.start = sort_term_code_start;
+    method.codec.stop = sort_term_code_stop;
+
     if (si->current_file && si->current_file->id == id)
        return 0;
     for (sf = si->files; sf; sf = sf->next)
@@ -227,13 +268,8 @@ int zebra_sort_type(zebra_sort_index_t si, int id)
         }
         break;
     case ZEBRA_SORT_TYPE_ISAMB:
-        method.compare_item = sort_term_compare;
-        method.log_item = sort_term_log_item;
-        method.codec.start = sort_term_code_start;
-        method.codec.encode = sort_term_encode;
-        method.codec.decode = sort_term_decode;
-        method.codec.reset = sort_term_code_reset;
-        method.codec.stop = sort_term_code_stop;
+        method.codec.encode = sort_term_encode1;
+        method.codec.decode = sort_term_decode1;
         
         sprintf(fname, "sortb%d", id);
         sf->u.isamb = isamb_open2(si->bfs, fname, si->write_flag, &method,
@@ -250,6 +286,26 @@ int zebra_sort_type(zebra_sort_index_t si, int id)
             sf->isam_p = isamb_get_root_ptr(sf->u.isamb);
         }
         break;
+    case ZEBRA_SORT_TYPE_MULTI:
+        isam_block_size = 32768;
+        method.codec.encode = sort_term_encode2;
+        method.codec.decode = sort_term_decode2;
+        
+        sprintf(fname, "sortm%d", id);
+        sf->u.isamb = isamb_open2(si->bfs, fname, si->write_flag, &method,
+                                  /* cache */ 0,
+                                  /* no_cat */ 1, &isam_block_size,
+                                  /* use_root_ptr */ 1);
+        if (!sf->u.isamb)
+        {
+            xfree(sf);
+            return -1;
+        }
+        else
+        {
+            sf->isam_p = isamb_get_root_ptr(sf->u.isamb);
+        }
+        break;
     }
     sf->isam_pp = 0;
     sf->no_inserted = 0;
@@ -266,13 +322,18 @@ void zebra_sort_sysno(zebra_sort_index_t si, zint sysno)
 
     for (sf = si->files; sf; sf = sf->next)
     {
-        sf->no_inserted = 0;
-        sf->no_deleted = 0;
-        if (sf->isam_pp && new_sysno < si->sysno && sf->isam_pp)
+        if (sf->no_inserted || sf->no_deleted)
+        {
+            isamb_pp_close(sf->isam_pp);
+            sf->isam_pp = 0;
+        }
+        else if (sf->isam_pp && new_sysno < si->sysno && sf->isam_pp)
         {
             isamb_pp_close(sf->isam_pp);
             sf->isam_pp = 0;
         }
+        sf->no_inserted = 0;
+        sf->no_deleted = 0;
     }
     si->sysno = new_sysno;
 }
@@ -290,6 +351,7 @@ void zebra_sort_delete(zebra_sort_index_t si)
         zebra_sort_add(si, "", 0);
         break;
     case ZEBRA_SORT_TYPE_ISAMB:
+    case ZEBRA_SORT_TYPE_MULTI:
         assert(sf->u.isamb);
         if (sf->no_deleted == 0)
         {
@@ -297,6 +359,7 @@ void zebra_sort_delete(zebra_sort_index_t si)
             ISAMC_I isamc_i;
 
             s.st.sysno = si->sysno;
+            s.st.length = 0;
             s.st.term[0] = '\0';
             
             s.no = 1;
@@ -344,6 +407,28 @@ void zebra_sort_add(zebra_sort_index_t si, const char *buf, int len)
                 len = SORT_MAX_TERM-1;
             memcpy(s.st.term, buf, len);
             s.st.term[len] = '\0';
+            s.st.length = len;
+            s.no = 1;
+            s.insert_flag = 1;
+            isamc_i.clientData = &s;
+            isamc_i.read_item = sort_term_code_read;
+            
+            isamb_merge(sf->u.isamb, &sf->isam_p, &isamc_i);
+            sf->no_inserted++;
+        }
+        break;
+    case ZEBRA_SORT_TYPE_MULTI:
+        assert(sf->u.isamb);
+        if (sf->no_inserted == 0)
+        {
+            struct sort_term_stream s;
+            ISAMC_I isamc_i;
+
+            s.st.sysno = si->sysno;
+            if (len >= SORT_MAX_MULTI)
+                len = SORT_MAX_MULTI-1;
+            memcpy(s.st.term, buf, len);
+            s.st.length = len;
             s.no = 1;
             s.insert_flag = 1;
             isamc_i.clientData = &s;
@@ -356,10 +441,11 @@ void zebra_sort_add(zebra_sort_index_t si, const char *buf, int len)
     }
 }
 
-int zebra_sort_read(zebra_sort_index_t si, char *buf)
+int zebra_sort_read(zebra_sort_index_t si, WRBUF w)
 {
     int r;
     struct sortFile *sf = si->current_file;
+    char tbuf[SORT_IDX_ENTRYSIZE];
 
     assert(sf);
     assert(sf->u.bf);
@@ -367,14 +453,14 @@ int zebra_sort_read(zebra_sort_index_t si, char *buf)
     switch(si->type)
     {
     case ZEBRA_SORT_TYPE_FLAT:
-        r = bf_read(sf->u.bf, si->sysno+1, 0, 0, buf);
-        if (!r)
-            memset(buf, 0, SORT_IDX_ENTRYSIZE);
-        if (buf[0] == 0)
+        r = bf_read(sf->u.bf, si->sysno+1, 0, 0, tbuf);
+        if (r && *tbuf)
+            wrbuf_puts(w, tbuf);
+        else
             return 0;
         break;
     case ZEBRA_SORT_TYPE_ISAMB:
-        memset(buf, 0, SORT_IDX_ENTRYSIZE);
+    case ZEBRA_SORT_TYPE_MULTI:
         if (!sf->isam_p)
             return 0;
         else
@@ -387,6 +473,7 @@ int zebra_sort_read(zebra_sort_index_t si, char *buf)
                 return 0;
 
             st_untilbuf.sysno = si->sysno;
+            st_untilbuf.length = 0;
             st_untilbuf.term[0] = '\0';
             r = isamb_pp_forward(sf->isam_pp, &st, &st_untilbuf);
             if (!r)
@@ -399,10 +486,7 @@ int zebra_sort_read(zebra_sort_index_t si, char *buf)
                             ZINT_FORMAT, st.sysno, si->sysno);
                     return 0;
                 }
-                if (strlen(st.term) < SORT_IDX_ENTRYSIZE)
-                    strcpy(buf, st.term);
-                else
-                    memcpy(buf, st.term, SORT_IDX_ENTRYSIZE);
+                wrbuf_write(w, st.term, st.length);
             }
         }
         break;
index fd66a24..b4026fd 100644 (file)
@@ -445,6 +445,8 @@ struct zebra_register *zebra_register_open(ZebraService zs, const char *name,
         sort_type = ZEBRA_SORT_TYPE_FLAT;
     else if (res_get_match(res, "sortindex", "i", "f"))
         sort_type = ZEBRA_SORT_TYPE_ISAMB;
+    else if (res_get_match(res, "sortindex", "m", "f"))
+        sort_type = ZEBRA_SORT_TYPE_MULTI;
     else
     {
        yaz_log(YLOG_WARN, "bad_value for 'sortindex'");
index 10ecb8c..a906705 100644 (file)
@@ -562,6 +562,7 @@ void resultSetInsertSort(ZebraHandle zh, ZebraSet sset,
     struct zset_sort_entry *new_entry = NULL;
     struct zset_sort_info *sort_info = sset->sort_info;
     int i, j;
+    WRBUF w = wrbuf_alloc();
 
     zebra_sort_sysno(zh->reg->sort_index, sysno);
     for (i = 0; i<num_criteria; i++)
@@ -574,13 +575,18 @@ void resultSetInsertSort(ZebraHandle zh, ZebraSet sset,
             yaz_log(log_level_sort, "pre zebra_sort_type ord is %d",
                     criteria[i].ord[database_no]);
             zebra_sort_type(zh->reg->sort_index, criteria[i].ord[database_no]);
-            zebra_sort_read(zh->reg->sort_index, this_entry_buf);
+            wrbuf_rewind(w);
+            zebra_sort_read(zh->reg->sort_index, w);
+            memcpy(this_entry_buf, wrbuf_buf(w),
+                   (wrbuf_len(w) >= SORT_IDX_ENTRYSIZE) ?
+                   SORT_IDX_ENTRYSIZE : wrbuf_len(w));
         }
         else
         {
             yaz_log(log_level_sort, "criteria[i].ord is -1 so not reading from sort index");
         }
     }
+    wrbuf_destroy(w);
     i = sort_info->num_entries;
     while (--i >= 0)
     {
index 5640ce6..ebd03ca 100644 (file)
@@ -25,28 +25,76 @@ static void tst1(zebra_sort_index_t si)
 {
     zint sysno = 12; /* just some sysno */
     int my_type = 2; /* just some type ID */
-    char read_buf[SORT_IDX_ENTRYSIZE];
+    WRBUF w = wrbuf_alloc();
 
     zebra_sort_type(si, my_type);
 
     zebra_sort_sysno(si, sysno);
-    YAZ_CHECK_EQ(zebra_sort_read(si, read_buf), 0);
+    YAZ_CHECK_EQ(zebra_sort_read(si, w), 0);
 
     zebra_sort_add(si, "abcde1", 6);
 
     zebra_sort_sysno(si, sysno);
-    YAZ_CHECK_EQ(zebra_sort_read(si, read_buf), 1);
-    YAZ_CHECK(!strcmp(read_buf, "abcde1"));
+    YAZ_CHECK_EQ(zebra_sort_read(si, w), 1);
+    YAZ_CHECK(!strcmp(wrbuf_cstr(w), "abcde1"));
 
     zebra_sort_sysno(si, sysno+1);
-    YAZ_CHECK_EQ(zebra_sort_read(si, read_buf), 0);
+    YAZ_CHECK_EQ(zebra_sort_read(si, w), 0);
 
     zebra_sort_sysno(si, sysno-1);
-    YAZ_CHECK_EQ(zebra_sort_read(si, read_buf), 0);
+    YAZ_CHECK_EQ(zebra_sort_read(si, w), 0);
 
     zebra_sort_sysno(si, sysno);
     zebra_sort_delete(si);
-    YAZ_CHECK_EQ(zebra_sort_read(si, read_buf), 0);
+    YAZ_CHECK_EQ(zebra_sort_read(si, w), 0);
+
+    zebra_sort_type(si, my_type);
+
+    zebra_sort_sysno(si, sysno);
+    YAZ_CHECK_EQ(zebra_sort_read(si, w), 0);
+
+    wrbuf_rewind(w);
+    zebra_sort_add(si, "abcde1", 6);
+
+    zebra_sort_sysno(si, sysno);
+    YAZ_CHECK_EQ(zebra_sort_read(si, w), 1);
+    YAZ_CHECK(!strcmp(wrbuf_cstr(w), "abcde1"));
+
+    zebra_sort_sysno(si, sysno);
+    zebra_sort_delete(si);
+
+    wrbuf_destroy(w);
+}
+
+static void tst2(zebra_sort_index_t si)
+{
+    zint sysno = 15; /* just some sysno */
+    int my_type = 2; /* just some type ID */
+    int i;
+
+    zebra_sort_type(si, my_type);
+
+    for (sysno = 1; sysno < 50; sysno++)
+    {
+        WRBUF w1 = wrbuf_alloc();
+        WRBUF w2 = wrbuf_alloc();
+        zebra_sort_sysno(si, sysno);
+        YAZ_CHECK_EQ(zebra_sort_read(si, w2), 0);
+        
+        for (i = 0; i < 600; i++) /* 600 * 6 < max size =4K */
+            wrbuf_write(w1, "12345", 6);
+        
+        zebra_sort_add(si, wrbuf_buf(w1), wrbuf_len(w1));
+        
+        zebra_sort_sysno(si, sysno);
+        
+        YAZ_CHECK_EQ(zebra_sort_read(si, w2), 1);
+        
+        YAZ_CHECK_EQ(wrbuf_len(w1), wrbuf_len(w2));
+        YAZ_CHECK(!memcmp(wrbuf_buf(w1), wrbuf_buf(w2), wrbuf_len(w2)));
+        wrbuf_destroy(w1);
+        wrbuf_destroy(w2);
+    }
 }
 
 static void tst(int argc, char **argv)
@@ -58,6 +106,17 @@ static void tst(int argc, char **argv)
     if (bfs)
     {
         bf_reset(bfs);
+        si = zebra_sort_open(bfs, 1, ZEBRA_SORT_TYPE_FLAT);
+        YAZ_CHECK(si);
+        if (si)
+        {
+            tst1(si);
+            zebra_sort_close(si);
+        }
+    }
+    if (bfs)
+    {
+        bf_reset(bfs);
         si = zebra_sort_open(bfs, 1, ZEBRA_SORT_TYPE_ISAMB);
         YAZ_CHECK(si);
         if (si)
@@ -66,15 +125,15 @@ static void tst(int argc, char **argv)
             zebra_sort_close(si);
         }
     }
-
     if (bfs)
     {
         bf_reset(bfs);
-        si = zebra_sort_open(bfs, 1, ZEBRA_SORT_TYPE_FLAT);
+        si = zebra_sort_open(bfs, 1, ZEBRA_SORT_TYPE_MULTI);
         YAZ_CHECK(si);
         if (si)
         {
             tst1(si);
+            tst2(si);
             zebra_sort_close(si);
         }
     }