Index statistics working again
[idzebra-moved-to-github.git] / isamb / isamb.c
index c0940ad..5627af4 100644 (file)
@@ -1,4 +1,9 @@
-
+/*
+ *  Copyright (c) 2000-2002, Index Data.
+ *  See the file LICENSE for details.
+ *
+ *  $Id: isamb.c,v 1.10 2002-04-26 08:44:47 adam Exp $
+ */
 #include <yaz/xmalloc.h>
 #include <yaz/log.h>
 #include <isamb.h>
@@ -8,26 +13,40 @@ struct ISAMB_head {
     int first_block;
     int last_block;
     int block_size;
+    int block_max;
 };
 
 #define ISAMB_DATA_OFFSET 3
 
+#define DST_ITEM_MAX 256
+
+/* approx 2*4 K + max size of item */
+#define DST_BUF_SIZE 8448
+
+
+struct ISAMB_file {
+    BFile bf;
+    int head_dirty;
+    struct ISAMB_head head;
+};
+
 struct ISAMB_s {
     BFiles bfs;
-    BFile bf;
     ISAMC_M method;
-    int head_dirty;
 
-    struct ISAMB_head head;
+    struct ISAMB_file *file;
+    int no_cat;
 };
 
 struct ISAMB_block {
     int pos;
+    int cat;
     int size;
     int leaf;
     int dirty;
     int offset;
-    unsigned char *bytes;
+    char *bytes;
+    unsigned char *buf;
     void *decodeClientData;
 };
 
@@ -52,200 +71,136 @@ void decode_ptr (char **src, int *pos)
 ISAMB isamb_open (BFiles bfs, const char *name, int writeflag, ISAMC_M method)
 {
     ISAMB isamb = xmalloc (sizeof(*isamb));
+    int i, b_size = 64;
 
     isamb->bfs = bfs;
     isamb->method = (ISAMC_M) xmalloc (sizeof(*method));
     memcpy (isamb->method, method, sizeof(*method));
+    isamb->no_cat = 4;
 
-    isamb->head.first_block = 1;
-    isamb->head.last_block = 1;
-    isamb->head.block_size = 1024;
-    isamb->head_dirty = 0;
-
-    isamb->bf = bf_open (bfs, name, isamb->head.block_size, writeflag);
+    isamb->file = xmalloc (sizeof(*isamb->file) * isamb->no_cat);
+    for (i = 0; i<isamb->no_cat; i++)
+    {
+        char fname[DST_BUF_SIZE];
+        isamb->file[i].head.first_block = 1;
+        isamb->file[i].head.last_block = 1;
+        isamb->file[i].head.block_size = b_size;
+        isamb->file[i].head.block_max = b_size - ISAMB_DATA_OFFSET;
+        b_size = b_size * 4;
+        isamb->file[i].head_dirty = 0;
+        sprintf (fname, "%s-%d", name, i);
+        isamb->file[i].bf =
+            bf_open (bfs, fname, isamb->file[i].head.block_size, writeflag);
     
-    bf_read (isamb->bf, 0, 0, sizeof(struct ISAMB_head),
-             &isamb->head);
+        bf_read (isamb->file[i].bf, 0, 0, sizeof(struct ISAMB_head),
+                 &isamb->file[i].head);
+    }
     return isamb;
 }
 
 void isamb_close (ISAMB isamb)
 {
-    if (isamb->head_dirty)
-        bf_write (isamb->bf, 0, 0, sizeof(struct ISAMB_head), &isamb->head);
+    int i;
+    for (i = 0; i<isamb->no_cat; i++)
+    {
+        if (isamb->file[i].head_dirty)
+            bf_write (isamb->file[i].bf, 0, 0,
+                      sizeof(struct ISAMB_head), &isamb->file[i].head);
+    }
+    xfree (isamb->file);
     xfree (isamb->method);
     xfree (isamb);
 }
 
 struct ISAMB_block *open_block (ISAMB b, ISAMC_P pos)
 {
+    int cat = pos&3;
     struct ISAMB_block *p;
     if (!pos)
         return 0;
     p = xmalloc (sizeof(*p));
     p->pos = pos;
-    p->bytes = xmalloc (b->head.block_size);
-    bf_read (b->bf, pos, 0, 0, p->bytes);
-    p->leaf = p->bytes[0];
-    p->size = p->bytes[1] + 256 * p->bytes[2];
-    p->offset = ISAMB_DATA_OFFSET;
+    p->cat = pos & 3;
+    p->buf = xmalloc (b->file[cat].head.block_size);
+    bf_read (b->file[cat].bf, pos/4, 0, 0, p->buf);
+    p->bytes = p->buf + ISAMB_DATA_OFFSET;
+    p->leaf = p->buf[0];
+    p->size = p->buf[1] + 256 * p->buf[2] - ISAMB_DATA_OFFSET;
+    p->offset = 0;
     p->dirty = 0;
     p->decodeClientData = (*b->method->code_start)(ISAMC_DECODE);
     return p;
 }
 
-struct ISAMB_block *new_block (ISAMB b, int leaf)
+struct ISAMB_block *new_block (ISAMB b, int leaf, int cat)
 {
     struct ISAMB_block *p;
+    int block_no;
     
     p = xmalloc (sizeof(*p));
-    p->pos = b->head.last_block++;
-    b->head_dirty = 1;
-    p->bytes = xmalloc (b->head.block_size);
-    memset (p->bytes, 0, b->head.block_size);
+    block_no = b->file[cat].head.last_block++;
+    p->cat = cat;
+    p->pos = block_no * 4 + cat;
+    p->cat = cat;
+    b->file[cat].head_dirty = 1;
+    p->buf = xmalloc (b->file[cat].head.block_size);
+    memset (p->buf, 0, b->file[cat].head.block_size);
+    p->bytes = p->buf + ISAMB_DATA_OFFSET;
     p->leaf = leaf;
-    p->size = ISAMB_DATA_OFFSET;
+    p->size = 0;
     p->dirty = 1;
-    p->offset = ISAMB_DATA_OFFSET;
+    p->offset = 0;
     p->decodeClientData = (*b->method->code_start)(ISAMC_DECODE);
     return p;
 }
 
+struct ISAMB_block *new_leaf (ISAMB b, int cat)
+{
+    return new_block (b, 1, cat);
+}
+
+
+struct ISAMB_block *new_int (ISAMB b, int cat)
+{
+    return new_block (b, 0, cat);
+}
+
 void close_block (ISAMB b, struct ISAMB_block *p)
 {
     if (!p)
         return;
     if (p->dirty)
     {
-        p->bytes[0] = p->leaf;
-        p->bytes[1] = p->size & 255;
-        p->bytes[2] = p->size >> 8;
-        bf_write (b->bf, p->pos, 0, 0, p->bytes);
+        int size = p->size + ISAMB_DATA_OFFSET;
+        p->buf[0] = p->leaf;
+        p->buf[1] = size & 255;
+        p->buf[2] = size >> 8;
+        bf_write (b->file[p->cat].bf, p->pos/4, 0, 0, p->buf);
     }
     (*b->method->code_stop)(ISAMC_DECODE, p->decodeClientData);
-    xfree (p->bytes);
+    xfree (p->buf);
     xfree (p);
 }
 
-void insert_sub (ISAMB b, struct ISAMB_block *p, const void *new_item,
-                 struct ISAMB_block **sp,
-                 void *sub_item, int *sub_size);
-
-void insert_leaf (ISAMB b, struct ISAMB_block *p, const void *new_item,
-                  struct ISAMB_block **sp,
-                  void *sub_item, int *sub_size)
-{
-    char dst_buf[2048];
-    char *dst = dst_buf;
-    char *src = p->bytes + ISAMB_DATA_OFFSET;
-    char *endp = p->bytes + p->size;
-    void *c1 = (*b->method->code_start)(ISAMC_DECODE);
-    void *c2 = (*b->method->code_start)(ISAMC_ENCODE);
-    char *half1 = 0;
-    char *half2 = 0;
-    char *cut = dst_buf + p->size / 2;
-    char cut_item_buf[256];
-    int cut_item_size = 0;
-
-    while (src != endp)
-    {
-        char file_item_buf[256];
-        char *file_item = file_item_buf;
-
-        (*b->method->code_item)(ISAMC_DECODE, c1, &file_item, &src);
-        if (new_item)
-        {
-            int d = (*b->method->compare_item)(file_item_buf, new_item);
-            if (d > 0)
-            {
-                char *item_ptr = (char*) new_item;
-                (*b->method->code_item)(ISAMC_ENCODE, c2, &dst, &item_ptr);
-                new_item = 0;
-                p->dirty = 1;
-            }
-            else if (d == 0)
-            {
-                new_item = 0;
-            }
-        }
-        
-        if (!half1 && dst > cut)   
-        {
-            half1 = dst; /* candidate for splitting */
-
-            file_item = file_item_buf;
-            (*b->method->code_item)(ISAMC_ENCODE, c2, &dst, &file_item);
-
-            cut_item_size = file_item - file_item_buf;
-            memcpy (cut_item_buf, file_item_buf, cut_item_size);
-
-            half2 = dst;
-        }
-        else
-        {
-            file_item = file_item_buf;
-            (*b->method->code_item)(ISAMC_ENCODE, c2, &dst, &file_item);
-        }
-    }
-    if (new_item)
-    {
-        char *item_ptr = (char*) new_item;
-        (*b->method->code_item)(ISAMC_ENCODE, c2, &dst, &item_ptr);
-        new_item = 0;
-        p->dirty = 1;
-    }
-    p->size = dst - dst_buf + ISAMB_DATA_OFFSET;
-    if (p->size > b->head.block_size)
-    {
-        char *first_dst;
-        char *cut_item = cut_item_buf;
-       /* first half */
-        p->size = half1 - dst_buf + ISAMB_DATA_OFFSET;
-        memcpy (p->bytes+ISAMB_DATA_OFFSET, dst_buf, half1 - dst_buf);
-
-        /* second half */
-        *sp = new_block (b, 1);
-
-        (*b->method->code_reset)(c2);
-
-        first_dst = (*sp)->bytes + ISAMB_DATA_OFFSET;
-
-        (*b->method->code_item)(ISAMC_ENCODE, c2, &first_dst, &cut_item);
-
-        memcpy (first_dst, half2, dst - half2);
-
-        (*sp)->size = (first_dst - (char*) (*sp)->bytes) + (dst - half2);
-        (*sp)->dirty = 1;
-        p->dirty = 1;
-        memcpy (sub_item, cut_item_buf, cut_item_size);
-        *sub_size = cut_item_size;
-
-        yaz_log (LOG_LOG, "l split %d / %d", p->size, (*sp)->size);
-
-    }
-    else
-    {
-        assert (p->size > ISAMB_DATA_OFFSET);
-        assert (p->size <= b->head.block_size);
-        memcpy (p->bytes+ISAMB_DATA_OFFSET, dst_buf, dst - dst_buf);
-        *sp = 0;
-    }
-    (*b->method->code_stop)(ISAMC_DECODE, c1);
-    (*b->method->code_stop)(ISAMC_ENCODE, c2);
-}
+int insert_sub (ISAMB b, struct ISAMB_block **p,
+                void *new_item,
+                ISAMC_I stream,
+                struct ISAMB_block **sp,
+                void *sub_item, int *sub_size,
+                void *max_item);
 
-void insert_int (ISAMB b, struct ISAMB_block *p, const void *new_item,
-                 struct ISAMB_block **sp,
-                 void *split_item, int *split_size)
+int insert_int (ISAMB b, struct ISAMB_block *p, void *lookahead_item,
+                ISAMC_I stream, struct ISAMB_block **sp,
+                void *split_item, int *split_size)
 {
-    char *startp = p->bytes + ISAMB_DATA_OFFSET;
+    char *startp = p->bytes;
     char *src = startp;
     char *endp = p->bytes + p->size;
     int pos;
     struct ISAMB_block *sub_p1 = 0, *sub_p2 = 0;
-    char sub_item[256];
+    char sub_item[DST_ITEM_MAX];
     int sub_size;
+    int more;
 
     *sp = 0;
 
@@ -255,13 +210,13 @@ void insert_int (ISAMB b, struct ISAMB_block *p, const void *new_item,
         int item_len;
         int d;
         decode_ptr (&src, &item_len);
-        d = (*b->method->compare_item)(src, new_item);
+        d = (*b->method->compare_item)(src, lookahead_item);
         if (d > 0)
         {
             sub_p1 = open_block (b, pos);
             assert (sub_p1);
-            insert_sub (b, sub_p1, new_item, &sub_p2, 
-                        sub_item, &sub_size);
+            more = insert_sub (b, &sub_p1, lookahead_item, stream, &sub_p2, 
+                               sub_item, &sub_size, src);
             break;
         }
         src += item_len;
@@ -271,15 +226,16 @@ void insert_int (ISAMB b, struct ISAMB_block *p, const void *new_item,
     {
         sub_p1 = open_block (b, pos);
         assert (sub_p1);
-        insert_sub (b, sub_p1, new_item, &sub_p2, 
-                    sub_item, &sub_size);
+        more = insert_sub (b, &sub_p1, lookahead_item, stream, &sub_p2, 
+                           sub_item, &sub_size, 0);
     }
     if (sub_p2)
     {
-        char dst_buf[2048];
+        /* there was a split - must insert pointer in this one */
+        char dst_buf[DST_BUF_SIZE];
         char *dst = dst_buf;
 
-        assert (sub_size < 20);
+        assert (sub_size < 30 && sub_size > 1);
 
         memcpy (dst, startp, src - startp);
                 
@@ -296,8 +252,8 @@ void insert_int (ISAMB b, struct ISAMB_block *p, const void *new_item,
             memcpy (dst, src, endp - src);
             dst += endp - src;
         }
-        p->size = dst - dst_buf + ISAMB_DATA_OFFSET;
-        if (p->size <= b->head.block_size)
+        p->size = dst - dst_buf;
+        if (p->size <= b->file[p->cat].head.block_max)
         {
             memcpy (startp, dst_buf, dst - dst_buf);
         }
@@ -308,7 +264,7 @@ void insert_int (ISAMB b, struct ISAMB_block *p, const void *new_item,
             src = dst_buf;
             endp = dst;
 
-            half = src + b->head.block_size/2;
+            half = src + b->file[p->cat].head.block_size/2;
             decode_ptr (&src, &pos);
             while (src <= half)
             {
@@ -317,84 +273,287 @@ void insert_int (ISAMB b, struct ISAMB_block *p, const void *new_item,
                 decode_ptr (&src, &pos);
             }
             p_new_size = src - dst_buf;
-            memcpy (p->bytes + ISAMB_DATA_OFFSET, dst_buf, p_new_size);
-            p_new_size += ISAMB_DATA_OFFSET;
+            memcpy (p->bytes, dst_buf, p_new_size);
 
             decode_ptr (&src, split_size);
             memcpy (split_item, src, *split_size);
             src += *split_size;
 
-            *sp = new_block (b, 0);
+            *sp = new_int (b, p->cat);
             (*sp)->size = endp - src;
-            memcpy ((*sp)->bytes+ISAMB_DATA_OFFSET, src, (*sp)->size);
-            (*sp)->size += ISAMB_DATA_OFFSET;
+            memcpy ((*sp)->bytes, src, (*sp)->size);
 
-            yaz_log (LOG_LOG, "i split %d -> %d %d",
-                     p->size, p_new_size, (*sp)->size);
             p->size = p_new_size;
         }
         p->dirty = 1;
         close_block (b, sub_p2);
     }
     close_block (b, sub_p1);
+    return more;
 }
 
-void insert_sub (ISAMB b, struct ISAMB_block *p, const void *new_item,
-                struct ISAMB_block **sp,
-                void *sub_item, int *sub_size)
-{
-    if (p->leaf)
-        insert_leaf (b, p, new_item, sp, sub_item, sub_size);
-    else
-        insert_int (b, p, new_item, sp, sub_item, sub_size);
-}
 
-int isamb_insert_one (ISAMB b, const void *item, ISAMC_P pos)
+int insert_leaf (ISAMB b, struct ISAMB_block **sp1, void *lookahead_item,
+                 ISAMC_I stream, struct ISAMB_block **sp2,
+                 void *sub_item, int *sub_size,
+                 void *max_item)
 {
-    struct ISAMB_block *p, *sp = 0;
-    char sub_item[256];
-    int sub_size;
+    struct ISAMB_block *p = *sp1;
+    char *src = 0, *endp = 0;
+    char dst_buf[DST_BUF_SIZE], *dst = dst_buf;
+    int new_size;
+    void *c1 = (*b->method->code_start)(ISAMC_DECODE);
+    void *c2 = (*b->method->code_start)(ISAMC_ENCODE);
+    int more = 1;
+    int quater = b->file[b->no_cat-1].head.block_max / 4;
+    char *cut = dst_buf + quater * 2;
+    char *maxp = dst_buf + b->file[b->no_cat-1].head.block_max;
+    char *half1 = 0;
+    char *half2 = 0;
+    char cut_item_buf[DST_ITEM_MAX];
+    int cut_item_size = 0;
 
-    if (!pos)
-        p = new_block (b, 1);
-    else
-        p = open_block (b, pos);
+    if (p && p->size)
+    {
+        char file_item_buf[DST_ITEM_MAX];
+        char *file_item = file_item_buf;
+            
+        src = p->bytes;
+        endp = p->bytes + p->size;
+        (*b->method->code_item)(ISAMC_DECODE, c1, &file_item, &src);
+        while (1)
+        {
+            char *dst_item = 0;
+            char *dst_0 = dst;
+            char *lookahead_next;
+            int lookahead_mode;
+            int d = -1;
+            
+            if (lookahead_item)
+                d = (*b->method->compare_item)(file_item_buf, lookahead_item);
+            
+            if (d > 0)  
+                dst_item = lookahead_item;
+            else
+                dst_item = file_item_buf;
+            if (!half1 && dst > cut)   
+            {
+                char *dst_item_0 = dst_item;
+                half1 = dst; /* candidate for splitting */
+                
+                (*b->method->code_item)(ISAMC_ENCODE, c2, &dst, &dst_item);
+                
+                cut_item_size = dst_item - dst_item_0;
+                memcpy (cut_item_buf, dst_item_0, cut_item_size);
+                
+                half2 = dst;
+            }
+            else
+                (*b->method->code_item)(ISAMC_ENCODE, c2, &dst, &dst_item);
+            if (d > 0)  
+            {
+                if (dst > maxp)
+                {
+                    dst = dst_0;
+                    lookahead_item = 0;
+                }
+                else
+                {
+                    lookahead_next = lookahead_item;
+                    if (!(*stream->read_item)(stream->clientData,
+                                              &lookahead_next,
+                                              &lookahead_mode))
+                    {
+                        lookahead_item = 0;
+                        more = 0;
+                    }
+                    if (max_item &&
+                        (*b->method->compare_item)(max_item, lookahead_item) <= 0)
+                    {
+                       yaz_log (LOG_LOG, "max_item 1");
+                        lookahead_item = 0;
+                    }
+                    
+                    p->dirty = 1;
+                }
+            }
+            else if (d == 0)
+            {
+                lookahead_next = lookahead_item;
+                if (!(*stream->read_item)(stream->clientData,
+                                          &lookahead_next, &lookahead_mode))
+                {
+                    lookahead_item = 0;
+                    more = 0;
+                }
+                if (src == endp)
+                    break;
+                file_item = file_item_buf;
+                (*b->method->code_item)(ISAMC_DECODE, c1, &file_item, &src);
+            }
+            else
+            {
+                if (src == endp)
+                    break;
+                file_item = file_item_buf;
+                (*b->method->code_item)(ISAMC_DECODE, c1, &file_item, &src);
+            }
+        }
+    }
+    maxp = dst_buf + b->file[b->no_cat-1].head.block_max + quater;
+    while (lookahead_item)
+    {
+        int lookahead_mode;
+        char *dst_item = lookahead_item;
+        char *dst_0 = dst;
+        
+        if (max_item &&
+            (*b->method->compare_item)(max_item, lookahead_item) <= 0)
+        {
+           yaz_log (LOG_LOG, "max_item 2");
+            break;
+        }
+        if (!half1 && dst > cut)   
+        {
+            char *dst_item_0 = dst_item;
+            half1 = dst; /* candidate for splitting */
+            
+            (*b->method->code_item)(ISAMC_ENCODE, c2, &dst, &dst_item);
+            
+            cut_item_size = dst_item - dst_item_0;
+            memcpy (cut_item_buf, dst_item_0, cut_item_size);
+            
+            half2 = dst;
+        }
+        else
+            (*b->method->code_item)(ISAMC_ENCODE, c2, &dst, &dst_item);
+
+        if (dst > maxp)
+        {
+            dst = dst_0;
+            break;
+        }
+        if (p)
+            p->dirty = 1;
+        dst_item = lookahead_item;
+        if (!(*stream->read_item)(stream->clientData, &dst_item,
+                                  &lookahead_mode))
+        {
+            lookahead_item = 0;
+            more = 0;
+        }
+    }
+    new_size = dst - dst_buf;
+    if (p && p->cat != b->no_cat-1 && 
+        new_size > b->file[p->cat].head.block_max)
+    {
+        /* non-btree block will be removed */
+        close_block (b, p);
+        /* delete it too!! */
+        p = 0; /* make a new one anyway */
+    }
     if (!p)
-        return -1;
+    {   /* must create a new one */
+        int i;
+        for (i = 0; i < b->no_cat; i++)
+            if (new_size <= b->file[i].head.block_max)
+                break;
+        if (i == b->no_cat)
+            i = b->no_cat - 1;
+        p = new_leaf (b, i);
+    }
+    if (new_size > b->file[p->cat].head.block_max)
+    {
+        char *first_dst;
+        char *cut_item = cut_item_buf;
 
-    insert_sub (b, p, item, &sp, sub_item, &sub_size);
-    if (sp)
-    {    /* increase level of tree by one */
-        struct ISAMB_block *p2 = new_block (b, 0);
-        char *dst = p2->bytes + p2->size;
-        
-        encode_ptr (&dst, p->pos);
-        assert (sub_size < 20);
-        encode_ptr (&dst, sub_size);
-        memcpy (dst, sub_item, sub_size);
-        dst += sub_size;
-        encode_ptr (&dst, sp->pos);
+        assert (half1);
+        assert (half2);
+
+       /* first half */
+        p->size = half1 - dst_buf;
+        memcpy (p->bytes, dst_buf, half1 - dst_buf);
+
+        /* second half */
+        *sp2 = new_leaf (b, p->cat);
+
+        (*b->method->code_reset)(c2);
+
+        first_dst = (*sp2)->bytes;
+
+        (*b->method->code_item)(ISAMC_ENCODE, c2, &first_dst, &cut_item);
+
+        memcpy (first_dst, half2, dst - half2);
 
-        p2->size = dst - (char*) p2->bytes;
-        pos = p2->pos;  /* return new super page */
-        close_block (b, sp);
-        close_block (b, p2);
+        (*sp2)->size = (first_dst - (*sp2)->bytes) + (dst - half2);
+        (*sp2)->dirty = 1;
+        p->dirty = 1;
+        memcpy (sub_item, cut_item_buf, cut_item_size);
+        *sub_size = cut_item_size;
     }
     else
-        pos = p->pos;   /* return current one (again) */
-    close_block (b, p);
-    return pos;
+    {
+        memcpy (p->bytes, dst_buf, dst - dst_buf);
+        p->size = new_size;
+    }
+    (*b->method->code_stop)(ISAMC_DECODE, c1);
+    (*b->method->code_stop)(ISAMC_ENCODE, c2);
+    *sp1 = p;
+    return more;
 }
 
-ISAMB_P isamb_merge (ISAMB b, ISAMB_P pos, ISAMC_I data)
+int insert_sub (ISAMB b, struct ISAMB_block **p, void *new_item,
+                ISAMC_I stream,
+                struct ISAMB_block **sp,
+                void *sub_item, int *sub_size,
+                void *max_item)
 {
+    if (!*p || (*p)->leaf)
+        return insert_leaf (b, p, new_item, stream, sp, sub_item, sub_size, 
+                            max_item);
+    else
+        return insert_int (b, *p, new_item, stream, sp, sub_item, sub_size);
+}
+
+int isamb_merge (ISAMB b, ISAMC_P pos, ISAMC_I stream)
+{
+    char item_buf[DST_ITEM_MAX];
+    char *item_ptr;
     int i_mode;
-    char item_buf[256];
-    char *item_ptr = item_buf;
-    while ((*data->read_item)(data->clientData, &item_ptr, &i_mode))
+    int more;
+
+    item_ptr = item_buf;
+    more = (*stream->read_item)(stream->clientData, &item_ptr, &i_mode);
+    while (more)
     {
-        item_ptr = item_buf;
-        pos = isamb_insert_one (b, item_buf, pos);
+        struct ISAMB_block *p = 0, *sp = 0;
+        char sub_item[DST_ITEM_MAX];
+        int sub_size;
+        
+        if (pos)
+            p = open_block (b, pos);
+        more = insert_sub (b, &p, item_buf, stream, &sp,
+                            sub_item, &sub_size, 0);
+        if (sp)
+        {    /* increase level of tree by one */
+            struct ISAMB_block *p2 = new_int (b, p->cat);
+            char *dst = p2->bytes + p2->size;
+            
+            encode_ptr (&dst, p->pos);
+            assert (sub_size < 20);
+            encode_ptr (&dst, sub_size);
+            memcpy (dst, sub_item, sub_size);
+            dst += sub_size;
+            encode_ptr (&dst, sp->pos);
+            
+            p2->size = dst - p2->bytes;
+            pos = p2->pos;  /* return new super page */
+            close_block (b, sp);
+            close_block (b, p2);
+        }
+        else
+            pos = p->pos;   /* return current one (again) */
+        close_block (b, p);
     }
     return pos;
 }
@@ -417,7 +576,7 @@ ISAMB_PP isamb_pp_open (ISAMB isamb, ISAMB_P pos)
             break;
 
         decode_ptr (&src, &pos);
-        p->offset = src - (char*) p->bytes;
+        p->offset = src - p->bytes;
         pp->level++;
     }
     pp->block[pp->level+1] = 0;
@@ -470,7 +629,7 @@ int isamb_pp_read (ISAMB_PP pp, void *buf)
         {
             pp->block[pp->level] = p = open_block (pp->isamb, pos);
             
-            if (p->bytes[0]) /* leaf */
+            if (p->leaf) /* leaf */
             {
                 break;
             }