Bump copyright year
[idzebra-moved-to-github.git] / isamb / isamb.c
1 /* This file is part of the Zebra server.
2    Copyright (C) 1994-2010 Index Data
3
4 Zebra is free software; you can redistribute it and/or modify it under
5 the terms of the GNU General Public License as published by the Free
6 Software Foundation; either version 2, or (at your option) any later
7 version.
8
9 Zebra is distributed in the hope that it will be useful, but WITHOUT ANY
10 WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12 for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
17
18 */
19
20 #include <stdlib.h>
21 #include <string.h>
22 #include <yaz/log.h>
23 #include <yaz/xmalloc.h>
24 #include <idzebra/isamb.h>
25 #include <assert.h>
26
27 #ifndef ISAMB_DEBUG
28 #define ISAMB_DEBUG 0
29 #endif
30
31
32 #define ISAMB_MAJOR_VERSION 3
33 #define ISAMB_MINOR_VERSION_NO_ROOT 0
34 #define ISAMB_MINOR_VERSION_WITH_ROOT 1
35
36 struct ISAMB_head {
37     zint first_block;
38     zint last_block;
39     zint free_list;
40     zint no_items;
41     int block_size;
42     int block_max;
43     int block_offset;
44 };
45
46 /* if 1, upper nodes items are encoded; 0 if not encoded */
47 #define INT_ENCODE 1
48
49 /* maximum size of encoded buffer */
50 #define DST_ITEM_MAX 5000
51
52 /* max page size for _any_ isamb use */
53 #define ISAMB_MAX_PAGE 32768
54
55 #define ISAMB_MAX_LEVEL 10
56 /* approx 2*max page + max size of item */
57 #define DST_BUF_SIZE (2*ISAMB_MAX_PAGE+DST_ITEM_MAX+100)
58
59 /* should be maximum block size of multiple thereof */
60 #define ISAMB_CACHE_ENTRY_SIZE ISAMB_MAX_PAGE
61
62 /* CAT_MAX: _must_ be power of 2 */
63 #define CAT_MAX 4
64 #define CAT_MASK (CAT_MAX-1)
65 /* CAT_NO: <= CAT_MAX */
66 #define CAT_NO 4
67
68 /* Smallest block size */
69 #define ISAMB_MIN_SIZE 32
70 /* Size factor */
71 #define ISAMB_FAC_SIZE 4
72
73 /* ISAMB_PTR_CODEC = 1 var, =0 fixed */
74 #define ISAMB_PTR_CODEC 1
75
76 struct ISAMB_cache_entry {
77     ISAM_P pos;
78     unsigned char *buf;
79     int dirty;
80     int hits;
81     struct ISAMB_cache_entry *next;
82 };
83
84 struct ISAMB_file {
85     BFile bf;
86     int head_dirty;
87     struct ISAMB_head head;
88     struct ISAMB_cache_entry *cache_entries;
89 };
90
91 struct ISAMB_s {
92     BFiles bfs;
93     ISAMC_M *method;
94
95     struct ISAMB_file *file;
96     int no_cat;
97     int cache; /* 0 = no cache, 1 = use cache, -1 = dummy isam (for testing only) */
98     int log_io;        /* log level for bf_read/bf_write calls */
99     int log_freelist;  /* log level for freelist handling */
100     zint skipped_numbers; /* on a leaf node */
101     zint returned_numbers; 
102     zint skipped_nodes[ISAMB_MAX_LEVEL]; /* [0]=skipped leaves, 1 = higher etc */
103     zint accessed_nodes[ISAMB_MAX_LEVEL]; /* nodes we did not skip */
104     zint number_of_int_splits;
105     zint number_of_leaf_splits;
106     int enable_int_count; /* whether we count nodes (or not) */
107     int cache_size; /* size of blocks to cache (if cache=1) */
108     int minor_version;
109     zint root_ptr;
110 };
111
112 struct ISAMB_block {
113     ISAM_P pos;
114     int cat;
115     int size;
116     int leaf;
117     int dirty;
118     int deleted;
119     int offset;
120     zint no_items;  /* number of nodes in this + children */
121     char *bytes;
122     char *cbuf;
123     unsigned char *buf;
124     void *decodeClientData;
125     int log_rw;
126 };
127
128 struct ISAMB_PP_s {
129     ISAMB isamb;
130     ISAM_P pos;
131     int level;
132     int maxlevel; /* total depth */
133     zint total_size;
134     zint no_blocks;
135     zint skipped_numbers; /* on a leaf node */
136     zint returned_numbers; 
137     zint skipped_nodes[ISAMB_MAX_LEVEL]; /* [0]=skipped leaves, 1 = higher etc */
138     zint accessed_nodes[ISAMB_MAX_LEVEL]; /* nodes we did not skip */
139     struct ISAMB_block **block;
140     int scope;  /* on what level we forward */
141 };
142
143
144 #define encode_item_len encode_ptr
145 #if ISAMB_PTR_CODEC
146 static void encode_ptr(char **dst, zint pos)
147 {
148     unsigned char *bp = (unsigned char*) *dst;
149
150     while (pos > 127)
151     {
152         *bp++ = (unsigned char) (128 | (pos & 127));
153         pos = pos >> 7;
154     }
155     *bp++ = (unsigned char) pos;
156     *dst = (char *) bp;
157 }
158 #else
159 static void encode_ptr(char **dst, zint pos)
160 {
161     memcpy(*dst, &pos, sizeof(pos));
162     (*dst) += sizeof(pos);
163 }
164 #endif
165
166 #define decode_item_len decode_ptr
167 #if ISAMB_PTR_CODEC
168 static void decode_ptr(const char **src, zint *pos)
169 {
170     zint d = 0;
171     unsigned char c;
172     unsigned r = 0;
173
174     while (((c = *(const unsigned char *)((*src)++)) & 128))
175     {
176         d += ((zint) (c & 127) << r);
177         r += 7;
178     }
179     d += ((zint) c << r);
180     *pos = d;
181 }
182 #else
183 static void decode_ptr(const char **src, zint *pos)
184 {
185     memcpy(pos, *src, sizeof(*pos));
186     (*src) += sizeof(*pos);
187 }
188 #endif
189
190
191 void isamb_set_int_count(ISAMB b, int v)
192 {
193     b->enable_int_count = v;
194 }
195
196 void isamb_set_cache_size(ISAMB b, int v)
197 {
198     b->cache_size = v;
199 }
200
201 ISAMB isamb_open2(BFiles bfs, const char *name, int writeflag, ISAMC_M *method,
202                   int cache, int no_cat, int *sizes, int use_root_ptr)
203 {
204     ISAMB isamb = xmalloc(sizeof(*isamb));
205     int i;
206
207     assert(no_cat <= CAT_MAX);
208
209     isamb->bfs = bfs;
210     isamb->method = (ISAMC_M *) xmalloc(sizeof(*method));
211     memcpy(isamb->method, method, sizeof(*method));
212     isamb->no_cat = no_cat;
213     isamb->log_io = 0;
214     isamb->log_freelist = 0;
215     isamb->cache = cache;
216     isamb->skipped_numbers = 0;
217     isamb->returned_numbers = 0;
218     isamb->number_of_int_splits = 0;
219     isamb->number_of_leaf_splits = 0;
220     isamb->enable_int_count = 1;
221     isamb->cache_size = 40;
222
223     if (use_root_ptr)
224         isamb->minor_version = ISAMB_MINOR_VERSION_WITH_ROOT;
225     else
226         isamb->minor_version = ISAMB_MINOR_VERSION_NO_ROOT;
227
228     isamb->root_ptr = 0;
229
230     for (i = 0; i<ISAMB_MAX_LEVEL; i++)
231         isamb->skipped_nodes[i] = isamb->accessed_nodes[i] = 0;
232
233     if (cache == -1)
234     {
235         yaz_log(YLOG_WARN, "isamb_open %s. Degraded TEST mode", name);
236     }
237     else
238     {
239         assert(cache == 0 || cache == 1);
240     }
241     isamb->file = xmalloc(sizeof(*isamb->file) * isamb->no_cat);
242
243     for (i = 0; i < isamb->no_cat; i++)
244     {
245         isamb->file[i].bf = 0;
246         isamb->file[i].head_dirty = 0;
247         isamb->file[i].cache_entries = 0;
248     }
249
250     for (i = 0; i < isamb->no_cat; i++)
251     {
252         char fname[DST_BUF_SIZE];
253         char hbuf[DST_BUF_SIZE];
254
255         sprintf(fname, "%s%c", name, i+'A');
256         if (cache)
257             isamb->file[i].bf = bf_open(bfs, fname, ISAMB_CACHE_ENTRY_SIZE,
258                                         writeflag);
259         else
260             isamb->file[i].bf = bf_open(bfs, fname, sizes[i], writeflag);
261
262         if (!isamb->file[i].bf)
263         {
264             isamb_close(isamb);
265             return 0;
266         }
267
268         /* fill-in default values (for empty isamb) */
269         isamb->file[i].head.first_block = ISAMB_CACHE_ENTRY_SIZE/sizes[i]+1;
270         isamb->file[i].head.last_block = isamb->file[i].head.first_block;
271         isamb->file[i].head.block_size = sizes[i];
272         assert(sizes[i] <= ISAMB_CACHE_ENTRY_SIZE);
273 #if ISAMB_PTR_CODEC
274         if (i == isamb->no_cat-1 || sizes[i] > 128)
275             isamb->file[i].head.block_offset = 8;
276         else
277             isamb->file[i].head.block_offset = 4;
278 #else
279         isamb->file[i].head.block_offset = 11;
280 #endif
281         isamb->file[i].head.block_max =
282             sizes[i] - isamb->file[i].head.block_offset;
283         isamb->file[i].head.free_list = 0;
284         if (bf_read(isamb->file[i].bf, 0, 0, 0, hbuf))
285         {
286             /* got header assume "isamb"major minor len can fit in 16 bytes */
287             zint zint_tmp;
288             int major, minor, len, pos = 0;
289             int left;
290             const char *src = 0;
291             if (memcmp(hbuf, "isamb", 5))
292             {
293                 yaz_log(YLOG_WARN, "bad isamb header for file %s", fname);
294                 isamb_close(isamb);
295                 return 0;
296             }
297             if (sscanf(hbuf+5, "%d %d %d", &major, &minor, &len) != 3)
298             {
299                 yaz_log(YLOG_WARN, "bad isamb header for file %s", fname);
300                 isamb_close(isamb);
301                 return 0;
302             }
303             if (major != ISAMB_MAJOR_VERSION)
304             {
305                 yaz_log(YLOG_WARN, "bad major version for file %s %d, must be %d",
306                         fname, major, ISAMB_MAJOR_VERSION);
307                 isamb_close(isamb);
308                 return 0;
309             }
310             for (left = len - sizes[i]; left > 0; left = left - sizes[i])
311             {
312                 pos++;
313                 if (!bf_read(isamb->file[i].bf, pos, 0, 0, hbuf + pos*sizes[i]))
314                 {
315                     yaz_log(YLOG_WARN, "truncated isamb header for " 
316                             "file=%s len=%d pos=%d",
317                             fname, len, pos);
318                     isamb_close(isamb);
319                     return 0;
320                 }
321             }
322             src = hbuf + 16;
323             decode_ptr(&src, &isamb->file[i].head.first_block);
324             decode_ptr(&src, &isamb->file[i].head.last_block);
325             decode_ptr(&src, &zint_tmp);
326             isamb->file[i].head.block_size = (int) zint_tmp;
327             decode_ptr(&src, &zint_tmp);
328             isamb->file[i].head.block_max = (int) zint_tmp;
329             decode_ptr(&src, &isamb->file[i].head.free_list);
330             if (isamb->minor_version >= ISAMB_MINOR_VERSION_WITH_ROOT)
331                 decode_ptr(&src, &isamb->root_ptr);
332         }
333         assert(isamb->file[i].head.block_size >= isamb->file[i].head.block_offset);
334         /* must rewrite the header if root ptr is in use (bug #1017) */
335         if (use_root_ptr && writeflag)
336             isamb->file[i].head_dirty = 1;
337         else
338             isamb->file[i].head_dirty = 0;
339         assert(isamb->file[i].head.block_size == sizes[i]);
340     }
341 #if ISAMB_DEBUG
342     yaz_log(YLOG_WARN, "isamb debug enabled. Things will be slower than usual");
343 #endif
344     return isamb;
345 }
346
347 ISAMB isamb_open(BFiles bfs, const char *name, int writeflag, ISAMC_M *method,
348                  int cache)
349 {
350     int sizes[CAT_NO];
351     int i, b_size = ISAMB_MIN_SIZE;
352     
353     for (i = 0; i<CAT_NO; i++)
354     {
355         sizes[i] = b_size;
356         b_size = b_size * ISAMB_FAC_SIZE;
357     }
358     return isamb_open2(bfs, name, writeflag, method, cache,
359                        CAT_NO, sizes, 0);
360 }
361
362 static void flush_blocks(ISAMB b, int cat)
363 {
364     while (b->file[cat].cache_entries)
365     {
366         struct ISAMB_cache_entry *ce_this = b->file[cat].cache_entries;
367         b->file[cat].cache_entries = ce_this->next;
368
369         if (ce_this->dirty)
370         {
371             yaz_log(b->log_io, "bf_write: flush_blocks");
372             bf_write(b->file[cat].bf, ce_this->pos, 0, 0, ce_this->buf);
373         }
374         xfree(ce_this->buf);
375         xfree(ce_this);
376     }
377 }
378
379 static int cache_block(ISAMB b, ISAM_P pos, unsigned char *userbuf, int wr)
380 {
381     int cat = (int) (pos&CAT_MASK);
382     int off = (int) (((pos/CAT_MAX) & 
383                       (ISAMB_CACHE_ENTRY_SIZE / b->file[cat].head.block_size - 1))
384                      * b->file[cat].head.block_size);
385     zint norm = pos / (CAT_MASK*ISAMB_CACHE_ENTRY_SIZE / b->file[cat].head.block_size);
386     int no = 0;
387     struct ISAMB_cache_entry **ce, *ce_this = 0, **ce_last = 0;
388
389     if (!b->cache)
390         return 0;
391
392     assert(ISAMB_CACHE_ENTRY_SIZE >= b->file[cat].head.block_size);
393     for (ce = &b->file[cat].cache_entries; *ce; ce = &(*ce)->next, no++)
394     {
395         ce_last = ce;
396         if ((*ce)->pos == norm)
397         {
398             ce_this = *ce;
399             *ce = (*ce)->next;   /* remove from list */
400             
401             ce_this->next = b->file[cat].cache_entries;  /* move to front */
402             b->file[cat].cache_entries = ce_this;
403             
404             if (wr)
405             {
406                 memcpy(ce_this->buf + off, userbuf, 
407                        b->file[cat].head.block_size);
408                 ce_this->dirty = 1;
409             }
410             else
411                 memcpy(userbuf, ce_this->buf + off,
412                        b->file[cat].head.block_size);
413             return 1;
414         }
415     }
416     if (no >= b->cache_size)
417     {
418         assert(ce_last && *ce_last);
419         ce_this = *ce_last;
420         *ce_last = 0;  /* remove the last entry from list */
421         if (ce_this->dirty)
422         {
423             yaz_log(b->log_io, "bf_write: cache_block");
424             bf_write(b->file[cat].bf, ce_this->pos, 0, 0, ce_this->buf);
425         }
426         xfree(ce_this->buf);
427         xfree(ce_this);
428     }
429     ce_this = xmalloc(sizeof(*ce_this));
430     ce_this->next = b->file[cat].cache_entries;
431     b->file[cat].cache_entries = ce_this;
432     ce_this->buf = xmalloc(ISAMB_CACHE_ENTRY_SIZE);
433     ce_this->pos = norm;
434     yaz_log(b->log_io, "bf_read: cache_block");
435     if (!bf_read(b->file[cat].bf, norm, 0, 0, ce_this->buf))
436         memset(ce_this->buf, 0, ISAMB_CACHE_ENTRY_SIZE);
437     if (wr)
438     {
439         memcpy(ce_this->buf + off, userbuf, b->file[cat].head.block_size);
440         ce_this->dirty = 1;
441     }
442     else
443     {
444         ce_this->dirty = 0;
445         memcpy(userbuf, ce_this->buf + off, b->file[cat].head.block_size);
446     }
447     return 1;
448 }
449
450
451 void isamb_close(ISAMB isamb)
452 {
453     int i;
454     for (i = 0; isamb->accessed_nodes[i]; i++)
455         yaz_log(YLOG_DEBUG, "isamb_close  level leaf-%d: "ZINT_FORMAT" read, "
456                 ZINT_FORMAT" skipped",
457                 i, isamb->accessed_nodes[i], isamb->skipped_nodes[i]);
458     yaz_log(YLOG_DEBUG, "isamb_close returned "ZINT_FORMAT" values, "
459             "skipped "ZINT_FORMAT,
460             isamb->skipped_numbers, isamb->returned_numbers);
461
462     for (i = 0; i<isamb->no_cat; i++)
463     {
464         flush_blocks(isamb, i);
465         if (isamb->file[i].head_dirty)
466         {
467             char hbuf[DST_BUF_SIZE];
468             int major = ISAMB_MAJOR_VERSION;
469             int len = 16;
470             char *dst = hbuf + 16;
471             int pos = 0, left;
472             int b_size = isamb->file[i].head.block_size;
473
474             encode_ptr(&dst, isamb->file[i].head.first_block);
475             encode_ptr(&dst, isamb->file[i].head.last_block);
476             encode_ptr(&dst, isamb->file[i].head.block_size);
477             encode_ptr(&dst, isamb->file[i].head.block_max);
478             encode_ptr(&dst, isamb->file[i].head.free_list);
479
480             if (isamb->minor_version >= ISAMB_MINOR_VERSION_WITH_ROOT)
481                 encode_ptr(&dst, isamb->root_ptr);
482
483             memset(dst, '\0', b_size); /* ensure no random bytes are written */
484
485             len = dst - hbuf;
486
487             /* print exactly 16 bytes (including trailing 0) */
488             sprintf(hbuf, "isamb%02d %02d %02d\r\n", major,
489                     isamb->minor_version, len);
490
491             bf_write(isamb->file[i].bf, pos, 0, 0, hbuf);
492
493             for (left = len - b_size; left > 0; left = left - b_size)
494             {
495                 pos++;
496                 bf_write(isamb->file[i].bf, pos, 0, 0, hbuf + pos*b_size);
497             }
498         }
499         if (isamb->file[i].bf)
500             bf_close (isamb->file[i].bf);
501     }
502     xfree(isamb->file);
503     xfree(isamb->method);
504     xfree(isamb);
505 }
506
507 /* open_block: read one block at pos.
508    Decode leading sys bytes .. consisting of
509    Offset:Meaning
510    0: leader byte, != 0 leaf, == 0, non-leaf
511    1-2: used size of block
512    3-7*: number of items and all children
513    
514    * Reserve 5 bytes for large block sizes. 1 for small ones .. Number
515    of items. We can thus have at most 2^40 nodes. 
516 */
517 static struct ISAMB_block *open_block(ISAMB b, ISAM_P pos)
518 {
519     int cat = (int) (pos&CAT_MASK);
520     const char *src;
521     int offset = b->file[cat].head.block_offset;
522     struct ISAMB_block *p;
523     if (!pos)
524         return 0;
525     p = xmalloc(sizeof(*p));
526     p->pos = pos;
527     p->cat = (int) (pos & CAT_MASK);
528     p->buf = xmalloc(b->file[cat].head.block_size);
529     p->cbuf = 0;
530
531     if (!cache_block (b, pos, p->buf, 0))
532     {
533         yaz_log(b->log_io, "bf_read: open_block");
534         if (bf_read(b->file[cat].bf, pos/CAT_MAX, 0, 0, p->buf) != 1)
535         {
536             yaz_log(YLOG_FATAL, "isamb: read fail for pos=%ld block=%ld",
537                     (long) pos, (long) pos/CAT_MAX);
538             zebra_exit("isamb:open_block");
539         }
540     }
541     p->bytes = (char *)p->buf + offset;
542     p->leaf = p->buf[0];
543     p->size = (p->buf[1] + 256 * p->buf[2]) - offset;
544     if (p->size < 0)
545     {
546         yaz_log(YLOG_FATAL, "Bad block size %d in pos=" ZINT_FORMAT "\n",
547                 p->size, pos);
548     }
549     assert(p->size >= 0);
550     src = (char*) p->buf + 3;
551     decode_ptr(&src, &p->no_items);
552
553     p->offset = 0;
554     p->dirty = 0;
555     p->deleted = 0;
556     p->decodeClientData = (*b->method->codec.start)();
557     return p;
558 }
559
560 struct ISAMB_block *new_block(ISAMB b, int leaf, int cat)
561 {
562     struct ISAMB_block *p;
563
564     p = xmalloc(sizeof(*p));
565     p->buf = xmalloc(b->file[cat].head.block_size);
566
567     if (!b->file[cat].head.free_list)
568     {
569         zint block_no;
570         block_no = b->file[cat].head.last_block++;
571         p->pos = block_no * CAT_MAX + cat;
572         if (b->log_freelist)
573             yaz_log(b->log_freelist, "got block " 
574                     ZINT_FORMAT " from last %d:" ZINT_FORMAT, p->pos,
575                     cat, p->pos/CAT_MAX);
576     }
577     else
578     {
579         p->pos = b->file[cat].head.free_list;
580         assert((p->pos & CAT_MASK) == cat);
581         if (!cache_block(b, p->pos, p->buf, 0))
582         {
583             yaz_log(b->log_io, "bf_read: new_block");
584             if (!bf_read(b->file[cat].bf, p->pos/CAT_MAX, 0, 0, p->buf))
585             {
586                 yaz_log(YLOG_FATAL, "isamb: read fail for pos=%ld block=%ld",
587                         (long) p->pos/CAT_MAX, (long) p->pos/CAT_MAX);
588                 zebra_exit("isamb:new_block");
589             }
590         }
591         if (b->log_freelist)
592             yaz_log(b->log_freelist, "got block " 
593                     ZINT_FORMAT " from freelist %d:" ZINT_FORMAT, p->pos,
594                     cat, p->pos/CAT_MAX);
595         memcpy(&b->file[cat].head.free_list, p->buf, sizeof(zint));
596     }
597     p->cat = cat;
598     b->file[cat].head_dirty = 1;
599     memset(p->buf, 0, b->file[cat].head.block_size);
600     p->bytes = (char*)p->buf + b->file[cat].head.block_offset;
601     p->leaf = leaf;
602     p->size = 0;
603     p->dirty = 1;
604     p->deleted = 0;
605     p->offset = 0;
606     p->no_items = 0;
607     p->decodeClientData = (*b->method->codec.start)();
608     return p;
609 }
610
611 struct ISAMB_block *new_leaf(ISAMB b, int cat)
612 {
613     return new_block(b, 1, cat);
614 }
615
616
617 struct ISAMB_block *new_int(ISAMB b, int cat)
618 {
619     return new_block(b, 0, cat);
620 }
621
622 static void check_block(ISAMB b, struct ISAMB_block *p)
623 {
624     assert(b); /* mostly to make the compiler shut up about unused b */
625     if (p->leaf)
626     {
627         ;
628     }
629     else
630     {
631         /* sanity check */
632         char *startp = p->bytes;
633         const char *src = startp;
634         char *endp = p->bytes + p->size;
635         ISAM_P pos;
636         void *c1 = (*b->method->codec.start)();
637             
638         decode_ptr(&src, &pos);
639         assert((pos&CAT_MASK) == p->cat);
640         while (src != endp)
641         {
642 #if INT_ENCODE
643             char file_item_buf[DST_ITEM_MAX];
644             char *file_item = file_item_buf;
645             (*b->method->codec.reset)(c1);
646             (*b->method->codec.decode)(c1, &file_item, &src);
647 #else
648             zint item_len;
649             decode_item_len(&src, &item_len);
650             assert(item_len > 0 && item_len < 128);
651             src += item_len;
652 #endif
653             decode_ptr(&src, &pos);
654             if ((pos&CAT_MASK) != p->cat)
655             {
656                 assert((pos&CAT_MASK) == p->cat);
657             }
658         }
659         (*b->method->codec.stop)(c1);
660     }
661 }
662
663 void close_block(ISAMB b, struct ISAMB_block *p)
664 {
665     if (!p)
666         return;
667     if (p->deleted)
668     {
669         yaz_log(b->log_freelist, "release block " ZINT_FORMAT " from freelist %d:" ZINT_FORMAT,
670                 p->pos, p->cat, p->pos/CAT_MAX);
671         memcpy(p->buf, &b->file[p->cat].head.free_list, sizeof(zint));
672         b->file[p->cat].head.free_list = p->pos;
673         b->file[p->cat].head_dirty = 1;
674         if (!cache_block(b, p->pos, p->buf, 1))
675         {
676             yaz_log(b->log_io, "bf_write: close_block (deleted)");
677             bf_write(b->file[p->cat].bf, p->pos/CAT_MAX, 0, 0, p->buf);
678         }
679     }
680     else if (p->dirty)
681     {
682         int offset = b->file[p->cat].head.block_offset;
683         int size = p->size + offset;
684         char *dst =  (char*)p->buf + 3;
685         assert(p->size >= 0);
686         
687         /* memset becuase encode_ptr usually does not write all bytes */
688         memset(p->buf, 0, b->file[p->cat].head.block_offset);
689         p->buf[0] = p->leaf;
690         p->buf[1] = size & 255;
691         p->buf[2] = size >> 8;
692         encode_ptr(&dst, p->no_items);
693         check_block(b, p);
694         if (!cache_block(b, p->pos, p->buf, 1))
695         {
696             yaz_log(b->log_io, "bf_write: close_block");
697             bf_write(b->file[p->cat].bf, p->pos/CAT_MAX, 0, 0, p->buf);
698         }
699     }
700     (*b->method->codec.stop)(p->decodeClientData);
701     xfree(p->buf);
702     xfree(p);
703 }
704
705 int insert_sub(ISAMB b, struct ISAMB_block **p,
706                void *new_item, int *mode,
707                ISAMC_I *stream,
708                struct ISAMB_block **sp,
709                void *sub_item, int *sub_size,
710                const void *max_item);
711
712 int insert_int(ISAMB b, struct ISAMB_block *p, void *lookahead_item,
713                int *mode,
714                ISAMC_I *stream, struct ISAMB_block **sp,
715                void *split_item, int *split_size, const void *last_max_item)
716 {
717     char *startp = p->bytes;
718     const char *src = startp;
719     char *endp = p->bytes + p->size;
720     ISAM_P pos;
721     struct ISAMB_block *sub_p1 = 0, *sub_p2 = 0;
722     char sub_item[DST_ITEM_MAX];
723     int sub_size;
724     int more = 0;
725     zint diff_terms = 0;
726     void *c1 = (*b->method->codec.start)();
727
728     *sp = 0;
729
730     assert(p->size >= 0);
731     decode_ptr(&src, &pos);
732     while (src != endp)
733     {
734         int d;
735         const char *src0 = src;
736 #if INT_ENCODE
737         char file_item_buf[DST_ITEM_MAX];
738         char *file_item = file_item_buf;
739         (*b->method->codec.reset)(c1);
740         (*b->method->codec.decode)(c1, &file_item, &src);
741         d = (*b->method->compare_item)(file_item_buf, lookahead_item);
742         if (d > 0)
743         {
744             sub_p1 = open_block(b, pos);
745             assert(sub_p1);
746             diff_terms -= sub_p1->no_items;
747             more = insert_sub(b, &sub_p1, lookahead_item, mode,
748                               stream, &sub_p2, 
749                               sub_item, &sub_size, file_item_buf);
750             diff_terms += sub_p1->no_items;
751             src = src0;
752             break;
753         }
754 #else
755         zint item_len;
756         decode_item_len(&src, &item_len);
757         d = (*b->method->compare_item)(src, lookahead_item);
758         if (d > 0)
759         {
760             sub_p1 = open_block(b, pos);
761             assert(sub_p1);
762             diff_terms -= sub_p1->no_items;
763             more = insert_sub(b, &sub_p1, lookahead_item, mode,
764                               stream, &sub_p2, 
765                               sub_item, &sub_size, src);
766             diff_terms += sub_p1->no_items;
767             src = src0;
768             break;
769         }
770         src += item_len;
771 #endif
772         decode_ptr(&src, &pos);
773     }
774     if (!sub_p1)
775     {
776         /* we reached the end. So lookahead > last item */
777         sub_p1 = open_block(b, pos);
778         assert(sub_p1);
779         diff_terms -= sub_p1->no_items;
780         more = insert_sub(b, &sub_p1, lookahead_item, mode, stream, &sub_p2, 
781                           sub_item, &sub_size, last_max_item);
782         diff_terms += sub_p1->no_items;
783     }
784     if (sub_p2)
785         diff_terms += sub_p2->no_items;
786     if (diff_terms)
787     {
788         p->dirty = 1;
789         p->no_items += diff_terms;
790     }
791     if (sub_p2)
792     {
793         /* there was a split - must insert pointer in this one */
794         char dst_buf[DST_BUF_SIZE];
795         char *dst = dst_buf;
796 #if INT_ENCODE
797         const char *sub_item_ptr = sub_item;
798 #endif
799         assert(sub_size < DST_ITEM_MAX && sub_size > 1);
800
801         memcpy(dst, startp, src - startp);
802                 
803         dst += src - startp;
804
805 #if INT_ENCODE
806         (*b->method->codec.reset)(c1);
807         (*b->method->codec.encode)(c1, &dst, &sub_item_ptr);
808 #else
809         encode_item_len(&dst, sub_size);      /* sub length and item */
810         memcpy(dst, sub_item, sub_size);
811         dst += sub_size;
812 #endif
813
814         encode_ptr(&dst, sub_p2->pos);   /* pos */
815
816         if (endp - src)                   /* remaining data */
817         {
818             memcpy(dst, src, endp - src);
819             dst += endp - src;
820         }
821         p->size = dst - dst_buf;
822         assert(p->size >= 0);
823
824         if (p->size <= b->file[p->cat].head.block_max)
825         {
826             /* it fits OK in this block */
827             memcpy(startp, dst_buf, dst - dst_buf);
828
829             close_block(b, sub_p2);
830         }
831         else
832         {
833             /* must split _this_ block as well .. */
834             struct ISAMB_block *sub_p3;
835 #if INT_ENCODE
836             char file_item_buf[DST_ITEM_MAX];
837             char *file_item = file_item_buf;
838 #else
839             zint split_size_tmp;
840 #endif
841             zint no_items_first_half = 0;
842             int p_new_size;
843             const char *half;
844             src = dst_buf;
845             endp = dst;
846             
847             b->number_of_int_splits++;
848
849             p->dirty = 1;
850             close_block(b, sub_p2);
851
852             half = src + b->file[p->cat].head.block_size/2;
853             decode_ptr(&src, &pos);
854
855             if (b->enable_int_count)
856             {
857                 /* read sub block so we can get no_items for it */
858                 sub_p3 = open_block(b, pos);
859                 no_items_first_half += sub_p3->no_items;
860                 close_block(b, sub_p3);
861             }
862
863             while (src <= half)
864             {
865 #if INT_ENCODE
866                 file_item = file_item_buf;
867                 (*b->method->codec.reset)(c1);
868                 (*b->method->codec.decode)(c1, &file_item, &src);
869 #else
870                 decode_item_len(&src, &split_size_tmp);
871                 *split_size = (int) split_size_tmp;
872                 src += *split_size;
873 #endif
874                 decode_ptr(&src, &pos);
875
876                 if (b->enable_int_count)
877                 {
878                     /* read sub block so we can get no_items for it */
879                     sub_p3 = open_block(b, pos);
880                     no_items_first_half += sub_p3->no_items;
881                     close_block(b, sub_p3);
882                 }
883             }
884             /*  p is first half */
885             p_new_size = src - dst_buf;
886             memcpy(p->bytes, dst_buf, p_new_size);
887
888 #if INT_ENCODE
889             file_item = file_item_buf;
890             (*b->method->codec.reset)(c1);
891             (*b->method->codec.decode)(c1, &file_item, &src);
892             *split_size = file_item - file_item_buf;
893             memcpy(split_item, file_item_buf, *split_size);
894 #else
895             decode_item_len(&src, &split_size_tmp);
896             *split_size = (int) split_size_tmp;
897             memcpy(split_item, src, *split_size);
898             src += *split_size;
899 #endif
900             /*  *sp is second half */
901             *sp = new_int(b, p->cat);
902             (*sp)->size = endp - src;
903             memcpy((*sp)->bytes, src, (*sp)->size);
904
905             p->size = p_new_size;
906
907             /*  adjust no_items in first&second half */
908             (*sp)->no_items = p->no_items - no_items_first_half;
909             p->no_items = no_items_first_half;
910         }
911         p->dirty = 1;
912     }
913     close_block(b, sub_p1);
914     (*b->method->codec.stop)(c1);
915     return more;
916 }
917
918 int insert_leaf(ISAMB b, struct ISAMB_block **sp1, void *lookahead_item,
919                 int *lookahead_mode, ISAMC_I *stream,
920                 struct ISAMB_block **sp2,
921                 void *sub_item, int *sub_size,
922                 const void *max_item)
923 {
924     struct ISAMB_block *p = *sp1;
925     char *endp = 0;
926     const char *src = 0;
927     char dst_buf[DST_BUF_SIZE], *dst = dst_buf;
928     int new_size;
929     void *c1 = (*b->method->codec.start)();
930     void *c2 = (*b->method->codec.start)();
931     int more = 1;
932     int quater = b->file[b->no_cat-1].head.block_max / 4;
933     char *mid_cut = dst_buf + quater * 2;
934     char *tail_cut = dst_buf + quater * 3;
935     char *maxp = dst_buf + b->file[b->no_cat-1].head.block_max;
936     char *half1 = 0;
937     char *half2 = 0;
938     char cut_item_buf[DST_ITEM_MAX];
939     int cut_item_size = 0;
940     int no_items = 0;    /* number of items (total) */
941     int no_items_1 = 0;  /* number of items (first half) */
942     int inserted_dst_bytes = 0;
943
944     if (p && p->size)
945     {
946         char file_item_buf[DST_ITEM_MAX];
947         char *file_item = file_item_buf;
948             
949         src = p->bytes;
950         endp = p->bytes + p->size;
951         (*b->method->codec.decode)(c1, &file_item, &src);
952         while (1)
953         {
954             const char *dst_item = 0; /* resulting item to be inserted */
955             char *lookahead_next;
956             char *dst_0 = dst;
957             int d = -1;
958             
959             if (lookahead_item)
960                 d = (*b->method->compare_item)(file_item_buf, lookahead_item);
961             
962             /* d now holds comparison between existing file item and
963                lookahead item 
964                d = 0: equal
965                d > 0: lookahead before file
966                d < 0: lookahead after file
967             */
968             if (d > 0)
969             {
970                 /* lookahead must be inserted */
971                 dst_item = lookahead_item;
972                 /* if this is not an insertion, it's really bad .. */
973                 if (!*lookahead_mode)
974                 {
975                     yaz_log(YLOG_WARN, "isamb: Inconsistent register (1)");
976                     assert(*lookahead_mode);
977                 }
978             }
979             else if (d == 0 && *lookahead_mode == 2)
980             {
981                 /* For mode == 2, we insert the new key anyway - even 
982                    though the comparison is 0. */
983                 dst_item = lookahead_item;
984                 p->dirty = 1;
985             }
986             else
987                 dst_item = file_item_buf;
988
989             if (d == 0 && !*lookahead_mode)
990             {
991                 /* it's a deletion and they match so there is nothing
992                    to be inserted anyway .. But mark the thing dirty
993                    (file item was part of input.. The item will not be
994                    part of output */
995                 p->dirty = 1;
996             }
997             else if (!half1 && dst > mid_cut)
998             {
999                 /* we have reached the splitting point for the first time */
1000                 const char *dst_item_0 = dst_item;
1001                 half1 = dst; /* candidate for splitting */
1002
1003                 /* encode the resulting item */
1004                 (*b->method->codec.encode)(c2, &dst, &dst_item);
1005                 
1006                 cut_item_size = dst_item - dst_item_0;
1007                 assert(cut_item_size > 0);
1008                 memcpy(cut_item_buf, dst_item_0, cut_item_size);
1009                 
1010                 half2 = dst;
1011                 no_items_1 = no_items;
1012                 no_items++;
1013             }
1014             else
1015             {
1016                 /* encode the resulting item */
1017                 (*b->method->codec.encode)(c2, &dst, &dst_item);
1018                 no_items++;
1019             }
1020
1021             /* now move "pointers" .. result has been encoded .. */
1022             if (d > 0)  
1023             {
1024                 /* we must move the lookahead pointer */
1025
1026                 inserted_dst_bytes += (dst - dst_0);
1027                 if (inserted_dst_bytes >=  quater)
1028                     /* no more room. Mark lookahead as "gone".. */
1029                     lookahead_item = 0;
1030                 else
1031                 {
1032                     /* move it really.. */
1033                     lookahead_next = lookahead_item;
1034                     if (!(*stream->read_item)(stream->clientData,
1035                                               &lookahead_next,
1036                                               lookahead_mode))
1037                     {
1038                         /* end of stream reached: no "more" and no lookahead */
1039                         lookahead_item = 0;
1040                         more = 0;
1041                     }
1042                     if (lookahead_item && max_item &&
1043                         (*b->method->compare_item)(max_item, lookahead_item) <= 0)
1044                     {
1045                         /* the lookahead goes beyond what we allow in this
1046                            leaf. Mark it as "gone" */
1047                         lookahead_item = 0;
1048                     }
1049                     
1050                     p->dirty = 1;
1051                 }
1052             }
1053             else if (d == 0)
1054             {
1055                 /* exact match .. move both pointers */
1056
1057                 lookahead_next = lookahead_item;
1058                 if (!(*stream->read_item)(stream->clientData,
1059                                           &lookahead_next, lookahead_mode))
1060                 {
1061                     lookahead_item = 0;
1062                     more = 0;
1063                 }
1064                 if (src == endp)
1065                     break;  /* end of file stream reached .. */
1066                 file_item = file_item_buf; /* move file pointer */
1067                 (*b->method->codec.decode)(c1, &file_item, &src);
1068             }
1069             else
1070             {
1071                 /* file pointer must be moved */
1072                 if (src == endp)
1073                     break;
1074                 file_item = file_item_buf;
1075                 (*b->method->codec.decode)(c1, &file_item, &src);
1076             }
1077         }
1078     }
1079
1080     /* this loop runs when we are "appending" to a leaf page. That is
1081        either it's empty (new) or all file items have been read in
1082        previous loop */
1083
1084     maxp = dst_buf + b->file[b->no_cat-1].head.block_max + quater;
1085     while (lookahead_item)
1086     {
1087         char *dst_item;
1088         const char *src = lookahead_item;
1089         char *dst_0 = dst;
1090         
1091         /* if we have a lookahead item, we stop if we exceed the value of it */
1092         if (max_item &&
1093             (*b->method->compare_item)(max_item, lookahead_item) <= 0)
1094         {
1095             /* stop if we have reached the value of max item */
1096             break;
1097         }
1098         if (!*lookahead_mode)
1099         {
1100             /* this is append. So a delete is bad */
1101             yaz_log(YLOG_WARN, "isamb: Inconsistent register (2)");
1102             assert(*lookahead_mode);
1103         }
1104         else if (!half1 && dst > tail_cut)
1105         {
1106             const char *src_0 = src;
1107             half1 = dst; /* candidate for splitting */
1108             
1109             (*b->method->codec.encode)(c2, &dst, &src);
1110             
1111             cut_item_size = src - src_0;
1112             assert(cut_item_size > 0);
1113             memcpy(cut_item_buf, src_0, cut_item_size);
1114             
1115             no_items_1 = no_items;
1116             half2 = dst;
1117         }
1118         else
1119             (*b->method->codec.encode)(c2, &dst, &src);
1120
1121         if (dst > maxp)
1122         {
1123             dst = dst_0;
1124             break;
1125         }
1126         no_items++;
1127         if (p)
1128             p->dirty = 1;
1129         dst_item = lookahead_item;
1130         if (!(*stream->read_item)(stream->clientData, &dst_item,
1131                                   lookahead_mode))
1132         {
1133             lookahead_item = 0;
1134             more = 0;
1135         }
1136     }
1137     new_size = dst - dst_buf;
1138     if (p && p->cat != b->no_cat-1 && 
1139         new_size > b->file[p->cat].head.block_max)
1140     {
1141         /* non-btree block will be removed */
1142         p->deleted = 1;
1143         close_block(b, p);
1144         /* delete it too!! */
1145         p = 0; /* make a new one anyway */
1146     }
1147     if (!p)
1148     {   /* must create a new one */
1149         int i;
1150         for (i = 0; i < b->no_cat; i++)
1151             if (new_size <= b->file[i].head.block_max)
1152                 break;
1153         if (i == b->no_cat)
1154             i = b->no_cat - 1;
1155         p = new_leaf(b, i);
1156     }
1157     if (new_size > b->file[p->cat].head.block_max)
1158     {
1159         char *first_dst;
1160         const char *cut_item = cut_item_buf;
1161
1162         assert(half1);
1163         assert(half2);
1164
1165         assert(cut_item_size > 0);
1166         
1167         /* first half */
1168         p->size = half1 - dst_buf;
1169         assert(p->size <=  b->file[p->cat].head.block_max);
1170         memcpy(p->bytes, dst_buf, half1 - dst_buf);
1171         p->no_items = no_items_1;
1172
1173         /* second half */
1174         *sp2 = new_leaf(b, p->cat);
1175
1176         (*b->method->codec.reset)(c2);
1177
1178         b->number_of_leaf_splits++;
1179
1180         first_dst = (*sp2)->bytes;
1181
1182         (*b->method->codec.encode)(c2, &first_dst, &cut_item);
1183
1184         memcpy(first_dst, half2, dst - half2);
1185
1186         (*sp2)->size = (first_dst - (*sp2)->bytes) + (dst - half2);
1187         assert((*sp2)->size <=  b->file[p->cat].head.block_max);
1188         (*sp2)->no_items = no_items - no_items_1;
1189         (*sp2)->dirty = 1;
1190         p->dirty = 1;
1191         memcpy(sub_item, cut_item_buf, cut_item_size);
1192         *sub_size = cut_item_size;
1193     }
1194     else
1195     {
1196         memcpy(p->bytes, dst_buf, dst - dst_buf);
1197         p->size = new_size;
1198         p->no_items = no_items;
1199     }
1200     (*b->method->codec.stop)(c1);
1201     (*b->method->codec.stop)(c2);
1202     *sp1 = p;
1203     return more;
1204 }
1205
1206 int insert_sub(ISAMB b, struct ISAMB_block **p, void *new_item,
1207                int *mode,
1208                ISAMC_I *stream,
1209                struct ISAMB_block **sp,
1210                void *sub_item, int *sub_size,
1211                const void *max_item)
1212 {
1213     if (!*p || (*p)->leaf)
1214         return insert_leaf(b, p, new_item, mode, stream, sp, sub_item, 
1215                            sub_size, max_item);
1216     else
1217         return insert_int(b, *p, new_item, mode, stream, sp, sub_item,
1218                           sub_size, max_item);
1219 }
1220
1221 int isamb_unlink(ISAMB b, ISAM_P pos)
1222 {
1223     struct ISAMB_block *p1;
1224
1225     if (!pos)
1226         return 0;
1227     p1 = open_block(b, pos);
1228     p1->deleted = 1;
1229     if (!p1->leaf)
1230     {
1231         zint sub_p;
1232         const char *src = p1->bytes + p1->offset;
1233 #if INT_ENCODE
1234         void *c1 = (*b->method->codec.start)();
1235 #endif
1236         decode_ptr(&src, &sub_p);
1237         isamb_unlink(b, sub_p);
1238         
1239         while (src != p1->bytes + p1->size)
1240         {
1241 #if INT_ENCODE
1242             char file_item_buf[DST_ITEM_MAX];
1243             char *file_item = file_item_buf;
1244             (*b->method->codec.reset)(c1);
1245             (*b->method->codec.decode)(c1, &file_item, &src);
1246 #else
1247             zint item_len;
1248             decode_item_len(&src, &item_len);
1249             src += item_len;
1250 #endif
1251             decode_ptr(&src, &sub_p);
1252             isamb_unlink(b, sub_p);
1253         }
1254 #if INT_ENCODE
1255         (*b->method->codec.stop)(c1);
1256 #endif
1257     }
1258     close_block(b, p1);
1259     return 0;
1260 }
1261
1262 void isamb_merge(ISAMB b, ISAM_P *pos, ISAMC_I *stream)
1263 {
1264     char item_buf[DST_ITEM_MAX];
1265     char *item_ptr;
1266     int i_mode;
1267     int more;
1268     int must_delete = 0;
1269
1270     if (b->cache < 0)
1271     {
1272         int more = 1;
1273         while (more)
1274         {
1275             item_ptr = item_buf;
1276             more =
1277                 (*stream->read_item)(stream->clientData, &item_ptr, &i_mode);
1278         }
1279         *pos = 1;
1280         return;
1281     }
1282     item_ptr = item_buf;
1283     more = (*stream->read_item)(stream->clientData, &item_ptr, &i_mode);
1284     while (more)
1285     {
1286         struct ISAMB_block *p = 0, *sp = 0;
1287         char sub_item[DST_ITEM_MAX];
1288         int sub_size;
1289         
1290         if (*pos)
1291             p = open_block(b, *pos);
1292         more = insert_sub(b, &p, item_buf, &i_mode, stream, &sp,
1293                           sub_item, &sub_size, 0);
1294         if (sp)
1295         {    /* increase level of tree by one */
1296             struct ISAMB_block *p2 = new_int(b, p->cat);
1297             char *dst = p2->bytes + p2->size;
1298 #if INT_ENCODE
1299             void *c1 = (*b->method->codec.start)();
1300             const char *sub_item_ptr = sub_item;
1301 #endif
1302
1303             encode_ptr(&dst, p->pos);
1304             assert(sub_size < DST_ITEM_MAX && sub_size > 1);
1305 #if INT_ENCODE
1306             (*b->method->codec.reset)(c1);
1307             (*b->method->codec.encode)(c1, &dst, &sub_item_ptr);
1308 #else
1309             encode_item_len(&dst, sub_size);
1310             memcpy(dst, sub_item, sub_size);
1311             dst += sub_size;
1312 #endif
1313             encode_ptr(&dst, sp->pos);
1314             
1315             p2->size = dst - p2->bytes;
1316             p2->no_items = p->no_items + sp->no_items;
1317             *pos = p2->pos;  /* return new super page */
1318             close_block(b, sp);
1319             close_block(b, p2);
1320 #if INT_ENCODE
1321             (*b->method->codec.stop)(c1);
1322 #endif
1323         }
1324         else
1325         {
1326             *pos = p->pos;   /* return current one (again) */
1327         }
1328         if (p->no_items == 0)
1329             must_delete = 1;
1330         else
1331             must_delete = 0;
1332         close_block(b, p);
1333     }
1334     if (must_delete)
1335     {
1336         isamb_unlink(b, *pos);
1337         *pos = 0;
1338     }
1339 }
1340
1341 ISAMB_PP isamb_pp_open_x(ISAMB isamb, ISAM_P pos, int *level, int scope)
1342 {
1343     ISAMB_PP pp = xmalloc(sizeof(*pp));
1344     int i;
1345
1346     assert(pos);
1347
1348     pp->isamb = isamb;
1349     pp->block = xmalloc(ISAMB_MAX_LEVEL * sizeof(*pp->block));
1350
1351     pp->pos = pos;
1352     pp->level = 0;
1353     pp->maxlevel = 0;
1354     pp->total_size = 0;
1355     pp->no_blocks = 0;
1356     pp->skipped_numbers = 0;
1357     pp->returned_numbers = 0;
1358     pp->scope = scope;
1359     for (i = 0; i<ISAMB_MAX_LEVEL; i++)
1360         pp->skipped_nodes[i] = pp->accessed_nodes[i] = 0;
1361     while (1)
1362     {
1363         struct ISAMB_block *p = open_block(isamb, pos);
1364         const char *src = p->bytes + p->offset;
1365         pp->block[pp->level] = p;
1366
1367         pp->total_size += p->size;
1368         pp->no_blocks++;
1369         if (p->leaf)
1370             break;
1371         decode_ptr(&src, &pos);
1372         p->offset = src - p->bytes;
1373         pp->level++;
1374         pp->accessed_nodes[pp->level]++; 
1375     }
1376     pp->block[pp->level+1] = 0;
1377     pp->maxlevel = pp->level;
1378     if (level)
1379         *level = pp->level;
1380     return pp;
1381 }
1382
1383 ISAMB_PP isamb_pp_open(ISAMB isamb, ISAM_P pos, int scope)
1384 {
1385     return isamb_pp_open_x(isamb, pos, 0, scope);
1386 }
1387
1388 void isamb_pp_close_x(ISAMB_PP pp, zint *size, zint *blocks)
1389 {
1390     int i;
1391     if (!pp)
1392         return;
1393     yaz_log(YLOG_DEBUG, "isamb_pp_close lev=%d returned "ZINT_FORMAT" values, " 
1394             "skipped "ZINT_FORMAT,
1395             pp->maxlevel, pp->skipped_numbers, pp->returned_numbers);
1396     for (i = pp->maxlevel; i>=0; i--)
1397         if (pp->skipped_nodes[i] || pp->accessed_nodes[i])
1398             yaz_log(YLOG_DEBUG, "isamb_pp_close  level leaf-%d: "
1399                     ZINT_FORMAT" read, "ZINT_FORMAT" skipped", i,
1400                     pp->accessed_nodes[i], pp->skipped_nodes[i]);
1401     pp->isamb->skipped_numbers += pp->skipped_numbers;
1402     pp->isamb->returned_numbers += pp->returned_numbers;
1403     for (i = pp->maxlevel; i>=0; i--)
1404     {
1405         pp->isamb->accessed_nodes[i] += pp->accessed_nodes[i];
1406         pp->isamb->skipped_nodes[i] += pp->skipped_nodes[i];
1407     }
1408     if (size)
1409         *size = pp->total_size;
1410     if (blocks)
1411         *blocks = pp->no_blocks;
1412     for (i = 0; i <= pp->level; i++)
1413         close_block(pp->isamb, pp->block[i]);
1414     xfree(pp->block);
1415     xfree(pp);
1416 }
1417
1418 int isamb_block_info(ISAMB isamb, int cat)
1419 {
1420     if (cat >= 0 && cat < isamb->no_cat)
1421         return isamb->file[cat].head.block_size;
1422     return -1;
1423 }
1424
1425 void isamb_pp_close(ISAMB_PP pp)
1426 {
1427     isamb_pp_close_x(pp, 0, 0);
1428 }
1429
1430 /* simple recursive dumper .. */
1431 static void isamb_dump_r(ISAMB b, ISAM_P pos, void (*pr)(const char *str),
1432                          int level)
1433 {
1434     char buf[1024];
1435     char prefix_str[1024];
1436     if (pos)
1437     {
1438         struct ISAMB_block *p = open_block(b, pos);
1439         sprintf(prefix_str, "%*s " ZINT_FORMAT " cat=%d size=%d max=%d items="
1440                 ZINT_FORMAT, level*2, "",
1441                 pos, p->cat, p->size, b->file[p->cat].head.block_max,
1442                 p->no_items);
1443         (*pr)(prefix_str);
1444         sprintf(prefix_str, "%*s " ZINT_FORMAT, level*2, "", pos);
1445         if (p->leaf)
1446         {
1447             while (p->offset < p->size)
1448             {
1449                 const char *src = p->bytes + p->offset;
1450                 char *dst = buf;
1451                 (*b->method->codec.decode)(p->decodeClientData, &dst, &src);
1452                 (*b->method->log_item)(YLOG_DEBUG, buf, prefix_str);
1453                 p->offset = src - (char*) p->bytes;
1454             }
1455             assert(p->offset == p->size);
1456         }
1457         else
1458         {
1459             const char *src = p->bytes + p->offset;
1460             ISAM_P sub;
1461
1462             decode_ptr(&src, &sub);
1463             p->offset = src - (char*) p->bytes;
1464
1465             isamb_dump_r(b, sub, pr, level+1);
1466             
1467             while (p->offset < p->size)
1468             {
1469 #if INT_ENCODE
1470                 char file_item_buf[DST_ITEM_MAX];
1471                 char *file_item = file_item_buf;
1472                 void *c1 = (*b->method->codec.start)();
1473                 (*b->method->codec.decode)(c1, &file_item, &src);
1474                 (*b->method->codec.stop)(c1);
1475                 (*b->method->log_item)(YLOG_DEBUG, file_item_buf, prefix_str);
1476 #else
1477                 zint item_len;
1478                 decode_item_len(&src, &item_len);
1479                 (*b->method->log_item)(YLOG_DEBUG, src, prefix_str);
1480                 src += item_len;
1481 #endif
1482                 decode_ptr(&src, &sub);
1483                 
1484                 p->offset = src - (char*) p->bytes;
1485                 
1486                 isamb_dump_r(b, sub, pr, level+1);
1487             }           
1488         }
1489         close_block(b, p);
1490     }
1491 }
1492
1493 void isamb_dump(ISAMB b, ISAM_P pos, void (*pr)(const char *str))
1494 {
1495     isamb_dump_r(b, pos, pr, 0);
1496 }
1497
1498 int isamb_pp_read(ISAMB_PP pp, void *buf)
1499 {
1500     return isamb_pp_forward(pp, buf, 0);
1501 }
1502
1503
1504 void isamb_pp_pos(ISAMB_PP pp, double *current, double *total)
1505 { /* return an estimate of the current position and of the total number of */
1506   /* occureences in the isam tree, based on the current leaf */
1507     assert(total);
1508     assert(current);
1509     
1510     /* if end-of-stream PP may not be leaf */
1511
1512     *total = (double) (pp->block[0]->no_items);
1513     *current = (double) pp->returned_numbers;
1514 #if ISAMB_DEBUG
1515     yaz_log(YLOG_LOG, "isamb_pp_pos returning: cur= %0.1f tot=%0.1f rn="
1516             ZINT_FORMAT, *current, *total, pp->returned_numbers);
1517 #endif
1518 }
1519
1520 int isamb_pp_forward(ISAMB_PP pp, void *buf, const void *untilb)
1521 {
1522     char *dst = buf;
1523     const char *src;
1524     struct ISAMB_block *p = pp->block[pp->level];
1525     ISAMB b = pp->isamb;
1526     if (!p)
1527         return 0;
1528  again:
1529     while (p->offset == p->size)
1530     {
1531         ISAM_P pos;
1532 #if INT_ENCODE
1533         const char *src_0;
1534         void *c1;
1535         char file_item_buf[DST_ITEM_MAX];
1536         char *file_item = file_item_buf;
1537 #else
1538         zint item_len;
1539 #endif
1540         while (p->offset == p->size)
1541         {
1542             if (pp->level == 0)
1543                 return 0;
1544             close_block(pp->isamb, pp->block[pp->level]);
1545             pp->block[pp->level] = 0;
1546             (pp->level)--;
1547             p = pp->block[pp->level];
1548             assert(!p->leaf);  
1549         }
1550
1551         assert(!p->leaf);
1552         src = p->bytes + p->offset;
1553        
1554 #if INT_ENCODE
1555         c1 = (*b->method->codec.start)();
1556         (*b->method->codec.decode)(c1, &file_item, &src);
1557 #else
1558         decode_ptr(&src, &item_len);
1559         src += item_len;
1560 #endif        
1561         decode_ptr(&src, &pos);
1562         p->offset = src - (char*) p->bytes;
1563
1564         src = p->bytes + p->offset;
1565
1566         while(1)
1567         {
1568             if (!untilb || p->offset == p->size)
1569                 break;
1570             assert(p->offset < p->size);
1571 #if INT_ENCODE
1572             src_0 = src;
1573             file_item = file_item_buf;
1574             (*b->method->codec.reset)(c1);
1575             (*b->method->codec.decode)(c1, &file_item, &src);
1576             if ((*b->method->compare_item)(untilb, file_item_buf) < pp->scope)
1577             {
1578                 src = src_0;
1579                 break;
1580             }
1581 #else
1582             decode_item_len(&src, &item_len);
1583             if ((*b->method->compare_item)(untilb, src) < pp->scope)
1584                 break;
1585             src += item_len;
1586 #endif
1587             decode_ptr(&src, &pos);
1588             p->offset = src - (char*) p->bytes;
1589         }
1590
1591         pp->level++;
1592
1593         while (1)
1594         {
1595             pp->block[pp->level] = p = open_block(pp->isamb, pos);
1596
1597             pp->total_size += p->size;
1598             pp->no_blocks++;
1599             
1600             if (p->leaf) 
1601             {
1602                 break;
1603             }
1604             
1605             src = p->bytes + p->offset;
1606             while(1)
1607             {
1608                 decode_ptr(&src, &pos);
1609                 p->offset = src - (char*) p->bytes;
1610                 
1611                 if (!untilb || p->offset == p->size)
1612                     break;
1613                 assert(p->offset < p->size);
1614 #if INT_ENCODE
1615                 src_0 = src;
1616                 file_item = file_item_buf;
1617                 (*b->method->codec.reset)(c1);
1618                 (*b->method->codec.decode)(c1, &file_item, &src);
1619                 if ((*b->method->compare_item)(untilb, file_item_buf) < pp->scope)
1620                 {
1621                     src = src_0;
1622                     break;
1623                 }
1624 #else
1625                 decode_ptr(&src, &item_len);
1626                 if ((*b->method->compare_item)(untilb, src) <= pp->scope)
1627                     break;
1628                 src += item_len;
1629 #endif
1630             }
1631             pp->level++;
1632         }
1633 #if INT_ENCODE
1634         (*b->method->codec.stop)(c1);
1635 #endif
1636     }
1637     assert(p->offset < p->size);
1638     assert(p->leaf);
1639     while(1)
1640     {
1641         char *dst0 = dst;
1642         src = p->bytes + p->offset;
1643         (*pp->isamb->method->codec.decode)(p->decodeClientData, &dst, &src);
1644         p->offset = src - (char*) p->bytes;
1645         if (!untilb || (*pp->isamb->method->compare_item)(untilb, dst0) < pp->scope)
1646             break;
1647         dst = dst0;
1648         if (p->offset == p->size) goto again;
1649     }
1650     pp->returned_numbers++; 
1651     return 1;
1652 }
1653
1654 zint isamb_get_int_splits(ISAMB b)
1655 {
1656     return b->number_of_int_splits;
1657 }
1658
1659 zint isamb_get_leaf_splits(ISAMB b)
1660 {
1661     return b->number_of_leaf_splits;
1662 }
1663
1664 zint isamb_get_root_ptr(ISAMB b)
1665 {
1666     return b->root_ptr;
1667 }
1668
1669 void isamb_set_root_ptr(ISAMB b, zint root_ptr)
1670 {
1671     b->root_ptr = root_ptr;
1672 }
1673
1674
1675 /*
1676  * Local variables:
1677  * c-basic-offset: 4
1678  * c-file-style: "Stroustrup"
1679  * indent-tabs-mode: nil
1680  * End:
1681  * vim: shiftwidth=4 tabstop=8 expandtab
1682  */
1683