Work on bug #550: Avoid exit. In particular the mfile/cfile/bfile has
[idzebra-moved-to-github.git] / isamb / isamb.c
1 /* $Id: isamb.c,v 1.85 2006-11-14 08:12:08 adam Exp $
2    Copyright (C) 1995-2006
3    Index Data ApS
4
5 This file is part of the Zebra server.
6
7 Zebra is free software; you can redistribute it and/or modify it under
8 the terms of the GNU General Public License as published by the Free
9 Software Foundation; either version 2, or (at your option) any later
10 version.
11
12 Zebra is distributed in the hope that it will be useful, but WITHOUT ANY
13 WARRANTY; without even the implied warranty of MERCHANTABILITY or
14 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
15 for more details.
16
17 You should have received a copy of the GNU General Public License
18 along with this program; if not, write to the Free Software
19 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
20
21 */
22
23 #include <stdlib.h>
24 #include <string.h>
25 #include <yaz/log.h>
26 #include <yaz/xmalloc.h>
27 #include <idzebra/isamb.h>
28 #include <assert.h>
29
30 #ifndef ISAMB_DEBUG
31 #define ISAMB_DEBUG 0
32 #endif
33
34
35 #define ISAMB_MAJOR_VERSION 3
36 #define ISAMB_MINOR_VERSION 0
37
38 struct ISAMB_head {
39     zint first_block;
40     zint last_block;
41     zint free_list;
42     zint no_items;
43     int block_size;
44     int block_max;
45     int block_offset;
46 };
47
48 /* if 1, upper nodes items are encoded; 0 if not encoded */
49 #define INT_ENCODE 1
50
51 /* maximum size of encoded buffer */
52 #define DST_ITEM_MAX 256
53
54 #define ISAMB_MAX_LEVEL 10
55 /* approx 2*max page + max size of item */
56 #define DST_BUF_SIZE 16840
57
58 #define ISAMB_CACHE_ENTRY_SIZE 4096
59
60 /* CAT_MAX: _must_ be power of 2 */
61 #define CAT_MAX 4
62 #define CAT_MASK (CAT_MAX-1)
63 /* CAT_NO: <= CAT_MAX */
64 #define CAT_NO 4
65
66 /* Smallest block size */
67 #define ISAMB_MIN_SIZE 32
68 /* Size factor */
69 #define ISAMB_FAC_SIZE 4
70
71 /* ISAMB_PTR_CODEC = 1 var, =0 fixed */
72 #define ISAMB_PTR_CODEC 1
73
74 struct ISAMB_cache_entry {
75     ISAM_P pos;
76     unsigned char *buf;
77     int dirty;
78     int hits;
79     struct ISAMB_cache_entry *next;
80 };
81
82 struct ISAMB_file {
83     BFile bf;
84     int head_dirty;
85     struct ISAMB_head head;
86     struct ISAMB_cache_entry *cache_entries;
87 };
88
89 struct ISAMB_s {
90     BFiles bfs;
91     ISAMC_M *method;
92
93     struct ISAMB_file *file;
94     int no_cat;
95     int cache; /* 0 = no cache, 1 = use cache, -1 = dummy isam (for testing only) */
96     int log_io;        /* log level for bf_read/bf_write calls */
97     int log_freelist;  /* log level for freelist handling */
98     zint skipped_numbers; /* on a leaf node */
99     zint returned_numbers; 
100     zint skipped_nodes[ISAMB_MAX_LEVEL]; /* [0]=skipped leaves, 1 = higher etc */
101     zint accessed_nodes[ISAMB_MAX_LEVEL]; /* nodes we did not skip */
102 };
103
104 struct ISAMB_block {
105     ISAM_P pos;
106     int cat;
107     int size;
108     int leaf;
109     int dirty;
110     int deleted;
111     int offset;
112     zint no_items;  /* number of nodes in this + children */
113     char *bytes;
114     char *cbuf;
115     unsigned char *buf;
116     void *decodeClientData;
117     int log_rw;
118 };
119
120 struct ISAMB_PP_s {
121     ISAMB isamb;
122     ISAM_P pos;
123     int level;
124     int maxlevel; /* total depth */
125     zint total_size;
126     zint no_blocks;
127     zint skipped_numbers; /* on a leaf node */
128     zint returned_numbers; 
129     zint skipped_nodes[ISAMB_MAX_LEVEL]; /* [0]=skipped leaves, 1 = higher etc */
130     zint accessed_nodes[ISAMB_MAX_LEVEL]; /* nodes we did not skip */
131     struct ISAMB_block **block;
132     int scope;  /* on what level we forward */
133 };
134
135
136 #define encode_item_len encode_ptr
137 #if ISAMB_PTR_CODEC
138 static void encode_ptr(char **dst, zint pos)
139 {
140     unsigned char *bp = (unsigned char*) *dst;
141
142     while (pos > 127)
143     {
144          *bp++ = (unsigned char) (128 | (pos & 127));
145          pos = pos >> 7;
146     }
147     *bp++ = (unsigned char) pos;
148     *dst = (char *) bp;
149 }
150 #else
151 static void encode_ptr(char **dst, zint pos)
152 {
153     memcpy(*dst, &pos, sizeof(pos));
154     (*dst) += sizeof(pos);
155 }
156 #endif
157
158 #define decode_item_len decode_ptr
159 #if ISAMB_PTR_CODEC
160 static void decode_ptr(const char **src, zint *pos)
161 {
162     zint d = 0;
163     unsigned char c;
164     unsigned r = 0;
165
166     while (((c = *(const unsigned char *)((*src)++)) & 128))
167     {
168         d += ((zint) (c & 127) << r);
169         r += 7;
170     }
171     d += ((zint) c << r);
172     *pos = d;
173 }
174 #else
175 static void decode_ptr(const char **src, zint *pos)
176 {
177      memcpy(pos, *src, sizeof(*pos));
178      (*src) += sizeof(*pos);
179 }
180 #endif
181
182 ISAMB isamb_open(BFiles bfs, const char *name, int writeflag, ISAMC_M *method,
183                  int cache)
184 {
185     ISAMB isamb = xmalloc(sizeof(*isamb));
186     int i, b_size = ISAMB_MIN_SIZE;
187
188     isamb->bfs = bfs;
189     isamb->method = (ISAMC_M *) xmalloc(sizeof(*method));
190     memcpy(isamb->method, method, sizeof(*method));
191     isamb->no_cat = CAT_NO;
192     isamb->log_io = 0;
193     isamb->log_freelist = 0;
194     isamb->cache = cache;
195     isamb->skipped_numbers = 0;
196     isamb->returned_numbers = 0;
197     for (i = 0; i<ISAMB_MAX_LEVEL; i++)
198         isamb->skipped_nodes[i] = isamb->accessed_nodes[i] = 0;
199
200     assert(cache == 0);
201     isamb->file = xmalloc(sizeof(*isamb->file) * isamb->no_cat);
202
203     for (i = 0; i < isamb->no_cat; i++)
204     {
205         isamb->file[i].bf = 0;
206         isamb->file[i].head_dirty = 0;
207         isamb->file[i].cache_entries = 0;
208     }
209
210     for (i = 0; i < isamb->no_cat; i++)
211     {
212         char fname[DST_BUF_SIZE];
213         char hbuf[DST_BUF_SIZE];
214
215         sprintf(fname, "%s%c", name, i+'A');
216         if (cache)
217             isamb->file[i].bf = bf_open(bfs, fname, ISAMB_CACHE_ENTRY_SIZE,
218                                          writeflag);
219         else
220             isamb->file[i].bf = bf_open(bfs, fname, b_size, writeflag);
221
222         if (!isamb->file[i].bf)
223         {
224             isamb_close(isamb);
225             return 0;
226         }
227
228         /* fill-in default values (for empty isamb) */
229         isamb->file[i].head.first_block = ISAMB_CACHE_ENTRY_SIZE/b_size+1;
230         isamb->file[i].head.last_block = isamb->file[i].head.first_block;
231         isamb->file[i].head.block_size = b_size;
232 #if ISAMB_PTR_CODEC
233         if (i == isamb->no_cat-1 || b_size > 128)
234             isamb->file[i].head.block_offset = 8;
235         else
236             isamb->file[i].head.block_offset = 4;
237 #else
238         isamb->file[i].head.block_offset = 11;
239 #endif
240         isamb->file[i].head.block_max =
241             b_size - isamb->file[i].head.block_offset;
242         isamb->file[i].head.free_list = 0;
243         if (bf_read(isamb->file[i].bf, 0, 0, 0, hbuf))
244         {
245             /* got header assume "isamb"major minor len can fit in 16 bytes */
246             zint zint_tmp;
247             int major, minor, len, pos = 0;
248             int left;
249             const char *src = 0;
250             if (memcmp(hbuf, "isamb", 5))
251             {
252                 yaz_log(YLOG_WARN, "bad isamb header for file %s", fname);
253                 return 0;
254             }
255             if (sscanf(hbuf+5, "%d %d %d", &major, &minor, &len) != 3)
256             {
257                 yaz_log(YLOG_WARN, "bad isamb header for file %s", fname);
258                 return 0;
259             }
260             if (major != ISAMB_MAJOR_VERSION)
261             {
262                 yaz_log(YLOG_WARN, "bad major version for file %s %d, must be %d",
263                      fname, major, ISAMB_MAJOR_VERSION);
264                 return 0;
265             }
266             for (left = len - b_size; left > 0; left = left - b_size)
267             {
268                 pos++;
269                 if (!bf_read(isamb->file[i].bf, pos, 0, 0, hbuf + pos*b_size))
270                 {
271                     yaz_log(YLOG_WARN, "truncated isamb header for " 
272                          "file=%s len=%d pos=%d",
273                          fname, len, pos);
274                     return 0;
275                 }
276             }
277             src = hbuf + 16;
278             decode_ptr(&src, &isamb->file[i].head.first_block);
279             decode_ptr(&src, &isamb->file[i].head.last_block);
280             decode_ptr(&src, &zint_tmp);
281             isamb->file[i].head.block_size = (int) zint_tmp;
282             decode_ptr(&src, &zint_tmp);
283             isamb->file[i].head.block_max = (int) zint_tmp;
284             decode_ptr(&src, &isamb->file[i].head.free_list);
285         }
286         assert (isamb->file[i].head.block_size >= isamb->file[i].head.block_offset);
287         isamb->file[i].head_dirty = 0;
288         assert(isamb->file[i].head.block_size == b_size);
289         b_size = b_size * ISAMB_FAC_SIZE;
290     }
291 #if ISAMB_DEBUG
292     yaz_log(YLOG_WARN, "isamb debug enabled. Things will be slower than usual");
293 #endif
294     return isamb;
295 }
296
297 static void flush_blocks (ISAMB b, int cat)
298 {
299     while (b->file[cat].cache_entries)
300     {
301         struct ISAMB_cache_entry *ce_this = b->file[cat].cache_entries;
302         b->file[cat].cache_entries = ce_this->next;
303
304         if (ce_this->dirty)
305         {
306             yaz_log(b->log_io, "bf_write: flush_blocks");
307             bf_write(b->file[cat].bf, ce_this->pos, 0, 0, ce_this->buf);
308         }
309         xfree(ce_this->buf);
310         xfree(ce_this);
311     }
312 }
313
314 static int cache_block (ISAMB b, ISAM_P pos, unsigned char *userbuf, int wr)
315 {
316     int cat = (int) (pos&CAT_MASK);
317     int off = (int) (((pos/CAT_MAX) & 
318                (ISAMB_CACHE_ENTRY_SIZE / b->file[cat].head.block_size - 1))
319         * b->file[cat].head.block_size);
320     zint norm = pos / (CAT_MASK*ISAMB_CACHE_ENTRY_SIZE / b->file[cat].head.block_size);
321     int no = 0;
322     struct ISAMB_cache_entry **ce, *ce_this = 0, **ce_last = 0;
323
324     if (!b->cache)
325         return 0;
326
327     assert (ISAMB_CACHE_ENTRY_SIZE >= b->file[cat].head.block_size);
328     for (ce = &b->file[cat].cache_entries; *ce; ce = &(*ce)->next, no++)
329     {
330         ce_last = ce;
331         if ((*ce)->pos == norm)
332         {
333             ce_this = *ce;
334             *ce = (*ce)->next;   /* remove from list */
335             
336             ce_this->next = b->file[cat].cache_entries;  /* move to front */
337             b->file[cat].cache_entries = ce_this;
338             
339             if (wr)
340             {
341                 memcpy (ce_this->buf + off, userbuf, 
342                         b->file[cat].head.block_size);
343                 ce_this->dirty = 1;
344             }
345             else
346                 memcpy (userbuf, ce_this->buf + off,
347                         b->file[cat].head.block_size);
348             return 1;
349         }
350     }
351     if (no >= 40)
352     {
353         assert (no == 40);
354         assert (ce_last && *ce_last);
355         ce_this = *ce_last;
356         *ce_last = 0;  /* remove the last entry from list */
357         if (ce_this->dirty)
358         {
359             yaz_log(b->log_io, "bf_write: cache_block");
360             bf_write(b->file[cat].bf, ce_this->pos, 0, 0, ce_this->buf);
361         }
362         xfree(ce_this->buf);
363         xfree(ce_this);
364     }
365     ce_this = xmalloc(sizeof(*ce_this));
366     ce_this->next = b->file[cat].cache_entries;
367     b->file[cat].cache_entries = ce_this;
368     ce_this->buf = xmalloc(ISAMB_CACHE_ENTRY_SIZE);
369     ce_this->pos = norm;
370     yaz_log(b->log_io, "bf_read: cache_block");
371     if (!bf_read(b->file[cat].bf, norm, 0, 0, ce_this->buf))
372         memset (ce_this->buf, 0, ISAMB_CACHE_ENTRY_SIZE);
373     if (wr)
374     {
375         memcpy (ce_this->buf + off, userbuf, b->file[cat].head.block_size);
376         ce_this->dirty = 1;
377     }
378     else
379     {
380         ce_this->dirty = 0;
381         memcpy (userbuf, ce_this->buf + off, b->file[cat].head.block_size);
382     }
383     return 1;
384 }
385
386
387 void isamb_close (ISAMB isamb)
388 {
389     int i;
390     for (i = 0; isamb->accessed_nodes[i]; i++)
391         yaz_log(YLOG_DEBUG, "isamb_close  level leaf-%d: "ZINT_FORMAT" read, "
392                         ZINT_FORMAT" skipped",
393              i, isamb->accessed_nodes[i], isamb->skipped_nodes[i]);
394     yaz_log(YLOG_DEBUG, "isamb_close returned "ZINT_FORMAT" values, "
395                    "skipped "ZINT_FORMAT,
396          isamb->skipped_numbers, isamb->returned_numbers);
397     for (i = 0; i<isamb->no_cat; i++)
398     {
399         flush_blocks (isamb, i);
400         if (isamb->file[i].head_dirty)
401         {
402             char hbuf[DST_BUF_SIZE];
403             int major = ISAMB_MAJOR_VERSION;
404             int minor = ISAMB_MINOR_VERSION;
405             int len = 16;
406             char *dst = hbuf + 16;
407             int pos = 0, left;
408             int b_size = isamb->file[i].head.block_size;
409
410             encode_ptr(&dst, isamb->file[i].head.first_block);
411             encode_ptr(&dst, isamb->file[i].head.last_block);
412             encode_ptr(&dst, isamb->file[i].head.block_size);
413             encode_ptr(&dst, isamb->file[i].head.block_max);
414             encode_ptr(&dst, isamb->file[i].head.free_list);
415             memset(dst, '\0', b_size); /* ensure no random bytes are written */
416
417             len = dst - hbuf;
418
419             /* print exactly 16 bytes (including trailing 0) */
420             sprintf(hbuf, "isamb%02d %02d %02d\r\n", major, minor, len);
421
422             bf_write(isamb->file[i].bf, pos, 0, 0, hbuf);
423
424             for (left = len - b_size; left > 0; left = left - b_size)
425             {
426                 pos++;
427                 bf_write(isamb->file[i].bf, pos, 0, 0, hbuf + pos*b_size);
428             }
429         }
430         if (isamb->file[i].bf)
431             bf_close (isamb->file[i].bf);
432     }
433     xfree(isamb->file);
434     xfree(isamb->method);
435     xfree(isamb);
436 }
437
438 /* open_block: read one block at pos.
439    Decode leading sys bytes .. consisting of
440    Offset:Meaning
441    0: leader byte, != 0 leaf, == 0, non-leaf
442    1-2: used size of block
443    3-7*: number of items and all children
444    
445    * Reserve 5 bytes for large block sizes. 1 for small ones .. Number
446    of items. We can thus have at most 2^40 nodes. 
447 */
448 static struct ISAMB_block *open_block(ISAMB b, ISAM_P pos)
449 {
450     int cat = (int) (pos&CAT_MASK);
451     const char *src;
452     int offset = b->file[cat].head.block_offset;
453     struct ISAMB_block *p;
454     if (!pos)
455         return 0;
456     p = xmalloc(sizeof(*p));
457     p->pos = pos;
458     p->cat = (int) (pos & CAT_MASK);
459     p->buf = xmalloc(b->file[cat].head.block_size);
460     p->cbuf = 0;
461
462     if (!cache_block (b, pos, p->buf, 0))
463     {
464         yaz_log(b->log_io, "bf_read: open_block");
465         if (!bf_read(b->file[cat].bf, pos/CAT_MAX, 0, 0, p->buf))
466         {
467             yaz_log(YLOG_FATAL, "isamb: read fail for pos=%ld block=%ld",
468                      (long) pos, (long) pos/CAT_MAX);
469             abort();
470         }
471     }
472     p->bytes = (char *)p->buf + offset;
473     p->leaf = p->buf[0];
474     p->size = (p->buf[1] + 256 * p->buf[2]) - offset;
475     if (p->size < 0)
476     {
477         yaz_log(YLOG_FATAL, "Bad block size %d in pos=" ZINT_FORMAT "\n",
478                  p->size, pos);
479     }
480     assert (p->size >= 0);
481     src = (char*) p->buf + 3;
482     decode_ptr(&src, &p->no_items);
483
484     p->offset = 0;
485     p->dirty = 0;
486     p->deleted = 0;
487     p->decodeClientData = (*b->method->codec.start)();
488     return p;
489 }
490
491 struct ISAMB_block *new_block (ISAMB b, int leaf, int cat)
492 {
493     struct ISAMB_block *p;
494
495     p = xmalloc(sizeof(*p));
496     p->buf = xmalloc(b->file[cat].head.block_size);
497
498     if (!b->file[cat].head.free_list)
499     {
500         zint block_no;
501         block_no = b->file[cat].head.last_block++;
502         p->pos = block_no * CAT_MAX + cat;
503     }
504     else
505     {
506         p->pos = b->file[cat].head.free_list;
507         assert((p->pos & CAT_MASK) == cat);
508         if (!cache_block (b, p->pos, p->buf, 0))
509         {
510             yaz_log(b->log_io, "bf_read: new_block");
511             if (!bf_read(b->file[cat].bf, p->pos/CAT_MAX, 0, 0, p->buf))
512             {
513                 yaz_log(YLOG_FATAL, "isamb: read fail for pos=%ld block=%ld",
514                          (long) p->pos/CAT_MAX, (long) p->pos/CAT_MAX);
515                 abort ();
516             }
517         }
518         yaz_log(b->log_freelist, "got block " ZINT_FORMAT " from freelist %d:" ZINT_FORMAT, p->pos,
519                  cat, p->pos/CAT_MAX);
520         memcpy (&b->file[cat].head.free_list, p->buf, sizeof(zint));
521     }
522     p->cat = cat;
523     b->file[cat].head_dirty = 1;
524     memset (p->buf, 0, b->file[cat].head.block_size);
525     p->bytes = (char*)p->buf + b->file[cat].head.block_offset;
526     p->leaf = leaf;
527     p->size = 0;
528     p->dirty = 1;
529     p->deleted = 0;
530     p->offset = 0;
531     p->no_items = 0;
532     p->decodeClientData = (*b->method->codec.start)();
533     return p;
534 }
535
536 struct ISAMB_block *new_leaf (ISAMB b, int cat)
537 {
538     return new_block (b, 1, cat);
539 }
540
541
542 struct ISAMB_block *new_int (ISAMB b, int cat)
543 {
544     return new_block (b, 0, cat);
545 }
546
547 static void check_block (ISAMB b, struct ISAMB_block *p)
548 {
549     assert(b); /* mostly to make the compiler shut up about unused b */
550     if (p->leaf)
551     {
552         ;
553     }
554     else
555     {
556         /* sanity check */
557         char *startp = p->bytes;
558         const char *src = startp;
559         char *endp = p->bytes + p->size;
560         ISAM_P pos;
561         void *c1 = (*b->method->codec.start)();
562             
563         decode_ptr(&src, &pos);
564         assert ((pos&CAT_MASK) == p->cat);
565         while (src != endp)
566         {
567 #if INT_ENCODE
568             char file_item_buf[DST_ITEM_MAX];
569             char *file_item = file_item_buf;
570             (*b->method->codec.reset)(c1);
571             (*b->method->codec.decode)(c1, &file_item, &src);
572 #else
573             zint item_len;
574             decode_item_len(&src, &item_len);
575             assert (item_len > 0 && item_len < 80);
576             src += item_len;
577 #endif
578             decode_ptr(&src, &pos);
579             if ((pos&CAT_MASK) != p->cat)
580             {
581                 assert ((pos&CAT_MASK) == p->cat);
582             }
583         }
584         (*b->method->codec.stop)(c1);
585     }
586 }
587
588 void close_block(ISAMB b, struct ISAMB_block *p)
589 {
590     if (!p)
591         return;
592     if (p->deleted)
593     {
594         yaz_log(b->log_freelist, "release block " ZINT_FORMAT " from freelist %d:" ZINT_FORMAT,
595                  p->pos, p->cat, p->pos/CAT_MAX);
596         memcpy (p->buf, &b->file[p->cat].head.free_list, sizeof(zint));
597         b->file[p->cat].head.free_list = p->pos;
598         if (!cache_block (b, p->pos, p->buf, 1))
599         {
600             yaz_log(b->log_io, "bf_write: close_block (deleted)");
601             bf_write(b->file[p->cat].bf, p->pos/CAT_MAX, 0, 0, p->buf);
602         }
603     }
604     else if (p->dirty)
605     {
606         int offset = b->file[p->cat].head.block_offset;
607         int size = p->size + offset;
608         char *dst =  (char*)p->buf + 3;
609         assert (p->size >= 0);
610         
611         /* memset becuase encode_ptr usually does not write all bytes */
612         memset(p->buf, 0, b->file[p->cat].head.block_offset);
613         p->buf[0] = p->leaf;
614         p->buf[1] = size & 255;
615         p->buf[2] = size >> 8;
616         encode_ptr(&dst, p->no_items);
617         check_block(b, p);
618         if (!cache_block (b, p->pos, p->buf, 1))
619         {
620             yaz_log(b->log_io, "bf_write: close_block");
621             bf_write(b->file[p->cat].bf, p->pos/CAT_MAX, 0, 0, p->buf);
622         }
623     }
624     (*b->method->codec.stop)(p->decodeClientData);
625     xfree(p->buf);
626     xfree(p);
627 }
628
629 int insert_sub (ISAMB b, struct ISAMB_block **p,
630                 void *new_item, int *mode,
631                 ISAMC_I *stream,
632                 struct ISAMB_block **sp,
633                 void *sub_item, int *sub_size,
634                 const void *max_item);
635
636 int insert_int (ISAMB b, struct ISAMB_block *p, void *lookahead_item,
637                 int *mode,
638                 ISAMC_I *stream, struct ISAMB_block **sp,
639                 void *split_item, int *split_size, const void *last_max_item)
640 {
641     char *startp = p->bytes;
642     const char *src = startp;
643     char *endp = p->bytes + p->size;
644     ISAM_P pos;
645     struct ISAMB_block *sub_p1 = 0, *sub_p2 = 0;
646     char sub_item[DST_ITEM_MAX];
647     int sub_size;
648     int more = 0;
649     zint diff_terms = 0;
650     void *c1 = (*b->method->codec.start)();
651
652     *sp = 0;
653
654     assert(p->size >= 0);
655     decode_ptr(&src, &pos);
656     while (src != endp)
657     {
658         int d;
659         const char *src0 = src;
660 #if INT_ENCODE
661         char file_item_buf[DST_ITEM_MAX];
662         char *file_item = file_item_buf;
663         (*b->method->codec.reset)(c1);
664         (*b->method->codec.decode)(c1, &file_item, &src);
665         d = (*b->method->compare_item)(file_item_buf, lookahead_item);
666         if (d > 0)
667         {
668             sub_p1 = open_block(b, pos);
669             assert (sub_p1);
670             diff_terms -= sub_p1->no_items;
671             more = insert_sub (b, &sub_p1, lookahead_item, mode,
672                                stream, &sub_p2, 
673                                sub_item, &sub_size, file_item_buf);
674             diff_terms += sub_p1->no_items;
675             src = src0;
676             break;
677         }
678 #else
679         zint item_len;
680         decode_item_len(&src, &item_len);
681         d = (*b->method->compare_item)(src, lookahead_item);
682         if (d > 0)
683         {
684             sub_p1 = open_block(b, pos);
685             assert (sub_p1);
686             diff_terms -= sub_p1->no_items;
687             more = insert_sub (b, &sub_p1, lookahead_item, mode,
688                                stream, &sub_p2, 
689                                sub_item, &sub_size, src);
690             diff_terms += sub_p1->no_items;
691             src = src0;
692             break;
693         }
694         src += item_len;
695 #endif
696         decode_ptr(&src, &pos);
697     }
698     if (!sub_p1)
699     {
700         /* we reached the end. So lookahead > last item */
701         sub_p1 = open_block(b, pos);
702         assert (sub_p1);
703         diff_terms -= sub_p1->no_items;
704         more = insert_sub (b, &sub_p1, lookahead_item, mode, stream, &sub_p2, 
705                            sub_item, &sub_size, last_max_item);
706         diff_terms += sub_p1->no_items;
707     }
708     if (sub_p2)
709         diff_terms += sub_p2->no_items;
710     if (diff_terms)
711     {
712         p->dirty = 1;
713         p->no_items += diff_terms;
714     }
715     if (sub_p2)
716     {
717         /* there was a split - must insert pointer in this one */
718         char dst_buf[DST_BUF_SIZE];
719         char *dst = dst_buf;
720 #if INT_ENCODE
721         const char *sub_item_ptr = sub_item;
722 #endif
723         assert (sub_size < 80 && sub_size > 1);
724
725         memcpy (dst, startp, src - startp);
726                 
727         dst += src - startp;
728
729 #if INT_ENCODE
730         (*b->method->codec.reset)(c1);
731         (*b->method->codec.encode)(c1, &dst, &sub_item_ptr);
732 #else
733         encode_item_len (&dst, sub_size);      /* sub length and item */
734         memcpy (dst, sub_item, sub_size);
735         dst += sub_size;
736 #endif
737
738         encode_ptr(&dst, sub_p2->pos);   /* pos */
739
740         if (endp - src)                   /* remaining data */
741         {
742             memcpy (dst, src, endp - src);
743             dst += endp - src;
744         }
745         p->size = dst - dst_buf;
746         assert (p->size >= 0);
747
748         if (p->size <= b->file[p->cat].head.block_max)
749         {
750             /* it fits OK in this block */
751             memcpy (startp, dst_buf, dst - dst_buf);
752
753             close_block(b, sub_p2);
754         }
755         else
756         {
757             /* must split _this_ block as well .. */
758             struct ISAMB_block *sub_p3;
759 #if INT_ENCODE
760             char file_item_buf[DST_ITEM_MAX];
761             char *file_item = file_item_buf;
762 #else
763             zint split_size_tmp;
764 #endif
765             zint no_items_first_half = 0;
766             int p_new_size;
767             const char *half;
768             src = dst_buf;
769             endp = dst;
770
771             p->dirty = 1;
772             close_block(b, sub_p2);
773
774             half = src + b->file[p->cat].head.block_size/2;
775             decode_ptr(&src, &pos);
776
777             /* read sub block so we can get no_items for it */
778             sub_p3 = open_block(b, pos);
779             no_items_first_half += sub_p3->no_items;
780             close_block(b, sub_p3);
781
782             while (src <= half)
783             {
784 #if INT_ENCODE
785                 file_item = file_item_buf;
786                 (*b->method->codec.reset)(c1);
787                 (*b->method->codec.decode)(c1, &file_item, &src);
788 #else
789                 decode_item_len(&src, &split_size_tmp);
790                 *split_size = (int) split_size_tmp;
791                 src += *split_size;
792 #endif
793                 decode_ptr(&src, &pos);
794
795                 /* read sub block so we can get no_items for it */
796                 sub_p3 = open_block(b, pos);
797                 no_items_first_half += sub_p3->no_items;
798                 close_block(b, sub_p3);
799             }
800             /*  p is first half */
801             p_new_size = src - dst_buf;
802             memcpy (p->bytes, dst_buf, p_new_size);
803
804 #if INT_ENCODE
805             file_item = file_item_buf;
806             (*b->method->codec.reset)(c1);
807             (*b->method->codec.decode)(c1, &file_item, &src);
808             *split_size = file_item - file_item_buf;
809             memcpy(split_item, file_item_buf, *split_size);
810 #else
811             decode_item_len(&src, &split_size_tmp);
812             *split_size = (int) split_size_tmp;
813             memcpy (split_item, src, *split_size);
814             src += *split_size;
815 #endif
816             /*  *sp is second half */
817             *sp = new_int (b, p->cat);
818             (*sp)->size = endp - src;
819             memcpy ((*sp)->bytes, src, (*sp)->size);
820
821             p->size = p_new_size;
822
823             /*  adjust no_items in first&second half */
824             (*sp)->no_items = p->no_items - no_items_first_half;
825             p->no_items = no_items_first_half;
826         }
827         p->dirty = 1;
828     }
829     close_block(b, sub_p1);
830     (*b->method->codec.stop)(c1);
831     return more;
832 }
833
834 int insert_leaf (ISAMB b, struct ISAMB_block **sp1, void *lookahead_item,
835                  int *lookahead_mode, ISAMC_I *stream,
836                  struct ISAMB_block **sp2,
837                  void *sub_item, int *sub_size,
838                  const void *max_item)
839 {
840     struct ISAMB_block *p = *sp1;
841     char *endp = 0;
842     const char *src = 0;
843     char dst_buf[DST_BUF_SIZE], *dst = dst_buf;
844     int new_size;
845     void *c1 = (*b->method->codec.start)();
846     void *c2 = (*b->method->codec.start)();
847     int more = 1;
848     int quater = b->file[b->no_cat-1].head.block_max / 4;
849     char *mid_cut = dst_buf + quater * 2;
850     char *tail_cut = dst_buf + quater * 3;
851     char *maxp = dst_buf + b->file[b->no_cat-1].head.block_max;
852     char *half1 = 0;
853     char *half2 = 0;
854     char cut_item_buf[DST_ITEM_MAX];
855     int cut_item_size = 0;
856     int no_items = 0;    /* number of items (total) */
857     int no_items_1 = 0;  /* number of items (first half) */
858     int inserted_dst_bytes = 0;
859
860     if (p && p->size)
861     {
862         char file_item_buf[DST_ITEM_MAX];
863         char *file_item = file_item_buf;
864             
865         src = p->bytes;
866         endp = p->bytes + p->size;
867         (*b->method->codec.decode)(c1, &file_item, &src);
868         while (1)
869         {
870             const char *dst_item = 0; /* resulting item to be inserted */
871             char *lookahead_next;
872             char *dst_0 = dst;
873             int d = -1;
874             
875             if (lookahead_item)
876                 d = (*b->method->compare_item)(file_item_buf, lookahead_item);
877             
878             /* d now holds comparison between existing file item and
879                lookahead item 
880                d = 0: equal
881                d > 0: lookahead before file
882                d < 0: lookahead after file
883             */
884             if (d > 0)
885             {
886                 /* lookahead must be inserted */
887                 dst_item = lookahead_item;
888                 /* if this is not an insertion, it's really bad .. */
889                 if (!*lookahead_mode)
890                 {
891                     yaz_log(YLOG_WARN, "isamb: Inconsistent register (1)");
892                     assert (*lookahead_mode);
893                 }
894             }
895             else
896                 dst_item = file_item_buf;
897
898             if (!*lookahead_mode && d == 0)
899             {
900                 /* it's a deletion and they match so there is nothing to be
901                    inserted anyway .. But mark the thing bad (file item
902                    was part of input.. The item will not be part of output */
903                 p->dirty = 1;
904             }
905             else if (!half1 && dst > mid_cut)
906             {
907                 /* we have reached the splitting point for the first time */
908                 const char *dst_item_0 = dst_item;
909                 half1 = dst; /* candidate for splitting */
910
911                 /* encode the resulting item */
912                 (*b->method->codec.encode)(c2, &dst, &dst_item);
913                 
914                 cut_item_size = dst_item - dst_item_0;
915                 assert(cut_item_size > 0);
916                 memcpy (cut_item_buf, dst_item_0, cut_item_size);
917                 
918                 half2 = dst;
919                 no_items_1 = no_items;
920                 no_items++;
921             }
922             else
923             {
924                 /* encode the resulting item */
925                 (*b->method->codec.encode)(c2, &dst, &dst_item);
926                 no_items++;
927             }
928
929             /* now move "pointers" .. result has been encoded .. */
930             if (d > 0)  
931             {
932                 /* we must move the lookahead pointer */
933
934                 inserted_dst_bytes += (dst - dst_0);
935                 if (inserted_dst_bytes >=  quater)
936                     /* no more room. Mark lookahead as "gone".. */
937                     lookahead_item = 0;
938                 else
939                 {
940                     /* move it really.. */
941                     lookahead_next = lookahead_item;
942                     if (!(*stream->read_item)(stream->clientData,
943                                               &lookahead_next,
944                                               lookahead_mode))
945                     {
946                         /* end of stream reached: no "more" and no lookahead */
947                         lookahead_item = 0;
948                         more = 0;
949                     }
950                     if (lookahead_item && max_item &&
951                         (*b->method->compare_item)(max_item, lookahead_item) <= 0)
952                     {
953                         /* the lookahead goes beyond what we allow in this
954                            leaf. Mark it as "gone" */
955                         lookahead_item = 0;
956                     }
957                     
958                     p->dirty = 1;
959                 }
960             }
961             else if (d == 0)
962             {
963                 /* exact match .. move both pointers */
964
965                 lookahead_next = lookahead_item;
966                 if (!(*stream->read_item)(stream->clientData,
967                                           &lookahead_next, lookahead_mode))
968                 {
969                     lookahead_item = 0;
970                     more = 0;
971                 }
972                 if (src == endp)
973                     break;  /* end of file stream reached .. */
974                 file_item = file_item_buf; /* move file pointer */
975                 (*b->method->codec.decode)(c1, &file_item, &src);
976             }
977             else
978             {
979                 /* file pointer must be moved */
980                 if (src == endp)
981                     break;
982                 file_item = file_item_buf;
983                 (*b->method->codec.decode)(c1, &file_item, &src);
984             }
985         }
986     }
987
988     /* this loop runs when we are "appending" to a leaf page. That is
989        either it's empty (new) or all file items have been read in
990        previous loop */
991
992     maxp = dst_buf + b->file[b->no_cat-1].head.block_max + quater;
993     while (lookahead_item)
994     {
995         char *dst_item;
996         const char *src = lookahead_item;
997         char *dst_0 = dst;
998         
999         /* if we have a lookahead item, we stop if we exceed the value of it */
1000         if (max_item &&
1001             (*b->method->compare_item)(max_item, lookahead_item) <= 0)
1002         {
1003             /* stop if we have reached the value of max item */
1004             break;
1005         }
1006         if (!*lookahead_mode)
1007         {
1008             /* this is append. So a delete is bad */
1009             yaz_log(YLOG_WARN, "isamb: Inconsistent register (2)");
1010             abort();
1011         }
1012         else if (!half1 && dst > tail_cut)
1013         {
1014             const char *src_0 = src;
1015             half1 = dst; /* candidate for splitting */
1016             
1017             (*b->method->codec.encode)(c2, &dst, &src);
1018             
1019             cut_item_size = src - src_0;
1020             assert(cut_item_size > 0);
1021             memcpy (cut_item_buf, src_0, cut_item_size);
1022             
1023             no_items_1 = no_items;
1024             half2 = dst;
1025         }
1026         else
1027             (*b->method->codec.encode)(c2, &dst, &src);
1028
1029         if (dst > maxp)
1030         {
1031             dst = dst_0;
1032             break;
1033         }
1034         no_items++;
1035         if (p)
1036             p->dirty = 1;
1037         dst_item = lookahead_item;
1038         if (!(*stream->read_item)(stream->clientData, &dst_item,
1039                                   lookahead_mode))
1040         {
1041             lookahead_item = 0;
1042             more = 0;
1043         }
1044     }
1045     new_size = dst - dst_buf;
1046     if (p && p->cat != b->no_cat-1 && 
1047         new_size > b->file[p->cat].head.block_max)
1048     {
1049         /* non-btree block will be removed */
1050         p->deleted = 1;
1051         close_block(b, p);
1052         /* delete it too!! */
1053         p = 0; /* make a new one anyway */
1054     }
1055     if (!p)
1056     {   /* must create a new one */
1057         int i;
1058         for (i = 0; i < b->no_cat; i++)
1059             if (new_size <= b->file[i].head.block_max)
1060                 break;
1061         if (i == b->no_cat)
1062             i = b->no_cat - 1;
1063         p = new_leaf (b, i);
1064     }
1065     if (new_size > b->file[p->cat].head.block_max)
1066     {
1067         char *first_dst;
1068         const char *cut_item = cut_item_buf;
1069
1070         assert (half1);
1071         assert (half2);
1072
1073         assert(cut_item_size > 0);
1074         
1075         /* first half */
1076         p->size = half1 - dst_buf;
1077         assert(p->size <=  b->file[p->cat].head.block_max);
1078         memcpy (p->bytes, dst_buf, half1 - dst_buf);
1079         p->no_items = no_items_1;
1080
1081         /* second half */
1082         *sp2 = new_leaf (b, p->cat);
1083
1084         (*b->method->codec.reset)(c2);
1085
1086         first_dst = (*sp2)->bytes;
1087
1088         (*b->method->codec.encode)(c2, &first_dst, &cut_item);
1089
1090         memcpy (first_dst, half2, dst - half2);
1091
1092         (*sp2)->size = (first_dst - (*sp2)->bytes) + (dst - half2);
1093         assert((*sp2)->size <=  b->file[p->cat].head.block_max);
1094         (*sp2)->no_items = no_items - no_items_1;
1095         (*sp2)->dirty = 1;
1096         p->dirty = 1;
1097         memcpy (sub_item, cut_item_buf, cut_item_size);
1098         *sub_size = cut_item_size;
1099     }
1100     else
1101     {
1102         memcpy (p->bytes, dst_buf, dst - dst_buf);
1103         p->size = new_size;
1104         p->no_items = no_items;
1105     }
1106     (*b->method->codec.stop)(c1);
1107     (*b->method->codec.stop)(c2);
1108     *sp1 = p;
1109     return more;
1110 }
1111
1112 int insert_sub (ISAMB b, struct ISAMB_block **p, void *new_item,
1113                 int *mode,
1114                 ISAMC_I *stream,
1115                 struct ISAMB_block **sp,
1116                 void *sub_item, int *sub_size,
1117                 const void *max_item)
1118 {
1119     if (!*p || (*p)->leaf)
1120         return insert_leaf (b, p, new_item, mode, stream, sp, sub_item, 
1121                             sub_size, max_item);
1122     else
1123         return insert_int (b, *p, new_item, mode, stream, sp, sub_item,
1124                            sub_size, max_item);
1125 }
1126
1127 int isamb_unlink (ISAMB b, ISAM_P pos)
1128 {
1129     struct ISAMB_block *p1;
1130
1131     if (!pos)
1132         return 0;
1133     p1 = open_block(b, pos);
1134     p1->deleted = 1;
1135     if (!p1->leaf)
1136     {
1137         zint sub_p;
1138         const char *src = p1->bytes + p1->offset;
1139 #if INT_ENCODE
1140         void *c1 = (*b->method->codec.start)();
1141 #endif
1142         decode_ptr(&src, &sub_p);
1143         isamb_unlink(b, sub_p);
1144         
1145         while (src != p1->bytes + p1->size)
1146         {
1147 #if INT_ENCODE
1148             char file_item_buf[DST_ITEM_MAX];
1149             char *file_item = file_item_buf;
1150             (*b->method->codec.reset)(c1);
1151             (*b->method->codec.decode)(c1, &file_item, &src);
1152 #else
1153             zint item_len;
1154             decode_item_len(&src, &item_len);
1155             src += item_len;
1156 #endif
1157             decode_ptr(&src, &sub_p);
1158             isamb_unlink(b, sub_p);
1159         }
1160 #if INT_ENCODE
1161         (*b->method->codec.stop)(c1);
1162 #endif
1163     }
1164     close_block(b, p1);
1165     return 0;
1166 }
1167
1168 void isamb_merge(ISAMB b, ISAM_P *pos, ISAMC_I *stream)
1169 {
1170     char item_buf[DST_ITEM_MAX];
1171     char *item_ptr;
1172     int i_mode;
1173     int more;
1174     int must_delete = 0;
1175
1176     if (b->cache < 0)
1177     {
1178         int more = 1;
1179         while (more)
1180         {
1181             item_ptr = item_buf;
1182             more =
1183                 (*stream->read_item)(stream->clientData, &item_ptr, &i_mode);
1184         }
1185         *pos = 1;
1186         return;
1187     }
1188     item_ptr = item_buf;
1189     more = (*stream->read_item)(stream->clientData, &item_ptr, &i_mode);
1190     while (more)
1191     {
1192         struct ISAMB_block *p = 0, *sp = 0;
1193         char sub_item[DST_ITEM_MAX];
1194         int sub_size;
1195         
1196         if (*pos)
1197             p = open_block(b, *pos);
1198         more = insert_sub (b, &p, item_buf, &i_mode, stream, &sp,
1199                            sub_item, &sub_size, 0);
1200         if (sp)
1201         {    /* increase level of tree by one */
1202             struct ISAMB_block *p2 = new_int (b, p->cat);
1203             char *dst = p2->bytes + p2->size;
1204 #if INT_ENCODE
1205             void *c1 = (*b->method->codec.start)();
1206             const char *sub_item_ptr = sub_item;
1207 #endif
1208
1209             encode_ptr(&dst, p->pos);
1210             assert (sub_size < 80 && sub_size > 1);
1211 #if INT_ENCODE
1212             (*b->method->codec.reset)(c1);
1213             (*b->method->codec.encode)(c1, &dst, &sub_item_ptr);
1214 #else
1215             encode_item_len (&dst, sub_size);
1216             memcpy (dst, sub_item, sub_size);
1217             dst += sub_size;
1218 #endif
1219             encode_ptr(&dst, sp->pos);
1220             
1221             p2->size = dst - p2->bytes;
1222             p2->no_items = p->no_items + sp->no_items;
1223             *pos = p2->pos;  /* return new super page */
1224             close_block(b, sp);
1225             close_block(b, p2);
1226 #if INT_ENCODE
1227             (*b->method->codec.stop)(c1);
1228 #endif
1229         }
1230         else
1231         {
1232             *pos = p->pos;   /* return current one (again) */
1233         }
1234         if (p->no_items == 0)
1235             must_delete = 1;
1236         else
1237             must_delete = 0;
1238         close_block(b, p);
1239     }
1240     if (must_delete)
1241     {
1242         isamb_unlink(b, *pos);
1243         *pos = 0;
1244     }
1245 }
1246
1247 ISAMB_PP isamb_pp_open_x(ISAMB isamb, ISAM_P pos, int *level, int scope)
1248 {
1249     ISAMB_PP pp = xmalloc(sizeof(*pp));
1250     int i;
1251
1252     assert(pos);
1253
1254     pp->isamb = isamb;
1255     pp->block = xmalloc(ISAMB_MAX_LEVEL * sizeof(*pp->block));
1256
1257     pp->pos = pos;
1258     pp->level = 0;
1259     pp->maxlevel = 0;
1260     pp->total_size = 0;
1261     pp->no_blocks = 0;
1262     pp->skipped_numbers = 0;
1263     pp->returned_numbers = 0;
1264     pp->scope = scope;
1265     for (i = 0; i<ISAMB_MAX_LEVEL; i++)
1266         pp->skipped_nodes[i] = pp->accessed_nodes[i] = 0;
1267     while (1)
1268     {
1269         struct ISAMB_block *p = open_block(isamb, pos);
1270         const char *src = p->bytes + p->offset;
1271         pp->block[pp->level] = p;
1272
1273         pp->total_size += p->size;
1274         pp->no_blocks++;
1275         if (p->leaf)
1276             break;
1277         decode_ptr(&src, &pos);
1278         p->offset = src - p->bytes;
1279         pp->level++;
1280         pp->accessed_nodes[pp->level]++; 
1281     }
1282     pp->block[pp->level+1] = 0;
1283     pp->maxlevel = pp->level;
1284     if (level)
1285         *level = pp->level;
1286     return pp;
1287 }
1288
1289 ISAMB_PP isamb_pp_open (ISAMB isamb, ISAM_P pos, int scope)
1290 {
1291     return isamb_pp_open_x(isamb, pos, 0, scope);
1292 }
1293
1294 void isamb_pp_close_x(ISAMB_PP pp, zint *size, zint *blocks)
1295 {
1296     int i;
1297     if (!pp)
1298         return;
1299     yaz_log(YLOG_DEBUG, "isamb_pp_close lev=%d returned "ZINT_FORMAT" values, " 
1300                     "skipped "ZINT_FORMAT,
1301         pp->maxlevel, pp->skipped_numbers, pp->returned_numbers);
1302     for (i = pp->maxlevel; i>=0; i--)
1303         if (pp->skipped_nodes[i] || pp->accessed_nodes[i])
1304             yaz_log(YLOG_DEBUG, "isamb_pp_close  level leaf-%d: "
1305                             ZINT_FORMAT" read, "ZINT_FORMAT" skipped", i,
1306                  pp->accessed_nodes[i], pp->skipped_nodes[i]);
1307     pp->isamb->skipped_numbers += pp->skipped_numbers;
1308     pp->isamb->returned_numbers += pp->returned_numbers;
1309     for (i = pp->maxlevel; i>=0; i--)
1310     {
1311         pp->isamb->accessed_nodes[i] += pp->accessed_nodes[i];
1312         pp->isamb->skipped_nodes[i] += pp->skipped_nodes[i];
1313     }
1314     if (size)
1315         *size = pp->total_size;
1316     if (blocks)
1317         *blocks = pp->no_blocks;
1318     for (i = 0; i <= pp->level; i++)
1319         close_block(pp->isamb, pp->block[i]);
1320     xfree(pp->block);
1321     xfree(pp);
1322 }
1323
1324 int isamb_block_info (ISAMB isamb, int cat)
1325 {
1326     if (cat >= 0 && cat < isamb->no_cat)
1327         return isamb->file[cat].head.block_size;
1328     return -1;
1329 }
1330
1331 void isamb_pp_close (ISAMB_PP pp)
1332 {
1333     isamb_pp_close_x(pp, 0, 0);
1334 }
1335
1336 /* simple recursive dumper .. */
1337 static void isamb_dump_r (ISAMB b, ISAM_P pos, void (*pr)(const char *str),
1338                           int level)
1339 {
1340     char buf[1024];
1341     char prefix_str[1024];
1342     if (pos)
1343     {
1344         struct ISAMB_block *p = open_block(b, pos);
1345         sprintf(prefix_str, "%*s " ZINT_FORMAT " cat=%d size=%d max=%d items="
1346                 ZINT_FORMAT, level*2, "",
1347                 pos, p->cat, p->size, b->file[p->cat].head.block_max,
1348                 p->no_items);
1349         (*pr)(prefix_str);
1350         sprintf(prefix_str, "%*s " ZINT_FORMAT, level*2, "", pos);
1351         if (p->leaf)
1352         {
1353             while (p->offset < p->size)
1354             {
1355                 const char *src = p->bytes + p->offset;
1356                 char *dst = buf;
1357                 (*b->method->codec.decode)(p->decodeClientData, &dst, &src);
1358                 (*b->method->log_item)(YLOG_DEBUG, buf, prefix_str);
1359                 p->offset = src - (char*) p->bytes;
1360             }
1361             assert(p->offset == p->size);
1362         }
1363         else
1364         {
1365             const char *src = p->bytes + p->offset;
1366             ISAM_P sub;
1367
1368             decode_ptr(&src, &sub);
1369             p->offset = src - (char*) p->bytes;
1370
1371             isamb_dump_r(b, sub, pr, level+1);
1372             
1373             while (p->offset < p->size)
1374             {
1375 #if INT_ENCODE
1376                 char file_item_buf[DST_ITEM_MAX];
1377                 char *file_item = file_item_buf;
1378                 void *c1 = (*b->method->codec.start)();
1379                 (*b->method->codec.decode)(c1, &file_item, &src);
1380                 (*b->method->codec.stop)(c1);
1381                 (*b->method->log_item)(YLOG_DEBUG, file_item_buf, prefix_str);
1382 #else
1383                 zint item_len;
1384                 decode_item_len(&src, &item_len);
1385                 (*b->method->log_item)(YLOG_DEBUG, src, prefix_str);
1386                 src += item_len;
1387 #endif
1388                 decode_ptr(&src, &sub);
1389                 
1390                 p->offset = src - (char*) p->bytes;
1391                 
1392                 isamb_dump_r(b, sub, pr, level+1);
1393             }           
1394         }
1395         close_block(b, p);
1396     }
1397 }
1398
1399 void isamb_dump(ISAMB b, ISAM_P pos, void (*pr)(const char *str))
1400 {
1401     isamb_dump_r(b, pos, pr, 0);
1402 }
1403
1404 int isamb_pp_read(ISAMB_PP pp, void *buf)
1405 {
1406     return isamb_pp_forward(pp, buf, 0);
1407 }
1408
1409
1410 static int isamb_pp_on_right_node(ISAMB_PP pp, int level, const void *untilbuf)
1411 { /* looks one node higher to see if we should be on this node at all */
1412   /* useful in backing off quickly, and in avoiding tail descends */
1413   /* call with pp->level to begin with */
1414     struct ISAMB_block *p;
1415     int cmp;
1416     const char *src;
1417     ISAMB b = pp->isamb;
1418
1419     assert(level >= 0);
1420     if (level == 0)
1421     {
1422 #if ISAMB_DEBUG
1423             yaz_log(YLOG_DEBUG, "isamb_pp_on_right returning true for root");
1424 #endif
1425         return 1; /* we can never skip the root node */
1426     }
1427     level--;
1428     p = pp->block[level];
1429     assert(p->offset <= p->size);
1430     if (p->offset < p->size)
1431     {
1432 #if INT_ENCODE
1433         char file_item_buf[DST_ITEM_MAX];
1434         char *file_item = file_item_buf;
1435         void *c1 = (*b->method->codec.start)();
1436         assert(p->offset > 0); 
1437         src = p->bytes + p->offset;
1438         (*b->method->codec.decode)(c1, &file_item, &src);
1439         (*b->method->codec.stop)(c1);
1440         cmp = (*b->method->compare_item)(untilbuf, file_item_buf);
1441 #else
1442         zint item_len;
1443         assert(p->offset > 0); 
1444         src = p->bytes + p->offset;
1445         decode_item_len(&src, &item_len);
1446 #if ISAMB_DEBUG
1447         (*b->method->codec.log_item)(YLOG_DEBUG, untilbuf, "on_leaf: until");
1448         (*b->method->codec.log_item)(YLOG_DEBUG, src, "on_leaf: value");
1449 #endif
1450         cmp = (*b->method->compare_item)(untilbuf, src);
1451 #endif
1452         if (cmp < pp->scope)
1453         {  /* cmp<2 */
1454 #if ISAMB_DEBUG
1455             yaz_log(YLOG_DEBUG, "isamb_pp_on_right returning true "
1456                             "cmp=%d lev=%d ofs=%d", cmp, level, p->offset);
1457 #endif
1458             return 1; 
1459         }
1460         else
1461         {
1462 #if ISAMB_DEBUG
1463             yaz_log(YLOG_DEBUG, "isamb_pp_on_right returning false "
1464                             "cmp=%d lev=%d ofs=%d", cmp, level, p->offset);
1465 #endif
1466             return 0; 
1467         }
1468     }
1469     else {
1470 #if ISAMB_DEBUG
1471         yaz_log(YLOG_DEBUG, "isamb_pp_on_right at tail, looking higher "
1472                         "lev=%d", level);
1473 #endif
1474         return isamb_pp_on_right_node(pp, level, untilbuf);
1475     }
1476 } /* isamb_pp_on_right_node */
1477
1478 static int isamb_pp_read_on_leaf(ISAMB_PP pp, void *buf)
1479
1480     /* reads the next item on the current leaf, returns 0 if end of leaf*/
1481     struct ISAMB_block *p = pp->block[pp->level];
1482     char *dst;
1483     const char *src;
1484     assert(pp);
1485     assert(buf);
1486     if (p->offset == p->size)
1487     {
1488 #if ISAMB_DEBUG
1489         yaz_log(YLOG_DEBUG, "isamb_pp_read_on_leaf returning 0 on "
1490                 "node %d", p->pos);
1491 #endif
1492         return 0; /* at end of leaf */
1493     }
1494     src = p->bytes + p->offset;
1495     dst = buf;
1496     (*pp->isamb->method->codec.decode)(p->decodeClientData, &dst, &src);
1497     p->offset = src - (char*) p->bytes;
1498 #if ISAMB_DEBUG
1499     (*pp->isamb->method->codec.log_item)(YLOG_DEBUG, buf,
1500                                          "read_on_leaf returning 1");
1501 #endif
1502     pp->returned_numbers++;
1503     return 1;
1504 } /* read_on_leaf */
1505
1506 static int isamb_pp_forward_on_leaf(ISAMB_PP pp, void *buf, const void *untilbuf)
1507 { /* forwards on the current leaf, returns 0 if not found */
1508     int cmp;
1509     int skips = 0;
1510     while (1)
1511     {
1512         if (!isamb_pp_read_on_leaf(pp, buf))
1513             return 0;
1514         /* FIXME - this is an extra function call, inline the read? */
1515         cmp=(*pp->isamb->method->compare_item)(untilbuf, buf);
1516         if (cmp <pp->scope)
1517         {  /* cmp<2  found a good one */
1518 #if ISAMB_DEBUG
1519             if (skips)
1520                 yaz_log(YLOG_DEBUG, "isam_pp_fwd_on_leaf skipped %d items", skips);
1521 #endif
1522             pp->returned_numbers++;
1523             return 1;
1524         }
1525         if (!skips)
1526             if (!isamb_pp_on_right_node(pp, pp->level, untilbuf))
1527                 return 0; /* never mind the rest of this leaf */
1528         pp->skipped_numbers++;
1529         skips++;
1530     }
1531 } /* forward_on_leaf */
1532
1533 static int isamb_pp_climb_level(ISAMB_PP pp, ISAM_P *pos)
1534 { /* climbs higher in the tree, until finds a level with data left */
1535   /* returns the node to (consider to) descend to in *pos) */
1536     struct ISAMB_block *p = pp->block[pp->level];
1537     const char *src;
1538 #if ISAMB_DEBUG
1539     yaz_log(YLOG_DEBUG, "isamb_pp_climb_level starting "
1540                    "at level %d node %d ofs=%d sz=%d",
1541                     pp->level, p->pos, p->offset, p->size);
1542 #endif
1543     assert(pp->level >= 0);
1544     assert(p->offset <= p->size);
1545     if (pp->level==0)
1546     {
1547 #if ISAMB_DEBUG
1548         yaz_log(YLOG_DEBUG, "isamb_pp_climb_level returning 0 at root");
1549 #endif
1550         return 0;
1551     }
1552     assert(pp->level>0); 
1553     close_block(pp->isamb, pp->block[pp->level]);
1554     pp->block[pp->level] = 0;
1555     (pp->level)--;
1556     p = pp->block[pp->level];
1557 #if ISAMB_DEBUG
1558     yaz_log(YLOG_DEBUG, "isamb_pp_climb_level climbed to level %d node %d ofs=%d",
1559                     pp->level, p->pos, p->offset);
1560 #endif
1561     assert(!p->leaf);
1562     assert(p->offset <= p->size);
1563     if (p->offset == p->size)
1564     {
1565         /* we came from the last pointer, climb on */
1566         if (!isamb_pp_climb_level(pp, pos))
1567             return 0;
1568         p = pp->block[pp->level];
1569     }
1570     else
1571     {
1572 #if INT_ENCODE
1573         char file_item_buf[DST_ITEM_MAX];
1574         char *file_item = file_item_buf;
1575         ISAMB b = pp->isamb;
1576         void *c1 = (*b->method->codec.start)();
1577 #else
1578         zint item_len;
1579 #endif
1580         /* skip the child we just came from */
1581 #if ISAMB_DEBUG
1582         yaz_log(YLOG_DEBUG, "isam_pp_climb_level: skipping lev=%d ofs=%d sz=%d", 
1583                         pp->level, p->offset, p->size);
1584 #endif
1585         assert (p->offset < p->size);
1586         src = p->bytes + p->offset;
1587 #if INT_ENCODE
1588         (*b->method->codec.decode)(c1, &file_item, &src);
1589         (*b->method->codec.stop)(c1);
1590 #else
1591         decode_item_len(&src, &item_len);
1592         src += item_len;
1593 #endif
1594         decode_ptr(&src, pos);
1595         p->offset = src - (char *)p->bytes;
1596             
1597     }
1598     return 1;
1599 } /* climb_level */
1600
1601
1602 static zint isamb_pp_forward_unode(ISAMB_PP pp, zint pos, const void *untilbuf)
1603 { /* scans a upper node until it finds a child <= untilbuf */
1604   /* pp points to the key value, as always. pos is the child read from */
1605   /* the buffer */
1606   /* if all values are too small, returns the last child in the node */
1607   /* FIXME - this can be detected, and avoided by looking at the */
1608   /* parent node, but that gets messy. Presumably the cost is */
1609   /* pretty low anyway */
1610     ISAMB b = pp->isamb;
1611     struct ISAMB_block *p = pp->block[pp->level];
1612     const char *src = p->bytes + p->offset;
1613     int cmp;
1614     zint nxtpos;
1615 #if ISAMB_DEBUG
1616     int skips = 0;
1617     yaz_log(YLOG_DEBUG, "isamb_pp_forward_unode starting "
1618                    "at level %d node %d ofs=%di sz=%d",
1619                     pp->level, p->pos, p->offset, p->size);
1620 #endif
1621     assert(!p->leaf);
1622     assert(p->offset <= p->size);
1623     if (p->offset == p->size)
1624     {
1625 #if ISAMB_DEBUG
1626             yaz_log(YLOG_DEBUG, "isamb_pp_forward_unode returning at end "
1627                    "at level %d node %d ofs=%di sz=%d",
1628                     pp->level, p->pos, p->offset, p->size);
1629 #endif
1630         return pos; /* already at the end of it */
1631     }
1632     while(p->offset < p->size)
1633     {
1634 #if INT_ENCODE
1635         char file_item_buf[DST_ITEM_MAX];
1636         char *file_item = file_item_buf;
1637         void *c1 = (*b->method->codec.start)();
1638         (*b->method->codec.decode)(c1, &file_item, &src);
1639         (*b->method->codec.stop)(c1);
1640         cmp = (*b->method->compare_item)(untilbuf, file_item_buf);
1641 #else
1642         zint item_len;
1643         decode_item_len(&src, &item_len);
1644         cmp = (*b->method->compare_item)(untilbuf, src);
1645         src += item_len;
1646 #endif
1647         decode_ptr(&src, &nxtpos);
1648         if (cmp<pp->scope) /* cmp<2 */
1649         {
1650 #if ISAMB_DEBUG
1651             yaz_log(YLOG_DEBUG, "isamb_pp_forward_unode returning a hit "
1652                    "at level %d node %d ofs=%d sz=%d",
1653                     pp->level, p->pos, p->offset, p->size);
1654 #endif
1655             return pos;
1656         } /* found one */
1657         pos = nxtpos;
1658         p->offset = src-(char*)p->bytes;
1659         (pp->skipped_nodes[pp->maxlevel - pp->level -1])++;
1660 #if ISAMB_DEBUG
1661         skips++;
1662 #endif
1663     }
1664 #if ISAMB_DEBUG
1665             yaz_log(YLOG_DEBUG, "isamb_pp_forward_unode returning at tail "
1666                    "at level %d node %d ofs=%d sz=%d skips=%d",
1667                     pp->level, p->pos, p->offset, p->size, skips);
1668 #endif
1669     return pos; /* that's the last one in the line */
1670     
1671 } /* forward_unode */
1672
1673 static void isamb_pp_descend_to_leaf(ISAMB_PP pp, ISAM_P pos,
1674                                      const void *untilbuf)
1675 { /* climbs down the tree, from pos, to the leftmost leaf */
1676     struct ISAMB_block *p = pp->block[pp->level];
1677     const char *src;
1678     assert(!p->leaf);
1679 #if ISAMB_DEBUG
1680     yaz_log(YLOG_DEBUG, "isamb_pp_descend_to_leaf "
1681                    "starting at lev %d node %d ofs=%d lf=%d u=%p", 
1682                    pp->level, p->pos, p->offset, p->leaf, untilbuf);
1683 #endif
1684     if (untilbuf)
1685         pos = isamb_pp_forward_unode(pp, pos, untilbuf);
1686     ++(pp->level);
1687     assert(pos);
1688     p = open_block(pp->isamb, pos);
1689     pp->block[pp->level] = p;
1690     ++(pp->accessed_nodes[pp->maxlevel-pp->level]);
1691     ++(pp->no_blocks);
1692 #if ISAMB_DEBUG
1693     yaz_log(YLOG_DEBUG, "isamb_pp_descend_to_leaf "
1694                    "got lev %d node %d lf=%d", 
1695                    pp->level, p->pos, p->leaf);
1696 #endif
1697     if (p->leaf)
1698         return;
1699     assert (p->offset==0);
1700     src = p->bytes + p->offset;
1701     decode_ptr(&src, &pos);
1702     p->offset = src-(char*)p->bytes;
1703     isamb_pp_descend_to_leaf(pp, pos, untilbuf);
1704 #if ISAMB_DEBUG
1705     yaz_log(YLOG_DEBUG, "isamb_pp_descend_to_leaf "
1706                    "returning at lev %d node %d ofs=%d lf=%d", 
1707                    pp->level, p->pos, p->offset, p->leaf);
1708 #endif
1709 } /* descend_to_leaf */
1710
1711 static int isamb_pp_find_next_leaf(ISAMB_PP pp)
1712 { /* finds the next leaf by climbing up and down */
1713     ISAM_P pos;
1714     if (!isamb_pp_climb_level(pp, &pos))
1715         return 0;
1716     isamb_pp_descend_to_leaf(pp, pos, 0);
1717     return 1;
1718 }
1719
1720 static int isamb_pp_climb_desc(ISAMB_PP pp,  const void *untilbuf)
1721 { /* climbs up and descends to a leaf where values >= *untilbuf are found */
1722     ISAM_P pos;
1723 #if ISAMB_DEBUG
1724     struct ISAMB_block *p = pp->block[pp->level];
1725     yaz_log(YLOG_DEBUG, "isamb_pp_climb_desc starting "
1726                    "at level %d node %d ofs=%d sz=%d",
1727                     pp->level, p->pos, p->offset, p->size);
1728 #endif
1729     if (!isamb_pp_climb_level(pp, &pos))
1730         return 0;
1731     /* see if it would pay to climb one higher */
1732     if (!isamb_pp_on_right_node(pp, pp->level, untilbuf))
1733         if (!isamb_pp_climb_level(pp, &pos))
1734             return 0;
1735     isamb_pp_descend_to_leaf(pp, pos, untilbuf);
1736 #if ISAMB_DEBUG
1737     p = pp->block[pp->level];
1738     yaz_log(YLOG_DEBUG, "isamb_pp_climb_desc done "
1739                    "at level %d node %d ofs=%d sz=%d",
1740                     pp->level, p->pos, p->offset, p->size);
1741 #endif
1742     return 1;
1743 } /* climb_desc */
1744
1745 int isamb_pp_forward (ISAMB_PP pp, void *buf, const void *untilbuf)
1746 {
1747 #if ISAMB_DEBUG
1748     struct ISAMB_block *p = pp->block[pp->level];
1749     assert(p->leaf);
1750     yaz_log(YLOG_DEBUG, "isamb_pp_forward starting "
1751                    "at level %d node %d ofs=%d sz=%d u=%p sc=%d",
1752                     pp->level, p->pos, p->offset, p->size, untilbuf, scope);
1753 #endif
1754     if (untilbuf)
1755     {
1756         if (isamb_pp_forward_on_leaf(pp, buf, untilbuf))
1757         {
1758 #if ISAMB_DEBUG
1759             yaz_log(YLOG_DEBUG, "isamb_pp_forward (f) returning (A) "
1760                    "at level %d node %d ofs=%d sz=%d",
1761                     pp->level, p->pos, p->offset, p->size);
1762 #endif
1763             return 1;
1764         }
1765         if (! isamb_pp_climb_desc(pp, untilbuf))
1766         {
1767 #if ISAMB_DEBUG
1768             yaz_log(YLOG_DEBUG, "isamb_pp_forward (f) returning notfound (B) "
1769                    "at level %d node %d ofs=%d sz=%d",
1770                     pp->level, p->pos, p->offset, p->size);
1771 #endif
1772             return 0; /* could not find a leaf */
1773         }
1774         do {
1775             if (isamb_pp_forward_on_leaf(pp, buf, untilbuf))
1776             {
1777 #if ISAMB_DEBUG
1778                 yaz_log(YLOG_DEBUG, "isamb_pp_forward (f) returning (c) "
1779                         "at level %d node %d ofs=%d sz=%d",
1780                         pp->level, p->pos, p->offset, p->size);
1781 #endif
1782                 return 1;
1783             }
1784         } while (isamb_pp_find_next_leaf(pp));
1785         return 0; /* could not find at all */
1786     }
1787     else { /* no untilbuf, a straight read */
1788         /* FIXME - this should be moved
1789          * directly into the pp_read */
1790         /* keeping here now, to keep same
1791          * interface as the old fwd */
1792         if (isamb_pp_read_on_leaf(pp, buf))
1793         {
1794 #if ISAMB_DEBUG
1795             yaz_log(YLOG_DEBUG, "isamb_pp_forward (read) returning (D) "
1796                    "at level %d node %d ofs=%d sz=%d",
1797                     pp->level, p->pos, p->offset, p->size);
1798 #endif
1799             return 1;
1800         }
1801         if (isamb_pp_find_next_leaf(pp))
1802         {
1803 #if ISAMB_DEBUG
1804             yaz_log(YLOG_DEBUG, "isamb_pp_forward (read) returning (E) "
1805                    "at level %d node %d ofs=%d sz=%d",
1806                     pp->level, p->pos, p->offset, p->size);
1807 #endif
1808             return isamb_pp_read_on_leaf(pp, buf);
1809         }
1810         else
1811             return 0;
1812     }
1813 } /* isam_pp_forward (new version) */
1814
1815 void isamb_pp_pos(ISAMB_PP pp, double *current, double *total)
1816 { /* return an estimate of the current position and of the total number of */
1817   /* occureences in the isam tree, based on the current leaf */
1818     assert(total);
1819     assert(current);
1820     
1821     /* if end-of-stream PP may not be leaf */
1822
1823     *total = (double) (pp->block[0]->no_items);
1824     *current = (double) pp->returned_numbers;
1825 #if ISAMB_DEBUG
1826     yaz_log(YLOG_LOG, "isamb_pp_pos returning: cur= %0.1f tot=%0.1f rn="
1827          ZINT_FORMAT, *current, *total, pp->returned_numbers);
1828 #endif
1829 }
1830
1831 int isamb_pp_forward2(ISAMB_PP pp, void *buf, const void *untilb)
1832 {
1833     char *dst = buf;
1834     const char *src;
1835     struct ISAMB_block *p = pp->block[pp->level];
1836     ISAMB b = pp->isamb;
1837     if (!p)
1838         return 0;
1839 again:
1840     while (p->offset == p->size)
1841     {
1842         ISAM_P pos;
1843 #if INT_ENCODE
1844         const char *src_0;
1845         void *c1;
1846         char file_item_buf[DST_ITEM_MAX];
1847         char *file_item = file_item_buf;
1848 #else
1849         zint item_len;
1850 #endif
1851         while (p->offset == p->size)
1852         {
1853             if (pp->level == 0)
1854                 return 0;
1855             close_block (pp->isamb, pp->block[pp->level]);
1856             pp->block[pp->level] = 0;
1857             (pp->level)--;
1858             p = pp->block[pp->level];
1859             assert (!p->leaf);  
1860         }
1861
1862         assert(!p->leaf);
1863         src = p->bytes + p->offset;
1864        
1865 #if INT_ENCODE
1866         c1 = (*b->method->codec.start)();
1867         (*b->method->codec.decode)(c1, &file_item, &src);
1868 #else
1869         decode_ptr (&src, &item_len);
1870         src += item_len;
1871 #endif        
1872         decode_ptr (&src, &pos);
1873         p->offset = src - (char*) p->bytes;
1874
1875         src = p->bytes + p->offset;
1876
1877         while(1)
1878         {
1879             if (!untilb || p->offset == p->size)
1880                 break;
1881             assert(p->offset < p->size);
1882 #if INT_ENCODE
1883             src_0 = src;
1884             file_item = file_item_buf;
1885             (*b->method->codec.reset)(c1);
1886             (*b->method->codec.decode)(c1, &file_item, &src);
1887             if ((*b->method->compare_item)(untilb, file_item_buf) <= 1)
1888             {
1889                 src = src_0;
1890                 break;
1891             }
1892 #else
1893             decode_item_len(&src, &item_len);
1894             if ((*b->method->compare_item)(untilb, src) <= 1)
1895                 break;
1896             src += item_len;
1897 #endif
1898             decode_ptr (&src, &pos);
1899             p->offset = src - (char*) p->bytes;
1900         }
1901
1902         pp->level++;
1903
1904         while (1)
1905         {
1906             pp->block[pp->level] = p = open_block (pp->isamb, pos);
1907
1908             pp->total_size += p->size;
1909             pp->no_blocks++;
1910             
1911             if (p->leaf) 
1912             {
1913                 break;
1914             }
1915             
1916             src = p->bytes + p->offset;
1917             while(1)
1918             {
1919                 decode_ptr (&src, &pos);
1920                 p->offset = src - (char*) p->bytes;
1921                 
1922                 if (!untilb || p->offset == p->size)
1923                     break;
1924                 assert(p->offset < p->size);
1925 #if INT_ENCODE
1926                 src_0 = src;
1927                 file_item = file_item_buf;
1928                 (*b->method->codec.reset)(c1);
1929                 (*b->method->codec.decode)(c1, &file_item, &src);
1930                 if ((*b->method->compare_item)(untilb, file_item_buf) <= 1)
1931                 {
1932                     src = src_0;
1933                     break;
1934                 }
1935 #else
1936                 decode_ptr (&src, &item_len);
1937                 if ((*b->method->compare_item)(untilb, src) <= 1)
1938                     break;
1939                 src += item_len;
1940 #endif
1941             }
1942             pp->level++;
1943         }
1944 #if INT_ENCODE
1945         (*b->method->codec.stop)(c1);
1946 #endif
1947     }
1948     assert (p->offset < p->size);
1949     assert (p->leaf);
1950     while(1)
1951     {
1952         char *dst0 = dst;
1953         src = p->bytes + p->offset;
1954         (*pp->isamb->method->codec.decode)(p->decodeClientData, &dst, &src);
1955         p->offset = src - (char*) p->bytes;
1956         if (!untilb || (*pp->isamb->method->compare_item)(untilb, dst0) <= 1)
1957             break;
1958         dst = dst0;
1959         if (p->offset == p->size) goto again;
1960     }
1961     return 1;
1962 }
1963 /*
1964  * Local variables:
1965  * c-basic-offset: 4
1966  * indent-tabs-mode: nil
1967  * End:
1968  * vim: shiftwidth=4 tabstop=8 expandtab
1969  */
1970