Fixed bug #1017: assert failure in isamb for delete of records. Problem
[idzebra-moved-to-github.git] / isamb / isamb.c
1 /* $Id: isamb.c,v 1.93 2007-04-03 16:54:46 adam Exp $
2    Copyright (C) 1995-2007
3    Index Data ApS
4
5 This file is part of the Zebra server.
6
7 Zebra is free software; you can redistribute it and/or modify it under
8 the terms of the GNU General Public License as published by the Free
9 Software Foundation; either version 2, or (at your option) any later
10 version.
11
12 Zebra is distributed in the hope that it will be useful, but WITHOUT ANY
13 WARRANTY; without even the implied warranty of MERCHANTABILITY or
14 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
15 for more details.
16
17 You should have received a copy of the GNU General Public License
18 along with this program; if not, write to the Free Software
19 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
20
21 */
22
23 #include <stdlib.h>
24 #include <string.h>
25 #include <yaz/log.h>
26 #include <yaz/xmalloc.h>
27 #include <idzebra/isamb.h>
28 #include <assert.h>
29
30 #ifndef ISAMB_DEBUG
31 #define ISAMB_DEBUG 0
32 #endif
33
34
35 #define ISAMB_MAJOR_VERSION 3
36 #define ISAMB_MINOR_VERSION_NO_ROOT 0
37 #define ISAMB_MINOR_VERSION_WITH_ROOT 1
38
39 struct ISAMB_head {
40     zint first_block;
41     zint last_block;
42     zint free_list;
43     zint no_items;
44     int block_size;
45     int block_max;
46     int block_offset;
47 };
48
49 /* if 1, upper nodes items are encoded; 0 if not encoded */
50 #define INT_ENCODE 1
51
52 /* maximum size of encoded buffer */
53 #define DST_ITEM_MAX 256
54
55 #define ISAMB_MAX_LEVEL 10
56 /* approx 2*max page + max size of item */
57 #define DST_BUF_SIZE (2*4096+300)
58
59 /* should be maximum block size of multiple thereof */
60 #define ISAMB_CACHE_ENTRY_SIZE 4096
61
62 /* CAT_MAX: _must_ be power of 2 */
63 #define CAT_MAX 4
64 #define CAT_MASK (CAT_MAX-1)
65 /* CAT_NO: <= CAT_MAX */
66 #define CAT_NO 4
67
68 /* Smallest block size */
69 #define ISAMB_MIN_SIZE 32
70 /* Size factor */
71 #define ISAMB_FAC_SIZE 4
72
73 /* ISAMB_PTR_CODEC = 1 var, =0 fixed */
74 #define ISAMB_PTR_CODEC 1
75
76 struct ISAMB_cache_entry {
77     ISAM_P pos;
78     unsigned char *buf;
79     int dirty;
80     int hits;
81     struct ISAMB_cache_entry *next;
82 };
83
84 struct ISAMB_file {
85     BFile bf;
86     int head_dirty;
87     struct ISAMB_head head;
88     struct ISAMB_cache_entry *cache_entries;
89 };
90
91 struct ISAMB_s {
92     BFiles bfs;
93     ISAMC_M *method;
94
95     struct ISAMB_file *file;
96     int no_cat;
97     int cache; /* 0 = no cache, 1 = use cache, -1 = dummy isam (for testing only) */
98     int log_io;        /* log level for bf_read/bf_write calls */
99     int log_freelist;  /* log level for freelist handling */
100     zint skipped_numbers; /* on a leaf node */
101     zint returned_numbers; 
102     zint skipped_nodes[ISAMB_MAX_LEVEL]; /* [0]=skipped leaves, 1 = higher etc */
103     zint accessed_nodes[ISAMB_MAX_LEVEL]; /* nodes we did not skip */
104     zint number_of_int_splits;
105     zint number_of_leaf_splits;
106     int enable_int_count; /* whether we count nodes (or not) */
107     int cache_size; /* size of blocks to cache (if cache=1) */
108     int minor_version;
109     zint root_ptr;
110 };
111
112 struct ISAMB_block {
113     ISAM_P pos;
114     int cat;
115     int size;
116     int leaf;
117     int dirty;
118     int deleted;
119     int offset;
120     zint no_items;  /* number of nodes in this + children */
121     char *bytes;
122     char *cbuf;
123     unsigned char *buf;
124     void *decodeClientData;
125     int log_rw;
126 };
127
128 struct ISAMB_PP_s {
129     ISAMB isamb;
130     ISAM_P pos;
131     int level;
132     int maxlevel; /* total depth */
133     zint total_size;
134     zint no_blocks;
135     zint skipped_numbers; /* on a leaf node */
136     zint returned_numbers; 
137     zint skipped_nodes[ISAMB_MAX_LEVEL]; /* [0]=skipped leaves, 1 = higher etc */
138     zint accessed_nodes[ISAMB_MAX_LEVEL]; /* nodes we did not skip */
139     struct ISAMB_block **block;
140     int scope;  /* on what level we forward */
141 };
142
143
144 #define encode_item_len encode_ptr
145 #if ISAMB_PTR_CODEC
146 static void encode_ptr(char **dst, zint pos)
147 {
148     unsigned char *bp = (unsigned char*) *dst;
149
150     while (pos > 127)
151     {
152          *bp++ = (unsigned char) (128 | (pos & 127));
153          pos = pos >> 7;
154     }
155     *bp++ = (unsigned char) pos;
156     *dst = (char *) bp;
157 }
158 #else
159 static void encode_ptr(char **dst, zint pos)
160 {
161     memcpy(*dst, &pos, sizeof(pos));
162     (*dst) += sizeof(pos);
163 }
164 #endif
165
166 #define decode_item_len decode_ptr
167 #if ISAMB_PTR_CODEC
168 static void decode_ptr(const char **src, zint *pos)
169 {
170     zint d = 0;
171     unsigned char c;
172     unsigned r = 0;
173
174     while (((c = *(const unsigned char *)((*src)++)) & 128))
175     {
176         d += ((zint) (c & 127) << r);
177         r += 7;
178     }
179     d += ((zint) c << r);
180     *pos = d;
181 }
182 #else
183 static void decode_ptr(const char **src, zint *pos)
184 {
185      memcpy(pos, *src, sizeof(*pos));
186      (*src) += sizeof(*pos);
187 }
188 #endif
189
190
191 void isamb_set_int_count(ISAMB b, int v)
192 {
193     b->enable_int_count = v;
194 }
195
196 void isamb_set_cache_size(ISAMB b, int v)
197 {
198     b->cache_size = v;
199 }
200
201 ISAMB isamb_open2(BFiles bfs, const char *name, int writeflag, ISAMC_M *method,
202                   int cache, int no_cat, int *sizes, int use_root_ptr)
203 {
204     ISAMB isamb = xmalloc(sizeof(*isamb));
205     int i;
206
207     assert(no_cat <= CAT_MAX);
208
209     isamb->bfs = bfs;
210     isamb->method = (ISAMC_M *) xmalloc(sizeof(*method));
211     memcpy(isamb->method, method, sizeof(*method));
212     isamb->no_cat = no_cat;
213     isamb->log_io = 0;
214     isamb->log_freelist = 0;
215     isamb->cache = cache;
216     isamb->skipped_numbers = 0;
217     isamb->returned_numbers = 0;
218     isamb->number_of_int_splits = 0;
219     isamb->number_of_leaf_splits = 0;
220     isamb->enable_int_count = 1;
221     isamb->cache_size = 40;
222
223     if (use_root_ptr)
224         isamb->minor_version = ISAMB_MINOR_VERSION_WITH_ROOT;
225     else
226         isamb->minor_version = ISAMB_MINOR_VERSION_NO_ROOT;
227
228     isamb->root_ptr = 0;
229
230     for (i = 0; i<ISAMB_MAX_LEVEL; i++)
231         isamb->skipped_nodes[i] = isamb->accessed_nodes[i] = 0;
232
233     if (cache == -1)
234     {
235         yaz_log(YLOG_WARN, "isamb_open %s. Degraded TEST mode", name);
236     }
237     else
238     {
239         assert(cache == 0 || cache == 1);
240     }
241     isamb->file = xmalloc(sizeof(*isamb->file) * isamb->no_cat);
242
243     for (i = 0; i < isamb->no_cat; i++)
244     {
245         isamb->file[i].bf = 0;
246         isamb->file[i].head_dirty = 0;
247         isamb->file[i].cache_entries = 0;
248     }
249
250     for (i = 0; i < isamb->no_cat; i++)
251     {
252         char fname[DST_BUF_SIZE];
253         char hbuf[DST_BUF_SIZE];
254
255         sprintf(fname, "%s%c", name, i+'A');
256         if (cache)
257             isamb->file[i].bf = bf_open(bfs, fname, ISAMB_CACHE_ENTRY_SIZE,
258                                          writeflag);
259         else
260             isamb->file[i].bf = bf_open(bfs, fname, sizes[i], writeflag);
261
262         if (!isamb->file[i].bf)
263         {
264             isamb_close(isamb);
265             return 0;
266         }
267
268         /* fill-in default values (for empty isamb) */
269         isamb->file[i].head.first_block = ISAMB_CACHE_ENTRY_SIZE/sizes[i]+1;
270         isamb->file[i].head.last_block = isamb->file[i].head.first_block;
271         isamb->file[i].head.block_size = sizes[i];
272         assert(sizes[i] <= ISAMB_CACHE_ENTRY_SIZE);
273 #if ISAMB_PTR_CODEC
274         if (i == isamb->no_cat-1 || sizes[i] > 128)
275             isamb->file[i].head.block_offset = 8;
276         else
277             isamb->file[i].head.block_offset = 4;
278 #else
279         isamb->file[i].head.block_offset = 11;
280 #endif
281         isamb->file[i].head.block_max =
282             sizes[i] - isamb->file[i].head.block_offset;
283         isamb->file[i].head.free_list = 0;
284         if (bf_read(isamb->file[i].bf, 0, 0, 0, hbuf))
285         {
286             /* got header assume "isamb"major minor len can fit in 16 bytes */
287             zint zint_tmp;
288             int major, minor, len, pos = 0;
289             int left;
290             const char *src = 0;
291             if (memcmp(hbuf, "isamb", 5))
292             {
293                 yaz_log(YLOG_WARN, "bad isamb header for file %s", fname);
294                 isamb_close(isamb);
295                 return 0;
296             }
297             if (sscanf(hbuf+5, "%d %d %d", &major, &minor, &len) != 3)
298             {
299                 yaz_log(YLOG_WARN, "bad isamb header for file %s", fname);
300                 isamb_close(isamb);
301                 return 0;
302             }
303             if (major != ISAMB_MAJOR_VERSION)
304             {
305                 yaz_log(YLOG_WARN, "bad major version for file %s %d, must be %d",
306                      fname, major, ISAMB_MAJOR_VERSION);
307                 isamb_close(isamb);
308                 return 0;
309             }
310             for (left = len - sizes[i]; left > 0; left = left - sizes[i])
311             {
312                 pos++;
313                 if (!bf_read(isamb->file[i].bf, pos, 0, 0, hbuf + pos*sizes[i]))
314                 {
315                     yaz_log(YLOG_WARN, "truncated isamb header for " 
316                          "file=%s len=%d pos=%d",
317                          fname, len, pos);
318                     isamb_close(isamb);
319                     return 0;
320                 }
321             }
322             src = hbuf + 16;
323             decode_ptr(&src, &isamb->file[i].head.first_block);
324             decode_ptr(&src, &isamb->file[i].head.last_block);
325             decode_ptr(&src, &zint_tmp);
326             isamb->file[i].head.block_size = (int) zint_tmp;
327             decode_ptr(&src, &zint_tmp);
328             isamb->file[i].head.block_max = (int) zint_tmp;
329             decode_ptr(&src, &isamb->file[i].head.free_list);
330             if (isamb->minor_version >= ISAMB_MINOR_VERSION_WITH_ROOT)
331                 decode_ptr(&src, &isamb->root_ptr);
332         }
333         assert (isamb->file[i].head.block_size >= isamb->file[i].head.block_offset);
334         /* must rewrite the header if root ptr is in use (bug #1017) */
335         if (use_root_ptr && writeflag)
336             isamb->file[i].head_dirty = 1;
337         else
338             isamb->file[i].head_dirty = 0;
339         assert(isamb->file[i].head.block_size == sizes[i]);
340     }
341 #if ISAMB_DEBUG
342     yaz_log(YLOG_WARN, "isamb debug enabled. Things will be slower than usual");
343 #endif
344     return isamb;
345 }
346
347 ISAMB isamb_open(BFiles bfs, const char *name, int writeflag, ISAMC_M *method,
348                  int cache)
349 {
350     int sizes[CAT_NO];
351     int i, b_size = ISAMB_MIN_SIZE;
352     
353     for (i = 0; i<CAT_NO; i++)
354     {
355         sizes[i] = b_size;
356         b_size = b_size * ISAMB_FAC_SIZE;
357     }
358     return isamb_open2(bfs, name, writeflag, method, cache,
359                        CAT_NO, sizes, 0);
360 }
361
362 static void flush_blocks (ISAMB b, int cat)
363 {
364     while (b->file[cat].cache_entries)
365     {
366         struct ISAMB_cache_entry *ce_this = b->file[cat].cache_entries;
367         b->file[cat].cache_entries = ce_this->next;
368
369         if (ce_this->dirty)
370         {
371             yaz_log(b->log_io, "bf_write: flush_blocks");
372             bf_write(b->file[cat].bf, ce_this->pos, 0, 0, ce_this->buf);
373         }
374         xfree(ce_this->buf);
375         xfree(ce_this);
376     }
377 }
378
379 static int cache_block (ISAMB b, ISAM_P pos, unsigned char *userbuf, int wr)
380 {
381     int cat = (int) (pos&CAT_MASK);
382     int off = (int) (((pos/CAT_MAX) & 
383                (ISAMB_CACHE_ENTRY_SIZE / b->file[cat].head.block_size - 1))
384         * b->file[cat].head.block_size);
385     zint norm = pos / (CAT_MASK*ISAMB_CACHE_ENTRY_SIZE / b->file[cat].head.block_size);
386     int no = 0;
387     struct ISAMB_cache_entry **ce, *ce_this = 0, **ce_last = 0;
388
389     if (!b->cache)
390         return 0;
391
392     assert (ISAMB_CACHE_ENTRY_SIZE >= b->file[cat].head.block_size);
393     for (ce = &b->file[cat].cache_entries; *ce; ce = &(*ce)->next, no++)
394     {
395         ce_last = ce;
396         if ((*ce)->pos == norm)
397         {
398             ce_this = *ce;
399             *ce = (*ce)->next;   /* remove from list */
400             
401             ce_this->next = b->file[cat].cache_entries;  /* move to front */
402             b->file[cat].cache_entries = ce_this;
403             
404             if (wr)
405             {
406                 memcpy (ce_this->buf + off, userbuf, 
407                         b->file[cat].head.block_size);
408                 ce_this->dirty = 1;
409             }
410             else
411                 memcpy (userbuf, ce_this->buf + off,
412                         b->file[cat].head.block_size);
413             return 1;
414         }
415     }
416     if (no >= b->cache_size)
417     {
418         assert (ce_last && *ce_last);
419         ce_this = *ce_last;
420         *ce_last = 0;  /* remove the last entry from list */
421         if (ce_this->dirty)
422         {
423             yaz_log(b->log_io, "bf_write: cache_block");
424             bf_write(b->file[cat].bf, ce_this->pos, 0, 0, ce_this->buf);
425         }
426         xfree(ce_this->buf);
427         xfree(ce_this);
428     }
429     ce_this = xmalloc(sizeof(*ce_this));
430     ce_this->next = b->file[cat].cache_entries;
431     b->file[cat].cache_entries = ce_this;
432     ce_this->buf = xmalloc(ISAMB_CACHE_ENTRY_SIZE);
433     ce_this->pos = norm;
434     yaz_log(b->log_io, "bf_read: cache_block");
435     if (!bf_read(b->file[cat].bf, norm, 0, 0, ce_this->buf))
436         memset (ce_this->buf, 0, ISAMB_CACHE_ENTRY_SIZE);
437     if (wr)
438     {
439         memcpy (ce_this->buf + off, userbuf, b->file[cat].head.block_size);
440         ce_this->dirty = 1;
441     }
442     else
443     {
444         ce_this->dirty = 0;
445         memcpy (userbuf, ce_this->buf + off, b->file[cat].head.block_size);
446     }
447     return 1;
448 }
449
450
451 void isamb_close (ISAMB isamb)
452 {
453     int i;
454     for (i = 0; isamb->accessed_nodes[i]; i++)
455         yaz_log(YLOG_DEBUG, "isamb_close  level leaf-%d: "ZINT_FORMAT" read, "
456                         ZINT_FORMAT" skipped",
457              i, isamb->accessed_nodes[i], isamb->skipped_nodes[i]);
458     yaz_log(YLOG_DEBUG, "isamb_close returned "ZINT_FORMAT" values, "
459                    "skipped "ZINT_FORMAT,
460          isamb->skipped_numbers, isamb->returned_numbers);
461
462     for (i = 0; i<isamb->no_cat; i++)
463     {
464         flush_blocks (isamb, i);
465         if (isamb->file[i].head_dirty)
466         {
467             char hbuf[DST_BUF_SIZE];
468             int major = ISAMB_MAJOR_VERSION;
469             int len = 16;
470             char *dst = hbuf + 16;
471             int pos = 0, left;
472             int b_size = isamb->file[i].head.block_size;
473
474             encode_ptr(&dst, isamb->file[i].head.first_block);
475             encode_ptr(&dst, isamb->file[i].head.last_block);
476             encode_ptr(&dst, isamb->file[i].head.block_size);
477             encode_ptr(&dst, isamb->file[i].head.block_max);
478             encode_ptr(&dst, isamb->file[i].head.free_list);
479
480             if (isamb->minor_version >= ISAMB_MINOR_VERSION_WITH_ROOT)
481                 encode_ptr(&dst, isamb->root_ptr);
482
483             memset(dst, '\0', b_size); /* ensure no random bytes are written */
484
485             len = dst - hbuf;
486
487             /* print exactly 16 bytes (including trailing 0) */
488             sprintf(hbuf, "isamb%02d %02d %02d\r\n", major,
489                     isamb->minor_version, len);
490
491             bf_write(isamb->file[i].bf, pos, 0, 0, hbuf);
492
493             for (left = len - b_size; left > 0; left = left - b_size)
494             {
495                 pos++;
496                 bf_write(isamb->file[i].bf, pos, 0, 0, hbuf + pos*b_size);
497             }
498         }
499         if (isamb->file[i].bf)
500             bf_close (isamb->file[i].bf);
501     }
502     xfree(isamb->file);
503     xfree(isamb->method);
504     xfree(isamb);
505 }
506
507 /* open_block: read one block at pos.
508    Decode leading sys bytes .. consisting of
509    Offset:Meaning
510    0: leader byte, != 0 leaf, == 0, non-leaf
511    1-2: used size of block
512    3-7*: number of items and all children
513    
514    * Reserve 5 bytes for large block sizes. 1 for small ones .. Number
515    of items. We can thus have at most 2^40 nodes. 
516 */
517 static struct ISAMB_block *open_block(ISAMB b, ISAM_P pos)
518 {
519     int cat = (int) (pos&CAT_MASK);
520     const char *src;
521     int offset = b->file[cat].head.block_offset;
522     struct ISAMB_block *p;
523     if (!pos)
524         return 0;
525     p = xmalloc(sizeof(*p));
526     p->pos = pos;
527     p->cat = (int) (pos & CAT_MASK);
528     p->buf = xmalloc(b->file[cat].head.block_size);
529     p->cbuf = 0;
530
531     if (!cache_block (b, pos, p->buf, 0))
532     {
533         yaz_log(b->log_io, "bf_read: open_block");
534         if (bf_read(b->file[cat].bf, pos/CAT_MAX, 0, 0, p->buf) != 1)
535         {
536             yaz_log(YLOG_FATAL, "isamb: read fail for pos=%ld block=%ld",
537                      (long) pos, (long) pos/CAT_MAX);
538             zebra_exit("isamb:open_block");
539         }
540     }
541     p->bytes = (char *)p->buf + offset;
542     p->leaf = p->buf[0];
543     p->size = (p->buf[1] + 256 * p->buf[2]) - offset;
544     if (p->size < 0)
545     {
546         yaz_log(YLOG_FATAL, "Bad block size %d in pos=" ZINT_FORMAT "\n",
547                  p->size, pos);
548     }
549     assert (p->size >= 0);
550     src = (char*) p->buf + 3;
551     decode_ptr(&src, &p->no_items);
552
553     p->offset = 0;
554     p->dirty = 0;
555     p->deleted = 0;
556     p->decodeClientData = (*b->method->codec.start)();
557     return p;
558 }
559
560 struct ISAMB_block *new_block (ISAMB b, int leaf, int cat)
561 {
562     struct ISAMB_block *p;
563
564     p = xmalloc(sizeof(*p));
565     p->buf = xmalloc(b->file[cat].head.block_size);
566
567     if (!b->file[cat].head.free_list)
568     {
569         zint block_no;
570         block_no = b->file[cat].head.last_block++;
571         p->pos = block_no * CAT_MAX + cat;
572     }
573     else
574     {
575         p->pos = b->file[cat].head.free_list;
576         assert((p->pos & CAT_MASK) == cat);
577         if (!cache_block (b, p->pos, p->buf, 0))
578         {
579             yaz_log(b->log_io, "bf_read: new_block");
580             if (!bf_read(b->file[cat].bf, p->pos/CAT_MAX, 0, 0, p->buf))
581             {
582                 yaz_log(YLOG_FATAL, "isamb: read fail for pos=%ld block=%ld",
583                          (long) p->pos/CAT_MAX, (long) p->pos/CAT_MAX);
584                 zebra_exit("isamb:new_block");
585             }
586         }
587         yaz_log(b->log_freelist, "got block " ZINT_FORMAT " from freelist %d:" ZINT_FORMAT, p->pos,
588                  cat, p->pos/CAT_MAX);
589         memcpy (&b->file[cat].head.free_list, p->buf, sizeof(zint));
590     }
591     p->cat = cat;
592     b->file[cat].head_dirty = 1;
593     memset (p->buf, 0, b->file[cat].head.block_size);
594     p->bytes = (char*)p->buf + b->file[cat].head.block_offset;
595     p->leaf = leaf;
596     p->size = 0;
597     p->dirty = 1;
598     p->deleted = 0;
599     p->offset = 0;
600     p->no_items = 0;
601     p->decodeClientData = (*b->method->codec.start)();
602     return p;
603 }
604
605 struct ISAMB_block *new_leaf (ISAMB b, int cat)
606 {
607     return new_block (b, 1, cat);
608 }
609
610
611 struct ISAMB_block *new_int (ISAMB b, int cat)
612 {
613     return new_block (b, 0, cat);
614 }
615
616 static void check_block (ISAMB b, struct ISAMB_block *p)
617 {
618     assert(b); /* mostly to make the compiler shut up about unused b */
619     if (p->leaf)
620     {
621         ;
622     }
623     else
624     {
625         /* sanity check */
626         char *startp = p->bytes;
627         const char *src = startp;
628         char *endp = p->bytes + p->size;
629         ISAM_P pos;
630         void *c1 = (*b->method->codec.start)();
631             
632         decode_ptr(&src, &pos);
633         assert ((pos&CAT_MASK) == p->cat);
634         while (src != endp)
635         {
636 #if INT_ENCODE
637             char file_item_buf[DST_ITEM_MAX];
638             char *file_item = file_item_buf;
639             (*b->method->codec.reset)(c1);
640             (*b->method->codec.decode)(c1, &file_item, &src);
641 #else
642             zint item_len;
643             decode_item_len(&src, &item_len);
644             assert (item_len > 0 && item_len < 128);
645             src += item_len;
646 #endif
647             decode_ptr(&src, &pos);
648             if ((pos&CAT_MASK) != p->cat)
649             {
650                 assert ((pos&CAT_MASK) == p->cat);
651             }
652         }
653         (*b->method->codec.stop)(c1);
654     }
655 }
656
657 void close_block(ISAMB b, struct ISAMB_block *p)
658 {
659     if (!p)
660         return;
661     if (p->deleted)
662     {
663         yaz_log(b->log_freelist, "release block " ZINT_FORMAT " from freelist %d:" ZINT_FORMAT,
664                  p->pos, p->cat, p->pos/CAT_MAX);
665         memcpy (p->buf, &b->file[p->cat].head.free_list, sizeof(zint));
666         b->file[p->cat].head.free_list = p->pos;
667         if (!cache_block (b, p->pos, p->buf, 1))
668         {
669             yaz_log(b->log_io, "bf_write: close_block (deleted)");
670             bf_write(b->file[p->cat].bf, p->pos/CAT_MAX, 0, 0, p->buf);
671         }
672     }
673     else if (p->dirty)
674     {
675         int offset = b->file[p->cat].head.block_offset;
676         int size = p->size + offset;
677         char *dst =  (char*)p->buf + 3;
678         assert (p->size >= 0);
679         
680         /* memset becuase encode_ptr usually does not write all bytes */
681         memset(p->buf, 0, b->file[p->cat].head.block_offset);
682         p->buf[0] = p->leaf;
683         p->buf[1] = size & 255;
684         p->buf[2] = size >> 8;
685         encode_ptr(&dst, p->no_items);
686         check_block(b, p);
687         if (!cache_block (b, p->pos, p->buf, 1))
688         {
689             yaz_log(b->log_io, "bf_write: close_block");
690             bf_write(b->file[p->cat].bf, p->pos/CAT_MAX, 0, 0, p->buf);
691         }
692     }
693     (*b->method->codec.stop)(p->decodeClientData);
694     xfree(p->buf);
695     xfree(p);
696 }
697
698 int insert_sub (ISAMB b, struct ISAMB_block **p,
699                 void *new_item, int *mode,
700                 ISAMC_I *stream,
701                 struct ISAMB_block **sp,
702                 void *sub_item, int *sub_size,
703                 const void *max_item);
704
705 int insert_int (ISAMB b, struct ISAMB_block *p, void *lookahead_item,
706                 int *mode,
707                 ISAMC_I *stream, struct ISAMB_block **sp,
708                 void *split_item, int *split_size, const void *last_max_item)
709 {
710     char *startp = p->bytes;
711     const char *src = startp;
712     char *endp = p->bytes + p->size;
713     ISAM_P pos;
714     struct ISAMB_block *sub_p1 = 0, *sub_p2 = 0;
715     char sub_item[DST_ITEM_MAX];
716     int sub_size;
717     int more = 0;
718     zint diff_terms = 0;
719     void *c1 = (*b->method->codec.start)();
720
721     *sp = 0;
722
723     assert(p->size >= 0);
724     decode_ptr(&src, &pos);
725     while (src != endp)
726     {
727         int d;
728         const char *src0 = src;
729 #if INT_ENCODE
730         char file_item_buf[DST_ITEM_MAX];
731         char *file_item = file_item_buf;
732         (*b->method->codec.reset)(c1);
733         (*b->method->codec.decode)(c1, &file_item, &src);
734         d = (*b->method->compare_item)(file_item_buf, lookahead_item);
735         if (d > 0)
736         {
737             sub_p1 = open_block(b, pos);
738             assert (sub_p1);
739             diff_terms -= sub_p1->no_items;
740             more = insert_sub (b, &sub_p1, lookahead_item, mode,
741                                stream, &sub_p2, 
742                                sub_item, &sub_size, file_item_buf);
743             diff_terms += sub_p1->no_items;
744             src = src0;
745             break;
746         }
747 #else
748         zint item_len;
749         decode_item_len(&src, &item_len);
750         d = (*b->method->compare_item)(src, lookahead_item);
751         if (d > 0)
752         {
753             sub_p1 = open_block(b, pos);
754             assert (sub_p1);
755             diff_terms -= sub_p1->no_items;
756             more = insert_sub (b, &sub_p1, lookahead_item, mode,
757                                stream, &sub_p2, 
758                                sub_item, &sub_size, src);
759             diff_terms += sub_p1->no_items;
760             src = src0;
761             break;
762         }
763         src += item_len;
764 #endif
765         decode_ptr(&src, &pos);
766     }
767     if (!sub_p1)
768     {
769         /* we reached the end. So lookahead > last item */
770         sub_p1 = open_block(b, pos);
771         assert (sub_p1);
772         diff_terms -= sub_p1->no_items;
773         more = insert_sub (b, &sub_p1, lookahead_item, mode, stream, &sub_p2, 
774                            sub_item, &sub_size, last_max_item);
775         diff_terms += sub_p1->no_items;
776     }
777     if (sub_p2)
778         diff_terms += sub_p2->no_items;
779     if (diff_terms)
780     {
781         p->dirty = 1;
782         p->no_items += diff_terms;
783     }
784     if (sub_p2)
785     {
786         /* there was a split - must insert pointer in this one */
787         char dst_buf[DST_BUF_SIZE];
788         char *dst = dst_buf;
789 #if INT_ENCODE
790         const char *sub_item_ptr = sub_item;
791 #endif
792         assert (sub_size < 128 && sub_size > 1);
793
794         memcpy (dst, startp, src - startp);
795                 
796         dst += src - startp;
797
798 #if INT_ENCODE
799         (*b->method->codec.reset)(c1);
800         (*b->method->codec.encode)(c1, &dst, &sub_item_ptr);
801 #else
802         encode_item_len (&dst, sub_size);      /* sub length and item */
803         memcpy (dst, sub_item, sub_size);
804         dst += sub_size;
805 #endif
806
807         encode_ptr(&dst, sub_p2->pos);   /* pos */
808
809         if (endp - src)                   /* remaining data */
810         {
811             memcpy (dst, src, endp - src);
812             dst += endp - src;
813         }
814         p->size = dst - dst_buf;
815         assert (p->size >= 0);
816
817         if (p->size <= b->file[p->cat].head.block_max)
818         {
819             /* it fits OK in this block */
820             memcpy (startp, dst_buf, dst - dst_buf);
821
822             close_block(b, sub_p2);
823         }
824         else
825         {
826             /* must split _this_ block as well .. */
827             struct ISAMB_block *sub_p3;
828 #if INT_ENCODE
829             char file_item_buf[DST_ITEM_MAX];
830             char *file_item = file_item_buf;
831 #else
832             zint split_size_tmp;
833 #endif
834             zint no_items_first_half = 0;
835             int p_new_size;
836             const char *half;
837             src = dst_buf;
838             endp = dst;
839             
840             b->number_of_int_splits++;
841
842             p->dirty = 1;
843             close_block(b, sub_p2);
844
845             half = src + b->file[p->cat].head.block_size/2;
846             decode_ptr(&src, &pos);
847
848             if (b->enable_int_count)
849             {
850                 /* read sub block so we can get no_items for it */
851                 sub_p3 = open_block(b, pos);
852                 no_items_first_half += sub_p3->no_items;
853                 close_block(b, sub_p3);
854             }
855
856             while (src <= half)
857             {
858 #if INT_ENCODE
859                 file_item = file_item_buf;
860                 (*b->method->codec.reset)(c1);
861                 (*b->method->codec.decode)(c1, &file_item, &src);
862 #else
863                 decode_item_len(&src, &split_size_tmp);
864                 *split_size = (int) split_size_tmp;
865                 src += *split_size;
866 #endif
867                 decode_ptr(&src, &pos);
868
869                 if (b->enable_int_count)
870                 {
871                     /* read sub block so we can get no_items for it */
872                     sub_p3 = open_block(b, pos);
873                     no_items_first_half += sub_p3->no_items;
874                     close_block(b, sub_p3);
875                 }
876             }
877             /*  p is first half */
878             p_new_size = src - dst_buf;
879             memcpy (p->bytes, dst_buf, p_new_size);
880
881 #if INT_ENCODE
882             file_item = file_item_buf;
883             (*b->method->codec.reset)(c1);
884             (*b->method->codec.decode)(c1, &file_item, &src);
885             *split_size = file_item - file_item_buf;
886             memcpy(split_item, file_item_buf, *split_size);
887 #else
888             decode_item_len(&src, &split_size_tmp);
889             *split_size = (int) split_size_tmp;
890             memcpy (split_item, src, *split_size);
891             src += *split_size;
892 #endif
893             /*  *sp is second half */
894             *sp = new_int (b, p->cat);
895             (*sp)->size = endp - src;
896             memcpy ((*sp)->bytes, src, (*sp)->size);
897
898             p->size = p_new_size;
899
900             /*  adjust no_items in first&second half */
901             (*sp)->no_items = p->no_items - no_items_first_half;
902             p->no_items = no_items_first_half;
903         }
904         p->dirty = 1;
905     }
906     close_block(b, sub_p1);
907     (*b->method->codec.stop)(c1);
908     return more;
909 }
910
911 int insert_leaf (ISAMB b, struct ISAMB_block **sp1, void *lookahead_item,
912                  int *lookahead_mode, ISAMC_I *stream,
913                  struct ISAMB_block **sp2,
914                  void *sub_item, int *sub_size,
915                  const void *max_item)
916 {
917     struct ISAMB_block *p = *sp1;
918     char *endp = 0;
919     const char *src = 0;
920     char dst_buf[DST_BUF_SIZE], *dst = dst_buf;
921     int new_size;
922     void *c1 = (*b->method->codec.start)();
923     void *c2 = (*b->method->codec.start)();
924     int more = 1;
925     int quater = b->file[b->no_cat-1].head.block_max / 4;
926     char *mid_cut = dst_buf + quater * 2;
927     char *tail_cut = dst_buf + quater * 3;
928     char *maxp = dst_buf + b->file[b->no_cat-1].head.block_max;
929     char *half1 = 0;
930     char *half2 = 0;
931     char cut_item_buf[DST_ITEM_MAX];
932     int cut_item_size = 0;
933     int no_items = 0;    /* number of items (total) */
934     int no_items_1 = 0;  /* number of items (first half) */
935     int inserted_dst_bytes = 0;
936
937     if (p && p->size)
938     {
939         char file_item_buf[DST_ITEM_MAX];
940         char *file_item = file_item_buf;
941             
942         src = p->bytes;
943         endp = p->bytes + p->size;
944         (*b->method->codec.decode)(c1, &file_item, &src);
945         while (1)
946         {
947             const char *dst_item = 0; /* resulting item to be inserted */
948             char *lookahead_next;
949             char *dst_0 = dst;
950             int d = -1;
951             
952             if (lookahead_item)
953                 d = (*b->method->compare_item)(file_item_buf, lookahead_item);
954             
955             /* d now holds comparison between existing file item and
956                lookahead item 
957                d = 0: equal
958                d > 0: lookahead before file
959                d < 0: lookahead after file
960             */
961             if (d > 0)
962             {
963                 /* lookahead must be inserted */
964                 dst_item = lookahead_item;
965                 /* if this is not an insertion, it's really bad .. */
966                 if (!*lookahead_mode)
967                 {
968                     yaz_log(YLOG_WARN, "isamb: Inconsistent register (1)");
969                     assert(*lookahead_mode);
970                 }
971             }
972             else
973                 dst_item = file_item_buf;
974
975             if (!*lookahead_mode && d == 0)
976             {
977                 /* it's a deletion and they match so there is nothing to be
978                    inserted anyway .. But mark the thing bad (file item
979                    was part of input.. The item will not be part of output */
980                 p->dirty = 1;
981             }
982             else if (!half1 && dst > mid_cut)
983             {
984                 /* we have reached the splitting point for the first time */
985                 const char *dst_item_0 = dst_item;
986                 half1 = dst; /* candidate for splitting */
987
988                 /* encode the resulting item */
989                 (*b->method->codec.encode)(c2, &dst, &dst_item);
990                 
991                 cut_item_size = dst_item - dst_item_0;
992                 assert(cut_item_size > 0);
993                 memcpy (cut_item_buf, dst_item_0, cut_item_size);
994                 
995                 half2 = dst;
996                 no_items_1 = no_items;
997                 no_items++;
998             }
999             else
1000             {
1001                 /* encode the resulting item */
1002                 (*b->method->codec.encode)(c2, &dst, &dst_item);
1003                 no_items++;
1004             }
1005
1006             /* now move "pointers" .. result has been encoded .. */
1007             if (d > 0)  
1008             {
1009                 /* we must move the lookahead pointer */
1010
1011                 inserted_dst_bytes += (dst - dst_0);
1012                 if (inserted_dst_bytes >=  quater)
1013                     /* no more room. Mark lookahead as "gone".. */
1014                     lookahead_item = 0;
1015                 else
1016                 {
1017                     /* move it really.. */
1018                     lookahead_next = lookahead_item;
1019                     if (!(*stream->read_item)(stream->clientData,
1020                                               &lookahead_next,
1021                                               lookahead_mode))
1022                     {
1023                         /* end of stream reached: no "more" and no lookahead */
1024                         lookahead_item = 0;
1025                         more = 0;
1026                     }
1027                     if (lookahead_item && max_item &&
1028                         (*b->method->compare_item)(max_item, lookahead_item) <= 0)
1029                     {
1030                         /* the lookahead goes beyond what we allow in this
1031                            leaf. Mark it as "gone" */
1032                         lookahead_item = 0;
1033                     }
1034                     
1035                     p->dirty = 1;
1036                 }
1037             }
1038             else if (d == 0)
1039             {
1040                 /* exact match .. move both pointers */
1041
1042                 lookahead_next = lookahead_item;
1043                 if (!(*stream->read_item)(stream->clientData,
1044                                           &lookahead_next, lookahead_mode))
1045                 {
1046                     lookahead_item = 0;
1047                     more = 0;
1048                 }
1049                 if (src == endp)
1050                     break;  /* end of file stream reached .. */
1051                 file_item = file_item_buf; /* move file pointer */
1052                 (*b->method->codec.decode)(c1, &file_item, &src);
1053             }
1054             else
1055             {
1056                 /* file pointer must be moved */
1057                 if (src == endp)
1058                     break;
1059                 file_item = file_item_buf;
1060                 (*b->method->codec.decode)(c1, &file_item, &src);
1061             }
1062         }
1063     }
1064
1065     /* this loop runs when we are "appending" to a leaf page. That is
1066        either it's empty (new) or all file items have been read in
1067        previous loop */
1068
1069     maxp = dst_buf + b->file[b->no_cat-1].head.block_max + quater;
1070     while (lookahead_item)
1071     {
1072         char *dst_item;
1073         const char *src = lookahead_item;
1074         char *dst_0 = dst;
1075         
1076         /* if we have a lookahead item, we stop if we exceed the value of it */
1077         if (max_item &&
1078             (*b->method->compare_item)(max_item, lookahead_item) <= 0)
1079         {
1080             /* stop if we have reached the value of max item */
1081             break;
1082         }
1083         if (!*lookahead_mode)
1084         {
1085             /* this is append. So a delete is bad */
1086             yaz_log(YLOG_WARN, "isamb: Inconsistent register (2)");
1087             assert(*lookahead_mode);
1088         }
1089         else if (!half1 && dst > tail_cut)
1090         {
1091             const char *src_0 = src;
1092             half1 = dst; /* candidate for splitting */
1093             
1094             (*b->method->codec.encode)(c2, &dst, &src);
1095             
1096             cut_item_size = src - src_0;
1097             assert(cut_item_size > 0);
1098             memcpy (cut_item_buf, src_0, cut_item_size);
1099             
1100             no_items_1 = no_items;
1101             half2 = dst;
1102         }
1103         else
1104             (*b->method->codec.encode)(c2, &dst, &src);
1105
1106         if (dst > maxp)
1107         {
1108             dst = dst_0;
1109             break;
1110         }
1111         no_items++;
1112         if (p)
1113             p->dirty = 1;
1114         dst_item = lookahead_item;
1115         if (!(*stream->read_item)(stream->clientData, &dst_item,
1116                                   lookahead_mode))
1117         {
1118             lookahead_item = 0;
1119             more = 0;
1120         }
1121     }
1122     new_size = dst - dst_buf;
1123     if (p && p->cat != b->no_cat-1 && 
1124         new_size > b->file[p->cat].head.block_max)
1125     {
1126         /* non-btree block will be removed */
1127         p->deleted = 1;
1128         close_block(b, p);
1129         /* delete it too!! */
1130         p = 0; /* make a new one anyway */
1131     }
1132     if (!p)
1133     {   /* must create a new one */
1134         int i;
1135         for (i = 0; i < b->no_cat; i++)
1136             if (new_size <= b->file[i].head.block_max)
1137                 break;
1138         if (i == b->no_cat)
1139             i = b->no_cat - 1;
1140         p = new_leaf (b, i);
1141     }
1142     if (new_size > b->file[p->cat].head.block_max)
1143     {
1144         char *first_dst;
1145         const char *cut_item = cut_item_buf;
1146
1147         assert (half1);
1148         assert (half2);
1149
1150         assert(cut_item_size > 0);
1151         
1152         /* first half */
1153         p->size = half1 - dst_buf;
1154         assert(p->size <=  b->file[p->cat].head.block_max);
1155         memcpy (p->bytes, dst_buf, half1 - dst_buf);
1156         p->no_items = no_items_1;
1157
1158         /* second half */
1159         *sp2 = new_leaf (b, p->cat);
1160
1161         (*b->method->codec.reset)(c2);
1162
1163         b->number_of_leaf_splits++;
1164
1165         first_dst = (*sp2)->bytes;
1166
1167         (*b->method->codec.encode)(c2, &first_dst, &cut_item);
1168
1169         memcpy (first_dst, half2, dst - half2);
1170
1171         (*sp2)->size = (first_dst - (*sp2)->bytes) + (dst - half2);
1172         assert((*sp2)->size <=  b->file[p->cat].head.block_max);
1173         (*sp2)->no_items = no_items - no_items_1;
1174         (*sp2)->dirty = 1;
1175         p->dirty = 1;
1176         memcpy (sub_item, cut_item_buf, cut_item_size);
1177         *sub_size = cut_item_size;
1178     }
1179     else
1180     {
1181         memcpy (p->bytes, dst_buf, dst - dst_buf);
1182         p->size = new_size;
1183         p->no_items = no_items;
1184     }
1185     (*b->method->codec.stop)(c1);
1186     (*b->method->codec.stop)(c2);
1187     *sp1 = p;
1188     return more;
1189 }
1190
1191 int insert_sub (ISAMB b, struct ISAMB_block **p, void *new_item,
1192                 int *mode,
1193                 ISAMC_I *stream,
1194                 struct ISAMB_block **sp,
1195                 void *sub_item, int *sub_size,
1196                 const void *max_item)
1197 {
1198     if (!*p || (*p)->leaf)
1199         return insert_leaf (b, p, new_item, mode, stream, sp, sub_item, 
1200                             sub_size, max_item);
1201     else
1202         return insert_int (b, *p, new_item, mode, stream, sp, sub_item,
1203                            sub_size, max_item);
1204 }
1205
1206 int isamb_unlink (ISAMB b, ISAM_P pos)
1207 {
1208     struct ISAMB_block *p1;
1209
1210     if (!pos)
1211         return 0;
1212     p1 = open_block(b, pos);
1213     p1->deleted = 1;
1214     if (!p1->leaf)
1215     {
1216         zint sub_p;
1217         const char *src = p1->bytes + p1->offset;
1218 #if INT_ENCODE
1219         void *c1 = (*b->method->codec.start)();
1220 #endif
1221         decode_ptr(&src, &sub_p);
1222         isamb_unlink(b, sub_p);
1223         
1224         while (src != p1->bytes + p1->size)
1225         {
1226 #if INT_ENCODE
1227             char file_item_buf[DST_ITEM_MAX];
1228             char *file_item = file_item_buf;
1229             (*b->method->codec.reset)(c1);
1230             (*b->method->codec.decode)(c1, &file_item, &src);
1231 #else
1232             zint item_len;
1233             decode_item_len(&src, &item_len);
1234             src += item_len;
1235 #endif
1236             decode_ptr(&src, &sub_p);
1237             isamb_unlink(b, sub_p);
1238         }
1239 #if INT_ENCODE
1240         (*b->method->codec.stop)(c1);
1241 #endif
1242     }
1243     close_block(b, p1);
1244     return 0;
1245 }
1246
1247 void isamb_merge(ISAMB b, ISAM_P *pos, ISAMC_I *stream)
1248 {
1249     char item_buf[DST_ITEM_MAX];
1250     char *item_ptr;
1251     int i_mode;
1252     int more;
1253     int must_delete = 0;
1254
1255     if (b->cache < 0)
1256     {
1257         int more = 1;
1258         while (more)
1259         {
1260             item_ptr = item_buf;
1261             more =
1262                 (*stream->read_item)(stream->clientData, &item_ptr, &i_mode);
1263         }
1264         *pos = 1;
1265         return;
1266     }
1267     item_ptr = item_buf;
1268     more = (*stream->read_item)(stream->clientData, &item_ptr, &i_mode);
1269     while (more)
1270     {
1271         struct ISAMB_block *p = 0, *sp = 0;
1272         char sub_item[DST_ITEM_MAX];
1273         int sub_size;
1274         
1275         if (*pos)
1276             p = open_block(b, *pos);
1277         more = insert_sub (b, &p, item_buf, &i_mode, stream, &sp,
1278                            sub_item, &sub_size, 0);
1279         if (sp)
1280         {    /* increase level of tree by one */
1281             struct ISAMB_block *p2 = new_int (b, p->cat);
1282             char *dst = p2->bytes + p2->size;
1283 #if INT_ENCODE
1284             void *c1 = (*b->method->codec.start)();
1285             const char *sub_item_ptr = sub_item;
1286 #endif
1287
1288             encode_ptr(&dst, p->pos);
1289             assert (sub_size < 128 && sub_size > 1);
1290 #if INT_ENCODE
1291             (*b->method->codec.reset)(c1);
1292             (*b->method->codec.encode)(c1, &dst, &sub_item_ptr);
1293 #else
1294             encode_item_len (&dst, sub_size);
1295             memcpy (dst, sub_item, sub_size);
1296             dst += sub_size;
1297 #endif
1298             encode_ptr(&dst, sp->pos);
1299             
1300             p2->size = dst - p2->bytes;
1301             p2->no_items = p->no_items + sp->no_items;
1302             *pos = p2->pos;  /* return new super page */
1303             close_block(b, sp);
1304             close_block(b, p2);
1305 #if INT_ENCODE
1306             (*b->method->codec.stop)(c1);
1307 #endif
1308         }
1309         else
1310         {
1311             *pos = p->pos;   /* return current one (again) */
1312         }
1313         if (p->no_items == 0)
1314             must_delete = 1;
1315         else
1316             must_delete = 0;
1317         close_block(b, p);
1318     }
1319     if (must_delete)
1320     {
1321         isamb_unlink(b, *pos);
1322         *pos = 0;
1323     }
1324 }
1325
1326 ISAMB_PP isamb_pp_open_x(ISAMB isamb, ISAM_P pos, int *level, int scope)
1327 {
1328     ISAMB_PP pp = xmalloc(sizeof(*pp));
1329     int i;
1330
1331     assert(pos);
1332
1333     pp->isamb = isamb;
1334     pp->block = xmalloc(ISAMB_MAX_LEVEL * sizeof(*pp->block));
1335
1336     pp->pos = pos;
1337     pp->level = 0;
1338     pp->maxlevel = 0;
1339     pp->total_size = 0;
1340     pp->no_blocks = 0;
1341     pp->skipped_numbers = 0;
1342     pp->returned_numbers = 0;
1343     pp->scope = scope;
1344     for (i = 0; i<ISAMB_MAX_LEVEL; i++)
1345         pp->skipped_nodes[i] = pp->accessed_nodes[i] = 0;
1346     while (1)
1347     {
1348         struct ISAMB_block *p = open_block(isamb, pos);
1349         const char *src = p->bytes + p->offset;
1350         pp->block[pp->level] = p;
1351
1352         pp->total_size += p->size;
1353         pp->no_blocks++;
1354         if (p->leaf)
1355             break;
1356         decode_ptr(&src, &pos);
1357         p->offset = src - p->bytes;
1358         pp->level++;
1359         pp->accessed_nodes[pp->level]++; 
1360     }
1361     pp->block[pp->level+1] = 0;
1362     pp->maxlevel = pp->level;
1363     if (level)
1364         *level = pp->level;
1365     return pp;
1366 }
1367
1368 ISAMB_PP isamb_pp_open (ISAMB isamb, ISAM_P pos, int scope)
1369 {
1370     return isamb_pp_open_x(isamb, pos, 0, scope);
1371 }
1372
1373 void isamb_pp_close_x(ISAMB_PP pp, zint *size, zint *blocks)
1374 {
1375     int i;
1376     if (!pp)
1377         return;
1378     yaz_log(YLOG_DEBUG, "isamb_pp_close lev=%d returned "ZINT_FORMAT" values, " 
1379                     "skipped "ZINT_FORMAT,
1380         pp->maxlevel, pp->skipped_numbers, pp->returned_numbers);
1381     for (i = pp->maxlevel; i>=0; i--)
1382         if (pp->skipped_nodes[i] || pp->accessed_nodes[i])
1383             yaz_log(YLOG_DEBUG, "isamb_pp_close  level leaf-%d: "
1384                             ZINT_FORMAT" read, "ZINT_FORMAT" skipped", i,
1385                  pp->accessed_nodes[i], pp->skipped_nodes[i]);
1386     pp->isamb->skipped_numbers += pp->skipped_numbers;
1387     pp->isamb->returned_numbers += pp->returned_numbers;
1388     for (i = pp->maxlevel; i>=0; i--)
1389     {
1390         pp->isamb->accessed_nodes[i] += pp->accessed_nodes[i];
1391         pp->isamb->skipped_nodes[i] += pp->skipped_nodes[i];
1392     }
1393     if (size)
1394         *size = pp->total_size;
1395     if (blocks)
1396         *blocks = pp->no_blocks;
1397     for (i = 0; i <= pp->level; i++)
1398         close_block(pp->isamb, pp->block[i]);
1399     xfree(pp->block);
1400     xfree(pp);
1401 }
1402
1403 int isamb_block_info (ISAMB isamb, int cat)
1404 {
1405     if (cat >= 0 && cat < isamb->no_cat)
1406         return isamb->file[cat].head.block_size;
1407     return -1;
1408 }
1409
1410 void isamb_pp_close (ISAMB_PP pp)
1411 {
1412     isamb_pp_close_x(pp, 0, 0);
1413 }
1414
1415 /* simple recursive dumper .. */
1416 static void isamb_dump_r (ISAMB b, ISAM_P pos, void (*pr)(const char *str),
1417                           int level)
1418 {
1419     char buf[1024];
1420     char prefix_str[1024];
1421     if (pos)
1422     {
1423         struct ISAMB_block *p = open_block(b, pos);
1424         sprintf(prefix_str, "%*s " ZINT_FORMAT " cat=%d size=%d max=%d items="
1425                 ZINT_FORMAT, level*2, "",
1426                 pos, p->cat, p->size, b->file[p->cat].head.block_max,
1427                 p->no_items);
1428         (*pr)(prefix_str);
1429         sprintf(prefix_str, "%*s " ZINT_FORMAT, level*2, "", pos);
1430         if (p->leaf)
1431         {
1432             while (p->offset < p->size)
1433             {
1434                 const char *src = p->bytes + p->offset;
1435                 char *dst = buf;
1436                 (*b->method->codec.decode)(p->decodeClientData, &dst, &src);
1437                 (*b->method->log_item)(YLOG_DEBUG, buf, prefix_str);
1438                 p->offset = src - (char*) p->bytes;
1439             }
1440             assert(p->offset == p->size);
1441         }
1442         else
1443         {
1444             const char *src = p->bytes + p->offset;
1445             ISAM_P sub;
1446
1447             decode_ptr(&src, &sub);
1448             p->offset = src - (char*) p->bytes;
1449
1450             isamb_dump_r(b, sub, pr, level+1);
1451             
1452             while (p->offset < p->size)
1453             {
1454 #if INT_ENCODE
1455                 char file_item_buf[DST_ITEM_MAX];
1456                 char *file_item = file_item_buf;
1457                 void *c1 = (*b->method->codec.start)();
1458                 (*b->method->codec.decode)(c1, &file_item, &src);
1459                 (*b->method->codec.stop)(c1);
1460                 (*b->method->log_item)(YLOG_DEBUG, file_item_buf, prefix_str);
1461 #else
1462                 zint item_len;
1463                 decode_item_len(&src, &item_len);
1464                 (*b->method->log_item)(YLOG_DEBUG, src, prefix_str);
1465                 src += item_len;
1466 #endif
1467                 decode_ptr(&src, &sub);
1468                 
1469                 p->offset = src - (char*) p->bytes;
1470                 
1471                 isamb_dump_r(b, sub, pr, level+1);
1472             }           
1473         }
1474         close_block(b, p);
1475     }
1476 }
1477
1478 void isamb_dump(ISAMB b, ISAM_P pos, void (*pr)(const char *str))
1479 {
1480     isamb_dump_r(b, pos, pr, 0);
1481 }
1482
1483 int isamb_pp_read(ISAMB_PP pp, void *buf)
1484 {
1485     return isamb_pp_forward(pp, buf, 0);
1486 }
1487
1488
1489 static int isamb_pp_on_right_node(ISAMB_PP pp, int level, const void *untilbuf)
1490 { /* looks one node higher to see if we should be on this node at all */
1491   /* useful in backing off quickly, and in avoiding tail descends */
1492   /* call with pp->level to begin with */
1493     struct ISAMB_block *p;
1494     int cmp;
1495     const char *src;
1496     ISAMB b = pp->isamb;
1497
1498     assert(level >= 0);
1499     if (level == 0)
1500     {
1501 #if ISAMB_DEBUG
1502             yaz_log(YLOG_DEBUG, "isamb_pp_on_right returning true for root");
1503 #endif
1504         return 1; /* we can never skip the root node */
1505     }
1506     level--;
1507     p = pp->block[level];
1508     assert(p->offset <= p->size);
1509     if (p->offset < p->size)
1510     {
1511 #if INT_ENCODE
1512         char file_item_buf[DST_ITEM_MAX];
1513         char *file_item = file_item_buf;
1514         void *c1 = (*b->method->codec.start)();
1515         assert(p->offset > 0); 
1516         src = p->bytes + p->offset;
1517         (*b->method->codec.decode)(c1, &file_item, &src);
1518         (*b->method->codec.stop)(c1);
1519         cmp = (*b->method->compare_item)(untilbuf, file_item_buf);
1520 #else
1521         zint item_len;
1522         assert(p->offset > 0); 
1523         src = p->bytes + p->offset;
1524         decode_item_len(&src, &item_len);
1525 #if ISAMB_DEBUG
1526         (*b->method->codec.log_item)(YLOG_DEBUG, untilbuf, "on_leaf: until");
1527         (*b->method->codec.log_item)(YLOG_DEBUG, src, "on_leaf: value");
1528 #endif
1529         cmp = (*b->method->compare_item)(untilbuf, src);
1530 #endif
1531         if (cmp < pp->scope)
1532         {  /* cmp<2 */
1533 #if ISAMB_DEBUG
1534             yaz_log(YLOG_DEBUG, "isamb_pp_on_right returning true "
1535                             "cmp=%d lev=%d ofs=%d", cmp, level, p->offset);
1536 #endif
1537             return 1; 
1538         }
1539         else
1540         {
1541 #if ISAMB_DEBUG
1542             yaz_log(YLOG_DEBUG, "isamb_pp_on_right returning false "
1543                             "cmp=%d lev=%d ofs=%d", cmp, level, p->offset);
1544 #endif
1545             return 0; 
1546         }
1547     }
1548     else {
1549 #if ISAMB_DEBUG
1550         yaz_log(YLOG_DEBUG, "isamb_pp_on_right at tail, looking higher "
1551                         "lev=%d", level);
1552 #endif
1553         return isamb_pp_on_right_node(pp, level, untilbuf);
1554     }
1555 } /* isamb_pp_on_right_node */
1556
1557 static int isamb_pp_read_on_leaf(ISAMB_PP pp, void *buf)
1558
1559     /* reads the next item on the current leaf, returns 0 if end of leaf*/
1560     struct ISAMB_block *p = pp->block[pp->level];
1561     char *dst;
1562     const char *src;
1563     assert(pp);
1564     assert(buf);
1565     if (p->offset == p->size)
1566     {
1567 #if ISAMB_DEBUG
1568         yaz_log(YLOG_DEBUG, "isamb_pp_read_on_leaf returning 0 on "
1569                 "node %d", p->pos);
1570 #endif
1571         return 0; /* at end of leaf */
1572     }
1573     src = p->bytes + p->offset;
1574     dst = buf;
1575     (*pp->isamb->method->codec.decode)(p->decodeClientData, &dst, &src);
1576     p->offset = src - (char*) p->bytes;
1577 #if ISAMB_DEBUG
1578     (*pp->isamb->method->codec.log_item)(YLOG_DEBUG, buf,
1579                                          "read_on_leaf returning 1");
1580 #endif
1581     pp->returned_numbers++;
1582     return 1;
1583 } /* read_on_leaf */
1584
1585 static int isamb_pp_forward_on_leaf(ISAMB_PP pp, void *buf, const void *untilbuf)
1586 { /* forwards on the current leaf, returns 0 if not found */
1587     int cmp;
1588     int skips = 0;
1589     while (1)
1590     {
1591         if (!isamb_pp_read_on_leaf(pp, buf))
1592             return 0;
1593         /* FIXME - this is an extra function call, inline the read? */
1594         cmp=(*pp->isamb->method->compare_item)(untilbuf, buf);
1595         if (cmp <pp->scope)
1596         {  /* cmp<2  found a good one */
1597 #if ISAMB_DEBUG
1598             if (skips)
1599                 yaz_log(YLOG_DEBUG, "isam_pp_fwd_on_leaf skipped %d items", skips);
1600 #endif
1601             pp->returned_numbers++;
1602             return 1;
1603         }
1604         if (!skips)
1605             if (!isamb_pp_on_right_node(pp, pp->level, untilbuf))
1606                 return 0; /* never mind the rest of this leaf */
1607         pp->skipped_numbers++;
1608         skips++;
1609     }
1610 } /* forward_on_leaf */
1611
1612 static int isamb_pp_climb_level(ISAMB_PP pp, ISAM_P *pos)
1613 { /* climbs higher in the tree, until finds a level with data left */
1614   /* returns the node to (consider to) descend to in *pos) */
1615     struct ISAMB_block *p = pp->block[pp->level];
1616     const char *src;
1617 #if ISAMB_DEBUG
1618     yaz_log(YLOG_DEBUG, "isamb_pp_climb_level starting "
1619                    "at level %d node %d ofs=%d sz=%d",
1620                     pp->level, p->pos, p->offset, p->size);
1621 #endif
1622     assert(pp->level >= 0);
1623     assert(p->offset <= p->size);
1624     if (pp->level==0)
1625     {
1626 #if ISAMB_DEBUG
1627         yaz_log(YLOG_DEBUG, "isamb_pp_climb_level returning 0 at root");
1628 #endif
1629         return 0;
1630     }
1631     assert(pp->level>0); 
1632     close_block(pp->isamb, pp->block[pp->level]);
1633     pp->block[pp->level] = 0;
1634     (pp->level)--;
1635     p = pp->block[pp->level];
1636 #if ISAMB_DEBUG
1637     yaz_log(YLOG_DEBUG, "isamb_pp_climb_level climbed to level %d node %d ofs=%d",
1638                     pp->level, p->pos, p->offset);
1639 #endif
1640     assert(!p->leaf);
1641     assert(p->offset <= p->size);
1642     if (p->offset == p->size)
1643     {
1644         /* we came from the last pointer, climb on */
1645         if (!isamb_pp_climb_level(pp, pos))
1646             return 0;
1647         p = pp->block[pp->level];
1648     }
1649     else
1650     {
1651 #if INT_ENCODE
1652         char file_item_buf[DST_ITEM_MAX];
1653         char *file_item = file_item_buf;
1654         ISAMB b = pp->isamb;
1655         void *c1 = (*b->method->codec.start)();
1656 #else
1657         zint item_len;
1658 #endif
1659         /* skip the child we just came from */
1660 #if ISAMB_DEBUG
1661         yaz_log(YLOG_DEBUG, "isam_pp_climb_level: skipping lev=%d ofs=%d sz=%d", 
1662                         pp->level, p->offset, p->size);
1663 #endif
1664         assert (p->offset < p->size);
1665         src = p->bytes + p->offset;
1666 #if INT_ENCODE
1667         (*b->method->codec.decode)(c1, &file_item, &src);
1668         (*b->method->codec.stop)(c1);
1669 #else
1670         decode_item_len(&src, &item_len);
1671         src += item_len;
1672 #endif
1673         decode_ptr(&src, pos);
1674         p->offset = src - (char *)p->bytes;
1675             
1676     }
1677     return 1;
1678 } /* climb_level */
1679
1680
1681 static zint isamb_pp_forward_unode(ISAMB_PP pp, zint pos, const void *untilbuf)
1682 { /* scans a upper node until it finds a child <= untilbuf */
1683   /* pp points to the key value, as always. pos is the child read from */
1684   /* the buffer */
1685   /* if all values are too small, returns the last child in the node */
1686   /* FIXME - this can be detected, and avoided by looking at the */
1687   /* parent node, but that gets messy. Presumably the cost is */
1688   /* pretty low anyway */
1689     ISAMB b = pp->isamb;
1690     struct ISAMB_block *p = pp->block[pp->level];
1691     const char *src = p->bytes + p->offset;
1692     int cmp;
1693     zint nxtpos;
1694 #if ISAMB_DEBUG
1695     int skips = 0;
1696     yaz_log(YLOG_DEBUG, "isamb_pp_forward_unode starting "
1697                    "at level %d node %d ofs=%di sz=%d",
1698                     pp->level, p->pos, p->offset, p->size);
1699 #endif
1700     assert(!p->leaf);
1701     assert(p->offset <= p->size);
1702     if (p->offset == p->size)
1703     {
1704 #if ISAMB_DEBUG
1705             yaz_log(YLOG_DEBUG, "isamb_pp_forward_unode returning at end "
1706                    "at level %d node %d ofs=%di sz=%d",
1707                     pp->level, p->pos, p->offset, p->size);
1708 #endif
1709         return pos; /* already at the end of it */
1710     }
1711     while(p->offset < p->size)
1712     {
1713 #if INT_ENCODE
1714         char file_item_buf[DST_ITEM_MAX];
1715         char *file_item = file_item_buf;
1716         void *c1 = (*b->method->codec.start)();
1717         (*b->method->codec.decode)(c1, &file_item, &src);
1718         (*b->method->codec.stop)(c1);
1719         cmp = (*b->method->compare_item)(untilbuf, file_item_buf);
1720 #else
1721         zint item_len;
1722         decode_item_len(&src, &item_len);
1723         cmp = (*b->method->compare_item)(untilbuf, src);
1724         src += item_len;
1725 #endif
1726         decode_ptr(&src, &nxtpos);
1727         if (cmp<pp->scope) /* cmp<2 */
1728         {
1729 #if ISAMB_DEBUG
1730             yaz_log(YLOG_DEBUG, "isamb_pp_forward_unode returning a hit "
1731                    "at level %d node %d ofs=%d sz=%d",
1732                     pp->level, p->pos, p->offset, p->size);
1733 #endif
1734             return pos;
1735         } /* found one */
1736         pos = nxtpos;
1737         p->offset = src-(char*)p->bytes;
1738         (pp->skipped_nodes[pp->maxlevel - pp->level -1])++;
1739 #if ISAMB_DEBUG
1740         skips++;
1741 #endif
1742     }
1743 #if ISAMB_DEBUG
1744             yaz_log(YLOG_DEBUG, "isamb_pp_forward_unode returning at tail "
1745                    "at level %d node %d ofs=%d sz=%d skips=%d",
1746                     pp->level, p->pos, p->offset, p->size, skips);
1747 #endif
1748     return pos; /* that's the last one in the line */
1749     
1750 } /* forward_unode */
1751
1752 static void isamb_pp_descend_to_leaf(ISAMB_PP pp, ISAM_P pos,
1753                                      const void *untilbuf)
1754 { /* climbs down the tree, from pos, to the leftmost leaf */
1755     struct ISAMB_block *p = pp->block[pp->level];
1756     const char *src;
1757     assert(!p->leaf);
1758 #if ISAMB_DEBUG
1759     yaz_log(YLOG_DEBUG, "isamb_pp_descend_to_leaf "
1760                    "starting at lev %d node %d ofs=%d lf=%d u=%p", 
1761                    pp->level, p->pos, p->offset, p->leaf, untilbuf);
1762 #endif
1763     if (untilbuf)
1764         pos = isamb_pp_forward_unode(pp, pos, untilbuf);
1765     ++(pp->level);
1766     assert(pos);
1767     p = open_block(pp->isamb, pos);
1768     pp->block[pp->level] = p;
1769     ++(pp->accessed_nodes[pp->maxlevel-pp->level]);
1770     ++(pp->no_blocks);
1771 #if ISAMB_DEBUG
1772     yaz_log(YLOG_DEBUG, "isamb_pp_descend_to_leaf "
1773                    "got lev %d node %d lf=%d", 
1774                    pp->level, p->pos, p->leaf);
1775 #endif
1776     if (p->leaf)
1777         return;
1778     assert (p->offset==0);
1779     src = p->bytes + p->offset;
1780     decode_ptr(&src, &pos);
1781     p->offset = src-(char*)p->bytes;
1782     isamb_pp_descend_to_leaf(pp, pos, untilbuf);
1783 #if ISAMB_DEBUG
1784     yaz_log(YLOG_DEBUG, "isamb_pp_descend_to_leaf "
1785                    "returning at lev %d node %d ofs=%d lf=%d", 
1786                    pp->level, p->pos, p->offset, p->leaf);
1787 #endif
1788 } /* descend_to_leaf */
1789
1790 static int isamb_pp_find_next_leaf(ISAMB_PP pp)
1791 { /* finds the next leaf by climbing up and down */
1792     ISAM_P pos;
1793     if (!isamb_pp_climb_level(pp, &pos))
1794         return 0;
1795     isamb_pp_descend_to_leaf(pp, pos, 0);
1796     return 1;
1797 }
1798
1799 static int isamb_pp_climb_desc(ISAMB_PP pp,  const void *untilbuf)
1800 { /* climbs up and descends to a leaf where values >= *untilbuf are found */
1801     ISAM_P pos;
1802 #if ISAMB_DEBUG
1803     struct ISAMB_block *p = pp->block[pp->level];
1804     yaz_log(YLOG_DEBUG, "isamb_pp_climb_desc starting "
1805                    "at level %d node %d ofs=%d sz=%d",
1806                     pp->level, p->pos, p->offset, p->size);
1807 #endif
1808     if (!isamb_pp_climb_level(pp, &pos))
1809         return 0;
1810     /* see if it would pay to climb one higher */
1811     if (!isamb_pp_on_right_node(pp, pp->level, untilbuf))
1812         if (!isamb_pp_climb_level(pp, &pos))
1813             return 0;
1814     isamb_pp_descend_to_leaf(pp, pos, untilbuf);
1815 #if ISAMB_DEBUG
1816     p = pp->block[pp->level];
1817     yaz_log(YLOG_DEBUG, "isamb_pp_climb_desc done "
1818                    "at level %d node %d ofs=%d sz=%d",
1819                     pp->level, p->pos, p->offset, p->size);
1820 #endif
1821     return 1;
1822 } /* climb_desc */
1823
1824 int isamb_pp_forward (ISAMB_PP pp, void *buf, const void *untilbuf)
1825 {
1826 #if ISAMB_DEBUG
1827     struct ISAMB_block *p = pp->block[pp->level];
1828     assert(p->leaf);
1829     yaz_log(YLOG_DEBUG, "isamb_pp_forward starting "
1830                    "at level %d node %d ofs=%d sz=%d u=%p sc=%d",
1831                     pp->level, p->pos, p->offset, p->size, untilbuf, scope);
1832 #endif
1833     if (untilbuf)
1834     {
1835         if (isamb_pp_forward_on_leaf(pp, buf, untilbuf))
1836         {
1837 #if ISAMB_DEBUG
1838             yaz_log(YLOG_DEBUG, "isamb_pp_forward (f) returning (A) "
1839                    "at level %d node %d ofs=%d sz=%d",
1840                     pp->level, p->pos, p->offset, p->size);
1841 #endif
1842             return 1;
1843         }
1844         if (! isamb_pp_climb_desc(pp, untilbuf))
1845         {
1846 #if ISAMB_DEBUG
1847             yaz_log(YLOG_DEBUG, "isamb_pp_forward (f) returning notfound (B) "
1848                    "at level %d node %d ofs=%d sz=%d",
1849                     pp->level, p->pos, p->offset, p->size);
1850 #endif
1851             return 0; /* could not find a leaf */
1852         }
1853         do {
1854             if (isamb_pp_forward_on_leaf(pp, buf, untilbuf))
1855             {
1856 #if ISAMB_DEBUG
1857                 yaz_log(YLOG_DEBUG, "isamb_pp_forward (f) returning (c) "
1858                         "at level %d node %d ofs=%d sz=%d",
1859                         pp->level, p->pos, p->offset, p->size);
1860 #endif
1861                 return 1;
1862             }
1863         } while (isamb_pp_find_next_leaf(pp));
1864         return 0; /* could not find at all */
1865     }
1866     else { /* no untilbuf, a straight read */
1867         /* FIXME - this should be moved
1868          * directly into the pp_read */
1869         /* keeping here now, to keep same
1870          * interface as the old fwd */
1871         if (isamb_pp_read_on_leaf(pp, buf))
1872         {
1873 #if ISAMB_DEBUG
1874             yaz_log(YLOG_DEBUG, "isamb_pp_forward (read) returning (D) "
1875                    "at level %d node %d ofs=%d sz=%d",
1876                     pp->level, p->pos, p->offset, p->size);
1877 #endif
1878             return 1;
1879         }
1880         if (isamb_pp_find_next_leaf(pp))
1881         {
1882 #if ISAMB_DEBUG
1883             yaz_log(YLOG_DEBUG, "isamb_pp_forward (read) returning (E) "
1884                    "at level %d node %d ofs=%d sz=%d",
1885                     pp->level, p->pos, p->offset, p->size);
1886 #endif
1887             return isamb_pp_read_on_leaf(pp, buf);
1888         }
1889         else
1890             return 0;
1891     }
1892 } /* isam_pp_forward (new version) */
1893
1894 void isamb_pp_pos(ISAMB_PP pp, double *current, double *total)
1895 { /* return an estimate of the current position and of the total number of */
1896   /* occureences in the isam tree, based on the current leaf */
1897     assert(total);
1898     assert(current);
1899     
1900     /* if end-of-stream PP may not be leaf */
1901
1902     *total = (double) (pp->block[0]->no_items);
1903     *current = (double) pp->returned_numbers;
1904 #if ISAMB_DEBUG
1905     yaz_log(YLOG_LOG, "isamb_pp_pos returning: cur= %0.1f tot=%0.1f rn="
1906          ZINT_FORMAT, *current, *total, pp->returned_numbers);
1907 #endif
1908 }
1909
1910 int isamb_pp_forward2(ISAMB_PP pp, void *buf, const void *untilb)
1911 {
1912     char *dst = buf;
1913     const char *src;
1914     struct ISAMB_block *p = pp->block[pp->level];
1915     ISAMB b = pp->isamb;
1916     if (!p)
1917         return 0;
1918 again:
1919     while (p->offset == p->size)
1920     {
1921         ISAM_P pos;
1922 #if INT_ENCODE
1923         const char *src_0;
1924         void *c1;
1925         char file_item_buf[DST_ITEM_MAX];
1926         char *file_item = file_item_buf;
1927 #else
1928         zint item_len;
1929 #endif
1930         while (p->offset == p->size)
1931         {
1932             if (pp->level == 0)
1933                 return 0;
1934             close_block (pp->isamb, pp->block[pp->level]);
1935             pp->block[pp->level] = 0;
1936             (pp->level)--;
1937             p = pp->block[pp->level];
1938             assert (!p->leaf);  
1939         }
1940
1941         assert(!p->leaf);
1942         src = p->bytes + p->offset;
1943        
1944 #if INT_ENCODE
1945         c1 = (*b->method->codec.start)();
1946         (*b->method->codec.decode)(c1, &file_item, &src);
1947 #else
1948         decode_ptr (&src, &item_len);
1949         src += item_len;
1950 #endif        
1951         decode_ptr (&src, &pos);
1952         p->offset = src - (char*) p->bytes;
1953
1954         src = p->bytes + p->offset;
1955
1956         while(1)
1957         {
1958             if (!untilb || p->offset == p->size)
1959                 break;
1960             assert(p->offset < p->size);
1961 #if INT_ENCODE
1962             src_0 = src;
1963             file_item = file_item_buf;
1964             (*b->method->codec.reset)(c1);
1965             (*b->method->codec.decode)(c1, &file_item, &src);
1966             if ((*b->method->compare_item)(untilb, file_item_buf) <= 1)
1967             {
1968                 src = src_0;
1969                 break;
1970             }
1971 #else
1972             decode_item_len(&src, &item_len);
1973             if ((*b->method->compare_item)(untilb, src) <= 1)
1974                 break;
1975             src += item_len;
1976 #endif
1977             decode_ptr (&src, &pos);
1978             p->offset = src - (char*) p->bytes;
1979         }
1980
1981         pp->level++;
1982
1983         while (1)
1984         {
1985             pp->block[pp->level] = p = open_block (pp->isamb, pos);
1986
1987             pp->total_size += p->size;
1988             pp->no_blocks++;
1989             
1990             if (p->leaf) 
1991             {
1992                 break;
1993             }
1994             
1995             src = p->bytes + p->offset;
1996             while(1)
1997             {
1998                 decode_ptr (&src, &pos);
1999                 p->offset = src - (char*) p->bytes;
2000                 
2001                 if (!untilb || p->offset == p->size)
2002                     break;
2003                 assert(p->offset < p->size);
2004 #if INT_ENCODE
2005                 src_0 = src;
2006                 file_item = file_item_buf;
2007                 (*b->method->codec.reset)(c1);
2008                 (*b->method->codec.decode)(c1, &file_item, &src);
2009                 if ((*b->method->compare_item)(untilb, file_item_buf) <= 1)
2010                 {
2011                     src = src_0;
2012                     break;
2013                 }
2014 #else
2015                 decode_ptr (&src, &item_len);
2016                 if ((*b->method->compare_item)(untilb, src) <= 1)
2017                     break;
2018                 src += item_len;
2019 #endif
2020             }
2021             pp->level++;
2022         }
2023 #if INT_ENCODE
2024         (*b->method->codec.stop)(c1);
2025 #endif
2026     }
2027     assert (p->offset < p->size);
2028     assert (p->leaf);
2029     while(1)
2030     {
2031         char *dst0 = dst;
2032         src = p->bytes + p->offset;
2033         (*pp->isamb->method->codec.decode)(p->decodeClientData, &dst, &src);
2034         p->offset = src - (char*) p->bytes;
2035         if (!untilb || (*pp->isamb->method->compare_item)(untilb, dst0) <= 1)
2036             break;
2037         dst = dst0;
2038         if (p->offset == p->size) goto again;
2039     }
2040     return 1;
2041 }
2042
2043 zint isamb_get_int_splits(ISAMB b)
2044 {
2045     return b->number_of_int_splits;
2046 }
2047
2048 zint isamb_get_leaf_splits(ISAMB b)
2049 {
2050     return b->number_of_leaf_splits;
2051 }
2052
2053 zint isamb_get_root_ptr(ISAMB b)
2054 {
2055     return b->root_ptr;
2056 }
2057
2058 void isamb_set_root_ptr(ISAMB b, zint root_ptr)
2059 {
2060     b->root_ptr = root_ptr;
2061 }
2062
2063
2064 /*
2065  * Local variables:
2066  * c-basic-offset: 4
2067  * indent-tabs-mode: nil
2068  * End:
2069  * vim: shiftwidth=4 tabstop=8 expandtab
2070  */
2071