Log freelist info changes
[idzebra-moved-to-github.git] / isamb / isamb.c
1 /* This file is part of the Zebra server.
2    Copyright (C) 1994-2009 Index Data
3
4 Zebra is free software; you can redistribute it and/or modify it under
5 the terms of the GNU General Public License as published by the Free
6 Software Foundation; either version 2, or (at your option) any later
7 version.
8
9 Zebra is distributed in the hope that it will be useful, but WITHOUT ANY
10 WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12 for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
17
18 */
19
20 #include <stdlib.h>
21 #include <string.h>
22 #include <yaz/log.h>
23 #include <yaz/xmalloc.h>
24 #include <idzebra/isamb.h>
25 #include <assert.h>
26
27 #ifndef ISAMB_DEBUG
28 #define ISAMB_DEBUG 0
29 #endif
30
31
32 #define ISAMB_MAJOR_VERSION 3
33 #define ISAMB_MINOR_VERSION_NO_ROOT 0
34 #define ISAMB_MINOR_VERSION_WITH_ROOT 1
35
36 struct ISAMB_head {
37     zint first_block;
38     zint last_block;
39     zint free_list;
40     zint no_items;
41     int block_size;
42     int block_max;
43     int block_offset;
44 };
45
46 /* if 1, upper nodes items are encoded; 0 if not encoded */
47 #define INT_ENCODE 1
48
49 /* maximum size of encoded buffer */
50 #define DST_ITEM_MAX 5000
51
52 /* max page size for _any_ isamb use */
53 #define ISAMB_MAX_PAGE 32768
54
55 #define ISAMB_MAX_LEVEL 10
56 /* approx 2*max page + max size of item */
57 #define DST_BUF_SIZE (2*ISAMB_MAX_PAGE+DST_ITEM_MAX+100)
58
59 /* should be maximum block size of multiple thereof */
60 #define ISAMB_CACHE_ENTRY_SIZE ISAMB_MAX_PAGE
61
62 /* CAT_MAX: _must_ be power of 2 */
63 #define CAT_MAX 4
64 #define CAT_MASK (CAT_MAX-1)
65 /* CAT_NO: <= CAT_MAX */
66 #define CAT_NO 4
67
68 /* Smallest block size */
69 #define ISAMB_MIN_SIZE 32
70 /* Size factor */
71 #define ISAMB_FAC_SIZE 4
72
73 /* ISAMB_PTR_CODEC = 1 var, =0 fixed */
74 #define ISAMB_PTR_CODEC 1
75
76 struct ISAMB_cache_entry {
77     ISAM_P pos;
78     unsigned char *buf;
79     int dirty;
80     int hits;
81     struct ISAMB_cache_entry *next;
82 };
83
84 struct ISAMB_file {
85     BFile bf;
86     int head_dirty;
87     struct ISAMB_head head;
88     struct ISAMB_cache_entry *cache_entries;
89 };
90
91 struct ISAMB_s {
92     BFiles bfs;
93     ISAMC_M *method;
94
95     struct ISAMB_file *file;
96     int no_cat;
97     int cache; /* 0 = no cache, 1 = use cache, -1 = dummy isam (for testing only) */
98     int log_io;        /* log level for bf_read/bf_write calls */
99     int log_freelist;  /* log level for freelist handling */
100     zint skipped_numbers; /* on a leaf node */
101     zint returned_numbers; 
102     zint skipped_nodes[ISAMB_MAX_LEVEL]; /* [0]=skipped leaves, 1 = higher etc */
103     zint accessed_nodes[ISAMB_MAX_LEVEL]; /* nodes we did not skip */
104     zint number_of_int_splits;
105     zint number_of_leaf_splits;
106     int enable_int_count; /* whether we count nodes (or not) */
107     int cache_size; /* size of blocks to cache (if cache=1) */
108     int minor_version;
109     zint root_ptr;
110 };
111
112 struct ISAMB_block {
113     ISAM_P pos;
114     int cat;
115     int size;
116     int leaf;
117     int dirty;
118     int deleted;
119     int offset;
120     zint no_items;  /* number of nodes in this + children */
121     char *bytes;
122     char *cbuf;
123     unsigned char *buf;
124     void *decodeClientData;
125     int log_rw;
126 };
127
128 struct ISAMB_PP_s {
129     ISAMB isamb;
130     ISAM_P pos;
131     int level;
132     int maxlevel; /* total depth */
133     zint total_size;
134     zint no_blocks;
135     zint skipped_numbers; /* on a leaf node */
136     zint returned_numbers; 
137     zint skipped_nodes[ISAMB_MAX_LEVEL]; /* [0]=skipped leaves, 1 = higher etc */
138     zint accessed_nodes[ISAMB_MAX_LEVEL]; /* nodes we did not skip */
139     struct ISAMB_block **block;
140     int scope;  /* on what level we forward */
141 };
142
143
144 #define encode_item_len encode_ptr
145 #if ISAMB_PTR_CODEC
146 static void encode_ptr(char **dst, zint pos)
147 {
148     unsigned char *bp = (unsigned char*) *dst;
149
150     while (pos > 127)
151     {
152         *bp++ = (unsigned char) (128 | (pos & 127));
153         pos = pos >> 7;
154     }
155     *bp++ = (unsigned char) pos;
156     *dst = (char *) bp;
157 }
158 #else
159 static void encode_ptr(char **dst, zint pos)
160 {
161     memcpy(*dst, &pos, sizeof(pos));
162     (*dst) += sizeof(pos);
163 }
164 #endif
165
166 #define decode_item_len decode_ptr
167 #if ISAMB_PTR_CODEC
168 static void decode_ptr(const char **src, zint *pos)
169 {
170     zint d = 0;
171     unsigned char c;
172     unsigned r = 0;
173
174     while (((c = *(const unsigned char *)((*src)++)) & 128))
175     {
176         d += ((zint) (c & 127) << r);
177         r += 7;
178     }
179     d += ((zint) c << r);
180     *pos = d;
181 }
182 #else
183 static void decode_ptr(const char **src, zint *pos)
184 {
185     memcpy(pos, *src, sizeof(*pos));
186     (*src) += sizeof(*pos);
187 }
188 #endif
189
190
191 void isamb_set_int_count(ISAMB b, int v)
192 {
193     b->enable_int_count = v;
194 }
195
196 void isamb_set_cache_size(ISAMB b, int v)
197 {
198     b->cache_size = v;
199 }
200
201 ISAMB isamb_open2(BFiles bfs, const char *name, int writeflag, ISAMC_M *method,
202                   int cache, int no_cat, int *sizes, int use_root_ptr)
203 {
204     ISAMB isamb = xmalloc(sizeof(*isamb));
205     int i;
206
207     assert(no_cat <= CAT_MAX);
208
209     isamb->bfs = bfs;
210     isamb->method = (ISAMC_M *) xmalloc(sizeof(*method));
211     memcpy(isamb->method, method, sizeof(*method));
212     isamb->no_cat = no_cat;
213     isamb->log_io = 0;
214     isamb->log_freelist = 0;
215     isamb->cache = cache;
216     isamb->skipped_numbers = 0;
217     isamb->returned_numbers = 0;
218     isamb->number_of_int_splits = 0;
219     isamb->number_of_leaf_splits = 0;
220     isamb->enable_int_count = 1;
221     isamb->cache_size = 40;
222
223     if (use_root_ptr)
224         isamb->minor_version = ISAMB_MINOR_VERSION_WITH_ROOT;
225     else
226         isamb->minor_version = ISAMB_MINOR_VERSION_NO_ROOT;
227
228     isamb->root_ptr = 0;
229
230     for (i = 0; i<ISAMB_MAX_LEVEL; i++)
231         isamb->skipped_nodes[i] = isamb->accessed_nodes[i] = 0;
232
233     if (cache == -1)
234     {
235         yaz_log(YLOG_WARN, "isamb_open %s. Degraded TEST mode", name);
236     }
237     else
238     {
239         assert(cache == 0 || cache == 1);
240     }
241     isamb->file = xmalloc(sizeof(*isamb->file) * isamb->no_cat);
242
243     for (i = 0; i < isamb->no_cat; i++)
244     {
245         isamb->file[i].bf = 0;
246         isamb->file[i].head_dirty = 0;
247         isamb->file[i].cache_entries = 0;
248     }
249
250     for (i = 0; i < isamb->no_cat; i++)
251     {
252         char fname[DST_BUF_SIZE];
253         char hbuf[DST_BUF_SIZE];
254
255         sprintf(fname, "%s%c", name, i+'A');
256         if (cache)
257             isamb->file[i].bf = bf_open(bfs, fname, ISAMB_CACHE_ENTRY_SIZE,
258                                         writeflag);
259         else
260             isamb->file[i].bf = bf_open(bfs, fname, sizes[i], writeflag);
261
262         if (!isamb->file[i].bf)
263         {
264             isamb_close(isamb);
265             return 0;
266         }
267
268         /* fill-in default values (for empty isamb) */
269         isamb->file[i].head.first_block = ISAMB_CACHE_ENTRY_SIZE/sizes[i]+1;
270         isamb->file[i].head.last_block = isamb->file[i].head.first_block;
271         isamb->file[i].head.block_size = sizes[i];
272         assert(sizes[i] <= ISAMB_CACHE_ENTRY_SIZE);
273 #if ISAMB_PTR_CODEC
274         if (i == isamb->no_cat-1 || sizes[i] > 128)
275             isamb->file[i].head.block_offset = 8;
276         else
277             isamb->file[i].head.block_offset = 4;
278 #else
279         isamb->file[i].head.block_offset = 11;
280 #endif
281         isamb->file[i].head.block_max =
282             sizes[i] - isamb->file[i].head.block_offset;
283         isamb->file[i].head.free_list = 0;
284         if (bf_read(isamb->file[i].bf, 0, 0, 0, hbuf))
285         {
286             /* got header assume "isamb"major minor len can fit in 16 bytes */
287             zint zint_tmp;
288             int major, minor, len, pos = 0;
289             int left;
290             const char *src = 0;
291             if (memcmp(hbuf, "isamb", 5))
292             {
293                 yaz_log(YLOG_WARN, "bad isamb header for file %s", fname);
294                 isamb_close(isamb);
295                 return 0;
296             }
297             if (sscanf(hbuf+5, "%d %d %d", &major, &minor, &len) != 3)
298             {
299                 yaz_log(YLOG_WARN, "bad isamb header for file %s", fname);
300                 isamb_close(isamb);
301                 return 0;
302             }
303             if (major != ISAMB_MAJOR_VERSION)
304             {
305                 yaz_log(YLOG_WARN, "bad major version for file %s %d, must be %d",
306                         fname, major, ISAMB_MAJOR_VERSION);
307                 isamb_close(isamb);
308                 return 0;
309             }
310             for (left = len - sizes[i]; left > 0; left = left - sizes[i])
311             {
312                 pos++;
313                 if (!bf_read(isamb->file[i].bf, pos, 0, 0, hbuf + pos*sizes[i]))
314                 {
315                     yaz_log(YLOG_WARN, "truncated isamb header for " 
316                             "file=%s len=%d pos=%d",
317                             fname, len, pos);
318                     isamb_close(isamb);
319                     return 0;
320                 }
321             }
322             src = hbuf + 16;
323             decode_ptr(&src, &isamb->file[i].head.first_block);
324             decode_ptr(&src, &isamb->file[i].head.last_block);
325             decode_ptr(&src, &zint_tmp);
326             isamb->file[i].head.block_size = (int) zint_tmp;
327             decode_ptr(&src, &zint_tmp);
328             isamb->file[i].head.block_max = (int) zint_tmp;
329             decode_ptr(&src, &isamb->file[i].head.free_list);
330             if (isamb->minor_version >= ISAMB_MINOR_VERSION_WITH_ROOT)
331                 decode_ptr(&src, &isamb->root_ptr);
332         }
333         assert(isamb->file[i].head.block_size >= isamb->file[i].head.block_offset);
334         /* must rewrite the header if root ptr is in use (bug #1017) */
335         if (use_root_ptr && writeflag)
336             isamb->file[i].head_dirty = 1;
337         else
338             isamb->file[i].head_dirty = 0;
339         assert(isamb->file[i].head.block_size == sizes[i]);
340     }
341 #if ISAMB_DEBUG
342     yaz_log(YLOG_WARN, "isamb debug enabled. Things will be slower than usual");
343 #endif
344     return isamb;
345 }
346
347 ISAMB isamb_open(BFiles bfs, const char *name, int writeflag, ISAMC_M *method,
348                  int cache)
349 {
350     int sizes[CAT_NO];
351     int i, b_size = ISAMB_MIN_SIZE;
352     
353     for (i = 0; i<CAT_NO; i++)
354     {
355         sizes[i] = b_size;
356         b_size = b_size * ISAMB_FAC_SIZE;
357     }
358     return isamb_open2(bfs, name, writeflag, method, cache,
359                        CAT_NO, sizes, 0);
360 }
361
362 static void flush_blocks(ISAMB b, int cat)
363 {
364     while (b->file[cat].cache_entries)
365     {
366         struct ISAMB_cache_entry *ce_this = b->file[cat].cache_entries;
367         b->file[cat].cache_entries = ce_this->next;
368
369         if (ce_this->dirty)
370         {
371             yaz_log(b->log_io, "bf_write: flush_blocks");
372             bf_write(b->file[cat].bf, ce_this->pos, 0, 0, ce_this->buf);
373         }
374         xfree(ce_this->buf);
375         xfree(ce_this);
376     }
377 }
378
379 static int cache_block(ISAMB b, ISAM_P pos, unsigned char *userbuf, int wr)
380 {
381     int cat = (int) (pos&CAT_MASK);
382     int off = (int) (((pos/CAT_MAX) & 
383                       (ISAMB_CACHE_ENTRY_SIZE / b->file[cat].head.block_size - 1))
384                      * b->file[cat].head.block_size);
385     zint norm = pos / (CAT_MASK*ISAMB_CACHE_ENTRY_SIZE / b->file[cat].head.block_size);
386     int no = 0;
387     struct ISAMB_cache_entry **ce, *ce_this = 0, **ce_last = 0;
388
389     if (!b->cache)
390         return 0;
391
392     assert(ISAMB_CACHE_ENTRY_SIZE >= b->file[cat].head.block_size);
393     for (ce = &b->file[cat].cache_entries; *ce; ce = &(*ce)->next, no++)
394     {
395         ce_last = ce;
396         if ((*ce)->pos == norm)
397         {
398             ce_this = *ce;
399             *ce = (*ce)->next;   /* remove from list */
400             
401             ce_this->next = b->file[cat].cache_entries;  /* move to front */
402             b->file[cat].cache_entries = ce_this;
403             
404             if (wr)
405             {
406                 memcpy(ce_this->buf + off, userbuf, 
407                        b->file[cat].head.block_size);
408                 ce_this->dirty = 1;
409             }
410             else
411                 memcpy(userbuf, ce_this->buf + off,
412                        b->file[cat].head.block_size);
413             return 1;
414         }
415     }
416     if (no >= b->cache_size)
417     {
418         assert(ce_last && *ce_last);
419         ce_this = *ce_last;
420         *ce_last = 0;  /* remove the last entry from list */
421         if (ce_this->dirty)
422         {
423             yaz_log(b->log_io, "bf_write: cache_block");
424             bf_write(b->file[cat].bf, ce_this->pos, 0, 0, ce_this->buf);
425         }
426         xfree(ce_this->buf);
427         xfree(ce_this);
428     }
429     ce_this = xmalloc(sizeof(*ce_this));
430     ce_this->next = b->file[cat].cache_entries;
431     b->file[cat].cache_entries = ce_this;
432     ce_this->buf = xmalloc(ISAMB_CACHE_ENTRY_SIZE);
433     ce_this->pos = norm;
434     yaz_log(b->log_io, "bf_read: cache_block");
435     if (!bf_read(b->file[cat].bf, norm, 0, 0, ce_this->buf))
436         memset(ce_this->buf, 0, ISAMB_CACHE_ENTRY_SIZE);
437     if (wr)
438     {
439         memcpy(ce_this->buf + off, userbuf, b->file[cat].head.block_size);
440         ce_this->dirty = 1;
441     }
442     else
443     {
444         ce_this->dirty = 0;
445         memcpy(userbuf, ce_this->buf + off, b->file[cat].head.block_size);
446     }
447     return 1;
448 }
449
450
451 void isamb_close(ISAMB isamb)
452 {
453     int i;
454     for (i = 0; isamb->accessed_nodes[i]; i++)
455         yaz_log(YLOG_DEBUG, "isamb_close  level leaf-%d: "ZINT_FORMAT" read, "
456                 ZINT_FORMAT" skipped",
457                 i, isamb->accessed_nodes[i], isamb->skipped_nodes[i]);
458     yaz_log(YLOG_DEBUG, "isamb_close returned "ZINT_FORMAT" values, "
459             "skipped "ZINT_FORMAT,
460             isamb->skipped_numbers, isamb->returned_numbers);
461
462     for (i = 0; i<isamb->no_cat; i++)
463     {
464         flush_blocks(isamb, i);
465         if (isamb->file[i].head_dirty)
466         {
467             char hbuf[DST_BUF_SIZE];
468             int major = ISAMB_MAJOR_VERSION;
469             int len = 16;
470             char *dst = hbuf + 16;
471             int pos = 0, left;
472             int b_size = isamb->file[i].head.block_size;
473
474             encode_ptr(&dst, isamb->file[i].head.first_block);
475             encode_ptr(&dst, isamb->file[i].head.last_block);
476             encode_ptr(&dst, isamb->file[i].head.block_size);
477             encode_ptr(&dst, isamb->file[i].head.block_max);
478             encode_ptr(&dst, isamb->file[i].head.free_list);
479
480             if (isamb->minor_version >= ISAMB_MINOR_VERSION_WITH_ROOT)
481                 encode_ptr(&dst, isamb->root_ptr);
482
483             memset(dst, '\0', b_size); /* ensure no random bytes are written */
484
485             len = dst - hbuf;
486
487             /* print exactly 16 bytes (including trailing 0) */
488             sprintf(hbuf, "isamb%02d %02d %02d\r\n", major,
489                     isamb->minor_version, len);
490
491             bf_write(isamb->file[i].bf, pos, 0, 0, hbuf);
492
493             for (left = len - b_size; left > 0; left = left - b_size)
494             {
495                 pos++;
496                 bf_write(isamb->file[i].bf, pos, 0, 0, hbuf + pos*b_size);
497             }
498         }
499         if (isamb->file[i].bf)
500             bf_close (isamb->file[i].bf);
501     }
502     xfree(isamb->file);
503     xfree(isamb->method);
504     xfree(isamb);
505 }
506
507 /* open_block: read one block at pos.
508    Decode leading sys bytes .. consisting of
509    Offset:Meaning
510    0: leader byte, != 0 leaf, == 0, non-leaf
511    1-2: used size of block
512    3-7*: number of items and all children
513    
514    * Reserve 5 bytes for large block sizes. 1 for small ones .. Number
515    of items. We can thus have at most 2^40 nodes. 
516 */
517 static struct ISAMB_block *open_block(ISAMB b, ISAM_P pos)
518 {
519     int cat = (int) (pos&CAT_MASK);
520     const char *src;
521     int offset = b->file[cat].head.block_offset;
522     struct ISAMB_block *p;
523     if (!pos)
524         return 0;
525     p = xmalloc(sizeof(*p));
526     p->pos = pos;
527     p->cat = (int) (pos & CAT_MASK);
528     p->buf = xmalloc(b->file[cat].head.block_size);
529     p->cbuf = 0;
530
531     if (!cache_block (b, pos, p->buf, 0))
532     {
533         yaz_log(b->log_io, "bf_read: open_block");
534         if (bf_read(b->file[cat].bf, pos/CAT_MAX, 0, 0, p->buf) != 1)
535         {
536             yaz_log(YLOG_FATAL, "isamb: read fail for pos=%ld block=%ld",
537                     (long) pos, (long) pos/CAT_MAX);
538             zebra_exit("isamb:open_block");
539         }
540     }
541     p->bytes = (char *)p->buf + offset;
542     p->leaf = p->buf[0];
543     p->size = (p->buf[1] + 256 * p->buf[2]) - offset;
544     if (p->size < 0)
545     {
546         yaz_log(YLOG_FATAL, "Bad block size %d in pos=" ZINT_FORMAT "\n",
547                 p->size, pos);
548     }
549     assert(p->size >= 0);
550     src = (char*) p->buf + 3;
551     decode_ptr(&src, &p->no_items);
552
553     p->offset = 0;
554     p->dirty = 0;
555     p->deleted = 0;
556     p->decodeClientData = (*b->method->codec.start)();
557     return p;
558 }
559
560 struct ISAMB_block *new_block(ISAMB b, int leaf, int cat)
561 {
562     struct ISAMB_block *p;
563
564     p = xmalloc(sizeof(*p));
565     p->buf = xmalloc(b->file[cat].head.block_size);
566
567     if (!b->file[cat].head.free_list)
568     {
569         zint block_no;
570         block_no = b->file[cat].head.last_block++;
571         p->pos = block_no * CAT_MAX + cat;
572         if (b->log_freelist)
573             yaz_log(b->log_freelist, "got block " 
574                     ZINT_FORMAT " from last %d:" ZINT_FORMAT, p->pos,
575                     cat, p->pos/CAT_MAX);
576     }
577     else
578     {
579         p->pos = b->file[cat].head.free_list;
580         assert((p->pos & CAT_MASK) == cat);
581         if (!cache_block(b, p->pos, p->buf, 0))
582         {
583             yaz_log(b->log_io, "bf_read: new_block");
584             if (!bf_read(b->file[cat].bf, p->pos/CAT_MAX, 0, 0, p->buf))
585             {
586                 yaz_log(YLOG_FATAL, "isamb: read fail for pos=%ld block=%ld",
587                         (long) p->pos/CAT_MAX, (long) p->pos/CAT_MAX);
588                 zebra_exit("isamb:new_block");
589             }
590         }
591         if (b->log_freelist)
592             yaz_log(b->log_freelist, "got block " 
593                     ZINT_FORMAT " from freelist %d:" ZINT_FORMAT, p->pos,
594                     cat, p->pos/CAT_MAX);
595         memcpy(&b->file[cat].head.free_list, p->buf, sizeof(zint));
596     }
597     p->cat = cat;
598     b->file[cat].head_dirty = 1;
599     memset(p->buf, 0, b->file[cat].head.block_size);
600     p->bytes = (char*)p->buf + b->file[cat].head.block_offset;
601     p->leaf = leaf;
602     p->size = 0;
603     p->dirty = 1;
604     p->deleted = 0;
605     p->offset = 0;
606     p->no_items = 0;
607     p->decodeClientData = (*b->method->codec.start)();
608     return p;
609 }
610
611 struct ISAMB_block *new_leaf(ISAMB b, int cat)
612 {
613     return new_block(b, 1, cat);
614 }
615
616
617 struct ISAMB_block *new_int(ISAMB b, int cat)
618 {
619     return new_block(b, 0, cat);
620 }
621
622 static void check_block(ISAMB b, struct ISAMB_block *p)
623 {
624     assert(b); /* mostly to make the compiler shut up about unused b */
625     if (p->leaf)
626     {
627         ;
628     }
629     else
630     {
631         /* sanity check */
632         char *startp = p->bytes;
633         const char *src = startp;
634         char *endp = p->bytes + p->size;
635         ISAM_P pos;
636         void *c1 = (*b->method->codec.start)();
637             
638         decode_ptr(&src, &pos);
639         assert((pos&CAT_MASK) == p->cat);
640         while (src != endp)
641         {
642 #if INT_ENCODE
643             char file_item_buf[DST_ITEM_MAX];
644             char *file_item = file_item_buf;
645             (*b->method->codec.reset)(c1);
646             (*b->method->codec.decode)(c1, &file_item, &src);
647 #else
648             zint item_len;
649             decode_item_len(&src, &item_len);
650             assert(item_len > 0 && item_len < 128);
651             src += item_len;
652 #endif
653             decode_ptr(&src, &pos);
654             if ((pos&CAT_MASK) != p->cat)
655             {
656                 assert((pos&CAT_MASK) == p->cat);
657             }
658         }
659         (*b->method->codec.stop)(c1);
660     }
661 }
662
663 void close_block(ISAMB b, struct ISAMB_block *p)
664 {
665     if (!p)
666         return;
667     if (p->deleted)
668     {
669         yaz_log(b->log_freelist, "release block " ZINT_FORMAT " from freelist %d:" ZINT_FORMAT,
670                 p->pos, p->cat, p->pos/CAT_MAX);
671         memcpy(p->buf, &b->file[p->cat].head.free_list, sizeof(zint));
672         b->file[p->cat].head.free_list = p->pos;
673         if (!cache_block(b, p->pos, p->buf, 1))
674         {
675             yaz_log(b->log_io, "bf_write: close_block (deleted)");
676             bf_write(b->file[p->cat].bf, p->pos/CAT_MAX, 0, 0, p->buf);
677         }
678     }
679     else if (p->dirty)
680     {
681         int offset = b->file[p->cat].head.block_offset;
682         int size = p->size + offset;
683         char *dst =  (char*)p->buf + 3;
684         assert(p->size >= 0);
685         
686         /* memset becuase encode_ptr usually does not write all bytes */
687         memset(p->buf, 0, b->file[p->cat].head.block_offset);
688         p->buf[0] = p->leaf;
689         p->buf[1] = size & 255;
690         p->buf[2] = size >> 8;
691         encode_ptr(&dst, p->no_items);
692         check_block(b, p);
693         if (!cache_block(b, p->pos, p->buf, 1))
694         {
695             yaz_log(b->log_io, "bf_write: close_block");
696             bf_write(b->file[p->cat].bf, p->pos/CAT_MAX, 0, 0, p->buf);
697         }
698     }
699     (*b->method->codec.stop)(p->decodeClientData);
700     xfree(p->buf);
701     xfree(p);
702 }
703
704 int insert_sub(ISAMB b, struct ISAMB_block **p,
705                void *new_item, int *mode,
706                ISAMC_I *stream,
707                struct ISAMB_block **sp,
708                void *sub_item, int *sub_size,
709                const void *max_item);
710
711 int insert_int(ISAMB b, struct ISAMB_block *p, void *lookahead_item,
712                int *mode,
713                ISAMC_I *stream, struct ISAMB_block **sp,
714                void *split_item, int *split_size, const void *last_max_item)
715 {
716     char *startp = p->bytes;
717     const char *src = startp;
718     char *endp = p->bytes + p->size;
719     ISAM_P pos;
720     struct ISAMB_block *sub_p1 = 0, *sub_p2 = 0;
721     char sub_item[DST_ITEM_MAX];
722     int sub_size;
723     int more = 0;
724     zint diff_terms = 0;
725     void *c1 = (*b->method->codec.start)();
726
727     *sp = 0;
728
729     assert(p->size >= 0);
730     decode_ptr(&src, &pos);
731     while (src != endp)
732     {
733         int d;
734         const char *src0 = src;
735 #if INT_ENCODE
736         char file_item_buf[DST_ITEM_MAX];
737         char *file_item = file_item_buf;
738         (*b->method->codec.reset)(c1);
739         (*b->method->codec.decode)(c1, &file_item, &src);
740         d = (*b->method->compare_item)(file_item_buf, lookahead_item);
741         if (d > 0)
742         {
743             sub_p1 = open_block(b, pos);
744             assert(sub_p1);
745             diff_terms -= sub_p1->no_items;
746             more = insert_sub(b, &sub_p1, lookahead_item, mode,
747                               stream, &sub_p2, 
748                               sub_item, &sub_size, file_item_buf);
749             diff_terms += sub_p1->no_items;
750             src = src0;
751             break;
752         }
753 #else
754         zint item_len;
755         decode_item_len(&src, &item_len);
756         d = (*b->method->compare_item)(src, lookahead_item);
757         if (d > 0)
758         {
759             sub_p1 = open_block(b, pos);
760             assert(sub_p1);
761             diff_terms -= sub_p1->no_items;
762             more = insert_sub(b, &sub_p1, lookahead_item, mode,
763                               stream, &sub_p2, 
764                               sub_item, &sub_size, src);
765             diff_terms += sub_p1->no_items;
766             src = src0;
767             break;
768         }
769         src += item_len;
770 #endif
771         decode_ptr(&src, &pos);
772     }
773     if (!sub_p1)
774     {
775         /* we reached the end. So lookahead > last item */
776         sub_p1 = open_block(b, pos);
777         assert(sub_p1);
778         diff_terms -= sub_p1->no_items;
779         more = insert_sub(b, &sub_p1, lookahead_item, mode, stream, &sub_p2, 
780                           sub_item, &sub_size, last_max_item);
781         diff_terms += sub_p1->no_items;
782     }
783     if (sub_p2)
784         diff_terms += sub_p2->no_items;
785     if (diff_terms)
786     {
787         p->dirty = 1;
788         p->no_items += diff_terms;
789     }
790     if (sub_p2)
791     {
792         /* there was a split - must insert pointer in this one */
793         char dst_buf[DST_BUF_SIZE];
794         char *dst = dst_buf;
795 #if INT_ENCODE
796         const char *sub_item_ptr = sub_item;
797 #endif
798         assert(sub_size < DST_ITEM_MAX && sub_size > 1);
799
800         memcpy(dst, startp, src - startp);
801                 
802         dst += src - startp;
803
804 #if INT_ENCODE
805         (*b->method->codec.reset)(c1);
806         (*b->method->codec.encode)(c1, &dst, &sub_item_ptr);
807 #else
808         encode_item_len(&dst, sub_size);      /* sub length and item */
809         memcpy(dst, sub_item, sub_size);
810         dst += sub_size;
811 #endif
812
813         encode_ptr(&dst, sub_p2->pos);   /* pos */
814
815         if (endp - src)                   /* remaining data */
816         {
817             memcpy(dst, src, endp - src);
818             dst += endp - src;
819         }
820         p->size = dst - dst_buf;
821         assert(p->size >= 0);
822
823         if (p->size <= b->file[p->cat].head.block_max)
824         {
825             /* it fits OK in this block */
826             memcpy(startp, dst_buf, dst - dst_buf);
827
828             close_block(b, sub_p2);
829         }
830         else
831         {
832             /* must split _this_ block as well .. */
833             struct ISAMB_block *sub_p3;
834 #if INT_ENCODE
835             char file_item_buf[DST_ITEM_MAX];
836             char *file_item = file_item_buf;
837 #else
838             zint split_size_tmp;
839 #endif
840             zint no_items_first_half = 0;
841             int p_new_size;
842             const char *half;
843             src = dst_buf;
844             endp = dst;
845             
846             b->number_of_int_splits++;
847
848             p->dirty = 1;
849             close_block(b, sub_p2);
850
851             half = src + b->file[p->cat].head.block_size/2;
852             decode_ptr(&src, &pos);
853
854             if (b->enable_int_count)
855             {
856                 /* read sub block so we can get no_items for it */
857                 sub_p3 = open_block(b, pos);
858                 no_items_first_half += sub_p3->no_items;
859                 close_block(b, sub_p3);
860             }
861
862             while (src <= half)
863             {
864 #if INT_ENCODE
865                 file_item = file_item_buf;
866                 (*b->method->codec.reset)(c1);
867                 (*b->method->codec.decode)(c1, &file_item, &src);
868 #else
869                 decode_item_len(&src, &split_size_tmp);
870                 *split_size = (int) split_size_tmp;
871                 src += *split_size;
872 #endif
873                 decode_ptr(&src, &pos);
874
875                 if (b->enable_int_count)
876                 {
877                     /* read sub block so we can get no_items for it */
878                     sub_p3 = open_block(b, pos);
879                     no_items_first_half += sub_p3->no_items;
880                     close_block(b, sub_p3);
881                 }
882             }
883             /*  p is first half */
884             p_new_size = src - dst_buf;
885             memcpy(p->bytes, dst_buf, p_new_size);
886
887 #if INT_ENCODE
888             file_item = file_item_buf;
889             (*b->method->codec.reset)(c1);
890             (*b->method->codec.decode)(c1, &file_item, &src);
891             *split_size = file_item - file_item_buf;
892             memcpy(split_item, file_item_buf, *split_size);
893 #else
894             decode_item_len(&src, &split_size_tmp);
895             *split_size = (int) split_size_tmp;
896             memcpy(split_item, src, *split_size);
897             src += *split_size;
898 #endif
899             /*  *sp is second half */
900             *sp = new_int(b, p->cat);
901             (*sp)->size = endp - src;
902             memcpy((*sp)->bytes, src, (*sp)->size);
903
904             p->size = p_new_size;
905
906             /*  adjust no_items in first&second half */
907             (*sp)->no_items = p->no_items - no_items_first_half;
908             p->no_items = no_items_first_half;
909         }
910         p->dirty = 1;
911     }
912     close_block(b, sub_p1);
913     (*b->method->codec.stop)(c1);
914     return more;
915 }
916
917 int insert_leaf(ISAMB b, struct ISAMB_block **sp1, void *lookahead_item,
918                 int *lookahead_mode, ISAMC_I *stream,
919                 struct ISAMB_block **sp2,
920                 void *sub_item, int *sub_size,
921                 const void *max_item)
922 {
923     struct ISAMB_block *p = *sp1;
924     char *endp = 0;
925     const char *src = 0;
926     char dst_buf[DST_BUF_SIZE], *dst = dst_buf;
927     int new_size;
928     void *c1 = (*b->method->codec.start)();
929     void *c2 = (*b->method->codec.start)();
930     int more = 1;
931     int quater = b->file[b->no_cat-1].head.block_max / 4;
932     char *mid_cut = dst_buf + quater * 2;
933     char *tail_cut = dst_buf + quater * 3;
934     char *maxp = dst_buf + b->file[b->no_cat-1].head.block_max;
935     char *half1 = 0;
936     char *half2 = 0;
937     char cut_item_buf[DST_ITEM_MAX];
938     int cut_item_size = 0;
939     int no_items = 0;    /* number of items (total) */
940     int no_items_1 = 0;  /* number of items (first half) */
941     int inserted_dst_bytes = 0;
942
943     if (p && p->size)
944     {
945         char file_item_buf[DST_ITEM_MAX];
946         char *file_item = file_item_buf;
947             
948         src = p->bytes;
949         endp = p->bytes + p->size;
950         (*b->method->codec.decode)(c1, &file_item, &src);
951         while (1)
952         {
953             const char *dst_item = 0; /* resulting item to be inserted */
954             char *lookahead_next;
955             char *dst_0 = dst;
956             int d = -1;
957             
958             if (lookahead_item)
959                 d = (*b->method->compare_item)(file_item_buf, lookahead_item);
960             
961             /* d now holds comparison between existing file item and
962                lookahead item 
963                d = 0: equal
964                d > 0: lookahead before file
965                d < 0: lookahead after file
966             */
967             if (d > 0)
968             {
969                 /* lookahead must be inserted */
970                 dst_item = lookahead_item;
971                 /* if this is not an insertion, it's really bad .. */
972                 if (!*lookahead_mode)
973                 {
974                     yaz_log(YLOG_WARN, "isamb: Inconsistent register (1)");
975                     assert(*lookahead_mode);
976                 }
977             }
978             else if (d == 0 && *lookahead_mode == 2)
979             {
980                 /* For mode == 2, we insert the new key anyway - even 
981                    though the comparison is 0. */
982                 dst_item = lookahead_item;
983                 p->dirty = 1;
984             }
985             else
986                 dst_item = file_item_buf;
987
988             if (d == 0 && !*lookahead_mode)
989             {
990                 /* it's a deletion and they match so there is nothing
991                    to be inserted anyway .. But mark the thing dirty
992                    (file item was part of input.. The item will not be
993                    part of output */
994                 p->dirty = 1;
995             }
996             else if (!half1 && dst > mid_cut)
997             {
998                 /* we have reached the splitting point for the first time */
999                 const char *dst_item_0 = dst_item;
1000                 half1 = dst; /* candidate for splitting */
1001
1002                 /* encode the resulting item */
1003                 (*b->method->codec.encode)(c2, &dst, &dst_item);
1004                 
1005                 cut_item_size = dst_item - dst_item_0;
1006                 assert(cut_item_size > 0);
1007                 memcpy(cut_item_buf, dst_item_0, cut_item_size);
1008                 
1009                 half2 = dst;
1010                 no_items_1 = no_items;
1011                 no_items++;
1012             }
1013             else
1014             {
1015                 /* encode the resulting item */
1016                 (*b->method->codec.encode)(c2, &dst, &dst_item);
1017                 no_items++;
1018             }
1019
1020             /* now move "pointers" .. result has been encoded .. */
1021             if (d > 0)  
1022             {
1023                 /* we must move the lookahead pointer */
1024
1025                 inserted_dst_bytes += (dst - dst_0);
1026                 if (inserted_dst_bytes >=  quater)
1027                     /* no more room. Mark lookahead as "gone".. */
1028                     lookahead_item = 0;
1029                 else
1030                 {
1031                     /* move it really.. */
1032                     lookahead_next = lookahead_item;
1033                     if (!(*stream->read_item)(stream->clientData,
1034                                               &lookahead_next,
1035                                               lookahead_mode))
1036                     {
1037                         /* end of stream reached: no "more" and no lookahead */
1038                         lookahead_item = 0;
1039                         more = 0;
1040                     }
1041                     if (lookahead_item && max_item &&
1042                         (*b->method->compare_item)(max_item, lookahead_item) <= 0)
1043                     {
1044                         /* the lookahead goes beyond what we allow in this
1045                            leaf. Mark it as "gone" */
1046                         lookahead_item = 0;
1047                     }
1048                     
1049                     p->dirty = 1;
1050                 }
1051             }
1052             else if (d == 0)
1053             {
1054                 /* exact match .. move both pointers */
1055
1056                 lookahead_next = lookahead_item;
1057                 if (!(*stream->read_item)(stream->clientData,
1058                                           &lookahead_next, lookahead_mode))
1059                 {
1060                     lookahead_item = 0;
1061                     more = 0;
1062                 }
1063                 if (src == endp)
1064                     break;  /* end of file stream reached .. */
1065                 file_item = file_item_buf; /* move file pointer */
1066                 (*b->method->codec.decode)(c1, &file_item, &src);
1067             }
1068             else
1069             {
1070                 /* file pointer must be moved */
1071                 if (src == endp)
1072                     break;
1073                 file_item = file_item_buf;
1074                 (*b->method->codec.decode)(c1, &file_item, &src);
1075             }
1076         }
1077     }
1078
1079     /* this loop runs when we are "appending" to a leaf page. That is
1080        either it's empty (new) or all file items have been read in
1081        previous loop */
1082
1083     maxp = dst_buf + b->file[b->no_cat-1].head.block_max + quater;
1084     while (lookahead_item)
1085     {
1086         char *dst_item;
1087         const char *src = lookahead_item;
1088         char *dst_0 = dst;
1089         
1090         /* if we have a lookahead item, we stop if we exceed the value of it */
1091         if (max_item &&
1092             (*b->method->compare_item)(max_item, lookahead_item) <= 0)
1093         {
1094             /* stop if we have reached the value of max item */
1095             break;
1096         }
1097         if (!*lookahead_mode)
1098         {
1099             /* this is append. So a delete is bad */
1100             yaz_log(YLOG_WARN, "isamb: Inconsistent register (2)");
1101             assert(*lookahead_mode);
1102         }
1103         else if (!half1 && dst > tail_cut)
1104         {
1105             const char *src_0 = src;
1106             half1 = dst; /* candidate for splitting */
1107             
1108             (*b->method->codec.encode)(c2, &dst, &src);
1109             
1110             cut_item_size = src - src_0;
1111             assert(cut_item_size > 0);
1112             memcpy(cut_item_buf, src_0, cut_item_size);
1113             
1114             no_items_1 = no_items;
1115             half2 = dst;
1116         }
1117         else
1118             (*b->method->codec.encode)(c2, &dst, &src);
1119
1120         if (dst > maxp)
1121         {
1122             dst = dst_0;
1123             break;
1124         }
1125         no_items++;
1126         if (p)
1127             p->dirty = 1;
1128         dst_item = lookahead_item;
1129         if (!(*stream->read_item)(stream->clientData, &dst_item,
1130                                   lookahead_mode))
1131         {
1132             lookahead_item = 0;
1133             more = 0;
1134         }
1135     }
1136     new_size = dst - dst_buf;
1137     if (p && p->cat != b->no_cat-1 && 
1138         new_size > b->file[p->cat].head.block_max)
1139     {
1140         /* non-btree block will be removed */
1141         p->deleted = 1;
1142         close_block(b, p);
1143         /* delete it too!! */
1144         p = 0; /* make a new one anyway */
1145     }
1146     if (!p)
1147     {   /* must create a new one */
1148         int i;
1149         for (i = 0; i < b->no_cat; i++)
1150             if (new_size <= b->file[i].head.block_max)
1151                 break;
1152         if (i == b->no_cat)
1153             i = b->no_cat - 1;
1154         p = new_leaf(b, i);
1155     }
1156     if (new_size > b->file[p->cat].head.block_max)
1157     {
1158         char *first_dst;
1159         const char *cut_item = cut_item_buf;
1160
1161         assert(half1);
1162         assert(half2);
1163
1164         assert(cut_item_size > 0);
1165         
1166         /* first half */
1167         p->size = half1 - dst_buf;
1168         assert(p->size <=  b->file[p->cat].head.block_max);
1169         memcpy(p->bytes, dst_buf, half1 - dst_buf);
1170         p->no_items = no_items_1;
1171
1172         /* second half */
1173         *sp2 = new_leaf(b, p->cat);
1174
1175         (*b->method->codec.reset)(c2);
1176
1177         b->number_of_leaf_splits++;
1178
1179         first_dst = (*sp2)->bytes;
1180
1181         (*b->method->codec.encode)(c2, &first_dst, &cut_item);
1182
1183         memcpy(first_dst, half2, dst - half2);
1184
1185         (*sp2)->size = (first_dst - (*sp2)->bytes) + (dst - half2);
1186         assert((*sp2)->size <=  b->file[p->cat].head.block_max);
1187         (*sp2)->no_items = no_items - no_items_1;
1188         (*sp2)->dirty = 1;
1189         p->dirty = 1;
1190         memcpy(sub_item, cut_item_buf, cut_item_size);
1191         *sub_size = cut_item_size;
1192     }
1193     else
1194     {
1195         memcpy(p->bytes, dst_buf, dst - dst_buf);
1196         p->size = new_size;
1197         p->no_items = no_items;
1198     }
1199     (*b->method->codec.stop)(c1);
1200     (*b->method->codec.stop)(c2);
1201     *sp1 = p;
1202     return more;
1203 }
1204
1205 int insert_sub(ISAMB b, struct ISAMB_block **p, void *new_item,
1206                int *mode,
1207                ISAMC_I *stream,
1208                struct ISAMB_block **sp,
1209                void *sub_item, int *sub_size,
1210                const void *max_item)
1211 {
1212     if (!*p || (*p)->leaf)
1213         return insert_leaf(b, p, new_item, mode, stream, sp, sub_item, 
1214                            sub_size, max_item);
1215     else
1216         return insert_int(b, *p, new_item, mode, stream, sp, sub_item,
1217                           sub_size, max_item);
1218 }
1219
1220 int isamb_unlink(ISAMB b, ISAM_P pos)
1221 {
1222     struct ISAMB_block *p1;
1223
1224     if (!pos)
1225         return 0;
1226     p1 = open_block(b, pos);
1227     p1->deleted = 1;
1228     if (!p1->leaf)
1229     {
1230         zint sub_p;
1231         const char *src = p1->bytes + p1->offset;
1232 #if INT_ENCODE
1233         void *c1 = (*b->method->codec.start)();
1234 #endif
1235         decode_ptr(&src, &sub_p);
1236         isamb_unlink(b, sub_p);
1237         
1238         while (src != p1->bytes + p1->size)
1239         {
1240 #if INT_ENCODE
1241             char file_item_buf[DST_ITEM_MAX];
1242             char *file_item = file_item_buf;
1243             (*b->method->codec.reset)(c1);
1244             (*b->method->codec.decode)(c1, &file_item, &src);
1245 #else
1246             zint item_len;
1247             decode_item_len(&src, &item_len);
1248             src += item_len;
1249 #endif
1250             decode_ptr(&src, &sub_p);
1251             isamb_unlink(b, sub_p);
1252         }
1253 #if INT_ENCODE
1254         (*b->method->codec.stop)(c1);
1255 #endif
1256     }
1257     close_block(b, p1);
1258     return 0;
1259 }
1260
1261 void isamb_merge(ISAMB b, ISAM_P *pos, ISAMC_I *stream)
1262 {
1263     char item_buf[DST_ITEM_MAX];
1264     char *item_ptr;
1265     int i_mode;
1266     int more;
1267     int must_delete = 0;
1268
1269     if (b->cache < 0)
1270     {
1271         int more = 1;
1272         while (more)
1273         {
1274             item_ptr = item_buf;
1275             more =
1276                 (*stream->read_item)(stream->clientData, &item_ptr, &i_mode);
1277         }
1278         *pos = 1;
1279         return;
1280     }
1281     item_ptr = item_buf;
1282     more = (*stream->read_item)(stream->clientData, &item_ptr, &i_mode);
1283     while (more)
1284     {
1285         struct ISAMB_block *p = 0, *sp = 0;
1286         char sub_item[DST_ITEM_MAX];
1287         int sub_size;
1288         
1289         if (*pos)
1290             p = open_block(b, *pos);
1291         more = insert_sub(b, &p, item_buf, &i_mode, stream, &sp,
1292                           sub_item, &sub_size, 0);
1293         if (sp)
1294         {    /* increase level of tree by one */
1295             struct ISAMB_block *p2 = new_int(b, p->cat);
1296             char *dst = p2->bytes + p2->size;
1297 #if INT_ENCODE
1298             void *c1 = (*b->method->codec.start)();
1299             const char *sub_item_ptr = sub_item;
1300 #endif
1301
1302             encode_ptr(&dst, p->pos);
1303             assert(sub_size < DST_ITEM_MAX && sub_size > 1);
1304 #if INT_ENCODE
1305             (*b->method->codec.reset)(c1);
1306             (*b->method->codec.encode)(c1, &dst, &sub_item_ptr);
1307 #else
1308             encode_item_len(&dst, sub_size);
1309             memcpy(dst, sub_item, sub_size);
1310             dst += sub_size;
1311 #endif
1312             encode_ptr(&dst, sp->pos);
1313             
1314             p2->size = dst - p2->bytes;
1315             p2->no_items = p->no_items + sp->no_items;
1316             *pos = p2->pos;  /* return new super page */
1317             close_block(b, sp);
1318             close_block(b, p2);
1319 #if INT_ENCODE
1320             (*b->method->codec.stop)(c1);
1321 #endif
1322         }
1323         else
1324         {
1325             *pos = p->pos;   /* return current one (again) */
1326         }
1327         if (p->no_items == 0)
1328             must_delete = 1;
1329         else
1330             must_delete = 0;
1331         close_block(b, p);
1332     }
1333     if (must_delete)
1334     {
1335         isamb_unlink(b, *pos);
1336         *pos = 0;
1337     }
1338 }
1339
1340 ISAMB_PP isamb_pp_open_x(ISAMB isamb, ISAM_P pos, int *level, int scope)
1341 {
1342     ISAMB_PP pp = xmalloc(sizeof(*pp));
1343     int i;
1344
1345     assert(pos);
1346
1347     pp->isamb = isamb;
1348     pp->block = xmalloc(ISAMB_MAX_LEVEL * sizeof(*pp->block));
1349
1350     pp->pos = pos;
1351     pp->level = 0;
1352     pp->maxlevel = 0;
1353     pp->total_size = 0;
1354     pp->no_blocks = 0;
1355     pp->skipped_numbers = 0;
1356     pp->returned_numbers = 0;
1357     pp->scope = scope;
1358     for (i = 0; i<ISAMB_MAX_LEVEL; i++)
1359         pp->skipped_nodes[i] = pp->accessed_nodes[i] = 0;
1360     while (1)
1361     {
1362         struct ISAMB_block *p = open_block(isamb, pos);
1363         const char *src = p->bytes + p->offset;
1364         pp->block[pp->level] = p;
1365
1366         pp->total_size += p->size;
1367         pp->no_blocks++;
1368         if (p->leaf)
1369             break;
1370         decode_ptr(&src, &pos);
1371         p->offset = src - p->bytes;
1372         pp->level++;
1373         pp->accessed_nodes[pp->level]++; 
1374     }
1375     pp->block[pp->level+1] = 0;
1376     pp->maxlevel = pp->level;
1377     if (level)
1378         *level = pp->level;
1379     return pp;
1380 }
1381
1382 ISAMB_PP isamb_pp_open(ISAMB isamb, ISAM_P pos, int scope)
1383 {
1384     return isamb_pp_open_x(isamb, pos, 0, scope);
1385 }
1386
1387 void isamb_pp_close_x(ISAMB_PP pp, zint *size, zint *blocks)
1388 {
1389     int i;
1390     if (!pp)
1391         return;
1392     yaz_log(YLOG_DEBUG, "isamb_pp_close lev=%d returned "ZINT_FORMAT" values, " 
1393             "skipped "ZINT_FORMAT,
1394             pp->maxlevel, pp->skipped_numbers, pp->returned_numbers);
1395     for (i = pp->maxlevel; i>=0; i--)
1396         if (pp->skipped_nodes[i] || pp->accessed_nodes[i])
1397             yaz_log(YLOG_DEBUG, "isamb_pp_close  level leaf-%d: "
1398                     ZINT_FORMAT" read, "ZINT_FORMAT" skipped", i,
1399                     pp->accessed_nodes[i], pp->skipped_nodes[i]);
1400     pp->isamb->skipped_numbers += pp->skipped_numbers;
1401     pp->isamb->returned_numbers += pp->returned_numbers;
1402     for (i = pp->maxlevel; i>=0; i--)
1403     {
1404         pp->isamb->accessed_nodes[i] += pp->accessed_nodes[i];
1405         pp->isamb->skipped_nodes[i] += pp->skipped_nodes[i];
1406     }
1407     if (size)
1408         *size = pp->total_size;
1409     if (blocks)
1410         *blocks = pp->no_blocks;
1411     for (i = 0; i <= pp->level; i++)
1412         close_block(pp->isamb, pp->block[i]);
1413     xfree(pp->block);
1414     xfree(pp);
1415 }
1416
1417 int isamb_block_info(ISAMB isamb, int cat)
1418 {
1419     if (cat >= 0 && cat < isamb->no_cat)
1420         return isamb->file[cat].head.block_size;
1421     return -1;
1422 }
1423
1424 void isamb_pp_close(ISAMB_PP pp)
1425 {
1426     isamb_pp_close_x(pp, 0, 0);
1427 }
1428
1429 /* simple recursive dumper .. */
1430 static void isamb_dump_r(ISAMB b, ISAM_P pos, void (*pr)(const char *str),
1431                          int level)
1432 {
1433     char buf[1024];
1434     char prefix_str[1024];
1435     if (pos)
1436     {
1437         struct ISAMB_block *p = open_block(b, pos);
1438         sprintf(prefix_str, "%*s " ZINT_FORMAT " cat=%d size=%d max=%d items="
1439                 ZINT_FORMAT, level*2, "",
1440                 pos, p->cat, p->size, b->file[p->cat].head.block_max,
1441                 p->no_items);
1442         (*pr)(prefix_str);
1443         sprintf(prefix_str, "%*s " ZINT_FORMAT, level*2, "", pos);
1444         if (p->leaf)
1445         {
1446             while (p->offset < p->size)
1447             {
1448                 const char *src = p->bytes + p->offset;
1449                 char *dst = buf;
1450                 (*b->method->codec.decode)(p->decodeClientData, &dst, &src);
1451                 (*b->method->log_item)(YLOG_DEBUG, buf, prefix_str);
1452                 p->offset = src - (char*) p->bytes;
1453             }
1454             assert(p->offset == p->size);
1455         }
1456         else
1457         {
1458             const char *src = p->bytes + p->offset;
1459             ISAM_P sub;
1460
1461             decode_ptr(&src, &sub);
1462             p->offset = src - (char*) p->bytes;
1463
1464             isamb_dump_r(b, sub, pr, level+1);
1465             
1466             while (p->offset < p->size)
1467             {
1468 #if INT_ENCODE
1469                 char file_item_buf[DST_ITEM_MAX];
1470                 char *file_item = file_item_buf;
1471                 void *c1 = (*b->method->codec.start)();
1472                 (*b->method->codec.decode)(c1, &file_item, &src);
1473                 (*b->method->codec.stop)(c1);
1474                 (*b->method->log_item)(YLOG_DEBUG, file_item_buf, prefix_str);
1475 #else
1476                 zint item_len;
1477                 decode_item_len(&src, &item_len);
1478                 (*b->method->log_item)(YLOG_DEBUG, src, prefix_str);
1479                 src += item_len;
1480 #endif
1481                 decode_ptr(&src, &sub);
1482                 
1483                 p->offset = src - (char*) p->bytes;
1484                 
1485                 isamb_dump_r(b, sub, pr, level+1);
1486             }           
1487         }
1488         close_block(b, p);
1489     }
1490 }
1491
1492 void isamb_dump(ISAMB b, ISAM_P pos, void (*pr)(const char *str))
1493 {
1494     isamb_dump_r(b, pos, pr, 0);
1495 }
1496
1497 int isamb_pp_read(ISAMB_PP pp, void *buf)
1498 {
1499     return isamb_pp_forward(pp, buf, 0);
1500 }
1501
1502
1503 void isamb_pp_pos(ISAMB_PP pp, double *current, double *total)
1504 { /* return an estimate of the current position and of the total number of */
1505   /* occureences in the isam tree, based on the current leaf */
1506     assert(total);
1507     assert(current);
1508     
1509     /* if end-of-stream PP may not be leaf */
1510
1511     *total = (double) (pp->block[0]->no_items);
1512     *current = (double) pp->returned_numbers;
1513 #if ISAMB_DEBUG
1514     yaz_log(YLOG_LOG, "isamb_pp_pos returning: cur= %0.1f tot=%0.1f rn="
1515             ZINT_FORMAT, *current, *total, pp->returned_numbers);
1516 #endif
1517 }
1518
1519 int isamb_pp_forward(ISAMB_PP pp, void *buf, const void *untilb)
1520 {
1521     char *dst = buf;
1522     const char *src;
1523     struct ISAMB_block *p = pp->block[pp->level];
1524     ISAMB b = pp->isamb;
1525     if (!p)
1526         return 0;
1527  again:
1528     while (p->offset == p->size)
1529     {
1530         ISAM_P pos;
1531 #if INT_ENCODE
1532         const char *src_0;
1533         void *c1;
1534         char file_item_buf[DST_ITEM_MAX];
1535         char *file_item = file_item_buf;
1536 #else
1537         zint item_len;
1538 #endif
1539         while (p->offset == p->size)
1540         {
1541             if (pp->level == 0)
1542                 return 0;
1543             close_block(pp->isamb, pp->block[pp->level]);
1544             pp->block[pp->level] = 0;
1545             (pp->level)--;
1546             p = pp->block[pp->level];
1547             assert(!p->leaf);  
1548         }
1549
1550         assert(!p->leaf);
1551         src = p->bytes + p->offset;
1552        
1553 #if INT_ENCODE
1554         c1 = (*b->method->codec.start)();
1555         (*b->method->codec.decode)(c1, &file_item, &src);
1556 #else
1557         decode_ptr(&src, &item_len);
1558         src += item_len;
1559 #endif        
1560         decode_ptr(&src, &pos);
1561         p->offset = src - (char*) p->bytes;
1562
1563         src = p->bytes + p->offset;
1564
1565         while(1)
1566         {
1567             if (!untilb || p->offset == p->size)
1568                 break;
1569             assert(p->offset < p->size);
1570 #if INT_ENCODE
1571             src_0 = src;
1572             file_item = file_item_buf;
1573             (*b->method->codec.reset)(c1);
1574             (*b->method->codec.decode)(c1, &file_item, &src);
1575             if ((*b->method->compare_item)(untilb, file_item_buf) < pp->scope)
1576             {
1577                 src = src_0;
1578                 break;
1579             }
1580 #else
1581             decode_item_len(&src, &item_len);
1582             if ((*b->method->compare_item)(untilb, src) < pp->scope)
1583                 break;
1584             src += item_len;
1585 #endif
1586             decode_ptr(&src, &pos);
1587             p->offset = src - (char*) p->bytes;
1588         }
1589
1590         pp->level++;
1591
1592         while (1)
1593         {
1594             pp->block[pp->level] = p = open_block(pp->isamb, pos);
1595
1596             pp->total_size += p->size;
1597             pp->no_blocks++;
1598             
1599             if (p->leaf) 
1600             {
1601                 break;
1602             }
1603             
1604             src = p->bytes + p->offset;
1605             while(1)
1606             {
1607                 decode_ptr(&src, &pos);
1608                 p->offset = src - (char*) p->bytes;
1609                 
1610                 if (!untilb || p->offset == p->size)
1611                     break;
1612                 assert(p->offset < p->size);
1613 #if INT_ENCODE
1614                 src_0 = src;
1615                 file_item = file_item_buf;
1616                 (*b->method->codec.reset)(c1);
1617                 (*b->method->codec.decode)(c1, &file_item, &src);
1618                 if ((*b->method->compare_item)(untilb, file_item_buf) < pp->scope)
1619                 {
1620                     src = src_0;
1621                     break;
1622                 }
1623 #else
1624                 decode_ptr(&src, &item_len);
1625                 if ((*b->method->compare_item)(untilb, src) <= pp->scope)
1626                     break;
1627                 src += item_len;
1628 #endif
1629             }
1630             pp->level++;
1631         }
1632 #if INT_ENCODE
1633         (*b->method->codec.stop)(c1);
1634 #endif
1635     }
1636     assert(p->offset < p->size);
1637     assert(p->leaf);
1638     while(1)
1639     {
1640         char *dst0 = dst;
1641         src = p->bytes + p->offset;
1642         (*pp->isamb->method->codec.decode)(p->decodeClientData, &dst, &src);
1643         p->offset = src - (char*) p->bytes;
1644         if (!untilb || (*pp->isamb->method->compare_item)(untilb, dst0) < pp->scope)
1645             break;
1646         dst = dst0;
1647         if (p->offset == p->size) goto again;
1648     }
1649     pp->returned_numbers++; 
1650     return 1;
1651 }
1652
1653 zint isamb_get_int_splits(ISAMB b)
1654 {
1655     return b->number_of_int_splits;
1656 }
1657
1658 zint isamb_get_leaf_splits(ISAMB b)
1659 {
1660     return b->number_of_leaf_splits;
1661 }
1662
1663 zint isamb_get_root_ptr(ISAMB b)
1664 {
1665     return b->root_ptr;
1666 }
1667
1668 void isamb_set_root_ptr(ISAMB b, zint root_ptr)
1669 {
1670     b->root_ptr = root_ptr;
1671 }
1672
1673
1674 /*
1675  * Local variables:
1676  * c-basic-offset: 4
1677  * c-file-style: "Stroustrup"
1678  * indent-tabs-mode: nil
1679  * End:
1680  * vim: shiftwidth=4 tabstop=8 expandtab
1681  */
1682