Omit CVS Id. Update copyright year.
[idzebra-moved-to-github.git] / isamb / isamb.c
1 /* This file is part of the Zebra server.
2    Copyright (C) 1995-2008 Index Data
3
4 Zebra is free software; you can redistribute it and/or modify it under
5 the terms of the GNU General Public License as published by the Free
6 Software Foundation; either version 2, or (at your option) any later
7 version.
8
9 Zebra is distributed in the hope that it will be useful, but WITHOUT ANY
10 WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12 for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
17
18 */
19
20 #include <stdlib.h>
21 #include <string.h>
22 #include <yaz/log.h>
23 #include <yaz/xmalloc.h>
24 #include <idzebra/isamb.h>
25 #include <assert.h>
26
27 #ifndef ISAMB_DEBUG
28 #define ISAMB_DEBUG 0
29 #endif
30
31
32 #define ISAMB_MAJOR_VERSION 3
33 #define ISAMB_MINOR_VERSION_NO_ROOT 0
34 #define ISAMB_MINOR_VERSION_WITH_ROOT 1
35
36 struct ISAMB_head {
37     zint first_block;
38     zint last_block;
39     zint free_list;
40     zint no_items;
41     int block_size;
42     int block_max;
43     int block_offset;
44 };
45
46 /* if 1, upper nodes items are encoded; 0 if not encoded */
47 #define INT_ENCODE 1
48
49 /* maximum size of encoded buffer */
50 #define DST_ITEM_MAX 256
51
52 #define ISAMB_MAX_LEVEL 10
53 /* approx 2*max page + max size of item */
54 #define DST_BUF_SIZE (2*4096+300)
55
56 /* should be maximum block size of multiple thereof */
57 #define ISAMB_CACHE_ENTRY_SIZE 4096
58
59 /* CAT_MAX: _must_ be power of 2 */
60 #define CAT_MAX 4
61 #define CAT_MASK (CAT_MAX-1)
62 /* CAT_NO: <= CAT_MAX */
63 #define CAT_NO 4
64
65 /* Smallest block size */
66 #define ISAMB_MIN_SIZE 32
67 /* Size factor */
68 #define ISAMB_FAC_SIZE 4
69
70 /* ISAMB_PTR_CODEC = 1 var, =0 fixed */
71 #define ISAMB_PTR_CODEC 1
72
73 struct ISAMB_cache_entry {
74     ISAM_P pos;
75     unsigned char *buf;
76     int dirty;
77     int hits;
78     struct ISAMB_cache_entry *next;
79 };
80
81 struct ISAMB_file {
82     BFile bf;
83     int head_dirty;
84     struct ISAMB_head head;
85     struct ISAMB_cache_entry *cache_entries;
86 };
87
88 struct ISAMB_s {
89     BFiles bfs;
90     ISAMC_M *method;
91
92     struct ISAMB_file *file;
93     int no_cat;
94     int cache; /* 0 = no cache, 1 = use cache, -1 = dummy isam (for testing only) */
95     int log_io;        /* log level for bf_read/bf_write calls */
96     int log_freelist;  /* log level for freelist handling */
97     zint skipped_numbers; /* on a leaf node */
98     zint returned_numbers; 
99     zint skipped_nodes[ISAMB_MAX_LEVEL]; /* [0]=skipped leaves, 1 = higher etc */
100     zint accessed_nodes[ISAMB_MAX_LEVEL]; /* nodes we did not skip */
101     zint number_of_int_splits;
102     zint number_of_leaf_splits;
103     int enable_int_count; /* whether we count nodes (or not) */
104     int cache_size; /* size of blocks to cache (if cache=1) */
105     int minor_version;
106     zint root_ptr;
107 };
108
109 struct ISAMB_block {
110     ISAM_P pos;
111     int cat;
112     int size;
113     int leaf;
114     int dirty;
115     int deleted;
116     int offset;
117     zint no_items;  /* number of nodes in this + children */
118     char *bytes;
119     char *cbuf;
120     unsigned char *buf;
121     void *decodeClientData;
122     int log_rw;
123 };
124
125 struct ISAMB_PP_s {
126     ISAMB isamb;
127     ISAM_P pos;
128     int level;
129     int maxlevel; /* total depth */
130     zint total_size;
131     zint no_blocks;
132     zint skipped_numbers; /* on a leaf node */
133     zint returned_numbers; 
134     zint skipped_nodes[ISAMB_MAX_LEVEL]; /* [0]=skipped leaves, 1 = higher etc */
135     zint accessed_nodes[ISAMB_MAX_LEVEL]; /* nodes we did not skip */
136     struct ISAMB_block **block;
137     int scope;  /* on what level we forward */
138 };
139
140
141 #define encode_item_len encode_ptr
142 #if ISAMB_PTR_CODEC
143 static void encode_ptr(char **dst, zint pos)
144 {
145     unsigned char *bp = (unsigned char*) *dst;
146
147     while (pos > 127)
148     {
149         *bp++ = (unsigned char) (128 | (pos & 127));
150         pos = pos >> 7;
151     }
152     *bp++ = (unsigned char) pos;
153     *dst = (char *) bp;
154 }
155 #else
156 static void encode_ptr(char **dst, zint pos)
157 {
158     memcpy(*dst, &pos, sizeof(pos));
159     (*dst) += sizeof(pos);
160 }
161 #endif
162
163 #define decode_item_len decode_ptr
164 #if ISAMB_PTR_CODEC
165 static void decode_ptr(const char **src, zint *pos)
166 {
167     zint d = 0;
168     unsigned char c;
169     unsigned r = 0;
170
171     while (((c = *(const unsigned char *)((*src)++)) & 128))
172     {
173         d += ((zint) (c & 127) << r);
174         r += 7;
175     }
176     d += ((zint) c << r);
177     *pos = d;
178 }
179 #else
180 static void decode_ptr(const char **src, zint *pos)
181 {
182     memcpy(pos, *src, sizeof(*pos));
183     (*src) += sizeof(*pos);
184 }
185 #endif
186
187
188 void isamb_set_int_count(ISAMB b, int v)
189 {
190     b->enable_int_count = v;
191 }
192
193 void isamb_set_cache_size(ISAMB b, int v)
194 {
195     b->cache_size = v;
196 }
197
198 ISAMB isamb_open2(BFiles bfs, const char *name, int writeflag, ISAMC_M *method,
199                   int cache, int no_cat, int *sizes, int use_root_ptr)
200 {
201     ISAMB isamb = xmalloc(sizeof(*isamb));
202     int i;
203
204     assert(no_cat <= CAT_MAX);
205
206     isamb->bfs = bfs;
207     isamb->method = (ISAMC_M *) xmalloc(sizeof(*method));
208     memcpy(isamb->method, method, sizeof(*method));
209     isamb->no_cat = no_cat;
210     isamb->log_io = 0;
211     isamb->log_freelist = 0;
212     isamb->cache = cache;
213     isamb->skipped_numbers = 0;
214     isamb->returned_numbers = 0;
215     isamb->number_of_int_splits = 0;
216     isamb->number_of_leaf_splits = 0;
217     isamb->enable_int_count = 1;
218     isamb->cache_size = 40;
219
220     if (use_root_ptr)
221         isamb->minor_version = ISAMB_MINOR_VERSION_WITH_ROOT;
222     else
223         isamb->minor_version = ISAMB_MINOR_VERSION_NO_ROOT;
224
225     isamb->root_ptr = 0;
226
227     for (i = 0; i<ISAMB_MAX_LEVEL; i++)
228         isamb->skipped_nodes[i] = isamb->accessed_nodes[i] = 0;
229
230     if (cache == -1)
231     {
232         yaz_log(YLOG_WARN, "isamb_open %s. Degraded TEST mode", name);
233     }
234     else
235     {
236         assert(cache == 0 || cache == 1);
237     }
238     isamb->file = xmalloc(sizeof(*isamb->file) * isamb->no_cat);
239
240     for (i = 0; i < isamb->no_cat; i++)
241     {
242         isamb->file[i].bf = 0;
243         isamb->file[i].head_dirty = 0;
244         isamb->file[i].cache_entries = 0;
245     }
246
247     for (i = 0; i < isamb->no_cat; i++)
248     {
249         char fname[DST_BUF_SIZE];
250         char hbuf[DST_BUF_SIZE];
251
252         sprintf(fname, "%s%c", name, i+'A');
253         if (cache)
254             isamb->file[i].bf = bf_open(bfs, fname, ISAMB_CACHE_ENTRY_SIZE,
255                                         writeflag);
256         else
257             isamb->file[i].bf = bf_open(bfs, fname, sizes[i], writeflag);
258
259         if (!isamb->file[i].bf)
260         {
261             isamb_close(isamb);
262             return 0;
263         }
264
265         /* fill-in default values (for empty isamb) */
266         isamb->file[i].head.first_block = ISAMB_CACHE_ENTRY_SIZE/sizes[i]+1;
267         isamb->file[i].head.last_block = isamb->file[i].head.first_block;
268         isamb->file[i].head.block_size = sizes[i];
269         assert(sizes[i] <= ISAMB_CACHE_ENTRY_SIZE);
270 #if ISAMB_PTR_CODEC
271         if (i == isamb->no_cat-1 || sizes[i] > 128)
272             isamb->file[i].head.block_offset = 8;
273         else
274             isamb->file[i].head.block_offset = 4;
275 #else
276         isamb->file[i].head.block_offset = 11;
277 #endif
278         isamb->file[i].head.block_max =
279             sizes[i] - isamb->file[i].head.block_offset;
280         isamb->file[i].head.free_list = 0;
281         if (bf_read(isamb->file[i].bf, 0, 0, 0, hbuf))
282         {
283             /* got header assume "isamb"major minor len can fit in 16 bytes */
284             zint zint_tmp;
285             int major, minor, len, pos = 0;
286             int left;
287             const char *src = 0;
288             if (memcmp(hbuf, "isamb", 5))
289             {
290                 yaz_log(YLOG_WARN, "bad isamb header for file %s", fname);
291                 isamb_close(isamb);
292                 return 0;
293             }
294             if (sscanf(hbuf+5, "%d %d %d", &major, &minor, &len) != 3)
295             {
296                 yaz_log(YLOG_WARN, "bad isamb header for file %s", fname);
297                 isamb_close(isamb);
298                 return 0;
299             }
300             if (major != ISAMB_MAJOR_VERSION)
301             {
302                 yaz_log(YLOG_WARN, "bad major version for file %s %d, must be %d",
303                         fname, major, ISAMB_MAJOR_VERSION);
304                 isamb_close(isamb);
305                 return 0;
306             }
307             for (left = len - sizes[i]; left > 0; left = left - sizes[i])
308             {
309                 pos++;
310                 if (!bf_read(isamb->file[i].bf, pos, 0, 0, hbuf + pos*sizes[i]))
311                 {
312                     yaz_log(YLOG_WARN, "truncated isamb header for " 
313                             "file=%s len=%d pos=%d",
314                             fname, len, pos);
315                     isamb_close(isamb);
316                     return 0;
317                 }
318             }
319             src = hbuf + 16;
320             decode_ptr(&src, &isamb->file[i].head.first_block);
321             decode_ptr(&src, &isamb->file[i].head.last_block);
322             decode_ptr(&src, &zint_tmp);
323             isamb->file[i].head.block_size = (int) zint_tmp;
324             decode_ptr(&src, &zint_tmp);
325             isamb->file[i].head.block_max = (int) zint_tmp;
326             decode_ptr(&src, &isamb->file[i].head.free_list);
327             if (isamb->minor_version >= ISAMB_MINOR_VERSION_WITH_ROOT)
328                 decode_ptr(&src, &isamb->root_ptr);
329         }
330         assert(isamb->file[i].head.block_size >= isamb->file[i].head.block_offset);
331         /* must rewrite the header if root ptr is in use (bug #1017) */
332         if (use_root_ptr && writeflag)
333             isamb->file[i].head_dirty = 1;
334         else
335             isamb->file[i].head_dirty = 0;
336         assert(isamb->file[i].head.block_size == sizes[i]);
337     }
338 #if ISAMB_DEBUG
339     yaz_log(YLOG_WARN, "isamb debug enabled. Things will be slower than usual");
340 #endif
341     return isamb;
342 }
343
344 ISAMB isamb_open(BFiles bfs, const char *name, int writeflag, ISAMC_M *method,
345                  int cache)
346 {
347     int sizes[CAT_NO];
348     int i, b_size = ISAMB_MIN_SIZE;
349     
350     for (i = 0; i<CAT_NO; i++)
351     {
352         sizes[i] = b_size;
353         b_size = b_size * ISAMB_FAC_SIZE;
354     }
355     return isamb_open2(bfs, name, writeflag, method, cache,
356                        CAT_NO, sizes, 0);
357 }
358
359 static void flush_blocks(ISAMB b, int cat)
360 {
361     while (b->file[cat].cache_entries)
362     {
363         struct ISAMB_cache_entry *ce_this = b->file[cat].cache_entries;
364         b->file[cat].cache_entries = ce_this->next;
365
366         if (ce_this->dirty)
367         {
368             yaz_log(b->log_io, "bf_write: flush_blocks");
369             bf_write(b->file[cat].bf, ce_this->pos, 0, 0, ce_this->buf);
370         }
371         xfree(ce_this->buf);
372         xfree(ce_this);
373     }
374 }
375
376 static int cache_block(ISAMB b, ISAM_P pos, unsigned char *userbuf, int wr)
377 {
378     int cat = (int) (pos&CAT_MASK);
379     int off = (int) (((pos/CAT_MAX) & 
380                       (ISAMB_CACHE_ENTRY_SIZE / b->file[cat].head.block_size - 1))
381                      * b->file[cat].head.block_size);
382     zint norm = pos / (CAT_MASK*ISAMB_CACHE_ENTRY_SIZE / b->file[cat].head.block_size);
383     int no = 0;
384     struct ISAMB_cache_entry **ce, *ce_this = 0, **ce_last = 0;
385
386     if (!b->cache)
387         return 0;
388
389     assert(ISAMB_CACHE_ENTRY_SIZE >= b->file[cat].head.block_size);
390     for (ce = &b->file[cat].cache_entries; *ce; ce = &(*ce)->next, no++)
391     {
392         ce_last = ce;
393         if ((*ce)->pos == norm)
394         {
395             ce_this = *ce;
396             *ce = (*ce)->next;   /* remove from list */
397             
398             ce_this->next = b->file[cat].cache_entries;  /* move to front */
399             b->file[cat].cache_entries = ce_this;
400             
401             if (wr)
402             {
403                 memcpy(ce_this->buf + off, userbuf, 
404                        b->file[cat].head.block_size);
405                 ce_this->dirty = 1;
406             }
407             else
408                 memcpy(userbuf, ce_this->buf + off,
409                        b->file[cat].head.block_size);
410             return 1;
411         }
412     }
413     if (no >= b->cache_size)
414     {
415         assert(ce_last && *ce_last);
416         ce_this = *ce_last;
417         *ce_last = 0;  /* remove the last entry from list */
418         if (ce_this->dirty)
419         {
420             yaz_log(b->log_io, "bf_write: cache_block");
421             bf_write(b->file[cat].bf, ce_this->pos, 0, 0, ce_this->buf);
422         }
423         xfree(ce_this->buf);
424         xfree(ce_this);
425     }
426     ce_this = xmalloc(sizeof(*ce_this));
427     ce_this->next = b->file[cat].cache_entries;
428     b->file[cat].cache_entries = ce_this;
429     ce_this->buf = xmalloc(ISAMB_CACHE_ENTRY_SIZE);
430     ce_this->pos = norm;
431     yaz_log(b->log_io, "bf_read: cache_block");
432     if (!bf_read(b->file[cat].bf, norm, 0, 0, ce_this->buf))
433         memset(ce_this->buf, 0, ISAMB_CACHE_ENTRY_SIZE);
434     if (wr)
435     {
436         memcpy(ce_this->buf + off, userbuf, b->file[cat].head.block_size);
437         ce_this->dirty = 1;
438     }
439     else
440     {
441         ce_this->dirty = 0;
442         memcpy(userbuf, ce_this->buf + off, b->file[cat].head.block_size);
443     }
444     return 1;
445 }
446
447
448 void isamb_close(ISAMB isamb)
449 {
450     int i;
451     for (i = 0; isamb->accessed_nodes[i]; i++)
452         yaz_log(YLOG_DEBUG, "isamb_close  level leaf-%d: "ZINT_FORMAT" read, "
453                 ZINT_FORMAT" skipped",
454                 i, isamb->accessed_nodes[i], isamb->skipped_nodes[i]);
455     yaz_log(YLOG_DEBUG, "isamb_close returned "ZINT_FORMAT" values, "
456             "skipped "ZINT_FORMAT,
457             isamb->skipped_numbers, isamb->returned_numbers);
458
459     for (i = 0; i<isamb->no_cat; i++)
460     {
461         flush_blocks(isamb, i);
462         if (isamb->file[i].head_dirty)
463         {
464             char hbuf[DST_BUF_SIZE];
465             int major = ISAMB_MAJOR_VERSION;
466             int len = 16;
467             char *dst = hbuf + 16;
468             int pos = 0, left;
469             int b_size = isamb->file[i].head.block_size;
470
471             encode_ptr(&dst, isamb->file[i].head.first_block);
472             encode_ptr(&dst, isamb->file[i].head.last_block);
473             encode_ptr(&dst, isamb->file[i].head.block_size);
474             encode_ptr(&dst, isamb->file[i].head.block_max);
475             encode_ptr(&dst, isamb->file[i].head.free_list);
476
477             if (isamb->minor_version >= ISAMB_MINOR_VERSION_WITH_ROOT)
478                 encode_ptr(&dst, isamb->root_ptr);
479
480             memset(dst, '\0', b_size); /* ensure no random bytes are written */
481
482             len = dst - hbuf;
483
484             /* print exactly 16 bytes (including trailing 0) */
485             sprintf(hbuf, "isamb%02d %02d %02d\r\n", major,
486                     isamb->minor_version, len);
487
488             bf_write(isamb->file[i].bf, pos, 0, 0, hbuf);
489
490             for (left = len - b_size; left > 0; left = left - b_size)
491             {
492                 pos++;
493                 bf_write(isamb->file[i].bf, pos, 0, 0, hbuf + pos*b_size);
494             }
495         }
496         if (isamb->file[i].bf)
497             bf_close (isamb->file[i].bf);
498     }
499     xfree(isamb->file);
500     xfree(isamb->method);
501     xfree(isamb);
502 }
503
504 /* open_block: read one block at pos.
505    Decode leading sys bytes .. consisting of
506    Offset:Meaning
507    0: leader byte, != 0 leaf, == 0, non-leaf
508    1-2: used size of block
509    3-7*: number of items and all children
510    
511    * Reserve 5 bytes for large block sizes. 1 for small ones .. Number
512    of items. We can thus have at most 2^40 nodes. 
513 */
514 static struct ISAMB_block *open_block(ISAMB b, ISAM_P pos)
515 {
516     int cat = (int) (pos&CAT_MASK);
517     const char *src;
518     int offset = b->file[cat].head.block_offset;
519     struct ISAMB_block *p;
520     if (!pos)
521         return 0;
522     p = xmalloc(sizeof(*p));
523     p->pos = pos;
524     p->cat = (int) (pos & CAT_MASK);
525     p->buf = xmalloc(b->file[cat].head.block_size);
526     p->cbuf = 0;
527
528     if (!cache_block (b, pos, p->buf, 0))
529     {
530         yaz_log(b->log_io, "bf_read: open_block");
531         if (bf_read(b->file[cat].bf, pos/CAT_MAX, 0, 0, p->buf) != 1)
532         {
533             yaz_log(YLOG_FATAL, "isamb: read fail for pos=%ld block=%ld",
534                     (long) pos, (long) pos/CAT_MAX);
535             zebra_exit("isamb:open_block");
536         }
537     }
538     p->bytes = (char *)p->buf + offset;
539     p->leaf = p->buf[0];
540     p->size = (p->buf[1] + 256 * p->buf[2]) - offset;
541     if (p->size < 0)
542     {
543         yaz_log(YLOG_FATAL, "Bad block size %d in pos=" ZINT_FORMAT "\n",
544                 p->size, pos);
545     }
546     assert(p->size >= 0);
547     src = (char*) p->buf + 3;
548     decode_ptr(&src, &p->no_items);
549
550     p->offset = 0;
551     p->dirty = 0;
552     p->deleted = 0;
553     p->decodeClientData = (*b->method->codec.start)();
554     return p;
555 }
556
557 struct ISAMB_block *new_block(ISAMB b, int leaf, int cat)
558 {
559     struct ISAMB_block *p;
560
561     p = xmalloc(sizeof(*p));
562     p->buf = xmalloc(b->file[cat].head.block_size);
563
564     if (!b->file[cat].head.free_list)
565     {
566         zint block_no;
567         block_no = b->file[cat].head.last_block++;
568         p->pos = block_no * CAT_MAX + cat;
569     }
570     else
571     {
572         p->pos = b->file[cat].head.free_list;
573         assert((p->pos & CAT_MASK) == cat);
574         if (!cache_block(b, p->pos, p->buf, 0))
575         {
576             yaz_log(b->log_io, "bf_read: new_block");
577             if (!bf_read(b->file[cat].bf, p->pos/CAT_MAX, 0, 0, p->buf))
578             {
579                 yaz_log(YLOG_FATAL, "isamb: read fail for pos=%ld block=%ld",
580                         (long) p->pos/CAT_MAX, (long) p->pos/CAT_MAX);
581                 zebra_exit("isamb:new_block");
582             }
583         }
584         yaz_log(b->log_freelist, "got block " ZINT_FORMAT " from freelist %d:" ZINT_FORMAT, p->pos,
585                 cat, p->pos/CAT_MAX);
586         memcpy(&b->file[cat].head.free_list, p->buf, sizeof(zint));
587     }
588     p->cat = cat;
589     b->file[cat].head_dirty = 1;
590     memset(p->buf, 0, b->file[cat].head.block_size);
591     p->bytes = (char*)p->buf + b->file[cat].head.block_offset;
592     p->leaf = leaf;
593     p->size = 0;
594     p->dirty = 1;
595     p->deleted = 0;
596     p->offset = 0;
597     p->no_items = 0;
598     p->decodeClientData = (*b->method->codec.start)();
599     return p;
600 }
601
602 struct ISAMB_block *new_leaf(ISAMB b, int cat)
603 {
604     return new_block(b, 1, cat);
605 }
606
607
608 struct ISAMB_block *new_int(ISAMB b, int cat)
609 {
610     return new_block(b, 0, cat);
611 }
612
613 static void check_block(ISAMB b, struct ISAMB_block *p)
614 {
615     assert(b); /* mostly to make the compiler shut up about unused b */
616     if (p->leaf)
617     {
618         ;
619     }
620     else
621     {
622         /* sanity check */
623         char *startp = p->bytes;
624         const char *src = startp;
625         char *endp = p->bytes + p->size;
626         ISAM_P pos;
627         void *c1 = (*b->method->codec.start)();
628             
629         decode_ptr(&src, &pos);
630         assert((pos&CAT_MASK) == p->cat);
631         while (src != endp)
632         {
633 #if INT_ENCODE
634             char file_item_buf[DST_ITEM_MAX];
635             char *file_item = file_item_buf;
636             (*b->method->codec.reset)(c1);
637             (*b->method->codec.decode)(c1, &file_item, &src);
638 #else
639             zint item_len;
640             decode_item_len(&src, &item_len);
641             assert(item_len > 0 && item_len < 128);
642             src += item_len;
643 #endif
644             decode_ptr(&src, &pos);
645             if ((pos&CAT_MASK) != p->cat)
646             {
647                 assert((pos&CAT_MASK) == p->cat);
648             }
649         }
650         (*b->method->codec.stop)(c1);
651     }
652 }
653
654 void close_block(ISAMB b, struct ISAMB_block *p)
655 {
656     if (!p)
657         return;
658     if (p->deleted)
659     {
660         yaz_log(b->log_freelist, "release block " ZINT_FORMAT " from freelist %d:" ZINT_FORMAT,
661                 p->pos, p->cat, p->pos/CAT_MAX);
662         memcpy(p->buf, &b->file[p->cat].head.free_list, sizeof(zint));
663         b->file[p->cat].head.free_list = p->pos;
664         if (!cache_block(b, p->pos, p->buf, 1))
665         {
666             yaz_log(b->log_io, "bf_write: close_block (deleted)");
667             bf_write(b->file[p->cat].bf, p->pos/CAT_MAX, 0, 0, p->buf);
668         }
669     }
670     else if (p->dirty)
671     {
672         int offset = b->file[p->cat].head.block_offset;
673         int size = p->size + offset;
674         char *dst =  (char*)p->buf + 3;
675         assert(p->size >= 0);
676         
677         /* memset becuase encode_ptr usually does not write all bytes */
678         memset(p->buf, 0, b->file[p->cat].head.block_offset);
679         p->buf[0] = p->leaf;
680         p->buf[1] = size & 255;
681         p->buf[2] = size >> 8;
682         encode_ptr(&dst, p->no_items);
683         check_block(b, p);
684         if (!cache_block(b, p->pos, p->buf, 1))
685         {
686             yaz_log(b->log_io, "bf_write: close_block");
687             bf_write(b->file[p->cat].bf, p->pos/CAT_MAX, 0, 0, p->buf);
688         }
689     }
690     (*b->method->codec.stop)(p->decodeClientData);
691     xfree(p->buf);
692     xfree(p);
693 }
694
695 int insert_sub(ISAMB b, struct ISAMB_block **p,
696                void *new_item, int *mode,
697                ISAMC_I *stream,
698                struct ISAMB_block **sp,
699                void *sub_item, int *sub_size,
700                const void *max_item);
701
702 int insert_int(ISAMB b, struct ISAMB_block *p, void *lookahead_item,
703                int *mode,
704                ISAMC_I *stream, struct ISAMB_block **sp,
705                void *split_item, int *split_size, const void *last_max_item)
706 {
707     char *startp = p->bytes;
708     const char *src = startp;
709     char *endp = p->bytes + p->size;
710     ISAM_P pos;
711     struct ISAMB_block *sub_p1 = 0, *sub_p2 = 0;
712     char sub_item[DST_ITEM_MAX];
713     int sub_size;
714     int more = 0;
715     zint diff_terms = 0;
716     void *c1 = (*b->method->codec.start)();
717
718     *sp = 0;
719
720     assert(p->size >= 0);
721     decode_ptr(&src, &pos);
722     while (src != endp)
723     {
724         int d;
725         const char *src0 = src;
726 #if INT_ENCODE
727         char file_item_buf[DST_ITEM_MAX];
728         char *file_item = file_item_buf;
729         (*b->method->codec.reset)(c1);
730         (*b->method->codec.decode)(c1, &file_item, &src);
731         d = (*b->method->compare_item)(file_item_buf, lookahead_item);
732         if (d > 0)
733         {
734             sub_p1 = open_block(b, pos);
735             assert(sub_p1);
736             diff_terms -= sub_p1->no_items;
737             more = insert_sub(b, &sub_p1, lookahead_item, mode,
738                               stream, &sub_p2, 
739                               sub_item, &sub_size, file_item_buf);
740             diff_terms += sub_p1->no_items;
741             src = src0;
742             break;
743         }
744 #else
745         zint item_len;
746         decode_item_len(&src, &item_len);
747         d = (*b->method->compare_item)(src, lookahead_item);
748         if (d > 0)
749         {
750             sub_p1 = open_block(b, pos);
751             assert(sub_p1);
752             diff_terms -= sub_p1->no_items;
753             more = insert_sub(b, &sub_p1, lookahead_item, mode,
754                               stream, &sub_p2, 
755                               sub_item, &sub_size, src);
756             diff_terms += sub_p1->no_items;
757             src = src0;
758             break;
759         }
760         src += item_len;
761 #endif
762         decode_ptr(&src, &pos);
763     }
764     if (!sub_p1)
765     {
766         /* we reached the end. So lookahead > last item */
767         sub_p1 = open_block(b, pos);
768         assert(sub_p1);
769         diff_terms -= sub_p1->no_items;
770         more = insert_sub(b, &sub_p1, lookahead_item, mode, stream, &sub_p2, 
771                           sub_item, &sub_size, last_max_item);
772         diff_terms += sub_p1->no_items;
773     }
774     if (sub_p2)
775         diff_terms += sub_p2->no_items;
776     if (diff_terms)
777     {
778         p->dirty = 1;
779         p->no_items += diff_terms;
780     }
781     if (sub_p2)
782     {
783         /* there was a split - must insert pointer in this one */
784         char dst_buf[DST_BUF_SIZE];
785         char *dst = dst_buf;
786 #if INT_ENCODE
787         const char *sub_item_ptr = sub_item;
788 #endif
789         assert(sub_size < 128 && sub_size > 1);
790
791         memcpy(dst, startp, src - startp);
792                 
793         dst += src - startp;
794
795 #if INT_ENCODE
796         (*b->method->codec.reset)(c1);
797         (*b->method->codec.encode)(c1, &dst, &sub_item_ptr);
798 #else
799         encode_item_len(&dst, sub_size);      /* sub length and item */
800         memcpy(dst, sub_item, sub_size);
801         dst += sub_size;
802 #endif
803
804         encode_ptr(&dst, sub_p2->pos);   /* pos */
805
806         if (endp - src)                   /* remaining data */
807         {
808             memcpy(dst, src, endp - src);
809             dst += endp - src;
810         }
811         p->size = dst - dst_buf;
812         assert(p->size >= 0);
813
814         if (p->size <= b->file[p->cat].head.block_max)
815         {
816             /* it fits OK in this block */
817             memcpy(startp, dst_buf, dst - dst_buf);
818
819             close_block(b, sub_p2);
820         }
821         else
822         {
823             /* must split _this_ block as well .. */
824             struct ISAMB_block *sub_p3;
825 #if INT_ENCODE
826             char file_item_buf[DST_ITEM_MAX];
827             char *file_item = file_item_buf;
828 #else
829             zint split_size_tmp;
830 #endif
831             zint no_items_first_half = 0;
832             int p_new_size;
833             const char *half;
834             src = dst_buf;
835             endp = dst;
836             
837             b->number_of_int_splits++;
838
839             p->dirty = 1;
840             close_block(b, sub_p2);
841
842             half = src + b->file[p->cat].head.block_size/2;
843             decode_ptr(&src, &pos);
844
845             if (b->enable_int_count)
846             {
847                 /* read sub block so we can get no_items for it */
848                 sub_p3 = open_block(b, pos);
849                 no_items_first_half += sub_p3->no_items;
850                 close_block(b, sub_p3);
851             }
852
853             while (src <= half)
854             {
855 #if INT_ENCODE
856                 file_item = file_item_buf;
857                 (*b->method->codec.reset)(c1);
858                 (*b->method->codec.decode)(c1, &file_item, &src);
859 #else
860                 decode_item_len(&src, &split_size_tmp);
861                 *split_size = (int) split_size_tmp;
862                 src += *split_size;
863 #endif
864                 decode_ptr(&src, &pos);
865
866                 if (b->enable_int_count)
867                 {
868                     /* read sub block so we can get no_items for it */
869                     sub_p3 = open_block(b, pos);
870                     no_items_first_half += sub_p3->no_items;
871                     close_block(b, sub_p3);
872                 }
873             }
874             /*  p is first half */
875             p_new_size = src - dst_buf;
876             memcpy(p->bytes, dst_buf, p_new_size);
877
878 #if INT_ENCODE
879             file_item = file_item_buf;
880             (*b->method->codec.reset)(c1);
881             (*b->method->codec.decode)(c1, &file_item, &src);
882             *split_size = file_item - file_item_buf;
883             memcpy(split_item, file_item_buf, *split_size);
884 #else
885             decode_item_len(&src, &split_size_tmp);
886             *split_size = (int) split_size_tmp;
887             memcpy(split_item, src, *split_size);
888             src += *split_size;
889 #endif
890             /*  *sp is second half */
891             *sp = new_int(b, p->cat);
892             (*sp)->size = endp - src;
893             memcpy((*sp)->bytes, src, (*sp)->size);
894
895             p->size = p_new_size;
896
897             /*  adjust no_items in first&second half */
898             (*sp)->no_items = p->no_items - no_items_first_half;
899             p->no_items = no_items_first_half;
900         }
901         p->dirty = 1;
902     }
903     close_block(b, sub_p1);
904     (*b->method->codec.stop)(c1);
905     return more;
906 }
907
908 int insert_leaf(ISAMB b, struct ISAMB_block **sp1, void *lookahead_item,
909                 int *lookahead_mode, ISAMC_I *stream,
910                 struct ISAMB_block **sp2,
911                 void *sub_item, int *sub_size,
912                 const void *max_item)
913 {
914     struct ISAMB_block *p = *sp1;
915     char *endp = 0;
916     const char *src = 0;
917     char dst_buf[DST_BUF_SIZE], *dst = dst_buf;
918     int new_size;
919     void *c1 = (*b->method->codec.start)();
920     void *c2 = (*b->method->codec.start)();
921     int more = 1;
922     int quater = b->file[b->no_cat-1].head.block_max / 4;
923     char *mid_cut = dst_buf + quater * 2;
924     char *tail_cut = dst_buf + quater * 3;
925     char *maxp = dst_buf + b->file[b->no_cat-1].head.block_max;
926     char *half1 = 0;
927     char *half2 = 0;
928     char cut_item_buf[DST_ITEM_MAX];
929     int cut_item_size = 0;
930     int no_items = 0;    /* number of items (total) */
931     int no_items_1 = 0;  /* number of items (first half) */
932     int inserted_dst_bytes = 0;
933
934     if (p && p->size)
935     {
936         char file_item_buf[DST_ITEM_MAX];
937         char *file_item = file_item_buf;
938             
939         src = p->bytes;
940         endp = p->bytes + p->size;
941         (*b->method->codec.decode)(c1, &file_item, &src);
942         while (1)
943         {
944             const char *dst_item = 0; /* resulting item to be inserted */
945             char *lookahead_next;
946             char *dst_0 = dst;
947             int d = -1;
948             
949             if (lookahead_item)
950                 d = (*b->method->compare_item)(file_item_buf, lookahead_item);
951             
952             /* d now holds comparison between existing file item and
953                lookahead item 
954                d = 0: equal
955                d > 0: lookahead before file
956                d < 0: lookahead after file
957             */
958             if (d > 0)
959             {
960                 /* lookahead must be inserted */
961                 dst_item = lookahead_item;
962                 /* if this is not an insertion, it's really bad .. */
963                 if (!*lookahead_mode)
964                 {
965                     yaz_log(YLOG_WARN, "isamb: Inconsistent register (1)");
966                     assert(*lookahead_mode);
967                 }
968             }
969             else if (d == 0 && *lookahead_mode == 2)
970             {
971                 /* For mode == 2, we insert the new key anyway - even 
972                    though the comparison is 0. */
973                 dst_item = lookahead_item;
974                 p->dirty = 1;
975             }
976             else
977                 dst_item = file_item_buf;
978
979             if (d == 0 && !*lookahead_mode)
980             {
981                 /* it's a deletion and they match so there is nothing
982                    to be inserted anyway .. But mark the thing dirty
983                    (file item was part of input.. The item will not be
984                    part of output */
985                 p->dirty = 1;
986             }
987             else if (!half1 && dst > mid_cut)
988             {
989                 /* we have reached the splitting point for the first time */
990                 const char *dst_item_0 = dst_item;
991                 half1 = dst; /* candidate for splitting */
992
993                 /* encode the resulting item */
994                 (*b->method->codec.encode)(c2, &dst, &dst_item);
995                 
996                 cut_item_size = dst_item - dst_item_0;
997                 assert(cut_item_size > 0);
998                 memcpy(cut_item_buf, dst_item_0, cut_item_size);
999                 
1000                 half2 = dst;
1001                 no_items_1 = no_items;
1002                 no_items++;
1003             }
1004             else
1005             {
1006                 /* encode the resulting item */
1007                 (*b->method->codec.encode)(c2, &dst, &dst_item);
1008                 no_items++;
1009             }
1010
1011             /* now move "pointers" .. result has been encoded .. */
1012             if (d > 0)  
1013             {
1014                 /* we must move the lookahead pointer */
1015
1016                 inserted_dst_bytes += (dst - dst_0);
1017                 if (inserted_dst_bytes >=  quater)
1018                     /* no more room. Mark lookahead as "gone".. */
1019                     lookahead_item = 0;
1020                 else
1021                 {
1022                     /* move it really.. */
1023                     lookahead_next = lookahead_item;
1024                     if (!(*stream->read_item)(stream->clientData,
1025                                               &lookahead_next,
1026                                               lookahead_mode))
1027                     {
1028                         /* end of stream reached: no "more" and no lookahead */
1029                         lookahead_item = 0;
1030                         more = 0;
1031                     }
1032                     if (lookahead_item && max_item &&
1033                         (*b->method->compare_item)(max_item, lookahead_item) <= 0)
1034                     {
1035                         /* the lookahead goes beyond what we allow in this
1036                            leaf. Mark it as "gone" */
1037                         lookahead_item = 0;
1038                     }
1039                     
1040                     p->dirty = 1;
1041                 }
1042             }
1043             else if (d == 0)
1044             {
1045                 /* exact match .. move both pointers */
1046
1047                 lookahead_next = lookahead_item;
1048                 if (!(*stream->read_item)(stream->clientData,
1049                                           &lookahead_next, lookahead_mode))
1050                 {
1051                     lookahead_item = 0;
1052                     more = 0;
1053                 }
1054                 if (src == endp)
1055                     break;  /* end of file stream reached .. */
1056                 file_item = file_item_buf; /* move file pointer */
1057                 (*b->method->codec.decode)(c1, &file_item, &src);
1058             }
1059             else
1060             {
1061                 /* file pointer must be moved */
1062                 if (src == endp)
1063                     break;
1064                 file_item = file_item_buf;
1065                 (*b->method->codec.decode)(c1, &file_item, &src);
1066             }
1067         }
1068     }
1069
1070     /* this loop runs when we are "appending" to a leaf page. That is
1071        either it's empty (new) or all file items have been read in
1072        previous loop */
1073
1074     maxp = dst_buf + b->file[b->no_cat-1].head.block_max + quater;
1075     while (lookahead_item)
1076     {
1077         char *dst_item;
1078         const char *src = lookahead_item;
1079         char *dst_0 = dst;
1080         
1081         /* if we have a lookahead item, we stop if we exceed the value of it */
1082         if (max_item &&
1083             (*b->method->compare_item)(max_item, lookahead_item) <= 0)
1084         {
1085             /* stop if we have reached the value of max item */
1086             break;
1087         }
1088         if (!*lookahead_mode)
1089         {
1090             /* this is append. So a delete is bad */
1091             yaz_log(YLOG_WARN, "isamb: Inconsistent register (2)");
1092             assert(*lookahead_mode);
1093         }
1094         else if (!half1 && dst > tail_cut)
1095         {
1096             const char *src_0 = src;
1097             half1 = dst; /* candidate for splitting */
1098             
1099             (*b->method->codec.encode)(c2, &dst, &src);
1100             
1101             cut_item_size = src - src_0;
1102             assert(cut_item_size > 0);
1103             memcpy(cut_item_buf, src_0, cut_item_size);
1104             
1105             no_items_1 = no_items;
1106             half2 = dst;
1107         }
1108         else
1109             (*b->method->codec.encode)(c2, &dst, &src);
1110
1111         if (dst > maxp)
1112         {
1113             dst = dst_0;
1114             break;
1115         }
1116         no_items++;
1117         if (p)
1118             p->dirty = 1;
1119         dst_item = lookahead_item;
1120         if (!(*stream->read_item)(stream->clientData, &dst_item,
1121                                   lookahead_mode))
1122         {
1123             lookahead_item = 0;
1124             more = 0;
1125         }
1126     }
1127     new_size = dst - dst_buf;
1128     if (p && p->cat != b->no_cat-1 && 
1129         new_size > b->file[p->cat].head.block_max)
1130     {
1131         /* non-btree block will be removed */
1132         p->deleted = 1;
1133         close_block(b, p);
1134         /* delete it too!! */
1135         p = 0; /* make a new one anyway */
1136     }
1137     if (!p)
1138     {   /* must create a new one */
1139         int i;
1140         for (i = 0; i < b->no_cat; i++)
1141             if (new_size <= b->file[i].head.block_max)
1142                 break;
1143         if (i == b->no_cat)
1144             i = b->no_cat - 1;
1145         p = new_leaf(b, i);
1146     }
1147     if (new_size > b->file[p->cat].head.block_max)
1148     {
1149         char *first_dst;
1150         const char *cut_item = cut_item_buf;
1151
1152         assert(half1);
1153         assert(half2);
1154
1155         assert(cut_item_size > 0);
1156         
1157         /* first half */
1158         p->size = half1 - dst_buf;
1159         assert(p->size <=  b->file[p->cat].head.block_max);
1160         memcpy(p->bytes, dst_buf, half1 - dst_buf);
1161         p->no_items = no_items_1;
1162
1163         /* second half */
1164         *sp2 = new_leaf(b, p->cat);
1165
1166         (*b->method->codec.reset)(c2);
1167
1168         b->number_of_leaf_splits++;
1169
1170         first_dst = (*sp2)->bytes;
1171
1172         (*b->method->codec.encode)(c2, &first_dst, &cut_item);
1173
1174         memcpy(first_dst, half2, dst - half2);
1175
1176         (*sp2)->size = (first_dst - (*sp2)->bytes) + (dst - half2);
1177         assert((*sp2)->size <=  b->file[p->cat].head.block_max);
1178         (*sp2)->no_items = no_items - no_items_1;
1179         (*sp2)->dirty = 1;
1180         p->dirty = 1;
1181         memcpy(sub_item, cut_item_buf, cut_item_size);
1182         *sub_size = cut_item_size;
1183     }
1184     else
1185     {
1186         memcpy(p->bytes, dst_buf, dst - dst_buf);
1187         p->size = new_size;
1188         p->no_items = no_items;
1189     }
1190     (*b->method->codec.stop)(c1);
1191     (*b->method->codec.stop)(c2);
1192     *sp1 = p;
1193     return more;
1194 }
1195
1196 int insert_sub(ISAMB b, struct ISAMB_block **p, void *new_item,
1197                int *mode,
1198                ISAMC_I *stream,
1199                struct ISAMB_block **sp,
1200                void *sub_item, int *sub_size,
1201                const void *max_item)
1202 {
1203     if (!*p || (*p)->leaf)
1204         return insert_leaf(b, p, new_item, mode, stream, sp, sub_item, 
1205                            sub_size, max_item);
1206     else
1207         return insert_int(b, *p, new_item, mode, stream, sp, sub_item,
1208                           sub_size, max_item);
1209 }
1210
1211 int isamb_unlink(ISAMB b, ISAM_P pos)
1212 {
1213     struct ISAMB_block *p1;
1214
1215     if (!pos)
1216         return 0;
1217     p1 = open_block(b, pos);
1218     p1->deleted = 1;
1219     if (!p1->leaf)
1220     {
1221         zint sub_p;
1222         const char *src = p1->bytes + p1->offset;
1223 #if INT_ENCODE
1224         void *c1 = (*b->method->codec.start)();
1225 #endif
1226         decode_ptr(&src, &sub_p);
1227         isamb_unlink(b, sub_p);
1228         
1229         while (src != p1->bytes + p1->size)
1230         {
1231 #if INT_ENCODE
1232             char file_item_buf[DST_ITEM_MAX];
1233             char *file_item = file_item_buf;
1234             (*b->method->codec.reset)(c1);
1235             (*b->method->codec.decode)(c1, &file_item, &src);
1236 #else
1237             zint item_len;
1238             decode_item_len(&src, &item_len);
1239             src += item_len;
1240 #endif
1241             decode_ptr(&src, &sub_p);
1242             isamb_unlink(b, sub_p);
1243         }
1244 #if INT_ENCODE
1245         (*b->method->codec.stop)(c1);
1246 #endif
1247     }
1248     close_block(b, p1);
1249     return 0;
1250 }
1251
1252 void isamb_merge(ISAMB b, ISAM_P *pos, ISAMC_I *stream)
1253 {
1254     char item_buf[DST_ITEM_MAX];
1255     char *item_ptr;
1256     int i_mode;
1257     int more;
1258     int must_delete = 0;
1259
1260     if (b->cache < 0)
1261     {
1262         int more = 1;
1263         while (more)
1264         {
1265             item_ptr = item_buf;
1266             more =
1267                 (*stream->read_item)(stream->clientData, &item_ptr, &i_mode);
1268         }
1269         *pos = 1;
1270         return;
1271     }
1272     item_ptr = item_buf;
1273     more = (*stream->read_item)(stream->clientData, &item_ptr, &i_mode);
1274     while (more)
1275     {
1276         struct ISAMB_block *p = 0, *sp = 0;
1277         char sub_item[DST_ITEM_MAX];
1278         int sub_size;
1279         
1280         if (*pos)
1281             p = open_block(b, *pos);
1282         more = insert_sub(b, &p, item_buf, &i_mode, stream, &sp,
1283                           sub_item, &sub_size, 0);
1284         if (sp)
1285         {    /* increase level of tree by one */
1286             struct ISAMB_block *p2 = new_int(b, p->cat);
1287             char *dst = p2->bytes + p2->size;
1288 #if INT_ENCODE
1289             void *c1 = (*b->method->codec.start)();
1290             const char *sub_item_ptr = sub_item;
1291 #endif
1292
1293             encode_ptr(&dst, p->pos);
1294             assert(sub_size < 128 && sub_size > 1);
1295 #if INT_ENCODE
1296             (*b->method->codec.reset)(c1);
1297             (*b->method->codec.encode)(c1, &dst, &sub_item_ptr);
1298 #else
1299             encode_item_len(&dst, sub_size);
1300             memcpy(dst, sub_item, sub_size);
1301             dst += sub_size;
1302 #endif
1303             encode_ptr(&dst, sp->pos);
1304             
1305             p2->size = dst - p2->bytes;
1306             p2->no_items = p->no_items + sp->no_items;
1307             *pos = p2->pos;  /* return new super page */
1308             close_block(b, sp);
1309             close_block(b, p2);
1310 #if INT_ENCODE
1311             (*b->method->codec.stop)(c1);
1312 #endif
1313         }
1314         else
1315         {
1316             *pos = p->pos;   /* return current one (again) */
1317         }
1318         if (p->no_items == 0)
1319             must_delete = 1;
1320         else
1321             must_delete = 0;
1322         close_block(b, p);
1323     }
1324     if (must_delete)
1325     {
1326         isamb_unlink(b, *pos);
1327         *pos = 0;
1328     }
1329 }
1330
1331 ISAMB_PP isamb_pp_open_x(ISAMB isamb, ISAM_P pos, int *level, int scope)
1332 {
1333     ISAMB_PP pp = xmalloc(sizeof(*pp));
1334     int i;
1335
1336     assert(pos);
1337
1338     pp->isamb = isamb;
1339     pp->block = xmalloc(ISAMB_MAX_LEVEL * sizeof(*pp->block));
1340
1341     pp->pos = pos;
1342     pp->level = 0;
1343     pp->maxlevel = 0;
1344     pp->total_size = 0;
1345     pp->no_blocks = 0;
1346     pp->skipped_numbers = 0;
1347     pp->returned_numbers = 0;
1348     pp->scope = scope;
1349     for (i = 0; i<ISAMB_MAX_LEVEL; i++)
1350         pp->skipped_nodes[i] = pp->accessed_nodes[i] = 0;
1351     while (1)
1352     {
1353         struct ISAMB_block *p = open_block(isamb, pos);
1354         const char *src = p->bytes + p->offset;
1355         pp->block[pp->level] = p;
1356
1357         pp->total_size += p->size;
1358         pp->no_blocks++;
1359         if (p->leaf)
1360             break;
1361         decode_ptr(&src, &pos);
1362         p->offset = src - p->bytes;
1363         pp->level++;
1364         pp->accessed_nodes[pp->level]++; 
1365     }
1366     pp->block[pp->level+1] = 0;
1367     pp->maxlevel = pp->level;
1368     if (level)
1369         *level = pp->level;
1370     return pp;
1371 }
1372
1373 ISAMB_PP isamb_pp_open(ISAMB isamb, ISAM_P pos, int scope)
1374 {
1375     return isamb_pp_open_x(isamb, pos, 0, scope);
1376 }
1377
1378 void isamb_pp_close_x(ISAMB_PP pp, zint *size, zint *blocks)
1379 {
1380     int i;
1381     if (!pp)
1382         return;
1383     yaz_log(YLOG_DEBUG, "isamb_pp_close lev=%d returned "ZINT_FORMAT" values, " 
1384             "skipped "ZINT_FORMAT,
1385             pp->maxlevel, pp->skipped_numbers, pp->returned_numbers);
1386     for (i = pp->maxlevel; i>=0; i--)
1387         if (pp->skipped_nodes[i] || pp->accessed_nodes[i])
1388             yaz_log(YLOG_DEBUG, "isamb_pp_close  level leaf-%d: "
1389                     ZINT_FORMAT" read, "ZINT_FORMAT" skipped", i,
1390                     pp->accessed_nodes[i], pp->skipped_nodes[i]);
1391     pp->isamb->skipped_numbers += pp->skipped_numbers;
1392     pp->isamb->returned_numbers += pp->returned_numbers;
1393     for (i = pp->maxlevel; i>=0; i--)
1394     {
1395         pp->isamb->accessed_nodes[i] += pp->accessed_nodes[i];
1396         pp->isamb->skipped_nodes[i] += pp->skipped_nodes[i];
1397     }
1398     if (size)
1399         *size = pp->total_size;
1400     if (blocks)
1401         *blocks = pp->no_blocks;
1402     for (i = 0; i <= pp->level; i++)
1403         close_block(pp->isamb, pp->block[i]);
1404     xfree(pp->block);
1405     xfree(pp);
1406 }
1407
1408 int isamb_block_info(ISAMB isamb, int cat)
1409 {
1410     if (cat >= 0 && cat < isamb->no_cat)
1411         return isamb->file[cat].head.block_size;
1412     return -1;
1413 }
1414
1415 void isamb_pp_close(ISAMB_PP pp)
1416 {
1417     isamb_pp_close_x(pp, 0, 0);
1418 }
1419
1420 /* simple recursive dumper .. */
1421 static void isamb_dump_r(ISAMB b, ISAM_P pos, void (*pr)(const char *str),
1422                          int level)
1423 {
1424     char buf[1024];
1425     char prefix_str[1024];
1426     if (pos)
1427     {
1428         struct ISAMB_block *p = open_block(b, pos);
1429         sprintf(prefix_str, "%*s " ZINT_FORMAT " cat=%d size=%d max=%d items="
1430                 ZINT_FORMAT, level*2, "",
1431                 pos, p->cat, p->size, b->file[p->cat].head.block_max,
1432                 p->no_items);
1433         (*pr)(prefix_str);
1434         sprintf(prefix_str, "%*s " ZINT_FORMAT, level*2, "", pos);
1435         if (p->leaf)
1436         {
1437             while (p->offset < p->size)
1438             {
1439                 const char *src = p->bytes + p->offset;
1440                 char *dst = buf;
1441                 (*b->method->codec.decode)(p->decodeClientData, &dst, &src);
1442                 (*b->method->log_item)(YLOG_DEBUG, buf, prefix_str);
1443                 p->offset = src - (char*) p->bytes;
1444             }
1445             assert(p->offset == p->size);
1446         }
1447         else
1448         {
1449             const char *src = p->bytes + p->offset;
1450             ISAM_P sub;
1451
1452             decode_ptr(&src, &sub);
1453             p->offset = src - (char*) p->bytes;
1454
1455             isamb_dump_r(b, sub, pr, level+1);
1456             
1457             while (p->offset < p->size)
1458             {
1459 #if INT_ENCODE
1460                 char file_item_buf[DST_ITEM_MAX];
1461                 char *file_item = file_item_buf;
1462                 void *c1 = (*b->method->codec.start)();
1463                 (*b->method->codec.decode)(c1, &file_item, &src);
1464                 (*b->method->codec.stop)(c1);
1465                 (*b->method->log_item)(YLOG_DEBUG, file_item_buf, prefix_str);
1466 #else
1467                 zint item_len;
1468                 decode_item_len(&src, &item_len);
1469                 (*b->method->log_item)(YLOG_DEBUG, src, prefix_str);
1470                 src += item_len;
1471 #endif
1472                 decode_ptr(&src, &sub);
1473                 
1474                 p->offset = src - (char*) p->bytes;
1475                 
1476                 isamb_dump_r(b, sub, pr, level+1);
1477             }           
1478         }
1479         close_block(b, p);
1480     }
1481 }
1482
1483 void isamb_dump(ISAMB b, ISAM_P pos, void (*pr)(const char *str))
1484 {
1485     isamb_dump_r(b, pos, pr, 0);
1486 }
1487
1488 int isamb_pp_read(ISAMB_PP pp, void *buf)
1489 {
1490     return isamb_pp_forward(pp, buf, 0);
1491 }
1492
1493
1494 void isamb_pp_pos(ISAMB_PP pp, double *current, double *total)
1495 { /* return an estimate of the current position and of the total number of */
1496   /* occureences in the isam tree, based on the current leaf */
1497     assert(total);
1498     assert(current);
1499     
1500     /* if end-of-stream PP may not be leaf */
1501
1502     *total = (double) (pp->block[0]->no_items);
1503     *current = (double) pp->returned_numbers;
1504 #if ISAMB_DEBUG
1505     yaz_log(YLOG_LOG, "isamb_pp_pos returning: cur= %0.1f tot=%0.1f rn="
1506             ZINT_FORMAT, *current, *total, pp->returned_numbers);
1507 #endif
1508 }
1509
1510 int isamb_pp_forward(ISAMB_PP pp, void *buf, const void *untilb)
1511 {
1512     char *dst = buf;
1513     const char *src;
1514     struct ISAMB_block *p = pp->block[pp->level];
1515     ISAMB b = pp->isamb;
1516     if (!p)
1517         return 0;
1518  again:
1519     while (p->offset == p->size)
1520     {
1521         ISAM_P pos;
1522 #if INT_ENCODE
1523         const char *src_0;
1524         void *c1;
1525         char file_item_buf[DST_ITEM_MAX];
1526         char *file_item = file_item_buf;
1527 #else
1528         zint item_len;
1529 #endif
1530         while (p->offset == p->size)
1531         {
1532             if (pp->level == 0)
1533                 return 0;
1534             close_block(pp->isamb, pp->block[pp->level]);
1535             pp->block[pp->level] = 0;
1536             (pp->level)--;
1537             p = pp->block[pp->level];
1538             assert(!p->leaf);  
1539         }
1540
1541         assert(!p->leaf);
1542         src = p->bytes + p->offset;
1543        
1544 #if INT_ENCODE
1545         c1 = (*b->method->codec.start)();
1546         (*b->method->codec.decode)(c1, &file_item, &src);
1547 #else
1548         decode_ptr(&src, &item_len);
1549         src += item_len;
1550 #endif        
1551         decode_ptr(&src, &pos);
1552         p->offset = src - (char*) p->bytes;
1553
1554         src = p->bytes + p->offset;
1555
1556         while(1)
1557         {
1558             if (!untilb || p->offset == p->size)
1559                 break;
1560             assert(p->offset < p->size);
1561 #if INT_ENCODE
1562             src_0 = src;
1563             file_item = file_item_buf;
1564             (*b->method->codec.reset)(c1);
1565             (*b->method->codec.decode)(c1, &file_item, &src);
1566             if ((*b->method->compare_item)(untilb, file_item_buf) < pp->scope)
1567             {
1568                 src = src_0;
1569                 break;
1570             }
1571 #else
1572             decode_item_len(&src, &item_len);
1573             if ((*b->method->compare_item)(untilb, src) < pp->scope)
1574                 break;
1575             src += item_len;
1576 #endif
1577             decode_ptr(&src, &pos);
1578             p->offset = src - (char*) p->bytes;
1579         }
1580
1581         pp->level++;
1582
1583         while (1)
1584         {
1585             pp->block[pp->level] = p = open_block(pp->isamb, pos);
1586
1587             pp->total_size += p->size;
1588             pp->no_blocks++;
1589             
1590             if (p->leaf) 
1591             {
1592                 break;
1593             }
1594             
1595             src = p->bytes + p->offset;
1596             while(1)
1597             {
1598                 decode_ptr(&src, &pos);
1599                 p->offset = src - (char*) p->bytes;
1600                 
1601                 if (!untilb || p->offset == p->size)
1602                     break;
1603                 assert(p->offset < p->size);
1604 #if INT_ENCODE
1605                 src_0 = src;
1606                 file_item = file_item_buf;
1607                 (*b->method->codec.reset)(c1);
1608                 (*b->method->codec.decode)(c1, &file_item, &src);
1609                 if ((*b->method->compare_item)(untilb, file_item_buf) < pp->scope)
1610                 {
1611                     src = src_0;
1612                     break;
1613                 }
1614 #else
1615                 decode_ptr(&src, &item_len);
1616                 if ((*b->method->compare_item)(untilb, src) <= pp->scope)
1617                     break;
1618                 src += item_len;
1619 #endif
1620             }
1621             pp->level++;
1622         }
1623 #if INT_ENCODE
1624         (*b->method->codec.stop)(c1);
1625 #endif
1626     }
1627     assert(p->offset < p->size);
1628     assert(p->leaf);
1629     while(1)
1630     {
1631         char *dst0 = dst;
1632         src = p->bytes + p->offset;
1633         (*pp->isamb->method->codec.decode)(p->decodeClientData, &dst, &src);
1634         p->offset = src - (char*) p->bytes;
1635         if (!untilb || (*pp->isamb->method->compare_item)(untilb, dst0) < pp->scope)
1636             break;
1637         dst = dst0;
1638         if (p->offset == p->size) goto again;
1639     }
1640     pp->returned_numbers++; 
1641     return 1;
1642 }
1643
1644 zint isamb_get_int_splits(ISAMB b)
1645 {
1646     return b->number_of_int_splits;
1647 }
1648
1649 zint isamb_get_leaf_splits(ISAMB b)
1650 {
1651     return b->number_of_leaf_splits;
1652 }
1653
1654 zint isamb_get_root_ptr(ISAMB b)
1655 {
1656     return b->root_ptr;
1657 }
1658
1659 void isamb_set_root_ptr(ISAMB b, zint root_ptr)
1660 {
1661     b->root_ptr = root_ptr;
1662 }
1663
1664
1665 /*
1666  * Local variables:
1667  * c-basic-offset: 4
1668  * indent-tabs-mode: nil
1669  * End:
1670  * vim: shiftwidth=4 tabstop=8 expandtab
1671  */
1672