Allow records to be zlib-compressed
[idzebra-moved-to-github.git] / index / records.c
index c39ac40..60908db 100644 (file)
@@ -1,8 +1,5 @@
-/* $Id: records.c,v 1.3 2007-11-28 11:16:32 adam Exp $
-   Copyright (C) 1995-2007
-   Index Data ApS
-
-This file is part of the Zebra server.
+/* This file is part of the Zebra server.
+   Copyright (C) 1994-2009 Index Data
 
 Zebra is free software; you can redistribute it and/or modify it under
 the terms of the GNU General Public License as published by the Free
@@ -21,7 +18,7 @@ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
 */
 
 /*
- *  Format of first block
+ *  Format of first block (assumes a 512 block size)
  *      next       (8 bytes)
  *      ref_count  (2 bytes)
  *      block      (500 bytes)
@@ -47,6 +44,9 @@ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
 #if HAVE_BZLIB_H
 #include <bzlib.h>
 #endif
+#if HAVE_ZLIB_H
+#include <zlib.h>
+#endif
 
 #define REC_BLOCK_TYPES 2
 #define REC_HEAD_MAGIC "recindex"
@@ -69,6 +69,8 @@ struct records_info {
     int cache_cur;
     int cache_max;
 
+    int compression_chunk_size;
+
     Zebra_mutex mutex;
 
     struct records_head {
@@ -172,6 +174,7 @@ static ZEBRA_RES rec_release_blocks(Records p, zint sysno)
            memcpy(block_and_ref + sizeof(freeblock), &ref, sizeof(ref));
            if (ref)
            {
+                /* there is still a reference to this block.. */
                if (bf_write(p->data_BFile[dst_type], freeblock, 0,
                              sizeof(block_and_ref), block_and_ref))
                {
@@ -180,7 +183,8 @@ static ZEBRA_RES rec_release_blocks(Records p, zint sysno)
                }
                return ZEBRA_OK;
            }
-           first = 0;
+            /* the list of blocks can all be removed (ref == 0) */
+            first = 0;
        }
        
         if (bf_write(p->data_BFile[dst_type], freeblock, 0, sizeof(freeblock),
@@ -287,8 +291,11 @@ Records rec_open(BFiles bfs, int rw, int compression_method)
     memset(&p->head, '\0', sizeof(p->head));
     p->compression_method = compression_method;
     p->rw = rw;
-    p->tmp_size = 1024;
+    p->tmp_size = 4096;
     p->tmp_buf = (char *) xmalloc(p->tmp_size);
+    p->compression_chunk_size = 0;
+    if (compression_method == REC_COMPRESS_BZIP2)
+        p->compression_chunk_size = 90000;
     p->recindex = recindex_open(bfs, rw, 0 /* 1=isamb for recindex */);
     r = recindex_read_head(p->recindex, p->tmp_buf);
     switch (r)
@@ -306,12 +313,12 @@ Records rec_open(BFiles bfs, int rw, int compression_method)
             p->head.block_last[i] = 1;
             p->head.block_used[i] = 0;
         }
-        p->head.block_size[0] = 128;
+        p->head.block_size[0] = 256;
         p->head.block_move[0] = 0;
         for (i = 1; i<REC_BLOCK_TYPES; i++)
         {
-            p->head.block_size[i] = p->head.block_size[i-1] * 4;
-            p->head.block_move[i] = p->head.block_size[i] * 24;
+            p->head.block_size[i] = p->head.block_size[i-1] * 8;
+            p->head.block_move[i] = p->head.block_size[i] * 2;
         }
         if (rw)
        {
@@ -477,62 +484,45 @@ static void rec_cache_flush_block1(Records p, Record rec, Record last_rec,
     }
 }
 
-static ZEBRA_RES rec_write_multiple(Records p, int saveCount)
+static ZEBRA_RES rec_flush_shared(Records p, short ref_count, zint *sysnos,
+                                  char *out_buf, int out_offset)
 {
-    int i;
-    short ref_count = 0;
-    char compression_method;
-    Record last_rec = 0;
-    int out_size = 1000;
-    int out_offset = 0;
-    char *out_buf = (char *) xmalloc(out_size);
-    zint *sysnos = (zint *) xmalloc(sizeof(*sysnos) * (p->cache_cur + 1));
-    zint *sysnop = sysnos;
     ZEBRA_RES ret = ZEBRA_OK;
-
-    for (i = 0; i<p->cache_cur - saveCount; i++)
-    {
-        struct record_cache_entry *e = p->record_cache + i;
-        switch (e->flag)
-        {
-        case recordFlagNew:
-            rec_cache_flush_block1(p, e->rec, last_rec, &out_buf,
-                                   &out_size, &out_offset);
-           *sysnop++ = rec_sysno_to_int(e->rec->sysno);
-           ref_count++;
-           e->flag = recordFlagNop;
-           last_rec = e->rec;
-            break;
-        case recordFlagWrite:
-           if (rec_release_blocks(p, rec_sysno_to_int(e->rec->sysno))
-               != ZEBRA_OK)
-               ret = ZEBRA_FAIL;
-
-            rec_cache_flush_block1(p, e->rec, last_rec, &out_buf,
-                                   &out_size, &out_offset);
-           *sysnop++ = rec_sysno_to_int(e->rec->sysno);
-           ref_count++;
-           e->flag = recordFlagNop;
-           last_rec = e->rec;
-            break;
-        case recordFlagDelete:
-            if (rec_delete_single(p, e->rec) != ZEBRA_OK)
-               ret = ZEBRA_FAIL;
-
-           e->flag = recordFlagNop;
-            break;
-       default:
-           break;
-        }
-    }
-
-    *sysnop = -1;
     if (ref_count)
     {
+        int i;
        unsigned int csize = 0;  /* indicate compression "not performed yet" */
-       compression_method = p->compression_method;
+       char compression_method = p->compression_method;
        switch (compression_method)
        {
+        case REC_COMPRESS_ZLIB:
+#if HAVE_ZLIB_H
+           csize = out_offset + (out_offset >> 6) + 620;
+            while (1)
+            {
+                int r;
+                uLongf destLen = csize;
+                rec_tmp_expand(p, csize);
+                r = compress((Bytef *) p->tmp_buf+sizeof(zint)+sizeof(short)+
+                             sizeof(char),
+                             &destLen, (const Bytef *) out_buf, out_offset);
+                csize = destLen;
+                if (r == Z_OK)
+                {
+                    yaz_log(YLOG_LOG, "compress %4d %5d %5d", ref_count,
+                            out_offset, csize);
+                    break;
+                }
+                if (r != Z_MEM_ERROR)
+                {
+                    yaz_log(YLOG_WARN, "compress error: %d", r);
+                    csize = 0;
+                    break;
+                }
+                csize = csize * 2;
+            }
+#endif
+            break;
        case REC_COMPRESS_BZIP2:
 #if HAVE_BZLIB_H       
            csize = out_offset + (out_offset >> 6) + 620;
@@ -550,8 +540,8 @@ static ZEBRA_RES rec_write_multiple(Records p, int saveCount)
                yaz_log(YLOG_WARN, "bzBuffToBuffCompress error code=%d", i);
                csize = 0;
            }
-           yaz_log(YLOG_LOG, "compress %4d %5d %5d", ref_count, out_offset,
-                 csize);
+           yaz_log(YLOG_LOG, "compress %4d %5d %5d", ref_count,
+                    out_offset, csize);
 #endif
            break;
        case REC_COMPRESS_NONE:
@@ -576,6 +566,61 @@ static ZEBRA_RES rec_write_multiple(Records p, int saveCount)
            != ZEBRA_OK)
            ret = ZEBRA_FAIL;
     }
+    return ret;
+}
+
+static ZEBRA_RES rec_write_multiple(Records p, int saveCount)
+{
+    int i;
+    short ref_count = 0;
+    Record last_rec = 0;
+    int out_size = 1000;
+    int out_offset = 0;
+    char *out_buf = (char *) xmalloc(out_size);
+    zint *sysnos = (zint *) xmalloc(sizeof(*sysnos) * (p->cache_cur + 1));
+    zint *sysnop = sysnos;
+    ZEBRA_RES ret = ZEBRA_OK;
+
+    for (i = 0; i<p->cache_cur - saveCount; i++)
+    {
+        struct record_cache_entry *e = p->record_cache + i;
+        switch (e->flag)
+        {
+        case recordFlagNew:
+            rec_cache_flush_block1(p, e->rec, last_rec, &out_buf,
+                                   &out_size, &out_offset);
+           *sysnop++ = rec_sysno_to_int(e->rec->sysno);
+           ref_count++;
+           e->flag = recordFlagNop;
+           last_rec = e->rec;
+            break;
+        case recordFlagWrite:
+           if (rec_release_blocks(p, rec_sysno_to_int(e->rec->sysno))
+               != ZEBRA_OK)
+               ret = ZEBRA_FAIL;
+
+            rec_cache_flush_block1(p, e->rec, last_rec, &out_buf,
+                                   &out_size, &out_offset);
+           *sysnop++ = rec_sysno_to_int(e->rec->sysno);
+           ref_count++;
+           e->flag = recordFlagNop;
+           last_rec = e->rec;
+            break;
+        case recordFlagDelete:
+            if (rec_delete_single(p, e->rec) != ZEBRA_OK)
+               ret = ZEBRA_FAIL;
+
+           e->flag = recordFlagNop;
+            break;
+        case recordFlagNop:
+           break;
+       default:
+            break;
+        }
+    }
+
+    *sysnop = -1;
+    rec_flush_shared(p, ref_count, sysnos, out_buf, out_offset);
     xfree(out_buf);
     xfree(sysnos);
     return ret;
@@ -638,7 +683,7 @@ static ZEBRA_RES rec_cache_insert(Records p, Record rec, enum recordCacheFlag fl
             for (j = 0; j<REC_NO_INFO; j++)
                 used += r->size[j];
         }
-        if (used > 90000)
+        if (used > p->compression_chunk_size)
             ret = rec_cache_flush(p, 1);
     }
     assert(p->cache_cur < p->cache_max);
@@ -747,6 +792,32 @@ static Record rec_get_int(Records p, zint sysno)
     in_size = entry.size - sizeof(short) - sizeof(char);
     switch (compression_method)
     {
+    case REC_COMPRESS_ZLIB:
+#if HAVE_ZLIB_H
+       bz_size = entry.size * 20 + 100;
+       while (1)
+       {
+            uLongf destLen = bz_size;
+           bz_buf = (char *) xmalloc(bz_size);
+           i = uncompress((Bytef *) bz_buf, &destLen,
+                           (const Bytef *) in_buf, in_size);
+           if (i == Z_OK)
+            {
+                yaz_log(YLOG_LOG, "decompress %5d %5d", in_size, bz_size);
+                bz_size = destLen; 
+               break;
+            }
+           yaz_log(YLOG_LOG, "failed");
+           xfree(bz_buf);
+            bz_size *= 2;
+       }
+       in_buf = bz_buf;
+       in_size = bz_size;
+#else
+       yaz_log(YLOG_FATAL, "cannot decompress record(s) in ZLIB format");
+       return 0;
+#endif
+        break;
     case REC_COMPRESS_BZIP2:
 #if HAVE_BZLIB_H
        bz_size = entry.size * 20 + 100;
@@ -981,7 +1052,7 @@ char *rec_strdup(const char *s, size_t *len)
     return p;
 }
 
-void rec_prstat(Records records)
+void rec_prstat(Records records, int verbose)
 {
     int i;
     zint total_bytes = 0;
@@ -1001,6 +1072,34 @@ void rec_prstat(Records records)
               records->head.block_used[i] * records->head.block_size[i]);
         total_bytes +=
             records->head.block_used[i] * records->head.block_size[i];
+
+        yaz_log(YLOG_LOG, " Block Last " ZINT_FORMAT, records->head.block_last[i]);
+        if (verbose)
+        {   /* analyse free lists */
+            zint no_free = 0;
+            zint block_free = records->head.block_free[i];
+            WRBUF w = wrbuf_alloc();
+            while (block_free)
+            {
+                zint nblock;
+                no_free++;
+                wrbuf_printf(w, " " ZINT_FORMAT, block_free);
+                if (bf_read(records->data_BFile[i],
+                            block_free, 0, sizeof(nblock), &nblock) != 1)
+                {
+                    yaz_log(YLOG_FATAL|YLOG_ERRNO, "read in %s at free block "
+                            ZINT_FORMAT,
+                            records->data_fname[i], block_free);
+                    break;
+                }
+                block_free = nblock;
+            }
+            yaz_log (YLOG_LOG,
+                     " Number in free list       %8" ZINT_FORMAT0, no_free);
+            if (no_free)
+                yaz_log(YLOG_LOG, "%s", wrbuf_cstr(w));
+            wrbuf_destroy(w);
+        }
     }
     yaz_log (YLOG_LOG,
           "Total size of record index in bytes  %8" ZINT_FORMAT0,
@@ -1013,6 +1112,7 @@ void rec_prstat(Records records)
 /*
  * Local variables:
  * c-basic-offset: 4
+ * c-file-style: "Stroustrup"
  * indent-tabs-mode: nil
  * End:
  * vim: shiftwidth=4 tabstop=8 expandtab