Proper cleanup (isamb_close) for bad headers
[idzebra-moved-to-github.git] / isamb / isamb.c
1 /* $Id: isamb.c,v 1.92 2007-02-24 16:46:22 adam Exp $
2    Copyright (C) 1995-2007
3    Index Data ApS
4
5 This file is part of the Zebra server.
6
7 Zebra is free software; you can redistribute it and/or modify it under
8 the terms of the GNU General Public License as published by the Free
9 Software Foundation; either version 2, or (at your option) any later
10 version.
11
12 Zebra is distributed in the hope that it will be useful, but WITHOUT ANY
13 WARRANTY; without even the implied warranty of MERCHANTABILITY or
14 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
15 for more details.
16
17 You should have received a copy of the GNU General Public License
18 along with this program; if not, write to the Free Software
19 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
20
21 */
22
23 #include <stdlib.h>
24 #include <string.h>
25 #include <yaz/log.h>
26 #include <yaz/xmalloc.h>
27 #include <idzebra/isamb.h>
28 #include <assert.h>
29
30 #ifndef ISAMB_DEBUG
31 #define ISAMB_DEBUG 0
32 #endif
33
34
35 #define ISAMB_MAJOR_VERSION 3
36 #define ISAMB_MINOR_VERSION_NO_ROOT 0
37 #define ISAMB_MINOR_VERSION_WITH_ROOT 1
38
39 struct ISAMB_head {
40     zint first_block;
41     zint last_block;
42     zint free_list;
43     zint no_items;
44     int block_size;
45     int block_max;
46     int block_offset;
47 };
48
49 /* if 1, upper nodes items are encoded; 0 if not encoded */
50 #define INT_ENCODE 1
51
52 /* maximum size of encoded buffer */
53 #define DST_ITEM_MAX 256
54
55 #define ISAMB_MAX_LEVEL 10
56 /* approx 2*max page + max size of item */
57 #define DST_BUF_SIZE (2*4096+300)
58
59 /* should be maximum block size of multiple thereof */
60 #define ISAMB_CACHE_ENTRY_SIZE 4096
61
62 /* CAT_MAX: _must_ be power of 2 */
63 #define CAT_MAX 4
64 #define CAT_MASK (CAT_MAX-1)
65 /* CAT_NO: <= CAT_MAX */
66 #define CAT_NO 4
67
68 /* Smallest block size */
69 #define ISAMB_MIN_SIZE 32
70 /* Size factor */
71 #define ISAMB_FAC_SIZE 4
72
73 /* ISAMB_PTR_CODEC = 1 var, =0 fixed */
74 #define ISAMB_PTR_CODEC 1
75
76 struct ISAMB_cache_entry {
77     ISAM_P pos;
78     unsigned char *buf;
79     int dirty;
80     int hits;
81     struct ISAMB_cache_entry *next;
82 };
83
84 struct ISAMB_file {
85     BFile bf;
86     int head_dirty;
87     struct ISAMB_head head;
88     struct ISAMB_cache_entry *cache_entries;
89 };
90
91 struct ISAMB_s {
92     BFiles bfs;
93     ISAMC_M *method;
94
95     struct ISAMB_file *file;
96     int no_cat;
97     int cache; /* 0 = no cache, 1 = use cache, -1 = dummy isam (for testing only) */
98     int log_io;        /* log level for bf_read/bf_write calls */
99     int log_freelist;  /* log level for freelist handling */
100     zint skipped_numbers; /* on a leaf node */
101     zint returned_numbers; 
102     zint skipped_nodes[ISAMB_MAX_LEVEL]; /* [0]=skipped leaves, 1 = higher etc */
103     zint accessed_nodes[ISAMB_MAX_LEVEL]; /* nodes we did not skip */
104     zint number_of_int_splits;
105     zint number_of_leaf_splits;
106     int enable_int_count; /* whether we count nodes (or not) */
107     int cache_size; /* size of blocks to cache (if cache=1) */
108     int minor_version;
109     zint root_ptr;
110 };
111
112 struct ISAMB_block {
113     ISAM_P pos;
114     int cat;
115     int size;
116     int leaf;
117     int dirty;
118     int deleted;
119     int offset;
120     zint no_items;  /* number of nodes in this + children */
121     char *bytes;
122     char *cbuf;
123     unsigned char *buf;
124     void *decodeClientData;
125     int log_rw;
126 };
127
128 struct ISAMB_PP_s {
129     ISAMB isamb;
130     ISAM_P pos;
131     int level;
132     int maxlevel; /* total depth */
133     zint total_size;
134     zint no_blocks;
135     zint skipped_numbers; /* on a leaf node */
136     zint returned_numbers; 
137     zint skipped_nodes[ISAMB_MAX_LEVEL]; /* [0]=skipped leaves, 1 = higher etc */
138     zint accessed_nodes[ISAMB_MAX_LEVEL]; /* nodes we did not skip */
139     struct ISAMB_block **block;
140     int scope;  /* on what level we forward */
141 };
142
143
144 #define encode_item_len encode_ptr
145 #if ISAMB_PTR_CODEC
146 static void encode_ptr(char **dst, zint pos)
147 {
148     unsigned char *bp = (unsigned char*) *dst;
149
150     while (pos > 127)
151     {
152          *bp++ = (unsigned char) (128 | (pos & 127));
153          pos = pos >> 7;
154     }
155     *bp++ = (unsigned char) pos;
156     *dst = (char *) bp;
157 }
158 #else
159 static void encode_ptr(char **dst, zint pos)
160 {
161     memcpy(*dst, &pos, sizeof(pos));
162     (*dst) += sizeof(pos);
163 }
164 #endif
165
166 #define decode_item_len decode_ptr
167 #if ISAMB_PTR_CODEC
168 static void decode_ptr(const char **src, zint *pos)
169 {
170     zint d = 0;
171     unsigned char c;
172     unsigned r = 0;
173
174     while (((c = *(const unsigned char *)((*src)++)) & 128))
175     {
176         d += ((zint) (c & 127) << r);
177         r += 7;
178     }
179     d += ((zint) c << r);
180     *pos = d;
181 }
182 #else
183 static void decode_ptr(const char **src, zint *pos)
184 {
185      memcpy(pos, *src, sizeof(*pos));
186      (*src) += sizeof(*pos);
187 }
188 #endif
189
190
191 void isamb_set_int_count(ISAMB b, int v)
192 {
193     b->enable_int_count = v;
194 }
195
196 void isamb_set_cache_size(ISAMB b, int v)
197 {
198     b->cache_size = v;
199 }
200
201 ISAMB isamb_open2(BFiles bfs, const char *name, int writeflag, ISAMC_M *method,
202                   int cache, int no_cat, int *sizes, int use_root_ptr)
203 {
204     ISAMB isamb = xmalloc(sizeof(*isamb));
205     int i;
206
207     assert(no_cat <= CAT_MAX);
208
209     isamb->bfs = bfs;
210     isamb->method = (ISAMC_M *) xmalloc(sizeof(*method));
211     memcpy(isamb->method, method, sizeof(*method));
212     isamb->no_cat = no_cat;
213     isamb->log_io = 0;
214     isamb->log_freelist = 0;
215     isamb->cache = cache;
216     isamb->skipped_numbers = 0;
217     isamb->returned_numbers = 0;
218     isamb->number_of_int_splits = 0;
219     isamb->number_of_leaf_splits = 0;
220     isamb->enable_int_count = 1;
221     isamb->cache_size = 40;
222
223     if (use_root_ptr)
224         isamb->minor_version = ISAMB_MINOR_VERSION_WITH_ROOT;
225     else
226         isamb->minor_version = ISAMB_MINOR_VERSION_NO_ROOT;
227
228     isamb->root_ptr = 0;
229
230     for (i = 0; i<ISAMB_MAX_LEVEL; i++)
231         isamb->skipped_nodes[i] = isamb->accessed_nodes[i] = 0;
232
233     if (cache == -1)
234     {
235         yaz_log(YLOG_WARN, "isamb_open %s. Degraded TEST mode", name);
236     }
237     else
238     {
239         assert(cache == 0 || cache == 1);
240     }
241     isamb->file = xmalloc(sizeof(*isamb->file) * isamb->no_cat);
242
243     for (i = 0; i < isamb->no_cat; i++)
244     {
245         isamb->file[i].bf = 0;
246         isamb->file[i].head_dirty = 0;
247         isamb->file[i].cache_entries = 0;
248     }
249
250     for (i = 0; i < isamb->no_cat; i++)
251     {
252         char fname[DST_BUF_SIZE];
253         char hbuf[DST_BUF_SIZE];
254
255         sprintf(fname, "%s%c", name, i+'A');
256         if (cache)
257             isamb->file[i].bf = bf_open(bfs, fname, ISAMB_CACHE_ENTRY_SIZE,
258                                          writeflag);
259         else
260             isamb->file[i].bf = bf_open(bfs, fname, sizes[i], writeflag);
261
262         if (!isamb->file[i].bf)
263         {
264             isamb_close(isamb);
265             return 0;
266         }
267
268         /* fill-in default values (for empty isamb) */
269         isamb->file[i].head.first_block = ISAMB_CACHE_ENTRY_SIZE/sizes[i]+1;
270         isamb->file[i].head.last_block = isamb->file[i].head.first_block;
271         isamb->file[i].head.block_size = sizes[i];
272         assert(sizes[i] <= ISAMB_CACHE_ENTRY_SIZE);
273 #if ISAMB_PTR_CODEC
274         if (i == isamb->no_cat-1 || sizes[i] > 128)
275             isamb->file[i].head.block_offset = 8;
276         else
277             isamb->file[i].head.block_offset = 4;
278 #else
279         isamb->file[i].head.block_offset = 11;
280 #endif
281         isamb->file[i].head.block_max =
282             sizes[i] - isamb->file[i].head.block_offset;
283         isamb->file[i].head.free_list = 0;
284         if (bf_read(isamb->file[i].bf, 0, 0, 0, hbuf))
285         {
286             /* got header assume "isamb"major minor len can fit in 16 bytes */
287             zint zint_tmp;
288             int major, minor, len, pos = 0;
289             int left;
290             const char *src = 0;
291             if (memcmp(hbuf, "isamb", 5))
292             {
293                 yaz_log(YLOG_WARN, "bad isamb header for file %s", fname);
294                 isamb_close(isamb);
295                 return 0;
296             }
297             if (sscanf(hbuf+5, "%d %d %d", &major, &minor, &len) != 3)
298             {
299                 yaz_log(YLOG_WARN, "bad isamb header for file %s", fname);
300                 isamb_close(isamb);
301                 return 0;
302             }
303             if (major != ISAMB_MAJOR_VERSION)
304             {
305                 yaz_log(YLOG_WARN, "bad major version for file %s %d, must be %d",
306                      fname, major, ISAMB_MAJOR_VERSION);
307                 isamb_close(isamb);
308                 return 0;
309             }
310             for (left = len - sizes[i]; left > 0; left = left - sizes[i])
311             {
312                 pos++;
313                 if (!bf_read(isamb->file[i].bf, pos, 0, 0, hbuf + pos*sizes[i]))
314                 {
315                     yaz_log(YLOG_WARN, "truncated isamb header for " 
316                          "file=%s len=%d pos=%d",
317                          fname, len, pos);
318                     isamb_close(isamb);
319                     return 0;
320                 }
321             }
322             src = hbuf + 16;
323             decode_ptr(&src, &isamb->file[i].head.first_block);
324             decode_ptr(&src, &isamb->file[i].head.last_block);
325             decode_ptr(&src, &zint_tmp);
326             isamb->file[i].head.block_size = (int) zint_tmp;
327             decode_ptr(&src, &zint_tmp);
328             isamb->file[i].head.block_max = (int) zint_tmp;
329             decode_ptr(&src, &isamb->file[i].head.free_list);
330             if (isamb->minor_version >= ISAMB_MINOR_VERSION_WITH_ROOT)
331                 decode_ptr(&src, &isamb->root_ptr);
332         }
333         assert (isamb->file[i].head.block_size >= isamb->file[i].head.block_offset);
334         isamb->file[i].head_dirty = 0;
335         assert(isamb->file[i].head.block_size == sizes[i]);
336     }
337 #if ISAMB_DEBUG
338     yaz_log(YLOG_WARN, "isamb debug enabled. Things will be slower than usual");
339 #endif
340     return isamb;
341 }
342
343 ISAMB isamb_open(BFiles bfs, const char *name, int writeflag, ISAMC_M *method,
344                  int cache)
345 {
346     int sizes[CAT_NO];
347     int i, b_size = ISAMB_MIN_SIZE;
348     
349     for (i = 0; i<CAT_NO; i++)
350     {
351         sizes[i] = b_size;
352         b_size = b_size * ISAMB_FAC_SIZE;
353     }
354     return isamb_open2(bfs, name, writeflag, method, cache,
355                        CAT_NO, sizes, 0);
356 }
357
358 static void flush_blocks (ISAMB b, int cat)
359 {
360     while (b->file[cat].cache_entries)
361     {
362         struct ISAMB_cache_entry *ce_this = b->file[cat].cache_entries;
363         b->file[cat].cache_entries = ce_this->next;
364
365         if (ce_this->dirty)
366         {
367             yaz_log(b->log_io, "bf_write: flush_blocks");
368             bf_write(b->file[cat].bf, ce_this->pos, 0, 0, ce_this->buf);
369         }
370         xfree(ce_this->buf);
371         xfree(ce_this);
372     }
373 }
374
375 static int cache_block (ISAMB b, ISAM_P pos, unsigned char *userbuf, int wr)
376 {
377     int cat = (int) (pos&CAT_MASK);
378     int off = (int) (((pos/CAT_MAX) & 
379                (ISAMB_CACHE_ENTRY_SIZE / b->file[cat].head.block_size - 1))
380         * b->file[cat].head.block_size);
381     zint norm = pos / (CAT_MASK*ISAMB_CACHE_ENTRY_SIZE / b->file[cat].head.block_size);
382     int no = 0;
383     struct ISAMB_cache_entry **ce, *ce_this = 0, **ce_last = 0;
384
385     if (!b->cache)
386         return 0;
387
388     assert (ISAMB_CACHE_ENTRY_SIZE >= b->file[cat].head.block_size);
389     for (ce = &b->file[cat].cache_entries; *ce; ce = &(*ce)->next, no++)
390     {
391         ce_last = ce;
392         if ((*ce)->pos == norm)
393         {
394             ce_this = *ce;
395             *ce = (*ce)->next;   /* remove from list */
396             
397             ce_this->next = b->file[cat].cache_entries;  /* move to front */
398             b->file[cat].cache_entries = ce_this;
399             
400             if (wr)
401             {
402                 memcpy (ce_this->buf + off, userbuf, 
403                         b->file[cat].head.block_size);
404                 ce_this->dirty = 1;
405             }
406             else
407                 memcpy (userbuf, ce_this->buf + off,
408                         b->file[cat].head.block_size);
409             return 1;
410         }
411     }
412     if (no >= b->cache_size)
413     {
414         assert (ce_last && *ce_last);
415         ce_this = *ce_last;
416         *ce_last = 0;  /* remove the last entry from list */
417         if (ce_this->dirty)
418         {
419             yaz_log(b->log_io, "bf_write: cache_block");
420             bf_write(b->file[cat].bf, ce_this->pos, 0, 0, ce_this->buf);
421         }
422         xfree(ce_this->buf);
423         xfree(ce_this);
424     }
425     ce_this = xmalloc(sizeof(*ce_this));
426     ce_this->next = b->file[cat].cache_entries;
427     b->file[cat].cache_entries = ce_this;
428     ce_this->buf = xmalloc(ISAMB_CACHE_ENTRY_SIZE);
429     ce_this->pos = norm;
430     yaz_log(b->log_io, "bf_read: cache_block");
431     if (!bf_read(b->file[cat].bf, norm, 0, 0, ce_this->buf))
432         memset (ce_this->buf, 0, ISAMB_CACHE_ENTRY_SIZE);
433     if (wr)
434     {
435         memcpy (ce_this->buf + off, userbuf, b->file[cat].head.block_size);
436         ce_this->dirty = 1;
437     }
438     else
439     {
440         ce_this->dirty = 0;
441         memcpy (userbuf, ce_this->buf + off, b->file[cat].head.block_size);
442     }
443     return 1;
444 }
445
446
447 void isamb_close (ISAMB isamb)
448 {
449     int i;
450     for (i = 0; isamb->accessed_nodes[i]; i++)
451         yaz_log(YLOG_DEBUG, "isamb_close  level leaf-%d: "ZINT_FORMAT" read, "
452                         ZINT_FORMAT" skipped",
453              i, isamb->accessed_nodes[i], isamb->skipped_nodes[i]);
454     yaz_log(YLOG_DEBUG, "isamb_close returned "ZINT_FORMAT" values, "
455                    "skipped "ZINT_FORMAT,
456          isamb->skipped_numbers, isamb->returned_numbers);
457     for (i = 0; i<isamb->no_cat; i++)
458     {
459         flush_blocks (isamb, i);
460         if (isamb->file[i].head_dirty)
461         {
462             char hbuf[DST_BUF_SIZE];
463             int major = ISAMB_MAJOR_VERSION;
464             int len = 16;
465             char *dst = hbuf + 16;
466             int pos = 0, left;
467             int b_size = isamb->file[i].head.block_size;
468
469             encode_ptr(&dst, isamb->file[i].head.first_block);
470             encode_ptr(&dst, isamb->file[i].head.last_block);
471             encode_ptr(&dst, isamb->file[i].head.block_size);
472             encode_ptr(&dst, isamb->file[i].head.block_max);
473             encode_ptr(&dst, isamb->file[i].head.free_list);
474
475             if (isamb->minor_version >= ISAMB_MINOR_VERSION_WITH_ROOT)
476                 encode_ptr(&dst, isamb->root_ptr);
477
478             memset(dst, '\0', b_size); /* ensure no random bytes are written */
479
480             len = dst - hbuf;
481
482             /* print exactly 16 bytes (including trailing 0) */
483             sprintf(hbuf, "isamb%02d %02d %02d\r\n", major,
484                     isamb->minor_version, len);
485
486             bf_write(isamb->file[i].bf, pos, 0, 0, hbuf);
487
488             for (left = len - b_size; left > 0; left = left - b_size)
489             {
490                 pos++;
491                 bf_write(isamb->file[i].bf, pos, 0, 0, hbuf + pos*b_size);
492             }
493         }
494         if (isamb->file[i].bf)
495             bf_close (isamb->file[i].bf);
496     }
497     xfree(isamb->file);
498     xfree(isamb->method);
499     xfree(isamb);
500 }
501
502 /* open_block: read one block at pos.
503    Decode leading sys bytes .. consisting of
504    Offset:Meaning
505    0: leader byte, != 0 leaf, == 0, non-leaf
506    1-2: used size of block
507    3-7*: number of items and all children
508    
509    * Reserve 5 bytes for large block sizes. 1 for small ones .. Number
510    of items. We can thus have at most 2^40 nodes. 
511 */
512 static struct ISAMB_block *open_block(ISAMB b, ISAM_P pos)
513 {
514     int cat = (int) (pos&CAT_MASK);
515     const char *src;
516     int offset = b->file[cat].head.block_offset;
517     struct ISAMB_block *p;
518     if (!pos)
519         return 0;
520     p = xmalloc(sizeof(*p));
521     p->pos = pos;
522     p->cat = (int) (pos & CAT_MASK);
523     p->buf = xmalloc(b->file[cat].head.block_size);
524     p->cbuf = 0;
525
526     if (!cache_block (b, pos, p->buf, 0))
527     {
528         yaz_log(b->log_io, "bf_read: open_block");
529         if (bf_read(b->file[cat].bf, pos/CAT_MAX, 0, 0, p->buf) != 1)
530         {
531             yaz_log(YLOG_FATAL, "isamb: read fail for pos=%ld block=%ld",
532                      (long) pos, (long) pos/CAT_MAX);
533             zebra_exit("isamb:open_block");
534         }
535     }
536     p->bytes = (char *)p->buf + offset;
537     p->leaf = p->buf[0];
538     p->size = (p->buf[1] + 256 * p->buf[2]) - offset;
539     if (p->size < 0)
540     {
541         yaz_log(YLOG_FATAL, "Bad block size %d in pos=" ZINT_FORMAT "\n",
542                  p->size, pos);
543     }
544     assert (p->size >= 0);
545     src = (char*) p->buf + 3;
546     decode_ptr(&src, &p->no_items);
547
548     p->offset = 0;
549     p->dirty = 0;
550     p->deleted = 0;
551     p->decodeClientData = (*b->method->codec.start)();
552     return p;
553 }
554
555 struct ISAMB_block *new_block (ISAMB b, int leaf, int cat)
556 {
557     struct ISAMB_block *p;
558
559     p = xmalloc(sizeof(*p));
560     p->buf = xmalloc(b->file[cat].head.block_size);
561
562     if (!b->file[cat].head.free_list)
563     {
564         zint block_no;
565         block_no = b->file[cat].head.last_block++;
566         p->pos = block_no * CAT_MAX + cat;
567     }
568     else
569     {
570         p->pos = b->file[cat].head.free_list;
571         assert((p->pos & CAT_MASK) == cat);
572         if (!cache_block (b, p->pos, p->buf, 0))
573         {
574             yaz_log(b->log_io, "bf_read: new_block");
575             if (!bf_read(b->file[cat].bf, p->pos/CAT_MAX, 0, 0, p->buf))
576             {
577                 yaz_log(YLOG_FATAL, "isamb: read fail for pos=%ld block=%ld",
578                          (long) p->pos/CAT_MAX, (long) p->pos/CAT_MAX);
579                 zebra_exit("isamb:new_block");
580             }
581         }
582         yaz_log(b->log_freelist, "got block " ZINT_FORMAT " from freelist %d:" ZINT_FORMAT, p->pos,
583                  cat, p->pos/CAT_MAX);
584         memcpy (&b->file[cat].head.free_list, p->buf, sizeof(zint));
585     }
586     p->cat = cat;
587     b->file[cat].head_dirty = 1;
588     memset (p->buf, 0, b->file[cat].head.block_size);
589     p->bytes = (char*)p->buf + b->file[cat].head.block_offset;
590     p->leaf = leaf;
591     p->size = 0;
592     p->dirty = 1;
593     p->deleted = 0;
594     p->offset = 0;
595     p->no_items = 0;
596     p->decodeClientData = (*b->method->codec.start)();
597     return p;
598 }
599
600 struct ISAMB_block *new_leaf (ISAMB b, int cat)
601 {
602     return new_block (b, 1, cat);
603 }
604
605
606 struct ISAMB_block *new_int (ISAMB b, int cat)
607 {
608     return new_block (b, 0, cat);
609 }
610
611 static void check_block (ISAMB b, struct ISAMB_block *p)
612 {
613     assert(b); /* mostly to make the compiler shut up about unused b */
614     if (p->leaf)
615     {
616         ;
617     }
618     else
619     {
620         /* sanity check */
621         char *startp = p->bytes;
622         const char *src = startp;
623         char *endp = p->bytes + p->size;
624         ISAM_P pos;
625         void *c1 = (*b->method->codec.start)();
626             
627         decode_ptr(&src, &pos);
628         assert ((pos&CAT_MASK) == p->cat);
629         while (src != endp)
630         {
631 #if INT_ENCODE
632             char file_item_buf[DST_ITEM_MAX];
633             char *file_item = file_item_buf;
634             (*b->method->codec.reset)(c1);
635             (*b->method->codec.decode)(c1, &file_item, &src);
636 #else
637             zint item_len;
638             decode_item_len(&src, &item_len);
639             assert (item_len > 0 && item_len < 128);
640             src += item_len;
641 #endif
642             decode_ptr(&src, &pos);
643             if ((pos&CAT_MASK) != p->cat)
644             {
645                 assert ((pos&CAT_MASK) == p->cat);
646             }
647         }
648         (*b->method->codec.stop)(c1);
649     }
650 }
651
652 void close_block(ISAMB b, struct ISAMB_block *p)
653 {
654     if (!p)
655         return;
656     if (p->deleted)
657     {
658         yaz_log(b->log_freelist, "release block " ZINT_FORMAT " from freelist %d:" ZINT_FORMAT,
659                  p->pos, p->cat, p->pos/CAT_MAX);
660         memcpy (p->buf, &b->file[p->cat].head.free_list, sizeof(zint));
661         b->file[p->cat].head.free_list = p->pos;
662         if (!cache_block (b, p->pos, p->buf, 1))
663         {
664             yaz_log(b->log_io, "bf_write: close_block (deleted)");
665             bf_write(b->file[p->cat].bf, p->pos/CAT_MAX, 0, 0, p->buf);
666         }
667     }
668     else if (p->dirty)
669     {
670         int offset = b->file[p->cat].head.block_offset;
671         int size = p->size + offset;
672         char *dst =  (char*)p->buf + 3;
673         assert (p->size >= 0);
674         
675         /* memset becuase encode_ptr usually does not write all bytes */
676         memset(p->buf, 0, b->file[p->cat].head.block_offset);
677         p->buf[0] = p->leaf;
678         p->buf[1] = size & 255;
679         p->buf[2] = size >> 8;
680         encode_ptr(&dst, p->no_items);
681         check_block(b, p);
682         if (!cache_block (b, p->pos, p->buf, 1))
683         {
684             yaz_log(b->log_io, "bf_write: close_block");
685             bf_write(b->file[p->cat].bf, p->pos/CAT_MAX, 0, 0, p->buf);
686         }
687     }
688     (*b->method->codec.stop)(p->decodeClientData);
689     xfree(p->buf);
690     xfree(p);
691 }
692
693 int insert_sub (ISAMB b, struct ISAMB_block **p,
694                 void *new_item, int *mode,
695                 ISAMC_I *stream,
696                 struct ISAMB_block **sp,
697                 void *sub_item, int *sub_size,
698                 const void *max_item);
699
700 int insert_int (ISAMB b, struct ISAMB_block *p, void *lookahead_item,
701                 int *mode,
702                 ISAMC_I *stream, struct ISAMB_block **sp,
703                 void *split_item, int *split_size, const void *last_max_item)
704 {
705     char *startp = p->bytes;
706     const char *src = startp;
707     char *endp = p->bytes + p->size;
708     ISAM_P pos;
709     struct ISAMB_block *sub_p1 = 0, *sub_p2 = 0;
710     char sub_item[DST_ITEM_MAX];
711     int sub_size;
712     int more = 0;
713     zint diff_terms = 0;
714     void *c1 = (*b->method->codec.start)();
715
716     *sp = 0;
717
718     assert(p->size >= 0);
719     decode_ptr(&src, &pos);
720     while (src != endp)
721     {
722         int d;
723         const char *src0 = src;
724 #if INT_ENCODE
725         char file_item_buf[DST_ITEM_MAX];
726         char *file_item = file_item_buf;
727         (*b->method->codec.reset)(c1);
728         (*b->method->codec.decode)(c1, &file_item, &src);
729         d = (*b->method->compare_item)(file_item_buf, lookahead_item);
730         if (d > 0)
731         {
732             sub_p1 = open_block(b, pos);
733             assert (sub_p1);
734             diff_terms -= sub_p1->no_items;
735             more = insert_sub (b, &sub_p1, lookahead_item, mode,
736                                stream, &sub_p2, 
737                                sub_item, &sub_size, file_item_buf);
738             diff_terms += sub_p1->no_items;
739             src = src0;
740             break;
741         }
742 #else
743         zint item_len;
744         decode_item_len(&src, &item_len);
745         d = (*b->method->compare_item)(src, lookahead_item);
746         if (d > 0)
747         {
748             sub_p1 = open_block(b, pos);
749             assert (sub_p1);
750             diff_terms -= sub_p1->no_items;
751             more = insert_sub (b, &sub_p1, lookahead_item, mode,
752                                stream, &sub_p2, 
753                                sub_item, &sub_size, src);
754             diff_terms += sub_p1->no_items;
755             src = src0;
756             break;
757         }
758         src += item_len;
759 #endif
760         decode_ptr(&src, &pos);
761     }
762     if (!sub_p1)
763     {
764         /* we reached the end. So lookahead > last item */
765         sub_p1 = open_block(b, pos);
766         assert (sub_p1);
767         diff_terms -= sub_p1->no_items;
768         more = insert_sub (b, &sub_p1, lookahead_item, mode, stream, &sub_p2, 
769                            sub_item, &sub_size, last_max_item);
770         diff_terms += sub_p1->no_items;
771     }
772     if (sub_p2)
773         diff_terms += sub_p2->no_items;
774     if (diff_terms)
775     {
776         p->dirty = 1;
777         p->no_items += diff_terms;
778     }
779     if (sub_p2)
780     {
781         /* there was a split - must insert pointer in this one */
782         char dst_buf[DST_BUF_SIZE];
783         char *dst = dst_buf;
784 #if INT_ENCODE
785         const char *sub_item_ptr = sub_item;
786 #endif
787         assert (sub_size < 128 && sub_size > 1);
788
789         memcpy (dst, startp, src - startp);
790                 
791         dst += src - startp;
792
793 #if INT_ENCODE
794         (*b->method->codec.reset)(c1);
795         (*b->method->codec.encode)(c1, &dst, &sub_item_ptr);
796 #else
797         encode_item_len (&dst, sub_size);      /* sub length and item */
798         memcpy (dst, sub_item, sub_size);
799         dst += sub_size;
800 #endif
801
802         encode_ptr(&dst, sub_p2->pos);   /* pos */
803
804         if (endp - src)                   /* remaining data */
805         {
806             memcpy (dst, src, endp - src);
807             dst += endp - src;
808         }
809         p->size = dst - dst_buf;
810         assert (p->size >= 0);
811
812         if (p->size <= b->file[p->cat].head.block_max)
813         {
814             /* it fits OK in this block */
815             memcpy (startp, dst_buf, dst - dst_buf);
816
817             close_block(b, sub_p2);
818         }
819         else
820         {
821             /* must split _this_ block as well .. */
822             struct ISAMB_block *sub_p3;
823 #if INT_ENCODE
824             char file_item_buf[DST_ITEM_MAX];
825             char *file_item = file_item_buf;
826 #else
827             zint split_size_tmp;
828 #endif
829             zint no_items_first_half = 0;
830             int p_new_size;
831             const char *half;
832             src = dst_buf;
833             endp = dst;
834             
835             b->number_of_int_splits++;
836
837             p->dirty = 1;
838             close_block(b, sub_p2);
839
840             half = src + b->file[p->cat].head.block_size/2;
841             decode_ptr(&src, &pos);
842
843             if (b->enable_int_count)
844             {
845                 /* read sub block so we can get no_items for it */
846                 sub_p3 = open_block(b, pos);
847                 no_items_first_half += sub_p3->no_items;
848                 close_block(b, sub_p3);
849             }
850
851             while (src <= half)
852             {
853 #if INT_ENCODE
854                 file_item = file_item_buf;
855                 (*b->method->codec.reset)(c1);
856                 (*b->method->codec.decode)(c1, &file_item, &src);
857 #else
858                 decode_item_len(&src, &split_size_tmp);
859                 *split_size = (int) split_size_tmp;
860                 src += *split_size;
861 #endif
862                 decode_ptr(&src, &pos);
863
864                 if (b->enable_int_count)
865                 {
866                     /* read sub block so we can get no_items for it */
867                     sub_p3 = open_block(b, pos);
868                     no_items_first_half += sub_p3->no_items;
869                     close_block(b, sub_p3);
870                 }
871             }
872             /*  p is first half */
873             p_new_size = src - dst_buf;
874             memcpy (p->bytes, dst_buf, p_new_size);
875
876 #if INT_ENCODE
877             file_item = file_item_buf;
878             (*b->method->codec.reset)(c1);
879             (*b->method->codec.decode)(c1, &file_item, &src);
880             *split_size = file_item - file_item_buf;
881             memcpy(split_item, file_item_buf, *split_size);
882 #else
883             decode_item_len(&src, &split_size_tmp);
884             *split_size = (int) split_size_tmp;
885             memcpy (split_item, src, *split_size);
886             src += *split_size;
887 #endif
888             /*  *sp is second half */
889             *sp = new_int (b, p->cat);
890             (*sp)->size = endp - src;
891             memcpy ((*sp)->bytes, src, (*sp)->size);
892
893             p->size = p_new_size;
894
895             /*  adjust no_items in first&second half */
896             (*sp)->no_items = p->no_items - no_items_first_half;
897             p->no_items = no_items_first_half;
898         }
899         p->dirty = 1;
900     }
901     close_block(b, sub_p1);
902     (*b->method->codec.stop)(c1);
903     return more;
904 }
905
906 int insert_leaf (ISAMB b, struct ISAMB_block **sp1, void *lookahead_item,
907                  int *lookahead_mode, ISAMC_I *stream,
908                  struct ISAMB_block **sp2,
909                  void *sub_item, int *sub_size,
910                  const void *max_item)
911 {
912     struct ISAMB_block *p = *sp1;
913     char *endp = 0;
914     const char *src = 0;
915     char dst_buf[DST_BUF_SIZE], *dst = dst_buf;
916     int new_size;
917     void *c1 = (*b->method->codec.start)();
918     void *c2 = (*b->method->codec.start)();
919     int more = 1;
920     int quater = b->file[b->no_cat-1].head.block_max / 4;
921     char *mid_cut = dst_buf + quater * 2;
922     char *tail_cut = dst_buf + quater * 3;
923     char *maxp = dst_buf + b->file[b->no_cat-1].head.block_max;
924     char *half1 = 0;
925     char *half2 = 0;
926     char cut_item_buf[DST_ITEM_MAX];
927     int cut_item_size = 0;
928     int no_items = 0;    /* number of items (total) */
929     int no_items_1 = 0;  /* number of items (first half) */
930     int inserted_dst_bytes = 0;
931
932     if (p && p->size)
933     {
934         char file_item_buf[DST_ITEM_MAX];
935         char *file_item = file_item_buf;
936             
937         src = p->bytes;
938         endp = p->bytes + p->size;
939         (*b->method->codec.decode)(c1, &file_item, &src);
940         while (1)
941         {
942             const char *dst_item = 0; /* resulting item to be inserted */
943             char *lookahead_next;
944             char *dst_0 = dst;
945             int d = -1;
946             
947             if (lookahead_item)
948                 d = (*b->method->compare_item)(file_item_buf, lookahead_item);
949             
950             /* d now holds comparison between existing file item and
951                lookahead item 
952                d = 0: equal
953                d > 0: lookahead before file
954                d < 0: lookahead after file
955             */
956             if (d > 0)
957             {
958                 /* lookahead must be inserted */
959                 dst_item = lookahead_item;
960                 /* if this is not an insertion, it's really bad .. */
961                 if (!*lookahead_mode)
962                 {
963                     yaz_log(YLOG_WARN, "isamb: Inconsistent register (1)");
964                     assert(*lookahead_mode);
965                 }
966             }
967             else
968                 dst_item = file_item_buf;
969
970             if (!*lookahead_mode && d == 0)
971             {
972                 /* it's a deletion and they match so there is nothing to be
973                    inserted anyway .. But mark the thing bad (file item
974                    was part of input.. The item will not be part of output */
975                 p->dirty = 1;
976             }
977             else if (!half1 && dst > mid_cut)
978             {
979                 /* we have reached the splitting point for the first time */
980                 const char *dst_item_0 = dst_item;
981                 half1 = dst; /* candidate for splitting */
982
983                 /* encode the resulting item */
984                 (*b->method->codec.encode)(c2, &dst, &dst_item);
985                 
986                 cut_item_size = dst_item - dst_item_0;
987                 assert(cut_item_size > 0);
988                 memcpy (cut_item_buf, dst_item_0, cut_item_size);
989                 
990                 half2 = dst;
991                 no_items_1 = no_items;
992                 no_items++;
993             }
994             else
995             {
996                 /* encode the resulting item */
997                 (*b->method->codec.encode)(c2, &dst, &dst_item);
998                 no_items++;
999             }
1000
1001             /* now move "pointers" .. result has been encoded .. */
1002             if (d > 0)  
1003             {
1004                 /* we must move the lookahead pointer */
1005
1006                 inserted_dst_bytes += (dst - dst_0);
1007                 if (inserted_dst_bytes >=  quater)
1008                     /* no more room. Mark lookahead as "gone".. */
1009                     lookahead_item = 0;
1010                 else
1011                 {
1012                     /* move it really.. */
1013                     lookahead_next = lookahead_item;
1014                     if (!(*stream->read_item)(stream->clientData,
1015                                               &lookahead_next,
1016                                               lookahead_mode))
1017                     {
1018                         /* end of stream reached: no "more" and no lookahead */
1019                         lookahead_item = 0;
1020                         more = 0;
1021                     }
1022                     if (lookahead_item && max_item &&
1023                         (*b->method->compare_item)(max_item, lookahead_item) <= 0)
1024                     {
1025                         /* the lookahead goes beyond what we allow in this
1026                            leaf. Mark it as "gone" */
1027                         lookahead_item = 0;
1028                     }
1029                     
1030                     p->dirty = 1;
1031                 }
1032             }
1033             else if (d == 0)
1034             {
1035                 /* exact match .. move both pointers */
1036
1037                 lookahead_next = lookahead_item;
1038                 if (!(*stream->read_item)(stream->clientData,
1039                                           &lookahead_next, lookahead_mode))
1040                 {
1041                     lookahead_item = 0;
1042                     more = 0;
1043                 }
1044                 if (src == endp)
1045                     break;  /* end of file stream reached .. */
1046                 file_item = file_item_buf; /* move file pointer */
1047                 (*b->method->codec.decode)(c1, &file_item, &src);
1048             }
1049             else
1050             {
1051                 /* file pointer must be moved */
1052                 if (src == endp)
1053                     break;
1054                 file_item = file_item_buf;
1055                 (*b->method->codec.decode)(c1, &file_item, &src);
1056             }
1057         }
1058     }
1059
1060     /* this loop runs when we are "appending" to a leaf page. That is
1061        either it's empty (new) or all file items have been read in
1062        previous loop */
1063
1064     maxp = dst_buf + b->file[b->no_cat-1].head.block_max + quater;
1065     while (lookahead_item)
1066     {
1067         char *dst_item;
1068         const char *src = lookahead_item;
1069         char *dst_0 = dst;
1070         
1071         /* if we have a lookahead item, we stop if we exceed the value of it */
1072         if (max_item &&
1073             (*b->method->compare_item)(max_item, lookahead_item) <= 0)
1074         {
1075             /* stop if we have reached the value of max item */
1076             break;
1077         }
1078         if (!*lookahead_mode)
1079         {
1080             /* this is append. So a delete is bad */
1081             yaz_log(YLOG_WARN, "isamb: Inconsistent register (2)");
1082             assert(*lookahead_mode);
1083         }
1084         else if (!half1 && dst > tail_cut)
1085         {
1086             const char *src_0 = src;
1087             half1 = dst; /* candidate for splitting */
1088             
1089             (*b->method->codec.encode)(c2, &dst, &src);
1090             
1091             cut_item_size = src - src_0;
1092             assert(cut_item_size > 0);
1093             memcpy (cut_item_buf, src_0, cut_item_size);
1094             
1095             no_items_1 = no_items;
1096             half2 = dst;
1097         }
1098         else
1099             (*b->method->codec.encode)(c2, &dst, &src);
1100
1101         if (dst > maxp)
1102         {
1103             dst = dst_0;
1104             break;
1105         }
1106         no_items++;
1107         if (p)
1108             p->dirty = 1;
1109         dst_item = lookahead_item;
1110         if (!(*stream->read_item)(stream->clientData, &dst_item,
1111                                   lookahead_mode))
1112         {
1113             lookahead_item = 0;
1114             more = 0;
1115         }
1116     }
1117     new_size = dst - dst_buf;
1118     if (p && p->cat != b->no_cat-1 && 
1119         new_size > b->file[p->cat].head.block_max)
1120     {
1121         /* non-btree block will be removed */
1122         p->deleted = 1;
1123         close_block(b, p);
1124         /* delete it too!! */
1125         p = 0; /* make a new one anyway */
1126     }
1127     if (!p)
1128     {   /* must create a new one */
1129         int i;
1130         for (i = 0; i < b->no_cat; i++)
1131             if (new_size <= b->file[i].head.block_max)
1132                 break;
1133         if (i == b->no_cat)
1134             i = b->no_cat - 1;
1135         p = new_leaf (b, i);
1136     }
1137     if (new_size > b->file[p->cat].head.block_max)
1138     {
1139         char *first_dst;
1140         const char *cut_item = cut_item_buf;
1141
1142         assert (half1);
1143         assert (half2);
1144
1145         assert(cut_item_size > 0);
1146         
1147         /* first half */
1148         p->size = half1 - dst_buf;
1149         assert(p->size <=  b->file[p->cat].head.block_max);
1150         memcpy (p->bytes, dst_buf, half1 - dst_buf);
1151         p->no_items = no_items_1;
1152
1153         /* second half */
1154         *sp2 = new_leaf (b, p->cat);
1155
1156         (*b->method->codec.reset)(c2);
1157
1158         b->number_of_leaf_splits++;
1159
1160         first_dst = (*sp2)->bytes;
1161
1162         (*b->method->codec.encode)(c2, &first_dst, &cut_item);
1163
1164         memcpy (first_dst, half2, dst - half2);
1165
1166         (*sp2)->size = (first_dst - (*sp2)->bytes) + (dst - half2);
1167         assert((*sp2)->size <=  b->file[p->cat].head.block_max);
1168         (*sp2)->no_items = no_items - no_items_1;
1169         (*sp2)->dirty = 1;
1170         p->dirty = 1;
1171         memcpy (sub_item, cut_item_buf, cut_item_size);
1172         *sub_size = cut_item_size;
1173     }
1174     else
1175     {
1176         memcpy (p->bytes, dst_buf, dst - dst_buf);
1177         p->size = new_size;
1178         p->no_items = no_items;
1179     }
1180     (*b->method->codec.stop)(c1);
1181     (*b->method->codec.stop)(c2);
1182     *sp1 = p;
1183     return more;
1184 }
1185
1186 int insert_sub (ISAMB b, struct ISAMB_block **p, void *new_item,
1187                 int *mode,
1188                 ISAMC_I *stream,
1189                 struct ISAMB_block **sp,
1190                 void *sub_item, int *sub_size,
1191                 const void *max_item)
1192 {
1193     if (!*p || (*p)->leaf)
1194         return insert_leaf (b, p, new_item, mode, stream, sp, sub_item, 
1195                             sub_size, max_item);
1196     else
1197         return insert_int (b, *p, new_item, mode, stream, sp, sub_item,
1198                            sub_size, max_item);
1199 }
1200
1201 int isamb_unlink (ISAMB b, ISAM_P pos)
1202 {
1203     struct ISAMB_block *p1;
1204
1205     if (!pos)
1206         return 0;
1207     p1 = open_block(b, pos);
1208     p1->deleted = 1;
1209     if (!p1->leaf)
1210     {
1211         zint sub_p;
1212         const char *src = p1->bytes + p1->offset;
1213 #if INT_ENCODE
1214         void *c1 = (*b->method->codec.start)();
1215 #endif
1216         decode_ptr(&src, &sub_p);
1217         isamb_unlink(b, sub_p);
1218         
1219         while (src != p1->bytes + p1->size)
1220         {
1221 #if INT_ENCODE
1222             char file_item_buf[DST_ITEM_MAX];
1223             char *file_item = file_item_buf;
1224             (*b->method->codec.reset)(c1);
1225             (*b->method->codec.decode)(c1, &file_item, &src);
1226 #else
1227             zint item_len;
1228             decode_item_len(&src, &item_len);
1229             src += item_len;
1230 #endif
1231             decode_ptr(&src, &sub_p);
1232             isamb_unlink(b, sub_p);
1233         }
1234 #if INT_ENCODE
1235         (*b->method->codec.stop)(c1);
1236 #endif
1237     }
1238     close_block(b, p1);
1239     return 0;
1240 }
1241
1242 void isamb_merge(ISAMB b, ISAM_P *pos, ISAMC_I *stream)
1243 {
1244     char item_buf[DST_ITEM_MAX];
1245     char *item_ptr;
1246     int i_mode;
1247     int more;
1248     int must_delete = 0;
1249
1250     if (b->cache < 0)
1251     {
1252         int more = 1;
1253         while (more)
1254         {
1255             item_ptr = item_buf;
1256             more =
1257                 (*stream->read_item)(stream->clientData, &item_ptr, &i_mode);
1258         }
1259         *pos = 1;
1260         return;
1261     }
1262     item_ptr = item_buf;
1263     more = (*stream->read_item)(stream->clientData, &item_ptr, &i_mode);
1264     while (more)
1265     {
1266         struct ISAMB_block *p = 0, *sp = 0;
1267         char sub_item[DST_ITEM_MAX];
1268         int sub_size;
1269         
1270         if (*pos)
1271             p = open_block(b, *pos);
1272         more = insert_sub (b, &p, item_buf, &i_mode, stream, &sp,
1273                            sub_item, &sub_size, 0);
1274         if (sp)
1275         {    /* increase level of tree by one */
1276             struct ISAMB_block *p2 = new_int (b, p->cat);
1277             char *dst = p2->bytes + p2->size;
1278 #if INT_ENCODE
1279             void *c1 = (*b->method->codec.start)();
1280             const char *sub_item_ptr = sub_item;
1281 #endif
1282
1283             encode_ptr(&dst, p->pos);
1284             assert (sub_size < 128 && sub_size > 1);
1285 #if INT_ENCODE
1286             (*b->method->codec.reset)(c1);
1287             (*b->method->codec.encode)(c1, &dst, &sub_item_ptr);
1288 #else
1289             encode_item_len (&dst, sub_size);
1290             memcpy (dst, sub_item, sub_size);
1291             dst += sub_size;
1292 #endif
1293             encode_ptr(&dst, sp->pos);
1294             
1295             p2->size = dst - p2->bytes;
1296             p2->no_items = p->no_items + sp->no_items;
1297             *pos = p2->pos;  /* return new super page */
1298             close_block(b, sp);
1299             close_block(b, p2);
1300 #if INT_ENCODE
1301             (*b->method->codec.stop)(c1);
1302 #endif
1303         }
1304         else
1305         {
1306             *pos = p->pos;   /* return current one (again) */
1307         }
1308         if (p->no_items == 0)
1309             must_delete = 1;
1310         else
1311             must_delete = 0;
1312         close_block(b, p);
1313     }
1314     if (must_delete)
1315     {
1316         isamb_unlink(b, *pos);
1317         *pos = 0;
1318     }
1319 }
1320
1321 ISAMB_PP isamb_pp_open_x(ISAMB isamb, ISAM_P pos, int *level, int scope)
1322 {
1323     ISAMB_PP pp = xmalloc(sizeof(*pp));
1324     int i;
1325
1326     assert(pos);
1327
1328     pp->isamb = isamb;
1329     pp->block = xmalloc(ISAMB_MAX_LEVEL * sizeof(*pp->block));
1330
1331     pp->pos = pos;
1332     pp->level = 0;
1333     pp->maxlevel = 0;
1334     pp->total_size = 0;
1335     pp->no_blocks = 0;
1336     pp->skipped_numbers = 0;
1337     pp->returned_numbers = 0;
1338     pp->scope = scope;
1339     for (i = 0; i<ISAMB_MAX_LEVEL; i++)
1340         pp->skipped_nodes[i] = pp->accessed_nodes[i] = 0;
1341     while (1)
1342     {
1343         struct ISAMB_block *p = open_block(isamb, pos);
1344         const char *src = p->bytes + p->offset;
1345         pp->block[pp->level] = p;
1346
1347         pp->total_size += p->size;
1348         pp->no_blocks++;
1349         if (p->leaf)
1350             break;
1351         decode_ptr(&src, &pos);
1352         p->offset = src - p->bytes;
1353         pp->level++;
1354         pp->accessed_nodes[pp->level]++; 
1355     }
1356     pp->block[pp->level+1] = 0;
1357     pp->maxlevel = pp->level;
1358     if (level)
1359         *level = pp->level;
1360     return pp;
1361 }
1362
1363 ISAMB_PP isamb_pp_open (ISAMB isamb, ISAM_P pos, int scope)
1364 {
1365     return isamb_pp_open_x(isamb, pos, 0, scope);
1366 }
1367
1368 void isamb_pp_close_x(ISAMB_PP pp, zint *size, zint *blocks)
1369 {
1370     int i;
1371     if (!pp)
1372         return;
1373     yaz_log(YLOG_DEBUG, "isamb_pp_close lev=%d returned "ZINT_FORMAT" values, " 
1374                     "skipped "ZINT_FORMAT,
1375         pp->maxlevel, pp->skipped_numbers, pp->returned_numbers);
1376     for (i = pp->maxlevel; i>=0; i--)
1377         if (pp->skipped_nodes[i] || pp->accessed_nodes[i])
1378             yaz_log(YLOG_DEBUG, "isamb_pp_close  level leaf-%d: "
1379                             ZINT_FORMAT" read, "ZINT_FORMAT" skipped", i,
1380                  pp->accessed_nodes[i], pp->skipped_nodes[i]);
1381     pp->isamb->skipped_numbers += pp->skipped_numbers;
1382     pp->isamb->returned_numbers += pp->returned_numbers;
1383     for (i = pp->maxlevel; i>=0; i--)
1384     {
1385         pp->isamb->accessed_nodes[i] += pp->accessed_nodes[i];
1386         pp->isamb->skipped_nodes[i] += pp->skipped_nodes[i];
1387     }
1388     if (size)
1389         *size = pp->total_size;
1390     if (blocks)
1391         *blocks = pp->no_blocks;
1392     for (i = 0; i <= pp->level; i++)
1393         close_block(pp->isamb, pp->block[i]);
1394     xfree(pp->block);
1395     xfree(pp);
1396 }
1397
1398 int isamb_block_info (ISAMB isamb, int cat)
1399 {
1400     if (cat >= 0 && cat < isamb->no_cat)
1401         return isamb->file[cat].head.block_size;
1402     return -1;
1403 }
1404
1405 void isamb_pp_close (ISAMB_PP pp)
1406 {
1407     isamb_pp_close_x(pp, 0, 0);
1408 }
1409
1410 /* simple recursive dumper .. */
1411 static void isamb_dump_r (ISAMB b, ISAM_P pos, void (*pr)(const char *str),
1412                           int level)
1413 {
1414     char buf[1024];
1415     char prefix_str[1024];
1416     if (pos)
1417     {
1418         struct ISAMB_block *p = open_block(b, pos);
1419         sprintf(prefix_str, "%*s " ZINT_FORMAT " cat=%d size=%d max=%d items="
1420                 ZINT_FORMAT, level*2, "",
1421                 pos, p->cat, p->size, b->file[p->cat].head.block_max,
1422                 p->no_items);
1423         (*pr)(prefix_str);
1424         sprintf(prefix_str, "%*s " ZINT_FORMAT, level*2, "", pos);
1425         if (p->leaf)
1426         {
1427             while (p->offset < p->size)
1428             {
1429                 const char *src = p->bytes + p->offset;
1430                 char *dst = buf;
1431                 (*b->method->codec.decode)(p->decodeClientData, &dst, &src);
1432                 (*b->method->log_item)(YLOG_DEBUG, buf, prefix_str);
1433                 p->offset = src - (char*) p->bytes;
1434             }
1435             assert(p->offset == p->size);
1436         }
1437         else
1438         {
1439             const char *src = p->bytes + p->offset;
1440             ISAM_P sub;
1441
1442             decode_ptr(&src, &sub);
1443             p->offset = src - (char*) p->bytes;
1444
1445             isamb_dump_r(b, sub, pr, level+1);
1446             
1447             while (p->offset < p->size)
1448             {
1449 #if INT_ENCODE
1450                 char file_item_buf[DST_ITEM_MAX];
1451                 char *file_item = file_item_buf;
1452                 void *c1 = (*b->method->codec.start)();
1453                 (*b->method->codec.decode)(c1, &file_item, &src);
1454                 (*b->method->codec.stop)(c1);
1455                 (*b->method->log_item)(YLOG_DEBUG, file_item_buf, prefix_str);
1456 #else
1457                 zint item_len;
1458                 decode_item_len(&src, &item_len);
1459                 (*b->method->log_item)(YLOG_DEBUG, src, prefix_str);
1460                 src += item_len;
1461 #endif
1462                 decode_ptr(&src, &sub);
1463                 
1464                 p->offset = src - (char*) p->bytes;
1465                 
1466                 isamb_dump_r(b, sub, pr, level+1);
1467             }           
1468         }
1469         close_block(b, p);
1470     }
1471 }
1472
1473 void isamb_dump(ISAMB b, ISAM_P pos, void (*pr)(const char *str))
1474 {
1475     isamb_dump_r(b, pos, pr, 0);
1476 }
1477
1478 int isamb_pp_read(ISAMB_PP pp, void *buf)
1479 {
1480     return isamb_pp_forward(pp, buf, 0);
1481 }
1482
1483
1484 static int isamb_pp_on_right_node(ISAMB_PP pp, int level, const void *untilbuf)
1485 { /* looks one node higher to see if we should be on this node at all */
1486   /* useful in backing off quickly, and in avoiding tail descends */
1487   /* call with pp->level to begin with */
1488     struct ISAMB_block *p;
1489     int cmp;
1490     const char *src;
1491     ISAMB b = pp->isamb;
1492
1493     assert(level >= 0);
1494     if (level == 0)
1495     {
1496 #if ISAMB_DEBUG
1497             yaz_log(YLOG_DEBUG, "isamb_pp_on_right returning true for root");
1498 #endif
1499         return 1; /* we can never skip the root node */
1500     }
1501     level--;
1502     p = pp->block[level];
1503     assert(p->offset <= p->size);
1504     if (p->offset < p->size)
1505     {
1506 #if INT_ENCODE
1507         char file_item_buf[DST_ITEM_MAX];
1508         char *file_item = file_item_buf;
1509         void *c1 = (*b->method->codec.start)();
1510         assert(p->offset > 0); 
1511         src = p->bytes + p->offset;
1512         (*b->method->codec.decode)(c1, &file_item, &src);
1513         (*b->method->codec.stop)(c1);
1514         cmp = (*b->method->compare_item)(untilbuf, file_item_buf);
1515 #else
1516         zint item_len;
1517         assert(p->offset > 0); 
1518         src = p->bytes + p->offset;
1519         decode_item_len(&src, &item_len);
1520 #if ISAMB_DEBUG
1521         (*b->method->codec.log_item)(YLOG_DEBUG, untilbuf, "on_leaf: until");
1522         (*b->method->codec.log_item)(YLOG_DEBUG, src, "on_leaf: value");
1523 #endif
1524         cmp = (*b->method->compare_item)(untilbuf, src);
1525 #endif
1526         if (cmp < pp->scope)
1527         {  /* cmp<2 */
1528 #if ISAMB_DEBUG
1529             yaz_log(YLOG_DEBUG, "isamb_pp_on_right returning true "
1530                             "cmp=%d lev=%d ofs=%d", cmp, level, p->offset);
1531 #endif
1532             return 1; 
1533         }
1534         else
1535         {
1536 #if ISAMB_DEBUG
1537             yaz_log(YLOG_DEBUG, "isamb_pp_on_right returning false "
1538                             "cmp=%d lev=%d ofs=%d", cmp, level, p->offset);
1539 #endif
1540             return 0; 
1541         }
1542     }
1543     else {
1544 #if ISAMB_DEBUG
1545         yaz_log(YLOG_DEBUG, "isamb_pp_on_right at tail, looking higher "
1546                         "lev=%d", level);
1547 #endif
1548         return isamb_pp_on_right_node(pp, level, untilbuf);
1549     }
1550 } /* isamb_pp_on_right_node */
1551
1552 static int isamb_pp_read_on_leaf(ISAMB_PP pp, void *buf)
1553
1554     /* reads the next item on the current leaf, returns 0 if end of leaf*/
1555     struct ISAMB_block *p = pp->block[pp->level];
1556     char *dst;
1557     const char *src;
1558     assert(pp);
1559     assert(buf);
1560     if (p->offset == p->size)
1561     {
1562 #if ISAMB_DEBUG
1563         yaz_log(YLOG_DEBUG, "isamb_pp_read_on_leaf returning 0 on "
1564                 "node %d", p->pos);
1565 #endif
1566         return 0; /* at end of leaf */
1567     }
1568     src = p->bytes + p->offset;
1569     dst = buf;
1570     (*pp->isamb->method->codec.decode)(p->decodeClientData, &dst, &src);
1571     p->offset = src - (char*) p->bytes;
1572 #if ISAMB_DEBUG
1573     (*pp->isamb->method->codec.log_item)(YLOG_DEBUG, buf,
1574                                          "read_on_leaf returning 1");
1575 #endif
1576     pp->returned_numbers++;
1577     return 1;
1578 } /* read_on_leaf */
1579
1580 static int isamb_pp_forward_on_leaf(ISAMB_PP pp, void *buf, const void *untilbuf)
1581 { /* forwards on the current leaf, returns 0 if not found */
1582     int cmp;
1583     int skips = 0;
1584     while (1)
1585     {
1586         if (!isamb_pp_read_on_leaf(pp, buf))
1587             return 0;
1588         /* FIXME - this is an extra function call, inline the read? */
1589         cmp=(*pp->isamb->method->compare_item)(untilbuf, buf);
1590         if (cmp <pp->scope)
1591         {  /* cmp<2  found a good one */
1592 #if ISAMB_DEBUG
1593             if (skips)
1594                 yaz_log(YLOG_DEBUG, "isam_pp_fwd_on_leaf skipped %d items", skips);
1595 #endif
1596             pp->returned_numbers++;
1597             return 1;
1598         }
1599         if (!skips)
1600             if (!isamb_pp_on_right_node(pp, pp->level, untilbuf))
1601                 return 0; /* never mind the rest of this leaf */
1602         pp->skipped_numbers++;
1603         skips++;
1604     }
1605 } /* forward_on_leaf */
1606
1607 static int isamb_pp_climb_level(ISAMB_PP pp, ISAM_P *pos)
1608 { /* climbs higher in the tree, until finds a level with data left */
1609   /* returns the node to (consider to) descend to in *pos) */
1610     struct ISAMB_block *p = pp->block[pp->level];
1611     const char *src;
1612 #if ISAMB_DEBUG
1613     yaz_log(YLOG_DEBUG, "isamb_pp_climb_level starting "
1614                    "at level %d node %d ofs=%d sz=%d",
1615                     pp->level, p->pos, p->offset, p->size);
1616 #endif
1617     assert(pp->level >= 0);
1618     assert(p->offset <= p->size);
1619     if (pp->level==0)
1620     {
1621 #if ISAMB_DEBUG
1622         yaz_log(YLOG_DEBUG, "isamb_pp_climb_level returning 0 at root");
1623 #endif
1624         return 0;
1625     }
1626     assert(pp->level>0); 
1627     close_block(pp->isamb, pp->block[pp->level]);
1628     pp->block[pp->level] = 0;
1629     (pp->level)--;
1630     p = pp->block[pp->level];
1631 #if ISAMB_DEBUG
1632     yaz_log(YLOG_DEBUG, "isamb_pp_climb_level climbed to level %d node %d ofs=%d",
1633                     pp->level, p->pos, p->offset);
1634 #endif
1635     assert(!p->leaf);
1636     assert(p->offset <= p->size);
1637     if (p->offset == p->size)
1638     {
1639         /* we came from the last pointer, climb on */
1640         if (!isamb_pp_climb_level(pp, pos))
1641             return 0;
1642         p = pp->block[pp->level];
1643     }
1644     else
1645     {
1646 #if INT_ENCODE
1647         char file_item_buf[DST_ITEM_MAX];
1648         char *file_item = file_item_buf;
1649         ISAMB b = pp->isamb;
1650         void *c1 = (*b->method->codec.start)();
1651 #else
1652         zint item_len;
1653 #endif
1654         /* skip the child we just came from */
1655 #if ISAMB_DEBUG
1656         yaz_log(YLOG_DEBUG, "isam_pp_climb_level: skipping lev=%d ofs=%d sz=%d", 
1657                         pp->level, p->offset, p->size);
1658 #endif
1659         assert (p->offset < p->size);
1660         src = p->bytes + p->offset;
1661 #if INT_ENCODE
1662         (*b->method->codec.decode)(c1, &file_item, &src);
1663         (*b->method->codec.stop)(c1);
1664 #else
1665         decode_item_len(&src, &item_len);
1666         src += item_len;
1667 #endif
1668         decode_ptr(&src, pos);
1669         p->offset = src - (char *)p->bytes;
1670             
1671     }
1672     return 1;
1673 } /* climb_level */
1674
1675
1676 static zint isamb_pp_forward_unode(ISAMB_PP pp, zint pos, const void *untilbuf)
1677 { /* scans a upper node until it finds a child <= untilbuf */
1678   /* pp points to the key value, as always. pos is the child read from */
1679   /* the buffer */
1680   /* if all values are too small, returns the last child in the node */
1681   /* FIXME - this can be detected, and avoided by looking at the */
1682   /* parent node, but that gets messy. Presumably the cost is */
1683   /* pretty low anyway */
1684     ISAMB b = pp->isamb;
1685     struct ISAMB_block *p = pp->block[pp->level];
1686     const char *src = p->bytes + p->offset;
1687     int cmp;
1688     zint nxtpos;
1689 #if ISAMB_DEBUG
1690     int skips = 0;
1691     yaz_log(YLOG_DEBUG, "isamb_pp_forward_unode starting "
1692                    "at level %d node %d ofs=%di sz=%d",
1693                     pp->level, p->pos, p->offset, p->size);
1694 #endif
1695     assert(!p->leaf);
1696     assert(p->offset <= p->size);
1697     if (p->offset == p->size)
1698     {
1699 #if ISAMB_DEBUG
1700             yaz_log(YLOG_DEBUG, "isamb_pp_forward_unode returning at end "
1701                    "at level %d node %d ofs=%di sz=%d",
1702                     pp->level, p->pos, p->offset, p->size);
1703 #endif
1704         return pos; /* already at the end of it */
1705     }
1706     while(p->offset < p->size)
1707     {
1708 #if INT_ENCODE
1709         char file_item_buf[DST_ITEM_MAX];
1710         char *file_item = file_item_buf;
1711         void *c1 = (*b->method->codec.start)();
1712         (*b->method->codec.decode)(c1, &file_item, &src);
1713         (*b->method->codec.stop)(c1);
1714         cmp = (*b->method->compare_item)(untilbuf, file_item_buf);
1715 #else
1716         zint item_len;
1717         decode_item_len(&src, &item_len);
1718         cmp = (*b->method->compare_item)(untilbuf, src);
1719         src += item_len;
1720 #endif
1721         decode_ptr(&src, &nxtpos);
1722         if (cmp<pp->scope) /* cmp<2 */
1723         {
1724 #if ISAMB_DEBUG
1725             yaz_log(YLOG_DEBUG, "isamb_pp_forward_unode returning a hit "
1726                    "at level %d node %d ofs=%d sz=%d",
1727                     pp->level, p->pos, p->offset, p->size);
1728 #endif
1729             return pos;
1730         } /* found one */
1731         pos = nxtpos;
1732         p->offset = src-(char*)p->bytes;
1733         (pp->skipped_nodes[pp->maxlevel - pp->level -1])++;
1734 #if ISAMB_DEBUG
1735         skips++;
1736 #endif
1737     }
1738 #if ISAMB_DEBUG
1739             yaz_log(YLOG_DEBUG, "isamb_pp_forward_unode returning at tail "
1740                    "at level %d node %d ofs=%d sz=%d skips=%d",
1741                     pp->level, p->pos, p->offset, p->size, skips);
1742 #endif
1743     return pos; /* that's the last one in the line */
1744     
1745 } /* forward_unode */
1746
1747 static void isamb_pp_descend_to_leaf(ISAMB_PP pp, ISAM_P pos,
1748                                      const void *untilbuf)
1749 { /* climbs down the tree, from pos, to the leftmost leaf */
1750     struct ISAMB_block *p = pp->block[pp->level];
1751     const char *src;
1752     assert(!p->leaf);
1753 #if ISAMB_DEBUG
1754     yaz_log(YLOG_DEBUG, "isamb_pp_descend_to_leaf "
1755                    "starting at lev %d node %d ofs=%d lf=%d u=%p", 
1756                    pp->level, p->pos, p->offset, p->leaf, untilbuf);
1757 #endif
1758     if (untilbuf)
1759         pos = isamb_pp_forward_unode(pp, pos, untilbuf);
1760     ++(pp->level);
1761     assert(pos);
1762     p = open_block(pp->isamb, pos);
1763     pp->block[pp->level] = p;
1764     ++(pp->accessed_nodes[pp->maxlevel-pp->level]);
1765     ++(pp->no_blocks);
1766 #if ISAMB_DEBUG
1767     yaz_log(YLOG_DEBUG, "isamb_pp_descend_to_leaf "
1768                    "got lev %d node %d lf=%d", 
1769                    pp->level, p->pos, p->leaf);
1770 #endif
1771     if (p->leaf)
1772         return;
1773     assert (p->offset==0);
1774     src = p->bytes + p->offset;
1775     decode_ptr(&src, &pos);
1776     p->offset = src-(char*)p->bytes;
1777     isamb_pp_descend_to_leaf(pp, pos, untilbuf);
1778 #if ISAMB_DEBUG
1779     yaz_log(YLOG_DEBUG, "isamb_pp_descend_to_leaf "
1780                    "returning at lev %d node %d ofs=%d lf=%d", 
1781                    pp->level, p->pos, p->offset, p->leaf);
1782 #endif
1783 } /* descend_to_leaf */
1784
1785 static int isamb_pp_find_next_leaf(ISAMB_PP pp)
1786 { /* finds the next leaf by climbing up and down */
1787     ISAM_P pos;
1788     if (!isamb_pp_climb_level(pp, &pos))
1789         return 0;
1790     isamb_pp_descend_to_leaf(pp, pos, 0);
1791     return 1;
1792 }
1793
1794 static int isamb_pp_climb_desc(ISAMB_PP pp,  const void *untilbuf)
1795 { /* climbs up and descends to a leaf where values >= *untilbuf are found */
1796     ISAM_P pos;
1797 #if ISAMB_DEBUG
1798     struct ISAMB_block *p = pp->block[pp->level];
1799     yaz_log(YLOG_DEBUG, "isamb_pp_climb_desc starting "
1800                    "at level %d node %d ofs=%d sz=%d",
1801                     pp->level, p->pos, p->offset, p->size);
1802 #endif
1803     if (!isamb_pp_climb_level(pp, &pos))
1804         return 0;
1805     /* see if it would pay to climb one higher */
1806     if (!isamb_pp_on_right_node(pp, pp->level, untilbuf))
1807         if (!isamb_pp_climb_level(pp, &pos))
1808             return 0;
1809     isamb_pp_descend_to_leaf(pp, pos, untilbuf);
1810 #if ISAMB_DEBUG
1811     p = pp->block[pp->level];
1812     yaz_log(YLOG_DEBUG, "isamb_pp_climb_desc done "
1813                    "at level %d node %d ofs=%d sz=%d",
1814                     pp->level, p->pos, p->offset, p->size);
1815 #endif
1816     return 1;
1817 } /* climb_desc */
1818
1819 int isamb_pp_forward (ISAMB_PP pp, void *buf, const void *untilbuf)
1820 {
1821 #if ISAMB_DEBUG
1822     struct ISAMB_block *p = pp->block[pp->level];
1823     assert(p->leaf);
1824     yaz_log(YLOG_DEBUG, "isamb_pp_forward starting "
1825                    "at level %d node %d ofs=%d sz=%d u=%p sc=%d",
1826                     pp->level, p->pos, p->offset, p->size, untilbuf, scope);
1827 #endif
1828     if (untilbuf)
1829     {
1830         if (isamb_pp_forward_on_leaf(pp, buf, untilbuf))
1831         {
1832 #if ISAMB_DEBUG
1833             yaz_log(YLOG_DEBUG, "isamb_pp_forward (f) returning (A) "
1834                    "at level %d node %d ofs=%d sz=%d",
1835                     pp->level, p->pos, p->offset, p->size);
1836 #endif
1837             return 1;
1838         }
1839         if (! isamb_pp_climb_desc(pp, untilbuf))
1840         {
1841 #if ISAMB_DEBUG
1842             yaz_log(YLOG_DEBUG, "isamb_pp_forward (f) returning notfound (B) "
1843                    "at level %d node %d ofs=%d sz=%d",
1844                     pp->level, p->pos, p->offset, p->size);
1845 #endif
1846             return 0; /* could not find a leaf */
1847         }
1848         do {
1849             if (isamb_pp_forward_on_leaf(pp, buf, untilbuf))
1850             {
1851 #if ISAMB_DEBUG
1852                 yaz_log(YLOG_DEBUG, "isamb_pp_forward (f) returning (c) "
1853                         "at level %d node %d ofs=%d sz=%d",
1854                         pp->level, p->pos, p->offset, p->size);
1855 #endif
1856                 return 1;
1857             }
1858         } while (isamb_pp_find_next_leaf(pp));
1859         return 0; /* could not find at all */
1860     }
1861     else { /* no untilbuf, a straight read */
1862         /* FIXME - this should be moved
1863          * directly into the pp_read */
1864         /* keeping here now, to keep same
1865          * interface as the old fwd */
1866         if (isamb_pp_read_on_leaf(pp, buf))
1867         {
1868 #if ISAMB_DEBUG
1869             yaz_log(YLOG_DEBUG, "isamb_pp_forward (read) returning (D) "
1870                    "at level %d node %d ofs=%d sz=%d",
1871                     pp->level, p->pos, p->offset, p->size);
1872 #endif
1873             return 1;
1874         }
1875         if (isamb_pp_find_next_leaf(pp))
1876         {
1877 #if ISAMB_DEBUG
1878             yaz_log(YLOG_DEBUG, "isamb_pp_forward (read) returning (E) "
1879                    "at level %d node %d ofs=%d sz=%d",
1880                     pp->level, p->pos, p->offset, p->size);
1881 #endif
1882             return isamb_pp_read_on_leaf(pp, buf);
1883         }
1884         else
1885             return 0;
1886     }
1887 } /* isam_pp_forward (new version) */
1888
1889 void isamb_pp_pos(ISAMB_PP pp, double *current, double *total)
1890 { /* return an estimate of the current position and of the total number of */
1891   /* occureences in the isam tree, based on the current leaf */
1892     assert(total);
1893     assert(current);
1894     
1895     /* if end-of-stream PP may not be leaf */
1896
1897     *total = (double) (pp->block[0]->no_items);
1898     *current = (double) pp->returned_numbers;
1899 #if ISAMB_DEBUG
1900     yaz_log(YLOG_LOG, "isamb_pp_pos returning: cur= %0.1f tot=%0.1f rn="
1901          ZINT_FORMAT, *current, *total, pp->returned_numbers);
1902 #endif
1903 }
1904
1905 int isamb_pp_forward2(ISAMB_PP pp, void *buf, const void *untilb)
1906 {
1907     char *dst = buf;
1908     const char *src;
1909     struct ISAMB_block *p = pp->block[pp->level];
1910     ISAMB b = pp->isamb;
1911     if (!p)
1912         return 0;
1913 again:
1914     while (p->offset == p->size)
1915     {
1916         ISAM_P pos;
1917 #if INT_ENCODE
1918         const char *src_0;
1919         void *c1;
1920         char file_item_buf[DST_ITEM_MAX];
1921         char *file_item = file_item_buf;
1922 #else
1923         zint item_len;
1924 #endif
1925         while (p->offset == p->size)
1926         {
1927             if (pp->level == 0)
1928                 return 0;
1929             close_block (pp->isamb, pp->block[pp->level]);
1930             pp->block[pp->level] = 0;
1931             (pp->level)--;
1932             p = pp->block[pp->level];
1933             assert (!p->leaf);  
1934         }
1935
1936         assert(!p->leaf);
1937         src = p->bytes + p->offset;
1938        
1939 #if INT_ENCODE
1940         c1 = (*b->method->codec.start)();
1941         (*b->method->codec.decode)(c1, &file_item, &src);
1942 #else
1943         decode_ptr (&src, &item_len);
1944         src += item_len;
1945 #endif        
1946         decode_ptr (&src, &pos);
1947         p->offset = src - (char*) p->bytes;
1948
1949         src = p->bytes + p->offset;
1950
1951         while(1)
1952         {
1953             if (!untilb || p->offset == p->size)
1954                 break;
1955             assert(p->offset < p->size);
1956 #if INT_ENCODE
1957             src_0 = src;
1958             file_item = file_item_buf;
1959             (*b->method->codec.reset)(c1);
1960             (*b->method->codec.decode)(c1, &file_item, &src);
1961             if ((*b->method->compare_item)(untilb, file_item_buf) <= 1)
1962             {
1963                 src = src_0;
1964                 break;
1965             }
1966 #else
1967             decode_item_len(&src, &item_len);
1968             if ((*b->method->compare_item)(untilb, src) <= 1)
1969                 break;
1970             src += item_len;
1971 #endif
1972             decode_ptr (&src, &pos);
1973             p->offset = src - (char*) p->bytes;
1974         }
1975
1976         pp->level++;
1977
1978         while (1)
1979         {
1980             pp->block[pp->level] = p = open_block (pp->isamb, pos);
1981
1982             pp->total_size += p->size;
1983             pp->no_blocks++;
1984             
1985             if (p->leaf) 
1986             {
1987                 break;
1988             }
1989             
1990             src = p->bytes + p->offset;
1991             while(1)
1992             {
1993                 decode_ptr (&src, &pos);
1994                 p->offset = src - (char*) p->bytes;
1995                 
1996                 if (!untilb || p->offset == p->size)
1997                     break;
1998                 assert(p->offset < p->size);
1999 #if INT_ENCODE
2000                 src_0 = src;
2001                 file_item = file_item_buf;
2002                 (*b->method->codec.reset)(c1);
2003                 (*b->method->codec.decode)(c1, &file_item, &src);
2004                 if ((*b->method->compare_item)(untilb, file_item_buf) <= 1)
2005                 {
2006                     src = src_0;
2007                     break;
2008                 }
2009 #else
2010                 decode_ptr (&src, &item_len);
2011                 if ((*b->method->compare_item)(untilb, src) <= 1)
2012                     break;
2013                 src += item_len;
2014 #endif
2015             }
2016             pp->level++;
2017         }
2018 #if INT_ENCODE
2019         (*b->method->codec.stop)(c1);
2020 #endif
2021     }
2022     assert (p->offset < p->size);
2023     assert (p->leaf);
2024     while(1)
2025     {
2026         char *dst0 = dst;
2027         src = p->bytes + p->offset;
2028         (*pp->isamb->method->codec.decode)(p->decodeClientData, &dst, &src);
2029         p->offset = src - (char*) p->bytes;
2030         if (!untilb || (*pp->isamb->method->compare_item)(untilb, dst0) <= 1)
2031             break;
2032         dst = dst0;
2033         if (p->offset == p->size) goto again;
2034     }
2035     return 1;
2036 }
2037
2038 zint isamb_get_int_splits(ISAMB b)
2039 {
2040     return b->number_of_int_splits;
2041 }
2042
2043 zint isamb_get_leaf_splits(ISAMB b)
2044 {
2045     return b->number_of_leaf_splits;
2046 }
2047
2048 zint isamb_get_root_ptr(ISAMB b)
2049 {
2050     return b->root_ptr;
2051 }
2052
2053 void isamb_set_root_ptr(ISAMB b, zint root_ptr)
2054 {
2055     b->root_ptr = root_ptr;
2056 }
2057
2058
2059 /*
2060  * Local variables:
2061  * c-basic-offset: 4
2062  * indent-tabs-mode: nil
2063  * End:
2064  * vim: shiftwidth=4 tabstop=8 expandtab
2065  */
2066