More work on multi-map for sort
[idzebra-moved-to-github.git] / index / sortidx.c
index 962841a..ef15209 100644 (file)
@@ -1,8 +1,5 @@
-/* $Id: sortidx.c,v 1.24 2007-01-15 15:10:17 adam Exp $
-   Copyright (C) 1995-2007
-   Index Data ApS
-
-This file is part of the Zebra server.
+/* This file is part of the Zebra server.
+   Copyright (C) 1995-2008 Index Data
 
 Zebra is free software; you can redistribute it and/or modify it under
 the terms of the GNU General Public License as published by the Free
@@ -32,12 +29,14 @@ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
 #include "recindex.h"
 
 #define SORT_MAX_TERM 110
+#define SORT_MAX_MULTI 4096
 
 #define SORT_IDX_BLOCKSIZE 64
 
 struct sort_term {
     zint sysno;
-    char term[SORT_MAX_TERM];
+    zint length;
+    char term[SORT_MAX_MULTI];
 };
 
 
@@ -47,10 +46,11 @@ static void sort_term_log_item(int level, const void *b, const char *txt)
 
     memcpy(&a1, b, sizeof(a1));
 
-    yaz_log(level, "%s " ZINT_FORMAT " %s", txt, a1.sysno, a1.term);
+    yaz_log(level, "%s " ZINT_FORMAT " %.*s", txt, a1.sysno, 
+            (int) a1.length, a1.term);
 }
 
-int sort_term_compare(const void *a, const void *b)
+static int sort_term_compare(const void *a, const void *b)
 {
     struct sort_term a1, b1;
 
@@ -64,12 +64,12 @@ int sort_term_compare(const void *a, const void *b)
     return 0;
 }
 
-void *sort_term_code_start(void)
+static void *sort_term_code_start(void)
 {
     return 0;
 }
 
-void sort_term_encode(void *p, char **dst, const char **src)
+static void sort_term_encode1(void *p, char **dst, const char **src)
 {
     struct sort_term a1;
 
@@ -81,27 +81,56 @@ void sort_term_encode(void *p, char **dst, const char **src)
     *dst += strlen(a1.term) + 1;
 }
 
-void sort_term_decode(void *p, char **dst, const char **src)
+static void sort_term_encode2(void *p, char **dst, const char **src)
 {
     struct sort_term a1;
 
+    memcpy(&a1, *src, sizeof(a1));
+    *src += sizeof(a1);
+
+    zebra_zint_encode(dst, a1.sysno); /* encode record id */
+    zebra_zint_encode(dst, a1.length); /* encode length */
+    memcpy(*dst, a1.term, a1.length);
+    *dst += a1.length;
+}
+
+static void sort_term_decode1(void *p, char **dst, const char **src)
+{
+    struct sort_term a1;
+    size_t slen;
+
     zebra_zint_decode(src, &a1.sysno);
 
     strcpy(a1.term, *src);
-    *src += strlen(a1.term) + 1;
+    slen = strlen(a1.term);
+    *src += slen + 1;
+    a1.length = slen;
 
     memcpy(*dst, &a1, sizeof(a1));
     *dst += sizeof(a1);
 }
 
-void sort_term_code_reset(void *p)
+static void sort_term_decode2(void *p, char **dst, const char **src)
 {
+    struct sort_term a1;
+
+    zebra_zint_decode(src, &a1.sysno);
+    zebra_zint_decode(src, &a1.length);
+
+    memcpy(a1.term, *src, a1.length);
+    *src += a1.length;
+
+    memcpy(*dst, &a1, sizeof(a1));
+    *dst += sizeof(a1);
 }
 
-void sort_term_code_stop(void *p)
+static void sort_term_code_reset(void *p)
 {
 }
 
+static void sort_term_code_stop(void *p)
+{
+}
 
 struct sort_term_stream {
     int no;
@@ -109,7 +138,7 @@ struct sort_term_stream {
     struct sort_term st;
 };
 
-int sort_term_code_read(void *vp, char **dst, int *insertMode)
+static int sort_term_code_read(void *vp, char **dst, int *insertMode)
 {
     struct sort_term_stream *s = (struct sort_term_stream *) vp;
 
@@ -124,7 +153,6 @@ int sort_term_code_read(void *vp, char **dst, int *insertMode)
     return 1;
 }
 
-        
 struct sortFileHead {
     zint sysno_max;
 };
@@ -177,6 +205,7 @@ void zebra_sort_close(zebra_sort_index_t si)
            bf_close(sf->u.bf);
             break;
         case ZEBRA_SORT_TYPE_ISAMB:
+        case ZEBRA_SORT_TYPE_MULTI:
             if (sf->isam_pp)
                 isamb_pp_close(sf->isam_pp);
             isamb_set_root_ptr(sf->u.isamb, sf->isam_p);
@@ -193,9 +222,17 @@ void zebra_sort_close(zebra_sort_index_t si)
 int zebra_sort_type(zebra_sort_index_t si, int id)
 {
     int isam_block_size = 4096;
+
     ISAMC_M method;
     char fname[80];
     struct sortFile *sf;
+
+    method.compare_item = sort_term_compare;
+    method.log_item = sort_term_log_item;
+    method.codec.reset = sort_term_code_reset;
+    method.codec.start = sort_term_code_start;
+    method.codec.stop = sort_term_code_stop;
+
     if (si->current_file && si->current_file->id == id)
        return 0;
     for (sf = si->files; sf; sf = sf->next)
@@ -207,14 +244,6 @@ int zebra_sort_type(zebra_sort_index_t si, int id)
     sf = (struct sortFile *) xmalloc(sizeof(*sf));
     sf->id = id;
 
-    method.compare_item = sort_term_compare;
-    method.log_item = sort_term_log_item;
-    method.codec.start = sort_term_code_start;
-    method.codec.encode = sort_term_encode;
-    method.codec.decode = sort_term_decode;
-    method.codec.reset = sort_term_code_reset;
-    method.codec.stop = sort_term_code_stop;
-   
     switch(si->type)
     {
     case ZEBRA_SORT_TYPE_FLAT:
@@ -239,8 +268,30 @@ int zebra_sort_type(zebra_sort_index_t si, int id)
         }
         break;
     case ZEBRA_SORT_TYPE_ISAMB:
+        method.codec.encode = sort_term_encode1;
+        method.codec.decode = sort_term_decode1;
+        
         sprintf(fname, "sortb%d", id);
+        sf->u.isamb = isamb_open2(si->bfs, fname, si->write_flag, &method,
+                                  /* cache */ 0,
+                                  /* no_cat */ 1, &isam_block_size,
+                                  /* use_root_ptr */ 1);
+        if (!sf->u.isamb)
+        {
+            xfree(sf);
+            return -1;
+        }
+        else
+        {
+            sf->isam_p = isamb_get_root_ptr(sf->u.isamb);
+        }
+        break;
+    case ZEBRA_SORT_TYPE_MULTI:
+        isam_block_size = 32768;
+        method.codec.encode = sort_term_encode2;
+        method.codec.decode = sort_term_decode2;
         
+        sprintf(fname, "sortm%d", id);
         sf->u.isamb = isamb_open2(si->bfs, fname, si->write_flag, &method,
                                   /* cache */ 0,
                                   /* no_cat */ 1, &isam_block_size,
@@ -271,13 +322,18 @@ void zebra_sort_sysno(zebra_sort_index_t si, zint sysno)
 
     for (sf = si->files; sf; sf = sf->next)
     {
-        sf->no_inserted = 0;
-        sf->no_deleted = 0;
-        if (sf->isam_pp && new_sysno < si->sysno && sf->isam_pp)
+        if (sf->no_inserted || sf->no_deleted)
         {
             isamb_pp_close(sf->isam_pp);
             sf->isam_pp = 0;
         }
+        else if (sf->isam_pp && new_sysno < si->sysno && sf->isam_pp)
+        {
+            isamb_pp_close(sf->isam_pp);
+            sf->isam_pp = 0;
+        }
+        sf->no_inserted = 0;
+        sf->no_deleted = 0;
     }
     si->sysno = new_sysno;
 }
@@ -295,6 +351,7 @@ void zebra_sort_delete(zebra_sort_index_t si)
         zebra_sort_add(si, "", 0);
         break;
     case ZEBRA_SORT_TYPE_ISAMB:
+    case ZEBRA_SORT_TYPE_MULTI:
         assert(sf->u.isamb);
         if (sf->no_deleted == 0)
         {
@@ -302,6 +359,7 @@ void zebra_sort_delete(zebra_sort_index_t si)
             ISAMC_I isamc_i;
 
             s.st.sysno = si->sysno;
+            s.st.length = 0;
             s.st.term[0] = '\0';
             
             s.no = 1;
@@ -316,6 +374,77 @@ void zebra_sort_delete(zebra_sort_index_t si)
     }
 }
 
+void zebra_sort_add_ent(zebra_sort_index_t si, struct zebra_sort_ent *ent)
+{
+    struct sortFile *sf = si->current_file;
+    int len;
+
+    if (!sf || !sf->u.bf)
+        return;
+    switch(si->type)
+    {
+    case ZEBRA_SORT_TYPE_FLAT:
+        /* take first entry from wrbuf - itself is 0-terminated */
+        len = strlen(wrbuf_buf(ent->wrbuf));
+        if (len > SORT_IDX_ENTRYSIZE)
+            len = SORT_IDX_ENTRYSIZE;
+        
+        memcpy(si->entry_buf, wrbuf_buf(ent->wrbuf), len);
+        if (len < SORT_IDX_ENTRYSIZE-len)
+            memset(si->entry_buf+len, 0, SORT_IDX_ENTRYSIZE-len);
+        bf_write(sf->u.bf, si->sysno+1, 0, 0, si->entry_buf);
+        break;
+    case ZEBRA_SORT_TYPE_ISAMB:
+        assert(sf->u.isamb);
+
+        assert(sf->no_inserted == 0);
+        if (sf->no_inserted == 0)
+        {
+            struct sort_term_stream s;
+            ISAMC_I isamc_i;
+            /* take first entry from wrbuf - itself is 0-terminated */
+            len = strlen(wrbuf_buf(ent->wrbuf)); 
+
+            s.st.sysno = si->sysno;
+            if (len >= SORT_MAX_TERM)
+                len = SORT_MAX_TERM-1;
+            memcpy(s.st.term, wrbuf_buf(ent->wrbuf), len);
+            s.st.term[len] = '\0';
+            s.st.length = len;
+            s.no = 1;
+            s.insert_flag = 1;
+            isamc_i.clientData = &s;
+            isamc_i.read_item = sort_term_code_read;
+            
+            isamb_merge(sf->u.isamb, &sf->isam_p, &isamc_i);
+            sf->no_inserted++;
+        }
+        break;
+    case ZEBRA_SORT_TYPE_MULTI:
+        assert(sf->u.isamb);
+        if (sf->no_inserted == 0)
+        {
+            struct sort_term_stream s;
+            ISAMC_I isamc_i;
+            len = wrbuf_len(ent->wrbuf);
+
+            s.st.sysno = si->sysno;
+            if (len >= SORT_MAX_MULTI)
+                len = SORT_MAX_MULTI-1;
+            memcpy(s.st.term, wrbuf_buf(ent->wrbuf), len);
+            s.st.length = len;
+            s.no = 1;
+            s.insert_flag = 1;
+            isamc_i.clientData = &s;
+            isamc_i.read_item = sort_term_code_read;
+            
+            isamb_merge(sf->u.isamb, &sf->isam_p, &isamc_i);
+            sf->no_inserted++;
+        }
+        break;
+    }
+}
+
 void zebra_sort_add(zebra_sort_index_t si, const char *buf, int len)
 {
     struct sortFile *sf = si->current_file;
@@ -349,6 +478,28 @@ void zebra_sort_add(zebra_sort_index_t si, const char *buf, int len)
                 len = SORT_MAX_TERM-1;
             memcpy(s.st.term, buf, len);
             s.st.term[len] = '\0';
+            s.st.length = len;
+            s.no = 1;
+            s.insert_flag = 1;
+            isamc_i.clientData = &s;
+            isamc_i.read_item = sort_term_code_read;
+            
+            isamb_merge(sf->u.isamb, &sf->isam_p, &isamc_i);
+            sf->no_inserted++;
+        }
+        break;
+    case ZEBRA_SORT_TYPE_MULTI:
+        assert(sf->u.isamb);
+        if (sf->no_inserted == 0)
+        {
+            struct sort_term_stream s;
+            ISAMC_I isamc_i;
+
+            s.st.sysno = si->sysno;
+            if (len >= SORT_MAX_MULTI)
+                len = SORT_MAX_MULTI-1;
+            memcpy(s.st.term, buf, len);
+            s.st.length = len;
             s.no = 1;
             s.insert_flag = 1;
             isamc_i.clientData = &s;
@@ -361,66 +512,57 @@ void zebra_sort_add(zebra_sort_index_t si, const char *buf, int len)
     }
 }
 
-void zebra_sort_read(zebra_sort_index_t si, char *buf)
+int zebra_sort_read(zebra_sort_index_t si, WRBUF w)
 {
     int r;
     struct sortFile *sf = si->current_file;
+    char tbuf[SORT_IDX_ENTRYSIZE];
 
     assert(sf);
+    assert(sf->u.bf);
 
     switch(si->type)
     {
     case ZEBRA_SORT_TYPE_FLAT:
-        r = bf_read(sf->u.bf, si->sysno+1, 0, 0, buf);
-        if (!r)
-            memset(buf, 0, SORT_IDX_ENTRYSIZE);
+        r = bf_read(sf->u.bf, si->sysno+1, 0, 0, tbuf);
+        if (r && *tbuf)
+            wrbuf_puts(w, tbuf);
+        else
+            return 0;
         break;
     case ZEBRA_SORT_TYPE_ISAMB:
-        memset(buf, 0, SORT_IDX_ENTRYSIZE);
-        assert(sf->u.bf);
-        if (sf->u.bf)
+    case ZEBRA_SORT_TYPE_MULTI:
+        if (!sf->isam_p)
+            return 0;
+        else
         {
             struct sort_term st, st_untilbuf;
 
             if (!sf->isam_pp)
                 sf->isam_pp = isamb_pp_open(sf->u.isamb, sf->isam_p, 1);
             if (!sf->isam_pp)
-                return;
+                return 0;
 
-#if 0
-            while (1)
-            {
-                r = isamb_pp_read(sf->isam_pp, &st);
-                if (!r)
-                    break;
-                if (st.sysno == si->sysno)
-                    break;
-                yaz_log(YLOG_LOG, "Received sysno=" ZINT_FORMAT " looking for "
-                        ZINT_FORMAT, st.sysno, si->sysno);
-            }
-#else
             st_untilbuf.sysno = si->sysno;
+            st_untilbuf.length = 0;
             st_untilbuf.term[0] = '\0';
             r = isamb_pp_forward(sf->isam_pp, &st, &st_untilbuf);
             if (!r)
-                return;
-#endif
+                return 0;
             if (r)
             {
                 if (st.sysno != si->sysno)
                 {
                     yaz_log(YLOG_LOG, "Received sysno=" ZINT_FORMAT " looking for "
                             ZINT_FORMAT, st.sysno, si->sysno);
-                    return;
+                    return 0;
                 }
-                if (strlen(st.term) < SORT_IDX_ENTRYSIZE)
-                    strcpy(buf, st.term);
-                else
-                    memcpy(buf, st.term, SORT_IDX_ENTRYSIZE);
+                wrbuf_write(w, st.term, st.length);
             }
         }
         break;
     }
+    return 1;
 }
 /*
  * Local variables: