Version 2.0.58
[idzebra-moved-to-github.git] / isamb / isamb.c
1 /* This file is part of the Zebra server.
2    Copyright (C) Index Data
3
4 Zebra is free software; you can redistribute it and/or modify it under
5 the terms of the GNU General Public License as published by the Free
6 Software Foundation; either version 2, or (at your option) any later
7 version.
8
9 Zebra is distributed in the hope that it will be useful, but WITHOUT ANY
10 WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12 for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
17
18 */
19
20 #if HAVE_CONFIG_H
21 #include <config.h>
22 #endif
23 #include <stdlib.h>
24 #include <string.h>
25 #include <yaz/log.h>
26 #include <yaz/xmalloc.h>
27 #include <idzebra/isamb.h>
28 #include <assert.h>
29
30 #ifndef ISAMB_DEBUG
31 #define ISAMB_DEBUG 0
32 #endif
33
34
35 #define ISAMB_MAJOR_VERSION 3
36 #define ISAMB_MINOR_VERSION_NO_ROOT 0
37 #define ISAMB_MINOR_VERSION_WITH_ROOT 1
38
39 struct ISAMB_head {
40     zint first_block;
41     zint last_block;
42     zint free_list;
43     zint no_items;
44     int block_size;
45     int block_max;
46     int block_offset;
47 };
48
49 /* if 1, upper nodes items are encoded; 0 if not encoded */
50 #define INT_ENCODE 1
51
52 /* maximum size of encoded buffer */
53 #define DST_ITEM_MAX 5000
54
55 /* max page size for _any_ isamb use */
56 #define ISAMB_MAX_PAGE 32768
57
58 #define ISAMB_MAX_LEVEL 10
59 /* approx 2*max page + max size of item */
60 #define DST_BUF_SIZE (2*ISAMB_MAX_PAGE+DST_ITEM_MAX+100)
61
62 /* should be maximum block size of multiple thereof */
63 #define ISAMB_CACHE_ENTRY_SIZE ISAMB_MAX_PAGE
64
65 /* CAT_MAX: _must_ be power of 2 */
66 #define CAT_MAX 4
67 #define CAT_MASK (CAT_MAX-1)
68 /* CAT_NO: <= CAT_MAX */
69 #define CAT_NO 4
70
71 /* Smallest block size */
72 #define ISAMB_MIN_SIZE 32
73 /* Size factor */
74 #define ISAMB_FAC_SIZE 4
75
76 /* ISAMB_PTR_CODEC = 1 var, =0 fixed */
77 #define ISAMB_PTR_CODEC 1
78
79 struct ISAMB_cache_entry {
80     ISAM_P pos;
81     unsigned char *buf;
82     int dirty;
83     int hits;
84     struct ISAMB_cache_entry *next;
85 };
86
87 struct ISAMB_file {
88     BFile bf;
89     int head_dirty;
90     struct ISAMB_head head;
91     struct ISAMB_cache_entry *cache_entries;
92 };
93
94 struct ISAMB_s {
95     BFiles bfs;
96     ISAMC_M *method;
97
98     struct ISAMB_file *file;
99     int no_cat;
100     int cache; /* 0 = no cache, 1 = use cache, -1 = dummy isam (for testing only) */
101     int log_io;        /* log level for bf_read/bf_write calls */
102     int log_freelist;  /* log level for freelist handling */
103     zint skipped_numbers; /* on a leaf node */
104     zint returned_numbers;
105     zint skipped_nodes[ISAMB_MAX_LEVEL]; /* [0]=skipped leaves, 1 = higher etc */
106     zint accessed_nodes[ISAMB_MAX_LEVEL]; /* nodes we did not skip */
107     zint number_of_int_splits;
108     zint number_of_leaf_splits;
109     int enable_int_count; /* whether we count nodes (or not) */
110     int cache_size; /* size of blocks to cache (if cache=1) */
111     int minor_version;
112     zint root_ptr;
113 };
114
115 struct ISAMB_block {
116     ISAM_P pos;
117     int cat;
118     int size;
119     int leaf;
120     int dirty;
121     int deleted;
122     int offset;
123     zint no_items;  /* number of nodes in this + children */
124     char *bytes;
125     char *cbuf;
126     unsigned char *buf;
127     void *decodeClientData;
128     int log_rw;
129 };
130
131 struct ISAMB_PP_s {
132     ISAMB isamb;
133     ISAM_P pos;
134     int level;
135     int maxlevel; /* total depth */
136     zint total_size;
137     zint no_blocks;
138     zint skipped_numbers; /* on a leaf node */
139     zint returned_numbers;
140     zint skipped_nodes[ISAMB_MAX_LEVEL]; /* [0]=skipped leaves, 1 = higher etc */
141     zint accessed_nodes[ISAMB_MAX_LEVEL]; /* nodes we did not skip */
142     struct ISAMB_block **block;
143     int scope;  /* on what level we forward */
144 };
145
146
147 #define encode_item_len encode_ptr
148 #if ISAMB_PTR_CODEC
149 static void encode_ptr(char **dst, zint pos)
150 {
151     unsigned char *bp = (unsigned char*) *dst;
152
153     while (pos > 127)
154     {
155         *bp++ = (unsigned char) (128 | (pos & 127));
156         pos = pos >> 7;
157     }
158     *bp++ = (unsigned char) pos;
159     *dst = (char *) bp;
160 }
161 #else
162 static void encode_ptr(char **dst, zint pos)
163 {
164     memcpy(*dst, &pos, sizeof(pos));
165     (*dst) += sizeof(pos);
166 }
167 #endif
168
169 #define decode_item_len decode_ptr
170 #if ISAMB_PTR_CODEC
171 static void decode_ptr(const char **src, zint *pos)
172 {
173     zint d = 0;
174     unsigned char c;
175     unsigned r = 0;
176
177     while (((c = *(const unsigned char *)((*src)++)) & 128))
178     {
179         d += ((zint) (c & 127) << r);
180         r += 7;
181     }
182     d += ((zint) c << r);
183     *pos = d;
184 }
185 #else
186 static void decode_ptr(const char **src, zint *pos)
187 {
188     memcpy(pos, *src, sizeof(*pos));
189     (*src) += sizeof(*pos);
190 }
191 #endif
192
193
194 void isamb_set_int_count(ISAMB b, int v)
195 {
196     b->enable_int_count = v;
197 }
198
199 void isamb_set_cache_size(ISAMB b, int v)
200 {
201     b->cache_size = v;
202 }
203
204 ISAMB isamb_open2(BFiles bfs, const char *name, int writeflag, ISAMC_M *method,
205                   int cache, int no_cat, int *sizes, int use_root_ptr)
206 {
207     ISAMB isamb = xmalloc(sizeof(*isamb));
208     int i;
209
210     assert(no_cat <= CAT_MAX);
211
212     isamb->bfs = bfs;
213     isamb->method = (ISAMC_M *) xmalloc(sizeof(*method));
214     memcpy(isamb->method, method, sizeof(*method));
215     isamb->no_cat = no_cat;
216     isamb->log_io = 0;
217     isamb->log_freelist = 0;
218     isamb->cache = cache;
219     isamb->skipped_numbers = 0;
220     isamb->returned_numbers = 0;
221     isamb->number_of_int_splits = 0;
222     isamb->number_of_leaf_splits = 0;
223     isamb->enable_int_count = 1;
224     isamb->cache_size = 40;
225
226     if (use_root_ptr)
227         isamb->minor_version = ISAMB_MINOR_VERSION_WITH_ROOT;
228     else
229         isamb->minor_version = ISAMB_MINOR_VERSION_NO_ROOT;
230
231     isamb->root_ptr = 0;
232
233     for (i = 0; i<ISAMB_MAX_LEVEL; i++)
234         isamb->skipped_nodes[i] = isamb->accessed_nodes[i] = 0;
235
236     if (cache == -1)
237     {
238         yaz_log(YLOG_WARN, "isamb_open %s. Degraded TEST mode", name);
239     }
240     else
241     {
242         assert(cache == 0 || cache == 1);
243     }
244     isamb->file = xmalloc(sizeof(*isamb->file) * isamb->no_cat);
245
246     for (i = 0; i < isamb->no_cat; i++)
247     {
248         isamb->file[i].bf = 0;
249         isamb->file[i].head_dirty = 0;
250         isamb->file[i].cache_entries = 0;
251     }
252
253     for (i = 0; i < isamb->no_cat; i++)
254     {
255         char fname[DST_BUF_SIZE];
256         char hbuf[DST_BUF_SIZE];
257
258         sprintf(fname, "%s%c", name, i+'A');
259         if (cache)
260             isamb->file[i].bf = bf_open(bfs, fname, ISAMB_CACHE_ENTRY_SIZE,
261                                         writeflag);
262         else
263             isamb->file[i].bf = bf_open(bfs, fname, sizes[i], writeflag);
264
265         if (!isamb->file[i].bf)
266         {
267             isamb_close(isamb);
268             return 0;
269         }
270
271         /* fill-in default values (for empty isamb) */
272         isamb->file[i].head.first_block = ISAMB_CACHE_ENTRY_SIZE/sizes[i]+1;
273         isamb->file[i].head.last_block = isamb->file[i].head.first_block;
274         isamb->file[i].head.block_size = sizes[i];
275         assert(sizes[i] <= ISAMB_CACHE_ENTRY_SIZE);
276 #if ISAMB_PTR_CODEC
277         if (i == isamb->no_cat-1 || sizes[i] > 128)
278             isamb->file[i].head.block_offset = 8;
279         else
280             isamb->file[i].head.block_offset = 4;
281 #else
282         isamb->file[i].head.block_offset = 11;
283 #endif
284         isamb->file[i].head.block_max =
285             sizes[i] - isamb->file[i].head.block_offset;
286         isamb->file[i].head.free_list = 0;
287         if (bf_read(isamb->file[i].bf, 0, 0, 0, hbuf))
288         {
289             /* got header assume "isamb"major minor len can fit in 16 bytes */
290             zint zint_tmp;
291             int major, minor, len, pos = 0;
292             int left;
293             const char *src = 0;
294             if (memcmp(hbuf, "isamb", 5))
295             {
296                 yaz_log(YLOG_WARN, "bad isamb header for file %s", fname);
297                 isamb_close(isamb);
298                 return 0;
299             }
300             if (sscanf(hbuf+5, "%d %d %d", &major, &minor, &len) != 3)
301             {
302                 yaz_log(YLOG_WARN, "bad isamb header for file %s", fname);
303                 isamb_close(isamb);
304                 return 0;
305             }
306             if (major != ISAMB_MAJOR_VERSION)
307             {
308                 yaz_log(YLOG_WARN, "bad major version for file %s %d, must be %d",
309                         fname, major, ISAMB_MAJOR_VERSION);
310                 isamb_close(isamb);
311                 return 0;
312             }
313             for (left = len - sizes[i]; left > 0; left = left - sizes[i])
314             {
315                 pos++;
316                 if (!bf_read(isamb->file[i].bf, pos, 0, 0, hbuf + pos*sizes[i]))
317                 {
318                     yaz_log(YLOG_WARN, "truncated isamb header for "
319                             "file=%s len=%d pos=%d",
320                             fname, len, pos);
321                     isamb_close(isamb);
322                     return 0;
323                 }
324             }
325             src = hbuf + 16;
326             decode_ptr(&src, &isamb->file[i].head.first_block);
327             decode_ptr(&src, &isamb->file[i].head.last_block);
328             decode_ptr(&src, &zint_tmp);
329             isamb->file[i].head.block_size = (int) zint_tmp;
330             decode_ptr(&src, &zint_tmp);
331             isamb->file[i].head.block_max = (int) zint_tmp;
332             decode_ptr(&src, &isamb->file[i].head.free_list);
333             if (isamb->minor_version >= ISAMB_MINOR_VERSION_WITH_ROOT)
334                 decode_ptr(&src, &isamb->root_ptr);
335         }
336         assert(isamb->file[i].head.block_size >= isamb->file[i].head.block_offset);
337         /* must rewrite the header if root ptr is in use (bug #1017) */
338         if (use_root_ptr && writeflag)
339             isamb->file[i].head_dirty = 1;
340         else
341             isamb->file[i].head_dirty = 0;
342         assert(isamb->file[i].head.block_size == sizes[i]);
343     }
344 #if ISAMB_DEBUG
345     yaz_log(YLOG_WARN, "isamb debug enabled. Things will be slower than usual");
346 #endif
347     return isamb;
348 }
349
350 ISAMB isamb_open(BFiles bfs, const char *name, int writeflag, ISAMC_M *method,
351                  int cache)
352 {
353     int sizes[CAT_NO];
354     int i, b_size = ISAMB_MIN_SIZE;
355
356     for (i = 0; i<CAT_NO; i++)
357     {
358         sizes[i] = b_size;
359         b_size = b_size * ISAMB_FAC_SIZE;
360     }
361     return isamb_open2(bfs, name, writeflag, method, cache,
362                        CAT_NO, sizes, 0);
363 }
364
365 static void flush_blocks(ISAMB b, int cat)
366 {
367     while (b->file[cat].cache_entries)
368     {
369         struct ISAMB_cache_entry *ce_this = b->file[cat].cache_entries;
370         b->file[cat].cache_entries = ce_this->next;
371
372         if (ce_this->dirty)
373         {
374             yaz_log(b->log_io, "bf_write: flush_blocks");
375             bf_write(b->file[cat].bf, ce_this->pos, 0, 0, ce_this->buf);
376         }
377         xfree(ce_this->buf);
378         xfree(ce_this);
379     }
380 }
381
382 static int cache_block(ISAMB b, ISAM_P pos, unsigned char *userbuf, int wr)
383 {
384     int cat = (int) (pos&CAT_MASK);
385     int off = (int) (((pos/CAT_MAX) &
386                       (ISAMB_CACHE_ENTRY_SIZE / b->file[cat].head.block_size - 1))
387                      * b->file[cat].head.block_size);
388     zint norm = pos / (CAT_MASK*ISAMB_CACHE_ENTRY_SIZE / b->file[cat].head.block_size);
389     int no = 0;
390     struct ISAMB_cache_entry **ce, *ce_this = 0, **ce_last = 0;
391
392     if (!b->cache)
393         return 0;
394
395     assert(ISAMB_CACHE_ENTRY_SIZE >= b->file[cat].head.block_size);
396     for (ce = &b->file[cat].cache_entries; *ce; ce = &(*ce)->next, no++)
397     {
398         ce_last = ce;
399         if ((*ce)->pos == norm)
400         {
401             ce_this = *ce;
402             *ce = (*ce)->next;   /* remove from list */
403
404             ce_this->next = b->file[cat].cache_entries;  /* move to front */
405             b->file[cat].cache_entries = ce_this;
406
407             if (wr)
408             {
409                 memcpy(ce_this->buf + off, userbuf,
410                        b->file[cat].head.block_size);
411                 ce_this->dirty = 1;
412             }
413             else
414                 memcpy(userbuf, ce_this->buf + off,
415                        b->file[cat].head.block_size);
416             return 1;
417         }
418     }
419     if (no >= b->cache_size)
420     {
421         assert(ce_last && *ce_last);
422         ce_this = *ce_last;
423         *ce_last = 0;  /* remove the last entry from list */
424         if (ce_this->dirty)
425         {
426             yaz_log(b->log_io, "bf_write: cache_block");
427             bf_write(b->file[cat].bf, ce_this->pos, 0, 0, ce_this->buf);
428         }
429         xfree(ce_this->buf);
430         xfree(ce_this);
431     }
432     ce_this = xmalloc(sizeof(*ce_this));
433     ce_this->next = b->file[cat].cache_entries;
434     b->file[cat].cache_entries = ce_this;
435     ce_this->buf = xmalloc(ISAMB_CACHE_ENTRY_SIZE);
436     ce_this->pos = norm;
437     yaz_log(b->log_io, "bf_read: cache_block");
438     if (!bf_read(b->file[cat].bf, norm, 0, 0, ce_this->buf))
439         memset(ce_this->buf, 0, ISAMB_CACHE_ENTRY_SIZE);
440     if (wr)
441     {
442         memcpy(ce_this->buf + off, userbuf, b->file[cat].head.block_size);
443         ce_this->dirty = 1;
444     }
445     else
446     {
447         ce_this->dirty = 0;
448         memcpy(userbuf, ce_this->buf + off, b->file[cat].head.block_size);
449     }
450     return 1;
451 }
452
453
454 void isamb_close(ISAMB isamb)
455 {
456     int i;
457     for (i = 0; isamb->accessed_nodes[i]; i++)
458         yaz_log(YLOG_DEBUG, "isamb_close  level leaf-%d: "ZINT_FORMAT" read, "
459                 ZINT_FORMAT" skipped",
460                 i, isamb->accessed_nodes[i], isamb->skipped_nodes[i]);
461     yaz_log(YLOG_DEBUG, "isamb_close returned "ZINT_FORMAT" values, "
462             "skipped "ZINT_FORMAT,
463             isamb->skipped_numbers, isamb->returned_numbers);
464
465     for (i = 0; i<isamb->no_cat; i++)
466     {
467         flush_blocks(isamb, i);
468         if (isamb->file[i].head_dirty)
469         {
470             char hbuf[DST_BUF_SIZE];
471             int major = ISAMB_MAJOR_VERSION;
472             int len = 16;
473             char *dst = hbuf + 16;
474             int pos = 0, left;
475             int b_size = isamb->file[i].head.block_size;
476
477             encode_ptr(&dst, isamb->file[i].head.first_block);
478             encode_ptr(&dst, isamb->file[i].head.last_block);
479             encode_ptr(&dst, isamb->file[i].head.block_size);
480             encode_ptr(&dst, isamb->file[i].head.block_max);
481             encode_ptr(&dst, isamb->file[i].head.free_list);
482
483             if (isamb->minor_version >= ISAMB_MINOR_VERSION_WITH_ROOT)
484                 encode_ptr(&dst, isamb->root_ptr);
485
486             memset(dst, '\0', b_size); /* ensure no random bytes are written */
487
488             len = dst - hbuf;
489
490             /* print exactly 16 bytes (including trailing 0) */
491             sprintf(hbuf, "isamb%02d %02d %02d\r\n", major,
492                     isamb->minor_version, len);
493
494             bf_write(isamb->file[i].bf, pos, 0, 0, hbuf);
495
496             for (left = len - b_size; left > 0; left = left - b_size)
497             {
498                 pos++;
499                 bf_write(isamb->file[i].bf, pos, 0, 0, hbuf + pos*b_size);
500             }
501         }
502         if (isamb->file[i].bf)
503             bf_close (isamb->file[i].bf);
504     }
505     xfree(isamb->file);
506     xfree(isamb->method);
507     xfree(isamb);
508 }
509
510 /* open_block: read one block at pos.
511    Decode leading sys bytes .. consisting of
512    Offset:Meaning
513    0: leader byte, != 0 leaf, == 0, non-leaf
514    1-2: used size of block
515    3-7*: number of items and all children
516
517    * Reserve 5 bytes for large block sizes. 1 for small ones .. Number
518    of items. We can thus have at most 2^40 nodes.
519 */
520 static struct ISAMB_block *open_block(ISAMB b, ISAM_P pos)
521 {
522     int cat = (int) (pos&CAT_MASK);
523     const char *src;
524     int offset = b->file[cat].head.block_offset;
525     struct ISAMB_block *p;
526     if (!pos)
527         return 0;
528     p = xmalloc(sizeof(*p));
529     p->pos = pos;
530     p->cat = (int) (pos & CAT_MASK);
531     p->buf = xmalloc(b->file[cat].head.block_size);
532     p->cbuf = 0;
533
534     if (!cache_block (b, pos, p->buf, 0))
535     {
536         yaz_log(b->log_io, "bf_read: open_block");
537         if (bf_read(b->file[cat].bf, pos/CAT_MAX, 0, 0, p->buf) != 1)
538         {
539             yaz_log(YLOG_FATAL, "isamb: read fail for pos=%ld block=%ld",
540                     (long) pos, (long) pos/CAT_MAX);
541             zebra_exit("isamb:open_block");
542         }
543     }
544     p->bytes = (char *)p->buf + offset;
545     p->leaf = p->buf[0];
546     p->size = (p->buf[1] + 256 * p->buf[2]) - offset;
547     if (p->size < 0)
548     {
549         yaz_log(YLOG_FATAL, "Bad block size %d in pos=" ZINT_FORMAT "\n",
550                 p->size, pos);
551     }
552     assert(p->size >= 0);
553     src = (char*) p->buf + 3;
554     decode_ptr(&src, &p->no_items);
555
556     p->offset = 0;
557     p->dirty = 0;
558     p->deleted = 0;
559     p->decodeClientData = (*b->method->codec.start)();
560     return p;
561 }
562
563 struct ISAMB_block *new_block(ISAMB b, int leaf, int cat)
564 {
565     struct ISAMB_block *p;
566
567     p = xmalloc(sizeof(*p));
568     p->buf = xmalloc(b->file[cat].head.block_size);
569
570     if (!b->file[cat].head.free_list)
571     {
572         zint block_no;
573         block_no = b->file[cat].head.last_block++;
574         p->pos = block_no * CAT_MAX + cat;
575         if (b->log_freelist)
576             yaz_log(b->log_freelist, "got block "
577                     ZINT_FORMAT " from last %d:" ZINT_FORMAT, p->pos,
578                     cat, p->pos/CAT_MAX);
579     }
580     else
581     {
582         p->pos = b->file[cat].head.free_list;
583         assert((p->pos & CAT_MASK) == cat);
584         if (!cache_block(b, p->pos, p->buf, 0))
585         {
586             yaz_log(b->log_io, "bf_read: new_block");
587             if (!bf_read(b->file[cat].bf, p->pos/CAT_MAX, 0, 0, p->buf))
588             {
589                 yaz_log(YLOG_FATAL, "isamb: read fail for pos=%ld block=%ld",
590                         (long) p->pos/CAT_MAX, (long) p->pos/CAT_MAX);
591                 zebra_exit("isamb:new_block");
592             }
593         }
594         if (b->log_freelist)
595             yaz_log(b->log_freelist, "got block "
596                     ZINT_FORMAT " from freelist %d:" ZINT_FORMAT, p->pos,
597                     cat, p->pos/CAT_MAX);
598         memcpy(&b->file[cat].head.free_list, p->buf, sizeof(zint));
599     }
600     p->cat = cat;
601     b->file[cat].head_dirty = 1;
602     memset(p->buf, 0, b->file[cat].head.block_size);
603     p->bytes = (char*)p->buf + b->file[cat].head.block_offset;
604     p->leaf = leaf;
605     p->size = 0;
606     p->dirty = 1;
607     p->deleted = 0;
608     p->offset = 0;
609     p->no_items = 0;
610     p->decodeClientData = (*b->method->codec.start)();
611     return p;
612 }
613
614 struct ISAMB_block *new_leaf(ISAMB b, int cat)
615 {
616     return new_block(b, 1, cat);
617 }
618
619
620 struct ISAMB_block *new_int(ISAMB b, int cat)
621 {
622     return new_block(b, 0, cat);
623 }
624
625 static void check_block(ISAMB b, struct ISAMB_block *p)
626 {
627     assert(b); /* mostly to make the compiler shut up about unused b */
628     if (p->leaf)
629     {
630         ;
631     }
632     else
633     {
634         /* sanity check */
635         char *startp = p->bytes;
636         const char *src = startp;
637         char *endp = p->bytes + p->size;
638         ISAM_P pos;
639         void *c1 = (*b->method->codec.start)();
640
641         decode_ptr(&src, &pos);
642         assert((pos&CAT_MASK) == p->cat);
643         while (src != endp)
644         {
645 #if INT_ENCODE
646             char file_item_buf[DST_ITEM_MAX];
647             char *file_item = file_item_buf;
648             (*b->method->codec.reset)(c1);
649             (*b->method->codec.decode)(c1, &file_item, &src);
650 #else
651             zint item_len;
652             decode_item_len(&src, &item_len);
653             assert(item_len > 0 && item_len < 128);
654             src += item_len;
655 #endif
656             decode_ptr(&src, &pos);
657             if ((pos&CAT_MASK) != p->cat)
658             {
659                 assert((pos&CAT_MASK) == p->cat);
660             }
661         }
662         (*b->method->codec.stop)(c1);
663     }
664 }
665
666 void close_block(ISAMB b, struct ISAMB_block *p)
667 {
668     if (!p)
669         return;
670     if (p->deleted)
671     {
672         yaz_log(b->log_freelist, "release block " ZINT_FORMAT " from freelist %d:" ZINT_FORMAT,
673                 p->pos, p->cat, p->pos/CAT_MAX);
674         memcpy(p->buf, &b->file[p->cat].head.free_list, sizeof(zint));
675         b->file[p->cat].head.free_list = p->pos;
676         b->file[p->cat].head_dirty = 1;
677         if (!cache_block(b, p->pos, p->buf, 1))
678         {
679             yaz_log(b->log_io, "bf_write: close_block (deleted)");
680             bf_write(b->file[p->cat].bf, p->pos/CAT_MAX, 0, 0, p->buf);
681         }
682     }
683     else if (p->dirty)
684     {
685         int offset = b->file[p->cat].head.block_offset;
686         int size = p->size + offset;
687         char *dst =  (char*)p->buf + 3;
688         assert(p->size >= 0);
689
690         /* memset becuase encode_ptr usually does not write all bytes */
691         memset(p->buf, 0, b->file[p->cat].head.block_offset);
692         p->buf[0] = p->leaf;
693         p->buf[1] = size & 255;
694         p->buf[2] = size >> 8;
695         encode_ptr(&dst, p->no_items);
696         check_block(b, p);
697         if (!cache_block(b, p->pos, p->buf, 1))
698         {
699             yaz_log(b->log_io, "bf_write: close_block");
700             bf_write(b->file[p->cat].bf, p->pos/CAT_MAX, 0, 0, p->buf);
701         }
702     }
703     (*b->method->codec.stop)(p->decodeClientData);
704     xfree(p->buf);
705     xfree(p);
706 }
707
708 int insert_sub(ISAMB b, struct ISAMB_block **p,
709                void *new_item, int *mode,
710                ISAMC_I *stream,
711                struct ISAMB_block **sp,
712                void *sub_item, int *sub_size,
713                const void *max_item);
714
715 int insert_int(ISAMB b, struct ISAMB_block *p, void *lookahead_item,
716                int *mode,
717                ISAMC_I *stream, struct ISAMB_block **sp,
718                void *split_item, int *split_size, const void *last_max_item)
719 {
720     char *startp = p->bytes;
721     const char *src = startp;
722     char *endp = p->bytes + p->size;
723     ISAM_P pos;
724     struct ISAMB_block *sub_p1 = 0, *sub_p2 = 0;
725     char sub_item[DST_ITEM_MAX];
726     int sub_size;
727     int more = 0;
728     zint diff_terms = 0;
729     void *c1 = (*b->method->codec.start)();
730
731     *sp = 0;
732
733     assert(p->size >= 0);
734     decode_ptr(&src, &pos);
735     while (src != endp)
736     {
737         int d;
738         const char *src0 = src;
739 #if INT_ENCODE
740         char file_item_buf[DST_ITEM_MAX];
741         char *file_item = file_item_buf;
742         (*b->method->codec.reset)(c1);
743         (*b->method->codec.decode)(c1, &file_item, &src);
744         d = (*b->method->compare_item)(file_item_buf, lookahead_item);
745         if (d > 0)
746         {
747             sub_p1 = open_block(b, pos);
748             assert(sub_p1);
749             diff_terms -= sub_p1->no_items;
750             more = insert_sub(b, &sub_p1, lookahead_item, mode,
751                               stream, &sub_p2,
752                               sub_item, &sub_size, file_item_buf);
753             diff_terms += sub_p1->no_items;
754             src = src0;
755             break;
756         }
757 #else
758         zint item_len;
759         decode_item_len(&src, &item_len);
760         d = (*b->method->compare_item)(src, lookahead_item);
761         if (d > 0)
762         {
763             sub_p1 = open_block(b, pos);
764             assert(sub_p1);
765             diff_terms -= sub_p1->no_items;
766             more = insert_sub(b, &sub_p1, lookahead_item, mode,
767                               stream, &sub_p2,
768                               sub_item, &sub_size, src);
769             diff_terms += sub_p1->no_items;
770             src = src0;
771             break;
772         }
773         src += item_len;
774 #endif
775         decode_ptr(&src, &pos);
776     }
777     if (!sub_p1)
778     {
779         /* we reached the end. So lookahead > last item */
780         sub_p1 = open_block(b, pos);
781         assert(sub_p1);
782         diff_terms -= sub_p1->no_items;
783         more = insert_sub(b, &sub_p1, lookahead_item, mode, stream, &sub_p2,
784                           sub_item, &sub_size, last_max_item);
785         diff_terms += sub_p1->no_items;
786     }
787     if (sub_p2)
788         diff_terms += sub_p2->no_items;
789     if (diff_terms)
790     {
791         p->dirty = 1;
792         p->no_items += diff_terms;
793     }
794     if (sub_p2)
795     {
796         /* there was a split - must insert pointer in this one */
797         char dst_buf[DST_BUF_SIZE];
798         char *dst = dst_buf;
799 #if INT_ENCODE
800         const char *sub_item_ptr = sub_item;
801 #endif
802         assert(sub_size < DST_ITEM_MAX && sub_size > 1);
803
804         memcpy(dst, startp, src - startp);
805
806         dst += src - startp;
807
808 #if INT_ENCODE
809         (*b->method->codec.reset)(c1);
810         (*b->method->codec.encode)(c1, &dst, &sub_item_ptr);
811 #else
812         encode_item_len(&dst, sub_size);      /* sub length and item */
813         memcpy(dst, sub_item, sub_size);
814         dst += sub_size;
815 #endif
816
817         encode_ptr(&dst, sub_p2->pos);   /* pos */
818
819         if (endp - src)                   /* remaining data */
820         {
821             memcpy(dst, src, endp - src);
822             dst += endp - src;
823         }
824         p->size = dst - dst_buf;
825         assert(p->size >= 0);
826
827         if (p->size <= b->file[p->cat].head.block_max)
828         {
829             /* it fits OK in this block */
830             memcpy(startp, dst_buf, dst - dst_buf);
831
832             close_block(b, sub_p2);
833         }
834         else
835         {
836             /* must split _this_ block as well .. */
837             struct ISAMB_block *sub_p3;
838 #if INT_ENCODE
839             char file_item_buf[DST_ITEM_MAX];
840             char *file_item = file_item_buf;
841 #else
842             zint split_size_tmp;
843 #endif
844             zint no_items_first_half = 0;
845             int p_new_size;
846             const char *half;
847             src = dst_buf;
848             endp = dst;
849
850             b->number_of_int_splits++;
851
852             p->dirty = 1;
853             close_block(b, sub_p2);
854
855             half = src + b->file[p->cat].head.block_size/2;
856             decode_ptr(&src, &pos);
857
858             if (b->enable_int_count)
859             {
860                 /* read sub block so we can get no_items for it */
861                 sub_p3 = open_block(b, pos);
862                 no_items_first_half += sub_p3->no_items;
863                 close_block(b, sub_p3);
864             }
865
866             while (src <= half)
867             {
868 #if INT_ENCODE
869                 file_item = file_item_buf;
870                 (*b->method->codec.reset)(c1);
871                 (*b->method->codec.decode)(c1, &file_item, &src);
872 #else
873                 decode_item_len(&src, &split_size_tmp);
874                 *split_size = (int) split_size_tmp;
875                 src += *split_size;
876 #endif
877                 decode_ptr(&src, &pos);
878
879                 if (b->enable_int_count)
880                 {
881                     /* read sub block so we can get no_items for it */
882                     sub_p3 = open_block(b, pos);
883                     no_items_first_half += sub_p3->no_items;
884                     close_block(b, sub_p3);
885                 }
886             }
887             /*  p is first half */
888             p_new_size = src - dst_buf;
889             memcpy(p->bytes, dst_buf, p_new_size);
890
891 #if INT_ENCODE
892             file_item = file_item_buf;
893             (*b->method->codec.reset)(c1);
894             (*b->method->codec.decode)(c1, &file_item, &src);
895             *split_size = file_item - file_item_buf;
896             memcpy(split_item, file_item_buf, *split_size);
897 #else
898             decode_item_len(&src, &split_size_tmp);
899             *split_size = (int) split_size_tmp;
900             memcpy(split_item, src, *split_size);
901             src += *split_size;
902 #endif
903             /*  *sp is second half */
904             *sp = new_int(b, p->cat);
905             (*sp)->size = endp - src;
906             memcpy((*sp)->bytes, src, (*sp)->size);
907
908             p->size = p_new_size;
909
910             /*  adjust no_items in first&second half */
911             (*sp)->no_items = p->no_items - no_items_first_half;
912             p->no_items = no_items_first_half;
913         }
914         p->dirty = 1;
915     }
916     close_block(b, sub_p1);
917     (*b->method->codec.stop)(c1);
918     return more;
919 }
920
921 int insert_leaf(ISAMB b, struct ISAMB_block **sp1, void *lookahead_item,
922                 int *lookahead_mode, ISAMC_I *stream,
923                 struct ISAMB_block **sp2,
924                 void *sub_item, int *sub_size,
925                 const void *max_item)
926 {
927     struct ISAMB_block *p = *sp1;
928     char *endp = 0;
929     const char *src = 0;
930     char dst_buf[DST_BUF_SIZE], *dst = dst_buf;
931     int new_size;
932     void *c1 = (*b->method->codec.start)();
933     void *c2 = (*b->method->codec.start)();
934     int more = 1;
935     int quater = b->file[b->no_cat-1].head.block_max / 4;
936     char *mid_cut = dst_buf + quater * 2;
937     char *tail_cut = dst_buf + quater * 3;
938     char *maxp = dst_buf + b->file[b->no_cat-1].head.block_max;
939     char *half1 = 0;
940     char *half2 = 0;
941     char cut_item_buf[DST_ITEM_MAX];
942     int cut_item_size = 0;
943     int no_items = 0;    /* number of items (total) */
944     int no_items_1 = 0;  /* number of items (first half) */
945     int inserted_dst_bytes = 0;
946
947     if (p && p->size)
948     {
949         char file_item_buf[DST_ITEM_MAX];
950         char *file_item = file_item_buf;
951
952         src = p->bytes;
953         endp = p->bytes + p->size;
954         (*b->method->codec.decode)(c1, &file_item, &src);
955         while (1)
956         {
957             const char *dst_item = 0; /* resulting item to be inserted */
958             char *lookahead_next;
959             char *dst_0 = dst;
960             int d = -1;
961
962             if (lookahead_item)
963                 d = (*b->method->compare_item)(file_item_buf, lookahead_item);
964
965             /* d now holds comparison between existing file item and
966                lookahead item
967                d = 0: equal
968                d > 0: lookahead before file
969                d < 0: lookahead after file
970             */
971             if (d > 0)
972             {
973                 /* lookahead must be inserted */
974                 dst_item = lookahead_item;
975                 /* if this is not an insertion, it's really bad .. */
976                 if (!*lookahead_mode)
977                 {
978                     yaz_log(YLOG_WARN, "isamb: Inconsistent register (1)");
979                     assert(*lookahead_mode);
980                 }
981             }
982             else if (d == 0 && *lookahead_mode == 2)
983             {
984                 /* For mode == 2, we insert the new key anyway - even
985                    though the comparison is 0. */
986                 dst_item = lookahead_item;
987                 p->dirty = 1;
988             }
989             else
990                 dst_item = file_item_buf;
991
992             if (d == 0 && !*lookahead_mode)
993             {
994                 /* it's a deletion and they match so there is nothing
995                    to be inserted anyway .. But mark the thing dirty
996                    (file item was part of input.. The item will not be
997                    part of output */
998                 p->dirty = 1;
999             }
1000             else if (!half1 && dst > mid_cut)
1001             {
1002                 /* we have reached the splitting point for the first time */
1003                 const char *dst_item_0 = dst_item;
1004                 half1 = dst; /* candidate for splitting */
1005
1006                 /* encode the resulting item */
1007                 (*b->method->codec.encode)(c2, &dst, &dst_item);
1008
1009                 cut_item_size = dst_item - dst_item_0;
1010                 assert(cut_item_size > 0);
1011                 memcpy(cut_item_buf, dst_item_0, cut_item_size);
1012
1013                 half2 = dst;
1014                 no_items_1 = no_items;
1015                 no_items++;
1016             }
1017             else
1018             {
1019                 /* encode the resulting item */
1020                 (*b->method->codec.encode)(c2, &dst, &dst_item);
1021                 no_items++;
1022             }
1023
1024             /* now move "pointers" .. result has been encoded .. */
1025             if (d > 0)
1026             {
1027                 /* we must move the lookahead pointer */
1028
1029                 inserted_dst_bytes += (dst - dst_0);
1030                 if (inserted_dst_bytes >=  quater)
1031                     /* no more room. Mark lookahead as "gone".. */
1032                     lookahead_item = 0;
1033                 else
1034                 {
1035                     /* move it really.. */
1036                     lookahead_next = lookahead_item;
1037                     if (!(*stream->read_item)(stream->clientData,
1038                                               &lookahead_next,
1039                                               lookahead_mode))
1040                     {
1041                         /* end of stream reached: no "more" and no lookahead */
1042                         lookahead_item = 0;
1043                         more = 0;
1044                     }
1045                     if (lookahead_item && max_item &&
1046                         (*b->method->compare_item)(max_item, lookahead_item) <= 0)
1047                     {
1048                         /* the lookahead goes beyond what we allow in this
1049                            leaf. Mark it as "gone" */
1050                         lookahead_item = 0;
1051                     }
1052
1053                     p->dirty = 1;
1054                 }
1055             }
1056             else if (d == 0)
1057             {
1058                 /* exact match .. move both pointers */
1059
1060                 lookahead_next = lookahead_item;
1061                 if (!(*stream->read_item)(stream->clientData,
1062                                           &lookahead_next, lookahead_mode))
1063                 {
1064                     lookahead_item = 0;
1065                     more = 0;
1066                 }
1067                 if (src == endp)
1068                     break;  /* end of file stream reached .. */
1069                 file_item = file_item_buf; /* move file pointer */
1070                 (*b->method->codec.decode)(c1, &file_item, &src);
1071             }
1072             else
1073             {
1074                 /* file pointer must be moved */
1075                 if (src == endp)
1076                     break;
1077                 file_item = file_item_buf;
1078                 (*b->method->codec.decode)(c1, &file_item, &src);
1079             }
1080         }
1081     }
1082
1083     /* this loop runs when we are "appending" to a leaf page. That is
1084        either it's empty (new) or all file items have been read in
1085        previous loop */
1086
1087     maxp = dst_buf + b->file[b->no_cat-1].head.block_max + quater;
1088     while (lookahead_item)
1089     {
1090         char *dst_item;
1091         const char *src = lookahead_item;
1092         char *dst_0 = dst;
1093
1094         /* if we have a lookahead item, we stop if we exceed the value of it */
1095         if (max_item &&
1096             (*b->method->compare_item)(max_item, lookahead_item) <= 0)
1097         {
1098             /* stop if we have reached the value of max item */
1099             break;
1100         }
1101         if (!*lookahead_mode)
1102         {
1103             /* this is append. So a delete is bad */
1104             yaz_log(YLOG_WARN, "isamb: Inconsistent register (2)");
1105             assert(*lookahead_mode);
1106         }
1107         else if (!half1 && dst > tail_cut)
1108         {
1109             const char *src_0 = src;
1110             half1 = dst; /* candidate for splitting */
1111
1112             (*b->method->codec.encode)(c2, &dst, &src);
1113
1114             cut_item_size = src - src_0;
1115             assert(cut_item_size > 0);
1116             memcpy(cut_item_buf, src_0, cut_item_size);
1117
1118             no_items_1 = no_items;
1119             half2 = dst;
1120         }
1121         else
1122             (*b->method->codec.encode)(c2, &dst, &src);
1123
1124         if (dst > maxp)
1125         {
1126             dst = dst_0;
1127             break;
1128         }
1129         no_items++;
1130         if (p)
1131             p->dirty = 1;
1132         dst_item = lookahead_item;
1133         if (!(*stream->read_item)(stream->clientData, &dst_item,
1134                                   lookahead_mode))
1135         {
1136             lookahead_item = 0;
1137             more = 0;
1138         }
1139     }
1140     new_size = dst - dst_buf;
1141     if (p && p->cat != b->no_cat-1 &&
1142         new_size > b->file[p->cat].head.block_max)
1143     {
1144         /* non-btree block will be removed */
1145         p->deleted = 1;
1146         close_block(b, p);
1147         /* delete it too!! */
1148         p = 0; /* make a new one anyway */
1149     }
1150     if (!p)
1151     {   /* must create a new one */
1152         int i;
1153         for (i = 0; i < b->no_cat; i++)
1154             if (new_size <= b->file[i].head.block_max)
1155                 break;
1156         if (i == b->no_cat)
1157             i = b->no_cat - 1;
1158         p = new_leaf(b, i);
1159     }
1160     if (new_size > b->file[p->cat].head.block_max)
1161     {
1162         char *first_dst;
1163         const char *cut_item = cut_item_buf;
1164
1165         assert(half1);
1166         assert(half2);
1167
1168         assert(cut_item_size > 0);
1169
1170         /* first half */
1171         p->size = half1 - dst_buf;
1172         assert(p->size <=  b->file[p->cat].head.block_max);
1173         memcpy(p->bytes, dst_buf, half1 - dst_buf);
1174         p->no_items = no_items_1;
1175
1176         /* second half */
1177         *sp2 = new_leaf(b, p->cat);
1178
1179         (*b->method->codec.reset)(c2);
1180
1181         b->number_of_leaf_splits++;
1182
1183         first_dst = (*sp2)->bytes;
1184
1185         (*b->method->codec.encode)(c2, &first_dst, &cut_item);
1186
1187         memcpy(first_dst, half2, dst - half2);
1188
1189         (*sp2)->size = (first_dst - (*sp2)->bytes) + (dst - half2);
1190         assert((*sp2)->size <=  b->file[p->cat].head.block_max);
1191         (*sp2)->no_items = no_items - no_items_1;
1192         (*sp2)->dirty = 1;
1193         p->dirty = 1;
1194         memcpy(sub_item, cut_item_buf, cut_item_size);
1195         *sub_size = cut_item_size;
1196     }
1197     else
1198     {
1199         memcpy(p->bytes, dst_buf, dst - dst_buf);
1200         p->size = new_size;
1201         p->no_items = no_items;
1202     }
1203     (*b->method->codec.stop)(c1);
1204     (*b->method->codec.stop)(c2);
1205     *sp1 = p;
1206     return more;
1207 }
1208
1209 int insert_sub(ISAMB b, struct ISAMB_block **p, void *new_item,
1210                int *mode,
1211                ISAMC_I *stream,
1212                struct ISAMB_block **sp,
1213                void *sub_item, int *sub_size,
1214                const void *max_item)
1215 {
1216     if (!*p || (*p)->leaf)
1217         return insert_leaf(b, p, new_item, mode, stream, sp, sub_item,
1218                            sub_size, max_item);
1219     else
1220         return insert_int(b, *p, new_item, mode, stream, sp, sub_item,
1221                           sub_size, max_item);
1222 }
1223
1224 int isamb_unlink(ISAMB b, ISAM_P pos)
1225 {
1226     struct ISAMB_block *p1;
1227
1228     if (!pos)
1229         return 0;
1230     p1 = open_block(b, pos);
1231     p1->deleted = 1;
1232     if (!p1->leaf)
1233     {
1234         zint sub_p;
1235         const char *src = p1->bytes + p1->offset;
1236 #if INT_ENCODE
1237         void *c1 = (*b->method->codec.start)();
1238 #endif
1239         decode_ptr(&src, &sub_p);
1240         isamb_unlink(b, sub_p);
1241
1242         while (src != p1->bytes + p1->size)
1243         {
1244 #if INT_ENCODE
1245             char file_item_buf[DST_ITEM_MAX];
1246             char *file_item = file_item_buf;
1247             (*b->method->codec.reset)(c1);
1248             (*b->method->codec.decode)(c1, &file_item, &src);
1249 #else
1250             zint item_len;
1251             decode_item_len(&src, &item_len);
1252             src += item_len;
1253 #endif
1254             decode_ptr(&src, &sub_p);
1255             isamb_unlink(b, sub_p);
1256         }
1257 #if INT_ENCODE
1258         (*b->method->codec.stop)(c1);
1259 #endif
1260     }
1261     close_block(b, p1);
1262     return 0;
1263 }
1264
1265 void isamb_merge(ISAMB b, ISAM_P *pos, ISAMC_I *stream)
1266 {
1267     char item_buf[DST_ITEM_MAX];
1268     char *item_ptr;
1269     int i_mode;
1270     int more;
1271     int must_delete = 0;
1272
1273     if (b->cache < 0)
1274     {
1275         int more = 1;
1276         while (more)
1277         {
1278             item_ptr = item_buf;
1279             more =
1280                 (*stream->read_item)(stream->clientData, &item_ptr, &i_mode);
1281         }
1282         *pos = 1;
1283         return;
1284     }
1285     item_ptr = item_buf;
1286     more = (*stream->read_item)(stream->clientData, &item_ptr, &i_mode);
1287     while (more)
1288     {
1289         struct ISAMB_block *p = 0, *sp = 0;
1290         char sub_item[DST_ITEM_MAX];
1291         int sub_size;
1292
1293         if (*pos)
1294             p = open_block(b, *pos);
1295         more = insert_sub(b, &p, item_buf, &i_mode, stream, &sp,
1296                           sub_item, &sub_size, 0);
1297         if (sp)
1298         {    /* increase level of tree by one */
1299             struct ISAMB_block *p2 = new_int(b, p->cat);
1300             char *dst = p2->bytes + p2->size;
1301 #if INT_ENCODE
1302             void *c1 = (*b->method->codec.start)();
1303             const char *sub_item_ptr = sub_item;
1304 #endif
1305
1306             encode_ptr(&dst, p->pos);
1307             assert(sub_size < DST_ITEM_MAX && sub_size > 1);
1308 #if INT_ENCODE
1309             (*b->method->codec.reset)(c1);
1310             (*b->method->codec.encode)(c1, &dst, &sub_item_ptr);
1311 #else
1312             encode_item_len(&dst, sub_size);
1313             memcpy(dst, sub_item, sub_size);
1314             dst += sub_size;
1315 #endif
1316             encode_ptr(&dst, sp->pos);
1317
1318             p2->size = dst - p2->bytes;
1319             p2->no_items = p->no_items + sp->no_items;
1320             *pos = p2->pos;  /* return new super page */
1321             close_block(b, sp);
1322             close_block(b, p2);
1323 #if INT_ENCODE
1324             (*b->method->codec.stop)(c1);
1325 #endif
1326         }
1327         else
1328         {
1329             *pos = p->pos;   /* return current one (again) */
1330         }
1331         if (p->no_items == 0)
1332             must_delete = 1;
1333         else
1334             must_delete = 0;
1335         close_block(b, p);
1336     }
1337     if (must_delete)
1338     {
1339         isamb_unlink(b, *pos);
1340         *pos = 0;
1341     }
1342 }
1343
1344 ISAMB_PP isamb_pp_open_x(ISAMB isamb, ISAM_P pos, int *level, int scope)
1345 {
1346     ISAMB_PP pp = xmalloc(sizeof(*pp));
1347     int i;
1348
1349     assert(pos);
1350
1351     pp->isamb = isamb;
1352     pp->block = xmalloc(ISAMB_MAX_LEVEL * sizeof(*pp->block));
1353
1354     pp->pos = pos;
1355     pp->level = 0;
1356     pp->maxlevel = 0;
1357     pp->total_size = 0;
1358     pp->no_blocks = 0;
1359     pp->skipped_numbers = 0;
1360     pp->returned_numbers = 0;
1361     pp->scope = scope;
1362     for (i = 0; i<ISAMB_MAX_LEVEL; i++)
1363         pp->skipped_nodes[i] = pp->accessed_nodes[i] = 0;
1364     while (1)
1365     {
1366         struct ISAMB_block *p = open_block(isamb, pos);
1367         const char *src = p->bytes + p->offset;
1368         pp->block[pp->level] = p;
1369
1370         pp->total_size += p->size;
1371         pp->no_blocks++;
1372         if (p->leaf)
1373             break;
1374         decode_ptr(&src, &pos);
1375         p->offset = src - p->bytes;
1376         pp->level++;
1377         pp->accessed_nodes[pp->level]++;
1378     }
1379     pp->block[pp->level+1] = 0;
1380     pp->maxlevel = pp->level;
1381     if (level)
1382         *level = pp->level;
1383     return pp;
1384 }
1385
1386 ISAMB_PP isamb_pp_open(ISAMB isamb, ISAM_P pos, int scope)
1387 {
1388     return isamb_pp_open_x(isamb, pos, 0, scope);
1389 }
1390
1391 void isamb_pp_close_x(ISAMB_PP pp, zint *size, zint *blocks)
1392 {
1393     int i;
1394     if (!pp)
1395         return;
1396     yaz_log(YLOG_DEBUG, "isamb_pp_close lev=%d returned "ZINT_FORMAT" values, "
1397             "skipped "ZINT_FORMAT,
1398             pp->maxlevel, pp->skipped_numbers, pp->returned_numbers);
1399     for (i = pp->maxlevel; i>=0; i--)
1400         if (pp->skipped_nodes[i] || pp->accessed_nodes[i])
1401             yaz_log(YLOG_DEBUG, "isamb_pp_close  level leaf-%d: "
1402                     ZINT_FORMAT" read, "ZINT_FORMAT" skipped", i,
1403                     pp->accessed_nodes[i], pp->skipped_nodes[i]);
1404     pp->isamb->skipped_numbers += pp->skipped_numbers;
1405     pp->isamb->returned_numbers += pp->returned_numbers;
1406     for (i = pp->maxlevel; i>=0; i--)
1407     {
1408         pp->isamb->accessed_nodes[i] += pp->accessed_nodes[i];
1409         pp->isamb->skipped_nodes[i] += pp->skipped_nodes[i];
1410     }
1411     if (size)
1412         *size = pp->total_size;
1413     if (blocks)
1414         *blocks = pp->no_blocks;
1415     for (i = 0; i <= pp->level; i++)
1416         close_block(pp->isamb, pp->block[i]);
1417     xfree(pp->block);
1418     xfree(pp);
1419 }
1420
1421 int isamb_block_info(ISAMB isamb, int cat)
1422 {
1423     if (cat >= 0 && cat < isamb->no_cat)
1424         return isamb->file[cat].head.block_size;
1425     return -1;
1426 }
1427
1428 void isamb_pp_close(ISAMB_PP pp)
1429 {
1430     isamb_pp_close_x(pp, 0, 0);
1431 }
1432
1433 /* simple recursive dumper .. */
1434 static void isamb_dump_r(ISAMB b, ISAM_P pos, void (*pr)(const char *str),
1435                          int level)
1436 {
1437     char buf[1024];
1438     char prefix_str[1024];
1439     if (pos)
1440     {
1441         struct ISAMB_block *p = open_block(b, pos);
1442         sprintf(prefix_str, "%*s " ZINT_FORMAT " cat=%d size=%d max=%d items="
1443                 ZINT_FORMAT, level*2, "",
1444                 pos, p->cat, p->size, b->file[p->cat].head.block_max,
1445                 p->no_items);
1446         (*pr)(prefix_str);
1447         sprintf(prefix_str, "%*s " ZINT_FORMAT, level*2, "", pos);
1448         if (p->leaf)
1449         {
1450             while (p->offset < p->size)
1451             {
1452                 const char *src = p->bytes + p->offset;
1453                 char *dst = buf;
1454                 (*b->method->codec.decode)(p->decodeClientData, &dst, &src);
1455                 (*b->method->log_item)(YLOG_DEBUG, buf, prefix_str);
1456                 p->offset = src - (char*) p->bytes;
1457             }
1458             assert(p->offset == p->size);
1459         }
1460         else
1461         {
1462             const char *src = p->bytes + p->offset;
1463             ISAM_P sub;
1464
1465             decode_ptr(&src, &sub);
1466             p->offset = src - (char*) p->bytes;
1467
1468             isamb_dump_r(b, sub, pr, level+1);
1469
1470             while (p->offset < p->size)
1471             {
1472 #if INT_ENCODE
1473                 char file_item_buf[DST_ITEM_MAX];
1474                 char *file_item = file_item_buf;
1475                 void *c1 = (*b->method->codec.start)();
1476                 (*b->method->codec.decode)(c1, &file_item, &src);
1477                 (*b->method->codec.stop)(c1);
1478                 (*b->method->log_item)(YLOG_DEBUG, file_item_buf, prefix_str);
1479 #else
1480                 zint item_len;
1481                 decode_item_len(&src, &item_len);
1482                 (*b->method->log_item)(YLOG_DEBUG, src, prefix_str);
1483                 src += item_len;
1484 #endif
1485                 decode_ptr(&src, &sub);
1486
1487                 p->offset = src - (char*) p->bytes;
1488
1489                 isamb_dump_r(b, sub, pr, level+1);
1490             }
1491         }
1492         close_block(b, p);
1493     }
1494 }
1495
1496 void isamb_dump(ISAMB b, ISAM_P pos, void (*pr)(const char *str))
1497 {
1498     isamb_dump_r(b, pos, pr, 0);
1499 }
1500
1501 int isamb_pp_read(ISAMB_PP pp, void *buf)
1502 {
1503     return isamb_pp_forward(pp, buf, 0);
1504 }
1505
1506
1507 void isamb_pp_pos(ISAMB_PP pp, double *current, double *total)
1508 { /* return an estimate of the current position and of the total number of */
1509   /* occureences in the isam tree, based on the current leaf */
1510     assert(total);
1511     assert(current);
1512
1513     /* if end-of-stream PP may not be leaf */
1514
1515     *total = (double) (pp->block[0]->no_items);
1516     *current = (double) pp->returned_numbers;
1517 #if ISAMB_DEBUG
1518     yaz_log(YLOG_LOG, "isamb_pp_pos returning: cur= %0.1f tot=%0.1f rn="
1519             ZINT_FORMAT, *current, *total, pp->returned_numbers);
1520 #endif
1521 }
1522
1523 int isamb_pp_forward(ISAMB_PP pp, void *buf, const void *untilb)
1524 {
1525     char *dst = buf;
1526     const char *src;
1527     struct ISAMB_block *p = pp->block[pp->level];
1528     ISAMB b = pp->isamb;
1529     if (!p)
1530         return 0;
1531  again:
1532     while (p->offset == p->size)
1533     {
1534         ISAM_P pos;
1535 #if INT_ENCODE
1536         const char *src_0;
1537         void *c1;
1538         char file_item_buf[DST_ITEM_MAX];
1539         char *file_item = file_item_buf;
1540 #else
1541         zint item_len;
1542 #endif
1543         while (p->offset == p->size)
1544         {
1545             if (pp->level == 0)
1546                 return 0;
1547             close_block(pp->isamb, pp->block[pp->level]);
1548             pp->block[pp->level] = 0;
1549             (pp->level)--;
1550             p = pp->block[pp->level];
1551             assert(!p->leaf);
1552         }
1553
1554         assert(!p->leaf);
1555         src = p->bytes + p->offset;
1556
1557 #if INT_ENCODE
1558         c1 = (*b->method->codec.start)();
1559         (*b->method->codec.decode)(c1, &file_item, &src);
1560 #else
1561         decode_ptr(&src, &item_len);
1562         src += item_len;
1563 #endif
1564         decode_ptr(&src, &pos);
1565         p->offset = src - (char*) p->bytes;
1566
1567         src = p->bytes + p->offset;
1568
1569         while(1)
1570         {
1571             if (!untilb || p->offset == p->size)
1572                 break;
1573             assert(p->offset < p->size);
1574 #if INT_ENCODE
1575             src_0 = src;
1576             file_item = file_item_buf;
1577             (*b->method->codec.reset)(c1);
1578             (*b->method->codec.decode)(c1, &file_item, &src);
1579             if ((*b->method->compare_item)(untilb, file_item_buf) < pp->scope)
1580             {
1581                 src = src_0;
1582                 break;
1583             }
1584 #else
1585             decode_item_len(&src, &item_len);
1586             if ((*b->method->compare_item)(untilb, src) < pp->scope)
1587                 break;
1588             src += item_len;
1589 #endif
1590             decode_ptr(&src, &pos);
1591             p->offset = src - (char*) p->bytes;
1592         }
1593
1594         pp->level++;
1595
1596         while (1)
1597         {
1598             pp->block[pp->level] = p = open_block(pp->isamb, pos);
1599
1600             pp->total_size += p->size;
1601             pp->no_blocks++;
1602
1603             if (p->leaf)
1604             {
1605                 break;
1606             }
1607
1608             src = p->bytes + p->offset;
1609             while(1)
1610             {
1611                 decode_ptr(&src, &pos);
1612                 p->offset = src - (char*) p->bytes;
1613
1614                 if (!untilb || p->offset == p->size)
1615                     break;
1616                 assert(p->offset < p->size);
1617 #if INT_ENCODE
1618                 src_0 = src;
1619                 file_item = file_item_buf;
1620                 (*b->method->codec.reset)(c1);
1621                 (*b->method->codec.decode)(c1, &file_item, &src);
1622                 if ((*b->method->compare_item)(untilb, file_item_buf) < pp->scope)
1623                 {
1624                     src = src_0;
1625                     break;
1626                 }
1627 #else
1628                 decode_ptr(&src, &item_len);
1629                 if ((*b->method->compare_item)(untilb, src) <= pp->scope)
1630                     break;
1631                 src += item_len;
1632 #endif
1633             }
1634             pp->level++;
1635         }
1636 #if INT_ENCODE
1637         (*b->method->codec.stop)(c1);
1638 #endif
1639     }
1640     assert(p->offset < p->size);
1641     assert(p->leaf);
1642     while(1)
1643     {
1644         char *dst0 = dst;
1645         src = p->bytes + p->offset;
1646         (*pp->isamb->method->codec.decode)(p->decodeClientData, &dst, &src);
1647         p->offset = src - (char*) p->bytes;
1648         if (!untilb || (*pp->isamb->method->compare_item)(untilb, dst0) < pp->scope)
1649             break;
1650         dst = dst0;
1651         if (p->offset == p->size) goto again;
1652     }
1653     pp->returned_numbers++;
1654     return 1;
1655 }
1656
1657 zint isamb_get_int_splits(ISAMB b)
1658 {
1659     return b->number_of_int_splits;
1660 }
1661
1662 zint isamb_get_leaf_splits(ISAMB b)
1663 {
1664     return b->number_of_leaf_splits;
1665 }
1666
1667 zint isamb_get_root_ptr(ISAMB b)
1668 {
1669     return b->root_ptr;
1670 }
1671
1672 void isamb_set_root_ptr(ISAMB b, zint root_ptr)
1673 {
1674     b->root_ptr = root_ptr;
1675 }
1676
1677
1678 /*
1679  * Local variables:
1680  * c-basic-offset: 4
1681  * c-file-style: "Stroustrup"
1682  * indent-tabs-mode: nil
1683  * End:
1684  * vim: shiftwidth=4 tabstop=8 expandtab
1685  */
1686