Much faster merge for b-tree
authorAdam Dickmeiss <adam@indexdata.dk>
Tue, 23 Apr 2002 13:39:10 +0000 (13:39 +0000)
committerAdam Dickmeiss <adam@indexdata.dk>
Tue, 23 Apr 2002 13:39:10 +0000 (13:39 +0000)
index/kinput.c
isamb/isamb.c

index e8d154a..01a074c 100644 (file)
@@ -3,7 +3,7 @@
  * All rights reserved.
  * Sebastian Hammer, Adam Dickmeiss, Heikki Levanto
  *
- * $Id: kinput.c,v 1.49 2002-04-16 22:31:42 adam Exp $
+ * $Id: kinput.c,v 1.50 2002-04-23 13:39:10 adam Exp $
  *
  * Bugs
  *  - Allocates a lot of memory for the merge process, but never releases it.
@@ -435,6 +435,27 @@ int heap_inpc (struct heap_info *hi)
     return 0;
 } 
 
+/* for debugging only */
+static void print_dict_item (ZebraMaps zm, const char *s)
+{
+    int reg_type = s[1];
+    char keybuf[IT_MAX_WORD+1];
+    char *to = keybuf;
+    const char *from = s + 2;
+
+    while (*from)
+    {
+        const char *res = zebra_maps_output (zm, reg_type, &from);
+        if (!res)
+            *to++ = *from++;
+        else
+            while (*res)
+                *to++ = *res++;
+    }
+    *to = '\0';
+    yaz_log (LOG_LOG, "%s", keybuf);
+}
+
 int heap_inpb (struct heap_info *hi)
 {
     struct heap_cread_info hci;
@@ -457,6 +478,10 @@ int heap_inpb (struct heap_info *hi)
         strcpy (this_name, hci.cur_name);
        assert (hci.cur_name[1]);
         hi->no_diffs++;
+
+#if 0
+        print_dict_item (hi->reg->zebra_maps, hci.cur_name);
+#endif
         if ((dict_info = dict_lookup (hi->reg->dict, hci.cur_name)))
         {
             memcpy (&isamc_p, dict_info+1, sizeof(ISAMC_P));
index ed28809..4f55b02 100644 (file)
@@ -2,7 +2,7 @@
  *  Copyright (c) 1995-1998, Index Data.
  *  See the file LICENSE for details.
  *
- *  $Id: isamb.c,v 1.7 2002-04-17 09:44:56 adam Exp $
+ *  $Id: isamb.c,v 1.8 2002-04-23 13:39:10 adam Exp $
  */
 #include <yaz/xmalloc.h>
 #include <yaz/log.h>
@@ -13,13 +13,17 @@ struct ISAMB_head {
     int first_block;
     int last_block;
     int block_size;
+    int block_max;
 };
 
 #define ISAMB_DATA_OFFSET 3
 
-#define DST_BUF_SIZE 4500
 #define DST_ITEM_MAX 256
 
+/* approx 2*4 K + max size of item */
+#define DST_BUF_SIZE 8448
+
+
 struct ISAMB_file {
     BFile bf;
     int head_dirty;
@@ -41,7 +45,8 @@ struct ISAMB_block {
     int leaf;
     int dirty;
     int offset;
-    unsigned char *bytes;
+    char *bytes;
+    unsigned char *buf;
     void *decodeClientData;
 };
 
@@ -80,6 +85,7 @@ ISAMB isamb_open (BFiles bfs, const char *name, int writeflag, ISAMC_M method)
         isamb->file[i].head.first_block = 1;
         isamb->file[i].head.last_block = 1;
         isamb->file[i].head.block_size = b_size;
+        isamb->file[i].head.block_max = b_size - ISAMB_DATA_OFFSET;
         b_size = b_size * 4;
         isamb->file[i].head_dirty = 0;
         sprintf (fname, "%s-%d", name, i);
@@ -115,11 +121,12 @@ struct ISAMB_block *open_block (ISAMB b, ISAMC_P pos)
     p = xmalloc (sizeof(*p));
     p->pos = pos;
     p->cat = pos & 3;
-    p->bytes = xmalloc (b->file[cat].head.block_size);
-    bf_read (b->file[cat].bf, pos/4, 0, 0, p->bytes);
-    p->leaf = p->bytes[0];
-    p->size = p->bytes[1] + 256 * p->bytes[2];
-    p->offset = ISAMB_DATA_OFFSET;
+    p->buf = xmalloc (b->file[cat].head.block_size);
+    bf_read (b->file[cat].bf, pos/4, 0, 0, p->buf);
+    p->bytes = p->buf + ISAMB_DATA_OFFSET;
+    p->leaf = p->buf[0];
+    p->size = p->buf[1] + 256 * p->buf[2] - ISAMB_DATA_OFFSET;
+    p->offset = 0;
     p->dirty = 0;
     p->decodeClientData = (*b->method->code_start)(ISAMC_DECODE);
     return p;
@@ -136,151 +143,64 @@ struct ISAMB_block *new_block (ISAMB b, int leaf, int cat)
     p->pos = block_no * 4 + cat;
     p->cat = cat;
     b->file[cat].head_dirty = 1;
-    p->bytes = xmalloc (b->file[cat].head.block_size);
-    memset (p->bytes, 0, b->file[cat].head.block_size);
+    p->buf = xmalloc (b->file[cat].head.block_size);
+    memset (p->buf, 0, b->file[cat].head.block_size);
+    p->bytes = p->buf + ISAMB_DATA_OFFSET;
     p->leaf = leaf;
-    p->size = ISAMB_DATA_OFFSET;
+    p->size = 0;
     p->dirty = 1;
-    p->offset = ISAMB_DATA_OFFSET;
+    p->offset = 0;
     p->decodeClientData = (*b->method->code_start)(ISAMC_DECODE);
     return p;
 }
 
+struct ISAMB_block *new_leaf (ISAMB b, int cat)
+{
+    return new_block (b, 1, cat);
+}
+
+
+struct ISAMB_block *new_int (ISAMB b, int cat)
+{
+    return new_block (b, 0, cat);
+}
+
 void close_block (ISAMB b, struct ISAMB_block *p)
 {
     if (!p)
         return;
     if (p->dirty)
     {
-        p->bytes[0] = p->leaf;
-        p->bytes[1] = p->size & 255;
-        p->bytes[2] = p->size >> 8;
-        bf_write (b->file[p->cat].bf, p->pos/4, 0, 0, p->bytes);
+        int size = p->size + ISAMB_DATA_OFFSET;
+        p->buf[0] = p->leaf;
+        p->buf[1] = size & 255;
+        p->buf[2] = size >> 8;
+        bf_write (b->file[p->cat].bf, p->pos/4, 0, 0, p->buf);
     }
     (*b->method->code_stop)(ISAMC_DECODE, p->decodeClientData);
-    xfree (p->bytes);
+    xfree (p->buf);
     xfree (p);
 }
 
-void insert_sub (ISAMB b, struct ISAMB_block *p, const void *new_item,
-                 struct ISAMB_block **sp,
-                 void *sub_item, int *sub_size);
-
-void insert_leaf (ISAMB b, struct ISAMB_block *p, const void *new_item,
-                  struct ISAMB_block **sp,
-                  void *sub_item, int *sub_size)
-{
-    char dst_buf[DST_BUF_SIZE];
-    char *dst = dst_buf;
-    char *src = p->bytes + ISAMB_DATA_OFFSET;
-    char *endp = p->bytes + p->size;
-    void *c1 = (*b->method->code_start)(ISAMC_DECODE);
-    void *c2 = (*b->method->code_start)(ISAMC_ENCODE);
-    char *half1 = 0;
-    char *half2 = 0;
-    char *cut = dst_buf + p->size / 2;
-    char cut_item_buf[DST_ITEM_MAX];
-    int cut_item_size = 0;
-
-    while (src != endp)
-    {
-        char file_item_buf[DST_ITEM_MAX];
-        char *file_item = file_item_buf;
-
-        (*b->method->code_item)(ISAMC_DECODE, c1, &file_item, &src);
-        if (new_item)
-        {
-            int d = (*b->method->compare_item)(file_item_buf, new_item);
-            if (d > 0)
-            {
-                char *item_ptr = (char*) new_item;
-                (*b->method->code_item)(ISAMC_ENCODE, c2, &dst, &item_ptr);
-                new_item = 0;
-                p->dirty = 1;
-            }
-            else if (d == 0)
-            {
-                new_item = 0;
-            }
-        }
-        
-        if (!half1 && dst > cut)   
-        {
-            half1 = dst; /* candidate for splitting */
-
-            file_item = file_item_buf;
-            (*b->method->code_item)(ISAMC_ENCODE, c2, &dst, &file_item);
-
-            cut_item_size = file_item - file_item_buf;
-            memcpy (cut_item_buf, file_item_buf, cut_item_size);
-
-            half2 = dst;
-        }
-        else
-        {
-            file_item = file_item_buf;
-            (*b->method->code_item)(ISAMC_ENCODE, c2, &dst, &file_item);
-        }
-    }
-    if (new_item)
-    {
-        char *item_ptr = (char*) new_item;
-        (*b->method->code_item)(ISAMC_ENCODE, c2, &dst, &item_ptr);
-        new_item = 0;
-        p->dirty = 1;
-    }
-    p->size = dst - dst_buf + ISAMB_DATA_OFFSET;
-    if (p->size > b->file[p->cat].head.block_size)
-    {
-        char *first_dst;
-        char *cut_item = cut_item_buf;
-       /* first half */
-        p->size = half1 - dst_buf + ISAMB_DATA_OFFSET;
-        memcpy (p->bytes+ISAMB_DATA_OFFSET, dst_buf, half1 - dst_buf);
-
-        /* second half */
-        *sp = new_block (b, 1, p->cat);
-
-        (*b->method->code_reset)(c2);
-
-        first_dst = (*sp)->bytes + ISAMB_DATA_OFFSET;
-
-        (*b->method->code_item)(ISAMC_ENCODE, c2, &first_dst, &cut_item);
-
-        memcpy (first_dst, half2, dst - half2);
-
-        (*sp)->size = (first_dst - (char*) (*sp)->bytes) + (dst - half2);
-        (*sp)->dirty = 1;
-        p->dirty = 1;
-        memcpy (sub_item, cut_item_buf, cut_item_size);
-        *sub_size = cut_item_size;
-
-        yaz_log (LOG_LOG, "l split %d / %d", p->size, (*sp)->size);
-
-    }
-    else
-    {
-        assert (p->size > ISAMB_DATA_OFFSET);
-        assert (p->size <= b->file[p->cat].head.block_size);
-        memcpy (p->bytes+ISAMB_DATA_OFFSET, dst_buf, dst - dst_buf);
-        *sp = 0;
-    }
-    (*b->method->code_stop)(ISAMC_DECODE, c1);
-    (*b->method->code_stop)(ISAMC_ENCODE, c2);
-}
+int insert_sub (ISAMB b, struct ISAMB_block **p,
+                void *new_item,
+                ISAMC_I stream,
+                struct ISAMB_block **sp,
+                void *sub_item, int *sub_size,
+                void *max_item);
 
-void insert_int (ISAMB b, struct ISAMB_block *p, const void *new_item,
-                 struct ISAMB_block **sp,
-                 void *split_item, int *split_size)
+int insert_int (ISAMB b, struct ISAMB_block *p, void *lookahead_item,
+                ISAMC_I stream, struct ISAMB_block **sp,
+                void *split_item, int *split_size)
 {
-    char *startp = p->bytes + ISAMB_DATA_OFFSET;
+    char *startp = p->bytes;
     char *src = startp;
     char *endp = p->bytes + p->size;
     int pos;
     struct ISAMB_block *sub_p1 = 0, *sub_p2 = 0;
     char sub_item[DST_ITEM_MAX];
     int sub_size;
+    int more;
 
     *sp = 0;
 
@@ -290,13 +210,13 @@ void insert_int (ISAMB b, struct ISAMB_block *p, const void *new_item,
         int item_len;
         int d;
         decode_ptr (&src, &item_len);
-        d = (*b->method->compare_item)(src, new_item);
+        d = (*b->method->compare_item)(src, lookahead_item);
         if (d > 0)
         {
             sub_p1 = open_block (b, pos);
             assert (sub_p1);
-            insert_sub (b, sub_p1, new_item, &sub_p2, 
-                        sub_item, &sub_size);
+            more = insert_sub (b, &sub_p1, lookahead_item, stream, &sub_p2, 
+                               sub_item, &sub_size, src);
             break;
         }
         src += item_len;
@@ -306,15 +226,16 @@ void insert_int (ISAMB b, struct ISAMB_block *p, const void *new_item,
     {
         sub_p1 = open_block (b, pos);
         assert (sub_p1);
-        insert_sub (b, sub_p1, new_item, &sub_p2, 
-                    sub_item, &sub_size);
+        more = insert_sub (b, &sub_p1, lookahead_item, stream, &sub_p2, 
+                           sub_item, &sub_size, 0);
     }
     if (sub_p2)
     {
+        /* there was a split - must insert pointer in this one */
         char dst_buf[DST_BUF_SIZE];
         char *dst = dst_buf;
 
-        assert (sub_size < 20);
+        assert (sub_size < 30 && sub_size > 1);
 
         memcpy (dst, startp, src - startp);
                 
@@ -331,8 +252,8 @@ void insert_int (ISAMB b, struct ISAMB_block *p, const void *new_item,
             memcpy (dst, src, endp - src);
             dst += endp - src;
         }
-        p->size = dst - dst_buf + ISAMB_DATA_OFFSET;
-        if (p->size <= b->file[p->cat].head.block_size)
+        p->size = dst - dst_buf;
+        if (p->size <= b->file[p->cat].head.block_max)
         {
             memcpy (startp, dst_buf, dst - dst_buf);
         }
@@ -352,17 +273,15 @@ void insert_int (ISAMB b, struct ISAMB_block *p, const void *new_item,
                 decode_ptr (&src, &pos);
             }
             p_new_size = src - dst_buf;
-            memcpy (p->bytes + ISAMB_DATA_OFFSET, dst_buf, p_new_size);
-            p_new_size += ISAMB_DATA_OFFSET;
+            memcpy (p->bytes, dst_buf, p_new_size);
 
             decode_ptr (&src, split_size);
             memcpy (split_item, src, *split_size);
             src += *split_size;
 
-            *sp = new_block (b, 0, p->cat);
+            *sp = new_int (b, p->cat);
             (*sp)->size = endp - src;
-            memcpy ((*sp)->bytes+ISAMB_DATA_OFFSET, src, (*sp)->size);
-            (*sp)->size += ISAMB_DATA_OFFSET;
+            memcpy ((*sp)->bytes, src, (*sp)->size);
 
             yaz_log (LOG_LOG, "i split %d -> %d %d",
                      p->size, p_new_size, (*sp)->size);
@@ -372,72 +291,165 @@ void insert_int (ISAMB b, struct ISAMB_block *p, const void *new_item,
         close_block (b, sub_p2);
     }
     close_block (b, sub_p1);
+    return more;
 }
 
-void insert_sub (ISAMB b, struct ISAMB_block *p, const void *new_item,
-                struct ISAMB_block **sp,
-                void *sub_item, int *sub_size)
-{
-    if (p->leaf)
-        insert_leaf (b, p, new_item, sp, sub_item, sub_size);
-    else
-        insert_int (b, p, new_item, sp, sub_item, sub_size);
-}
 
-int insert_flat (ISAMB b, const void *new_item, ISAMC_P *posp)
+int insert_leaf (ISAMB b, struct ISAMB_block **sp1, void *lookahead_item,
+                 ISAMC_I stream, struct ISAMB_block **sp2,
+                 void *sub_item, int *sub_size,
+                 void *max_item)
 {
-    struct ISAMB_block *p = 0;
+    struct ISAMB_block *p = *sp1;
     char *src = 0, *endp = 0;
     char dst_buf[DST_BUF_SIZE], *dst = dst_buf;
     int new_size;
-    ISAMB_P pos = *posp;
     void *c1 = (*b->method->code_start)(ISAMC_DECODE);
     void *c2 = (*b->method->code_start)(ISAMC_ENCODE);
-    
-    if (pos)
-    {
-        p = open_block (b, pos);
-        if (!p)
-            return -1;
-        src = p->bytes + ISAMB_DATA_OFFSET;
-        endp = p->bytes + p->size;
+    int more = 1;
+    int quater = b->file[b->no_cat-1].head.block_max / 4;
+    char *cut = dst_buf + quater * 2;
+    char *maxp = dst_buf + b->file[b->no_cat-1].head.block_max;
+    char *half1 = 0;
+    char *half2 = 0;
+    char cut_item_buf[DST_ITEM_MAX];
+    int cut_item_size = 0;
 
-    }
-    while (p && src != endp)
+    if (p && p->size)
     {
         char file_item_buf[DST_ITEM_MAX];
         char *file_item = file_item_buf;
-
+            
+        src = p->bytes;
+        endp = p->bytes + p->size;
         (*b->method->code_item)(ISAMC_DECODE, c1, &file_item, &src);
-        if (new_item)
+        while (1)
         {
-            int d = (*b->method->compare_item)(file_item_buf, new_item);
-            if (d > 0)
+            char *dst_item = 0;
+            char *dst_0 = dst;
+            char *lookahead_next;
+            int lookahead_mode;
+            int d = -1;
+            
+            if (lookahead_item)
+                d = (*b->method->compare_item)(file_item_buf, lookahead_item);
+            
+            if (d > 0)  
+                dst_item = lookahead_item;
+            else
+                dst_item = file_item_buf;
+            if (!half1 && dst > cut)   
             {
-                char *item_ptr = (char*) new_item;
-                (*b->method->code_item)(ISAMC_ENCODE, c2, &dst, &item_ptr);
-                new_item = 0;
-                p->dirty = 1;
+                char *dst_item_0 = dst_item;
+                half1 = dst; /* candidate for splitting */
+                
+                (*b->method->code_item)(ISAMC_ENCODE, c2, &dst, &dst_item);
+                
+                cut_item_size = dst_item - dst_item_0;
+                memcpy (cut_item_buf, dst_item_0, cut_item_size);
+                
+                half2 = dst;
+            }
+            else
+                (*b->method->code_item)(ISAMC_ENCODE, c2, &dst, &dst_item);
+            if (d > 0)  
+            {
+                if (dst > maxp)
+                {
+                    dst = dst_0;
+                    lookahead_item = 0;
+                }
+                else
+                {
+                    lookahead_next = lookahead_item;
+                    if (!(*stream->read_item)(stream->clientData,
+                                              &lookahead_next,
+                                              &lookahead_mode))
+                    {
+                        lookahead_item = 0;
+                        more = 0;
+                    }
+                    if (max_item &&
+                        (*b->method->compare_item)(max_item, lookahead_item) <= 0)
+                    {
+                        assert (0);
+                        lookahead_item = 0;
+                    }
+                    
+                    p->dirty = 1;
+                }
             }
             else if (d == 0)
             {
-                new_item = 0;
+                lookahead_next = lookahead_item;
+                if (!(*stream->read_item)(stream->clientData,
+                                          &lookahead_next, &lookahead_mode))
+                {
+                    lookahead_item = 0;
+                    more = 0;
+                }
+                if (src == endp)
+                    break;
+                file_item = file_item_buf;
+                (*b->method->code_item)(ISAMC_DECODE, c1, &file_item, &src);
+            }
+            else
+            {
+                if (src == endp)
+                    break;
+                file_item = file_item_buf;
+                (*b->method->code_item)(ISAMC_DECODE, c1, &file_item, &src);
             }
         }
-        file_item = file_item_buf;
-        (*b->method->code_item)(ISAMC_ENCODE, c2, &dst, &file_item);
     }
-    if (new_item)
+    maxp = dst_buf + b->file[b->no_cat-1].head.block_max + quater;
+    while (lookahead_item)
     {
-        char *item_ptr = (char*) new_item;
-        (*b->method->code_item)(ISAMC_ENCODE, c2, &dst, &item_ptr);
-        new_item = 0;
+        int lookahead_mode;
+        char *dst_item = lookahead_item;
+        char *dst_0 = dst;
+        
+        if (max_item &&
+            (*b->method->compare_item)(max_item, lookahead_item) <= 0)
+        {
+            assert (0);
+            break;
+        }
+        if (!half1 && dst > cut)   
+        {
+            char *dst_item_0 = dst_item;
+            half1 = dst; /* candidate for splitting */
+            
+            (*b->method->code_item)(ISAMC_ENCODE, c2, &dst, &dst_item);
+            
+            cut_item_size = dst_item - dst_item_0;
+            memcpy (cut_item_buf, dst_item_0, cut_item_size);
+            
+            half2 = dst;
+        }
+        else
+            (*b->method->code_item)(ISAMC_ENCODE, c2, &dst, &dst_item);
+
+        if (dst > maxp)
+        {
+            dst = dst_0;
+            break;
+        }
         if (p)
             p->dirty = 1;
+        dst_item = lookahead_item;
+        if (!(*stream->read_item)(stream->clientData, &dst_item,
+                                  &lookahead_mode))
+        {
+            lookahead_item = 0;
+            more = 0;
+        }
     }
-    new_size = dst - dst_buf + ISAMB_DATA_OFFSET;
-    if (p && new_size > b->file[p->cat].head.block_size)
+    new_size = dst - dst_buf;
+    if (p && p->cat != b->no_cat-1 && 
+        new_size > b->file[p->cat].head.block_max)
     {
+        /* non-btree block will be removed */
         close_block (b, p);
         /* delete it too!! */
         p = 0; /* make a new one anyway */
@@ -446,40 +458,89 @@ int insert_flat (ISAMB b, const void *new_item, ISAMC_P *posp)
     {   /* must create a new one */
         int i;
         for (i = 0; i < b->no_cat; i++)
-            if (new_size <= b->file[i].head.block_size)
+            if (new_size <= b->file[i].head.block_max)
                 break;
-        p = new_block (b, 1, i);
+        if (i == b->no_cat)
+            i = b->no_cat - 1;
+        p = new_leaf (b, i);
+    }
+    if (new_size > b->file[p->cat].head.block_max)
+    {
+        char *first_dst;
+        char *cut_item = cut_item_buf;
+
+        assert (half1);
+        assert (half2);
+
+       /* first half */
+        p->size = half1 - dst_buf;
+        memcpy (p->bytes, dst_buf, half1 - dst_buf);
+
+        /* second half */
+        *sp2 = new_leaf (b, p->cat);
+
+        (*b->method->code_reset)(c2);
+
+        first_dst = (*sp2)->bytes;
+
+        (*b->method->code_item)(ISAMC_ENCODE, c2, &first_dst, &cut_item);
+
+        memcpy (first_dst, half2, dst - half2);
+
+        (*sp2)->size = (first_dst - (*sp2)->bytes) + (dst - half2);
+        (*sp2)->dirty = 1;
+        p->dirty = 1;
+        memcpy (sub_item, cut_item_buf, cut_item_size);
+        *sub_size = cut_item_size;
+
+        yaz_log (LOG_LOG, "l split %d / %d", p->size, (*sp2)->size);
+    }
+    else
+    {
+        memcpy (p->bytes, dst_buf, dst - dst_buf);
+        p->size = new_size;
     }
-    memcpy (p->bytes+ISAMB_DATA_OFFSET, dst_buf, dst - dst_buf);
-    p->size = new_size;
-    *posp = p->pos;
     (*b->method->code_stop)(ISAMC_DECODE, c1);
     (*b->method->code_stop)(ISAMC_ENCODE, c2);
-    close_block (b, p);
-    return 0;
+    *sp1 = p;
+    return more;
 }
 
-int isamb_insert_one (ISAMB b, const void *item, ISAMC_P pos)
+int insert_sub (ISAMB b, struct ISAMB_block **p, void *new_item,
+                ISAMC_I stream,
+                struct ISAMB_block **sp,
+                void *sub_item, int *sub_size,
+                void *max_item)
 {
-
-    if ((pos & 3) != b->no_cat-1)
-    {
-        /* note if pos == 0 we go here too! */
-        /* flat insert */
-        insert_flat (b, item, &pos);
-    }
+    if (!*p || (*p)->leaf)
+        return insert_leaf (b, p, new_item, stream, sp, sub_item, sub_size, 
+                            max_item);
     else
+        return insert_int (b, *p, new_item, stream, sp, sub_item, sub_size);
+}
+
+int isamb_merge (ISAMB b, ISAMC_P pos, ISAMC_I stream)
+{
+    char item_buf[DST_ITEM_MAX];
+    char *item_ptr;
+    int i_mode;
+    int more;
+
+    item_ptr = item_buf;
+    more = (*stream->read_item)(stream->clientData, &item_ptr, &i_mode);
+    while (more)
     {
-        /* b-tree insert */
-        struct ISAMB_block *p = open_block (b, pos), *sp = 0;
+        struct ISAMB_block *p = 0, *sp = 0;
         char sub_item[DST_ITEM_MAX];
         int sub_size;
-
-
-        insert_sub (b, p, item, &sp, sub_item, &sub_size);
+        
+        if (pos)
+            p = open_block (b, pos);
+        more = insert_sub (b, &p, item_buf, stream, &sp,
+                            sub_item, &sub_size, 0);
         if (sp)
         {    /* increase level of tree by one */
-            struct ISAMB_block *p2 = new_block (b, 0, p->cat);
+            struct ISAMB_block *p2 = new_int (b, p->cat);
             char *dst = p2->bytes + p2->size;
             
             encode_ptr (&dst, p->pos);
@@ -489,7 +550,7 @@ int isamb_insert_one (ISAMB b, const void *item, ISAMC_P pos)
             dst += sub_size;
             encode_ptr (&dst, sp->pos);
             
-            p2->size = dst - (char*) p2->bytes;
+            p2->size = dst - p2->bytes;
             pos = p2->pos;  /* return new super page */
             close_block (b, sp);
             close_block (b, p2);
@@ -501,19 +562,6 @@ int isamb_insert_one (ISAMB b, const void *item, ISAMC_P pos)
     return pos;
 }
 
-ISAMB_P isamb_merge (ISAMB b, ISAMB_P pos, ISAMC_I data)
-{
-    int i_mode;
-    char item_buf[DST_ITEM_MAX];
-    char *item_ptr = item_buf;
-    while ((*data->read_item)(data->clientData, &item_ptr, &i_mode))
-    {
-        item_ptr = item_buf;
-        pos = isamb_insert_one (b, item_buf, pos);
-    }
-    return pos;
-}
-
 ISAMB_PP isamb_pp_open (ISAMB isamb, ISAMB_P pos)
 {
     ISAMB_PP pp = xmalloc (sizeof(*pp));
@@ -532,7 +580,7 @@ ISAMB_PP isamb_pp_open (ISAMB isamb, ISAMB_P pos)
             break;
 
         decode_ptr (&src, &pos);
-        p->offset = src - (char*) p->bytes;
+        p->offset = src - p->bytes;
         pp->level++;
     }
     pp->block[pp->level+1] = 0;
@@ -585,7 +633,7 @@ int isamb_pp_read (ISAMB_PP pp, void *buf)
         {
             pp->block[pp->level] = p = open_block (pp->isamb, pos);
             
-            if (p->bytes[0]) /* leaf */
+            if (p->leaf) /* leaf */
             {
                 break;
             }