Fix a bug where second-half of block in leaf split could become
[idzebra-moved-to-github.git] / isamb / isamb.c
1 /* $Id: isamb.c,v 1.74 2005-03-18 12:05:11 adam Exp $
2    Copyright (C) 1995-2005
3    Index Data ApS
4
5 This file is part of the Zebra server.
6
7 Zebra is free software; you can redistribute it and/or modify it under
8 the terms of the GNU General Public License as published by the Free
9 Software Foundation; either version 2, or (at your option) any later
10 version.
11
12 Zebra is distributed in the hope that it will be useful, but WITHOUT ANY
13 WARRANTY; without even the implied warranty of MERCHANTABILITY or
14 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
15 for more details.
16
17 You should have received a copy of the GNU General Public License
18 along with Zebra; see the file LICENSE.zebra.  If not, write to the
19 Free Software Foundation, 59 Temple Place - Suite 330, Boston, MA
20 02111-1307, USA.
21 */
22
23 #include <stdlib.h>
24 #include <string.h>
25 #include <yaz/log.h>
26 #include <yaz/xmalloc.h>
27 #include <idzebra/isamb.h>
28 #include <assert.h>
29
30 #ifndef ISAMB_DEBUG
31 #define ISAMB_DEBUG 0
32 #endif
33
34
35 #define ISAMB_MAJOR_VERSION 3
36 #define ISAMB_MINOR_VERSION 0
37
38 struct ISAMB_head {
39     zint first_block;
40     zint last_block;
41     zint free_list;
42     zint no_items;
43     int block_size;
44     int block_max;
45     int block_offset;
46 };
47
48 /* if 1, upper nodes items are encoded; 0 if not encoded */
49 #define INT_ENCODE 1
50
51 /* maximum size of encoded buffer */
52 #define DST_ITEM_MAX 256
53
54 #define ISAMB_MAX_LEVEL 10
55 /* approx 2*max page + max size of item */
56 #define DST_BUF_SIZE 16840
57
58 #define ISAMB_CACHE_ENTRY_SIZE 4096
59
60 /* CAT_MAX: _must_ be power of 2 */
61 #define CAT_MAX 4
62 #define CAT_MASK (CAT_MAX-1)
63 /* CAT_NO: <= CAT_MAX */
64 #define CAT_NO 4
65
66 /* Smallest block size */
67 #define ISAMB_MIN_SIZE 32
68 /* Size factor */
69 #define ISAMB_FAC_SIZE 4
70
71 /* ISAMB_PTR_CODEC = 1 var, =0 fixed */
72 #define ISAMB_PTR_CODEC 1
73
74 struct ISAMB_cache_entry {
75     ISAMB_P pos;
76     unsigned char *buf;
77     int dirty;
78     int hits;
79     struct ISAMB_cache_entry *next;
80 };
81
82 struct ISAMB_file {
83     BFile bf;
84     int head_dirty;
85     struct ISAMB_head head;
86     struct ISAMB_cache_entry *cache_entries;
87 };
88
89 struct ISAMB_s {
90     BFiles bfs;
91     ISAMC_M *method;
92
93     struct ISAMB_file *file;
94     int no_cat;
95     int cache; /* 0 = no cache, 1 = use cache, -1 = dummy isam (for testing only) */
96     int log_io;        /* log level for bf_read/bf_write calls */
97     int log_freelist;  /* log level for freelist handling */
98     zint skipped_numbers; /* on a leaf node */
99     zint returned_numbers; 
100     zint skipped_nodes[ISAMB_MAX_LEVEL]; /* [0]=skipped leaves, 1 = higher etc */
101     zint accessed_nodes[ISAMB_MAX_LEVEL]; /* nodes we did not skip */
102 };
103
104 struct ISAMB_block {
105     ISAMB_P pos;
106     int cat;
107     int size;
108     int leaf;
109     int dirty;
110     int deleted;
111     int offset;
112     zint no_items;  /* number of nodes in this + children */
113     char *bytes;
114     char *cbuf;
115     unsigned char *buf;
116     void *decodeClientData;
117     int log_rw;
118 };
119
120 struct ISAMB_PP_s {
121     ISAMB isamb;
122     ISAMB_P pos;
123     int level;
124     int maxlevel; /* total depth */
125     zint total_size;
126     zint no_blocks;
127     zint skipped_numbers; /* on a leaf node */
128     zint returned_numbers; 
129     zint skipped_nodes[ISAMB_MAX_LEVEL]; /* [0]=skipped leaves, 1 = higher etc */
130     zint accessed_nodes[ISAMB_MAX_LEVEL]; /* nodes we did not skip */
131     struct ISAMB_block **block;
132     int scope;  /* on what level we forward */
133 };
134
135
136 #define encode_item_len encode_ptr
137 #if ISAMB_PTR_CODEC
138 static void encode_ptr(char **dst, zint pos)
139 {
140     unsigned char *bp = (unsigned char*) *dst;
141
142     while (pos > 127)
143     {
144          *bp++ = (unsigned char) (128 | (pos & 127));
145          pos = pos >> 7;
146     }
147     *bp++ = (unsigned char) pos;
148     *dst = (char *) bp;
149 }
150 #else
151 static void encode_ptr(char **dst, zint pos)
152 {
153     memcpy(*dst, &pos, sizeof(pos));
154     (*dst) += sizeof(pos);
155 }
156 #endif
157
158 #define decode_item_len decode_ptr
159 #if ISAMB_PTR_CODEC
160 static void decode_ptr(const char **src1, zint *pos)
161 {
162     const unsigned char **src = (const unsigned char **) src1;
163     zint d = 0;
164     unsigned char c;
165     unsigned r = 0;
166
167     while (((c = *(*src)++) & 128))
168     {
169         d += ((zint) (c & 127) << r);
170         r += 7;
171     }
172     d += ((zint) c << r);
173     *pos = d;
174 }
175 #else
176 static void decode_ptr(const char **src, zint *pos)
177 {
178      memcpy(pos, *src, sizeof(*pos));
179      (*src) += sizeof(*pos);
180 }
181 #endif
182
183 ISAMB isamb_open(BFiles bfs, const char *name, int writeflag, ISAMC_M *method,
184                   int cache)
185 {
186     ISAMB isamb = xmalloc(sizeof(*isamb));
187     int i, b_size = ISAMB_MIN_SIZE;
188
189     isamb->bfs = bfs;
190     isamb->method = (ISAMC_M *) xmalloc(sizeof(*method));
191     memcpy(isamb->method, method, sizeof(*method));
192     isamb->no_cat = CAT_NO;
193     isamb->log_io = 0;
194     isamb->log_freelist = 0;
195     isamb->cache = cache;
196     isamb->skipped_numbers = 0;
197     isamb->returned_numbers = 0;
198     for (i = 0; i<ISAMB_MAX_LEVEL; i++)
199       isamb->skipped_nodes[i] = isamb->accessed_nodes[i] = 0;
200
201     assert(cache == 0);
202     isamb->file = xmalloc(sizeof(*isamb->file) * isamb->no_cat);
203     for (i = 0; i < isamb->no_cat; i++)
204     {
205         char fname[DST_BUF_SIZE];
206         char hbuf[DST_BUF_SIZE];
207         isamb->file[i].cache_entries = 0;
208         isamb->file[i].head_dirty = 0;
209         sprintf(fname, "%s%c", name, i+'A');
210         if (cache)
211             isamb->file[i].bf = bf_open(bfs, fname, ISAMB_CACHE_ENTRY_SIZE,
212                                          writeflag);
213         else
214             isamb->file[i].bf = bf_open(bfs, fname, b_size, writeflag);
215
216         /* fill-in default values (for empty isamb) */
217         isamb->file[i].head.first_block = ISAMB_CACHE_ENTRY_SIZE/b_size+1;
218         isamb->file[i].head.last_block = isamb->file[i].head.first_block;
219         isamb->file[i].head.block_size = b_size;
220 #if ISAMB_PTR_CODEC
221         if (i == isamb->no_cat-1 || b_size > 128)
222             isamb->file[i].head.block_offset = 8;
223         else
224             isamb->file[i].head.block_offset = 4;
225 #else
226         isamb->file[i].head.block_offset = 11;
227 #endif
228         isamb->file[i].head.block_max =
229             b_size - isamb->file[i].head.block_offset;
230         isamb->file[i].head.free_list = 0;
231         if (bf_read(isamb->file[i].bf, 0, 0, 0, hbuf))
232         {
233             /* got header assume "isamb"major minor len can fit in 16 bytes */
234             zint zint_tmp;
235             int major, minor, len, pos = 0;
236             int left;
237             const char *src = 0;
238             if (memcmp(hbuf, "isamb", 5))
239             {
240                 yaz_log(YLOG_WARN, "bad isamb header for file %s", fname);
241                 return 0;
242             }
243             if (sscanf(hbuf+5, "%d %d %d", &major, &minor, &len) != 3)
244             {
245                 yaz_log(YLOG_WARN, "bad isamb header for file %s", fname);
246                 return 0;
247             }
248             if (major != ISAMB_MAJOR_VERSION)
249             {
250                 yaz_log(YLOG_WARN, "bad major version for file %s %d, must be %d",
251                      fname, major, ISAMB_MAJOR_VERSION);
252                 return 0;
253             }
254             for (left = len - b_size; left > 0; left = left - b_size)
255             {
256                 pos++;
257                 if (!bf_read(isamb->file[i].bf, pos, 0, 0, hbuf + pos*b_size))
258                 {
259                     yaz_log(YLOG_WARN, "truncated isamb header for " 
260                          "file=%s len=%d pos=%d",
261                          fname, len, pos);
262                     return 0;
263                 }
264             }
265             src = hbuf + 16;
266             decode_ptr(&src, &isamb->file[i].head.first_block);
267             decode_ptr(&src, &isamb->file[i].head.last_block);
268             decode_ptr(&src, &zint_tmp);
269             isamb->file[i].head.block_size = (int) zint_tmp;
270             decode_ptr(&src, &zint_tmp);
271             isamb->file[i].head.block_max = (int) zint_tmp;
272             decode_ptr(&src, &isamb->file[i].head.free_list);
273         }
274         assert (isamb->file[i].head.block_size >= isamb->file[i].head.block_offset);
275         isamb->file[i].head_dirty = 0;
276         assert(isamb->file[i].head.block_size == b_size);
277         b_size = b_size * ISAMB_FAC_SIZE;
278     }
279 #if ISAMB_DEBUG
280     yaz_log(YLOG_WARN, "isamb debug enabled. Things will be slower than usual");
281 #endif
282     return isamb;
283 }
284
285 static void flush_blocks (ISAMB b, int cat)
286 {
287     while (b->file[cat].cache_entries)
288     {
289         struct ISAMB_cache_entry *ce_this = b->file[cat].cache_entries;
290         b->file[cat].cache_entries = ce_this->next;
291
292         if (ce_this->dirty)
293         {
294             yaz_log(b->log_io, "bf_write: flush_blocks");
295             bf_write(b->file[cat].bf, ce_this->pos, 0, 0, ce_this->buf);
296         }
297         xfree(ce_this->buf);
298         xfree(ce_this);
299     }
300 }
301
302 static int cache_block (ISAMB b, ISAMC_P pos, char *userbuf, int wr)
303 {
304     int cat = (int) (pos&CAT_MASK);
305     int off = (int) (((pos/CAT_MAX) & 
306                (ISAMB_CACHE_ENTRY_SIZE / b->file[cat].head.block_size - 1))
307         * b->file[cat].head.block_size);
308     zint norm = pos / (CAT_MASK*ISAMB_CACHE_ENTRY_SIZE / b->file[cat].head.block_size);
309     int no = 0;
310     struct ISAMB_cache_entry **ce, *ce_this = 0, **ce_last = 0;
311
312     if (!b->cache)
313         return 0;
314
315     assert (ISAMB_CACHE_ENTRY_SIZE >= b->file[cat].head.block_size);
316     for (ce = &b->file[cat].cache_entries; *ce; ce = &(*ce)->next, no++)
317     {
318         ce_last = ce;
319         if ((*ce)->pos == norm)
320         {
321             ce_this = *ce;
322             *ce = (*ce)->next;   /* remove from list */
323             
324             ce_this->next = b->file[cat].cache_entries;  /* move to front */
325             b->file[cat].cache_entries = ce_this;
326             
327             if (wr)
328             {
329                 memcpy (ce_this->buf + off, userbuf, 
330                         b->file[cat].head.block_size);
331                 ce_this->dirty = 1;
332             }
333             else
334                 memcpy (userbuf, ce_this->buf + off,
335                         b->file[cat].head.block_size);
336             return 1;
337         }
338     }
339     if (no >= 40)
340     {
341         assert (no == 40);
342         assert (ce_last && *ce_last);
343         ce_this = *ce_last;
344         *ce_last = 0;  /* remove the last entry from list */
345         if (ce_this->dirty)
346         {
347             yaz_log(b->log_io, "bf_write: cache_block");
348             bf_write(b->file[cat].bf, ce_this->pos, 0, 0, ce_this->buf);
349         }
350         xfree(ce_this->buf);
351         xfree(ce_this);
352     }
353     ce_this = xmalloc(sizeof(*ce_this));
354     ce_this->next = b->file[cat].cache_entries;
355     b->file[cat].cache_entries = ce_this;
356     ce_this->buf = xmalloc(ISAMB_CACHE_ENTRY_SIZE);
357     ce_this->pos = norm;
358     yaz_log(b->log_io, "bf_read: cache_block");
359     if (!bf_read(b->file[cat].bf, norm, 0, 0, ce_this->buf))
360         memset (ce_this->buf, 0, ISAMB_CACHE_ENTRY_SIZE);
361     if (wr)
362     {
363         memcpy (ce_this->buf + off, userbuf, b->file[cat].head.block_size);
364         ce_this->dirty = 1;
365     }
366     else
367     {
368         ce_this->dirty = 0;
369         memcpy (userbuf, ce_this->buf + off, b->file[cat].head.block_size);
370     }
371     return 1;
372 }
373
374
375 void isamb_close (ISAMB isamb)
376 {
377     int i;
378     for (i = 0; isamb->accessed_nodes[i]; i++)
379         yaz_log(YLOG_DEBUG, "isamb_close  level leaf-%d: "ZINT_FORMAT" read, "
380                         ZINT_FORMAT" skipped",
381              i, isamb->accessed_nodes[i], isamb->skipped_nodes[i]);
382     yaz_log(YLOG_DEBUG, "isamb_close returned "ZINT_FORMAT" values, "
383                    "skipped "ZINT_FORMAT,
384          isamb->skipped_numbers, isamb->returned_numbers);
385     for (i = 0; i<isamb->no_cat; i++)
386     {
387         flush_blocks (isamb, i);
388         if (isamb->file[i].head_dirty)
389         {
390             char hbuf[DST_BUF_SIZE];
391             int major = ISAMB_MAJOR_VERSION;
392             int minor = ISAMB_MINOR_VERSION;
393             int len = 16;
394             char *dst = hbuf + 16;
395             int pos = 0, left;
396             int b_size = isamb->file[i].head.block_size;
397
398             encode_ptr(&dst, isamb->file[i].head.first_block);
399             encode_ptr(&dst, isamb->file[i].head.last_block);
400             encode_ptr(&dst, isamb->file[i].head.block_size);
401             encode_ptr(&dst, isamb->file[i].head.block_max);
402             encode_ptr(&dst, isamb->file[i].head.free_list);
403             memset(dst, '\0', b_size); /* ensure no random bytes are written */
404
405             len = dst - hbuf;
406
407             /* print exactly 16 bytes (including trailing 0) */
408             sprintf(hbuf, "isamb%02d %02d %02d\r\n", major, minor, len);
409
410             bf_write(isamb->file[i].bf, pos, 0, 0, hbuf);
411
412             for (left = len - b_size; left > 0; left = left - b_size)
413             {
414                 pos++;
415                 bf_write(isamb->file[i].bf, pos, 0, 0, hbuf + pos*b_size);
416             }
417         }
418         bf_close (isamb->file[i].bf);
419     }
420     xfree(isamb->file);
421     xfree(isamb->method);
422     xfree(isamb);
423 }
424
425 /* open_block: read one block at pos.
426    Decode leading sys bytes .. consisting of
427    Offset:Meaning
428    0: leader byte, != 0 leaf, == 0, non-leaf
429    1-2: used size of block
430    3-7*: number of items and all children
431    
432    * Reserve 5 bytes for large block sizes. 1 for small ones .. Number
433    of items. We can thus have at most 2^40 nodes. 
434 */
435 static struct ISAMB_block *open_block(ISAMB b, ISAMC_P pos)
436 {
437     int cat = (int) (pos&CAT_MASK);
438     const char *src;
439     int offset = b->file[cat].head.block_offset;
440     struct ISAMB_block *p;
441     if (!pos)
442         return 0;
443     p = xmalloc(sizeof(*p));
444     p->pos = pos;
445     p->cat = (int) (pos & CAT_MASK);
446     p->buf = xmalloc(b->file[cat].head.block_size);
447     p->cbuf = 0;
448
449     if (!cache_block (b, pos, p->buf, 0))
450     {
451         yaz_log(b->log_io, "bf_read: open_block");
452         if (!bf_read(b->file[cat].bf, pos/CAT_MAX, 0, 0, p->buf))
453         {
454             yaz_log(YLOG_FATAL, "isamb: read fail for pos=%ld block=%ld",
455                      (long) pos, (long) pos/CAT_MAX);
456             abort();
457         }
458     }
459     p->bytes = p->buf + offset;
460     p->leaf = p->buf[0];
461     p->size = (p->buf[1] + 256 * p->buf[2]) - offset;
462     if (p->size < 0)
463     {
464         yaz_log(YLOG_FATAL, "Bad block size %d in pos=" ZINT_FORMAT "\n",
465                  p->size, pos);
466     }
467     assert (p->size >= 0);
468     src = p->buf + 3;
469     decode_ptr(&src, &p->no_items);
470
471     p->offset = 0;
472     p->dirty = 0;
473     p->deleted = 0;
474     p->decodeClientData = (*b->method->codec.start)();
475     return p;
476 }
477
478 struct ISAMB_block *new_block (ISAMB b, int leaf, int cat)
479 {
480     struct ISAMB_block *p;
481
482     p = xmalloc(sizeof(*p));
483     p->buf = xmalloc(b->file[cat].head.block_size);
484
485     if (!b->file[cat].head.free_list)
486     {
487         zint block_no;
488         block_no = b->file[cat].head.last_block++;
489         p->pos = block_no * CAT_MAX + cat;
490     }
491     else
492     {
493         p->pos = b->file[cat].head.free_list;
494         assert((p->pos & CAT_MASK) == cat);
495         if (!cache_block (b, p->pos, p->buf, 0))
496         {
497             yaz_log(b->log_io, "bf_read: new_block");
498             if (!bf_read(b->file[cat].bf, p->pos/CAT_MAX, 0, 0, p->buf))
499             {
500                 yaz_log(YLOG_FATAL, "isamb: read fail for pos=%ld block=%ld",
501                          (long) p->pos/CAT_MAX, (long) p->pos/CAT_MAX);
502                 abort ();
503             }
504         }
505         yaz_log(b->log_freelist, "got block " ZINT_FORMAT " from freelist %d:" ZINT_FORMAT, p->pos,
506                  cat, p->pos/CAT_MAX);
507         memcpy (&b->file[cat].head.free_list, p->buf, sizeof(zint));
508     }
509     p->cat = cat;
510     b->file[cat].head_dirty = 1;
511     memset (p->buf, 0, b->file[cat].head.block_size);
512     p->bytes = p->buf + b->file[cat].head.block_offset;
513     p->leaf = leaf;
514     p->size = 0;
515     p->dirty = 1;
516     p->deleted = 0;
517     p->offset = 0;
518     p->no_items = 0;
519     p->decodeClientData = (*b->method->codec.start)();
520     return p;
521 }
522
523 struct ISAMB_block *new_leaf (ISAMB b, int cat)
524 {
525     return new_block (b, 1, cat);
526 }
527
528
529 struct ISAMB_block *new_int (ISAMB b, int cat)
530 {
531     return new_block (b, 0, cat);
532 }
533
534 static void check_block (ISAMB b, struct ISAMB_block *p)
535 {
536     assert(b); /* mostly to make the compiler shut up about unused b */
537     if (p->leaf)
538     {
539         ;
540     }
541     else
542     {
543         /* sanity check */
544         char *startp = p->bytes;
545         const char *src = startp;
546         char *endp = p->bytes + p->size;
547         ISAMB_P pos;
548         void *c1 = (*b->method->codec.start)();
549             
550         decode_ptr(&src, &pos);
551         assert ((pos&CAT_MASK) == p->cat);
552         while (src != endp)
553         {
554 #if INT_ENCODE
555             char file_item_buf[DST_ITEM_MAX];
556             char *file_item = file_item_buf;
557             (*b->method->codec.reset)(c1);
558             (*b->method->codec.decode)(c1, &file_item, &src);
559 #else
560             zint item_len;
561             decode_item_len(&src, &item_len);
562             assert (item_len > 0 && item_len < 80);
563             src += item_len;
564 #endif
565             decode_ptr(&src, &pos);
566             if ((pos&CAT_MASK) != p->cat)
567             {
568                 assert ((pos&CAT_MASK) == p->cat);
569             }
570         }
571         (*b->method->codec.stop)(c1);
572     }
573 }
574
575 void close_block(ISAMB b, struct ISAMB_block *p)
576 {
577     if (!p)
578         return;
579     if (p->deleted)
580     {
581         yaz_log(b->log_freelist, "release block " ZINT_FORMAT " from freelist %d:" ZINT_FORMAT,
582                  p->pos, p->cat, p->pos/CAT_MAX);
583         memcpy (p->buf, &b->file[p->cat].head.free_list, sizeof(zint));
584         b->file[p->cat].head.free_list = p->pos;
585         if (!cache_block (b, p->pos, p->buf, 1))
586         {
587             yaz_log(b->log_io, "bf_write: close_block (deleted)");
588             bf_write(b->file[p->cat].bf, p->pos/CAT_MAX, 0, 0, p->buf);
589         }
590     }
591     else if (p->dirty)
592     {
593         int offset = b->file[p->cat].head.block_offset;
594         int size = p->size + offset;
595         char *dst =  p->buf + 3;
596         assert (p->size >= 0);
597         
598         /* memset becuase encode_ptr usually does not write all bytes */
599         memset(p->buf, 0, b->file[p->cat].head.block_offset);
600         p->buf[0] = p->leaf;
601         p->buf[1] = size & 255;
602         p->buf[2] = size >> 8;
603         encode_ptr(&dst, p->no_items);
604         check_block(b, p);
605         if (!cache_block (b, p->pos, p->buf, 1))
606         {
607             yaz_log(b->log_io, "bf_write: close_block");
608             bf_write(b->file[p->cat].bf, p->pos/CAT_MAX, 0, 0, p->buf);
609         }
610     }
611     (*b->method->codec.stop)(p->decodeClientData);
612     xfree(p->buf);
613     xfree(p);
614 }
615
616 int insert_sub (ISAMB b, struct ISAMB_block **p,
617                 void *new_item, int *mode,
618                 ISAMC_I *stream,
619                 struct ISAMB_block **sp,
620                 void *sub_item, int *sub_size,
621                 const void *max_item);
622
623 int insert_int (ISAMB b, struct ISAMB_block *p, void *lookahead_item,
624                 int *mode,
625                 ISAMC_I *stream, struct ISAMB_block **sp,
626                 void *split_item, int *split_size, const void *last_max_item)
627 {
628     char *startp = p->bytes;
629     const char *src = startp;
630     char *endp = p->bytes + p->size;
631     ISAMB_P pos;
632     struct ISAMB_block *sub_p1 = 0, *sub_p2 = 0;
633     char sub_item[DST_ITEM_MAX];
634     int sub_size;
635     int more = 0;
636     zint diff_terms = 0;
637     void *c1 = (*b->method->codec.start)();
638
639     *sp = 0;
640
641     assert(p->size >= 0);
642     decode_ptr(&src, &pos);
643     while (src != endp)
644     {
645         int d;
646         const char *src0 = src;
647 #if INT_ENCODE
648         char file_item_buf[DST_ITEM_MAX];
649         char *file_item = file_item_buf;
650         (*b->method->codec.reset)(c1);
651         (*b->method->codec.decode)(c1, &file_item, &src);
652         d = (*b->method->compare_item)(file_item_buf, lookahead_item);
653         if (d > 0)
654         {
655             sub_p1 = open_block(b, pos);
656             assert (sub_p1);
657             diff_terms -= sub_p1->no_items;
658             more = insert_sub (b, &sub_p1, lookahead_item, mode,
659                                stream, &sub_p2, 
660                                sub_item, &sub_size, file_item_buf);
661             diff_terms += sub_p1->no_items;
662             src = src0;
663             break;
664         }
665 #else
666         zint item_len;
667         decode_item_len(&src, &item_len);
668         d = (*b->method->compare_item)(src, lookahead_item);
669         if (d > 0)
670         {
671             sub_p1 = open_block(b, pos);
672             assert (sub_p1);
673             diff_terms -= sub_p1->no_items;
674             more = insert_sub (b, &sub_p1, lookahead_item, mode,
675                                stream, &sub_p2, 
676                                sub_item, &sub_size, src);
677             diff_terms += sub_p1->no_items;
678             src = src0;
679             break;
680         }
681         src += item_len;
682 #endif
683         decode_ptr(&src, &pos);
684     }
685     if (!sub_p1)
686     {
687         /* we reached the end. So lookahead > last item */
688         sub_p1 = open_block(b, pos);
689         assert (sub_p1);
690         diff_terms -= sub_p1->no_items;
691         more = insert_sub (b, &sub_p1, lookahead_item, mode, stream, &sub_p2, 
692                            sub_item, &sub_size, last_max_item);
693         diff_terms += sub_p1->no_items;
694     }
695     if (sub_p2)
696         diff_terms += sub_p2->no_items;
697     if (diff_terms)
698     {
699         p->dirty = 1;
700         p->no_items += diff_terms;
701     }
702     if (sub_p2)
703     {
704         /* there was a split - must insert pointer in this one */
705         char dst_buf[DST_BUF_SIZE];
706         char *dst = dst_buf;
707 #if INT_ENCODE
708         const char *sub_item_ptr = sub_item;
709 #endif
710         assert (sub_size < 80 && sub_size > 1);
711
712         memcpy (dst, startp, src - startp);
713                 
714         dst += src - startp;
715
716 #if INT_ENCODE
717         (*b->method->codec.reset)(c1);
718         (*b->method->codec.encode)(c1, &dst, &sub_item_ptr);
719 #else
720         encode_item_len (&dst, sub_size);      /* sub length and item */
721         memcpy (dst, sub_item, sub_size);
722         dst += sub_size;
723 #endif
724
725         encode_ptr(&dst, sub_p2->pos);   /* pos */
726
727         if (endp - src)                   /* remaining data */
728         {
729             memcpy (dst, src, endp - src);
730             dst += endp - src;
731         }
732         p->size = dst - dst_buf;
733         assert (p->size >= 0);
734
735
736         if (p->size <= b->file[p->cat].head.block_max)
737         {
738             /* it fits OK in this block */
739             memcpy (startp, dst_buf, dst - dst_buf);
740         }
741         else
742         {
743             /* must split _this_ block as well .. */
744             struct ISAMB_block *sub_p3;
745 #if INT_ENCODE
746             char file_item_buf[DST_ITEM_MAX];
747             char *file_item = file_item_buf;
748 #else
749             zint split_size_tmp;
750 #endif
751             zint no_items_first_half = 0;
752             int p_new_size;
753             const char *half;
754             src = dst_buf;
755             endp = dst;
756
757             half = src + b->file[p->cat].head.block_size/2;
758             decode_ptr(&src, &pos);
759
760             /* read sub block so we can get no_items for it */
761             sub_p3 = open_block(b, pos);
762             no_items_first_half += sub_p3->no_items;
763             close_block(b, sub_p3);
764
765             while (src <= half)
766             {
767 #if INT_ENCODE
768                 file_item = file_item_buf;
769                 (*b->method->codec.reset)(c1);
770                 (*b->method->codec.decode)(c1, &file_item, &src);
771 #else
772                 decode_item_len(&src, &split_size_tmp);
773                 *split_size = (int) split_size_tmp;
774                 src += *split_size;
775 #endif
776                 decode_ptr(&src, &pos);
777
778                 /* read sub block so we can get no_items for it */
779                 sub_p3 = open_block(b, pos);
780                 no_items_first_half += sub_p3->no_items;
781                 close_block(b, sub_p3);
782             }
783             /*  p is first half */
784             p_new_size = src - dst_buf;
785             memcpy (p->bytes, dst_buf, p_new_size);
786
787 #if INT_ENCODE
788             file_item = file_item_buf;
789             (*b->method->codec.reset)(c1);
790             (*b->method->codec.decode)(c1, &file_item, &src);
791             *split_size = file_item - file_item_buf;
792             memcpy(split_item, file_item_buf, *split_size);
793 #else
794             decode_item_len(&src, &split_size_tmp);
795             *split_size = (int) split_size_tmp;
796             memcpy (split_item, src, *split_size);
797             src += *split_size;
798 #endif
799             /*  *sp is second half */
800             *sp = new_int (b, p->cat);
801             (*sp)->size = endp - src;
802             memcpy ((*sp)->bytes, src, (*sp)->size);
803
804             p->size = p_new_size;
805
806             /*  adjust no_items in first&second half */
807             (*sp)->no_items = p->no_items - no_items_first_half;
808             p->no_items = no_items_first_half;
809         }
810         p->dirty = 1;
811         close_block(b, sub_p2);
812     }
813     close_block(b, sub_p1);
814     (*b->method->codec.stop)(c1);
815     return more;
816 }
817
818 int insert_leaf (ISAMB b, struct ISAMB_block **sp1, void *lookahead_item,
819                  int *lookahead_mode, ISAMC_I *stream,
820                  struct ISAMB_block **sp2,
821                  void *sub_item, int *sub_size,
822                  const void *max_item)
823 {
824     struct ISAMB_block *p = *sp1;
825     char *endp = 0;
826     const char *src = 0;
827     char dst_buf[DST_BUF_SIZE], *dst = dst_buf;
828     int new_size;
829     void *c1 = (*b->method->codec.start)();
830     void *c2 = (*b->method->codec.start)();
831     int more = 1;
832     int quater = b->file[b->no_cat-1].head.block_max / 4;
833     char *mid_cut = dst_buf + quater * 2;
834     char *tail_cut = dst_buf + quater * 3;
835     char *maxp = dst_buf + b->file[b->no_cat-1].head.block_max;
836     char *half1 = 0;
837     char *half2 = 0;
838     char cut_item_buf[DST_ITEM_MAX];
839     int cut_item_size = 0;
840     int no_items = 0;    /* number of items (total) */
841     int no_items_1 = 0;  /* number of items (first half) */
842
843     if (p && p->size)
844     {
845         char file_item_buf[DST_ITEM_MAX];
846         char *file_item = file_item_buf;
847             
848         src = p->bytes;
849         endp = p->bytes + p->size;
850         (*b->method->codec.decode)(c1, &file_item, &src);
851         while (1)
852         {
853             const char *dst_item = 0; /* resulting item to be inserted */
854             char *lookahead_next;
855             int d = -1;
856             
857             if (lookahead_item)
858                 d = (*b->method->compare_item)(file_item_buf, lookahead_item);
859             
860             /* d now holds comparison between existing file item and
861                lookahead item 
862                d = 0: equal
863                d > 0: lookahead before file
864                d < 0: lookahead after file
865             */
866             if (d > 0)
867             {
868                 /* lookahead must be inserted */
869                 dst_item = lookahead_item;
870                 /* if this is not an insertion, it's really bad .. */
871                 if (!*lookahead_mode)
872                 {
873                     yaz_log(YLOG_WARN, "isamb: Inconsistent register (1)");
874                     assert (*lookahead_mode);
875                 }
876             }
877             else
878                 dst_item = file_item_buf;
879
880                
881             if (!*lookahead_mode && d == 0)
882             {
883                 /* it's a deletion and they match so there is nothing to be
884                    inserted anyway .. But mark the thing bad (file item
885                    was part of input.. The item will not be part of output */
886                 p->dirty = 1;
887             }
888             else if (!half1 && dst > mid_cut)
889             {
890                 /* we have reached the splitting point for the first time */
891                 const char *dst_item_0 = dst_item;
892                 half1 = dst; /* candidate for splitting */
893
894                 /* encode the resulting item */
895                 (*b->method->codec.encode)(c2, &dst, &dst_item);
896                 
897                 cut_item_size = dst_item - dst_item_0;
898                 assert(cut_item_size > 0);
899                 memcpy (cut_item_buf, dst_item_0, cut_item_size);
900                 
901                 half2 = dst;
902                 no_items_1 = no_items;
903                 no_items++;
904             }
905             else
906             {
907                 /* encode the resulting item */
908                 (*b->method->codec.encode)(c2, &dst, &dst_item);
909                 no_items++;
910             }
911
912             /* now move "pointers" .. result has been encoded .. */
913             if (d > 0)  
914             {
915                 /* we must move the lookahead pointer */
916
917                 if (dst > maxp)
918                     /* no more room. Mark lookahead as "gone".. */
919                     lookahead_item = 0;
920                 else
921                 {
922                     /* move it really.. */
923                     lookahead_next = lookahead_item;
924                     if (!(*stream->read_item)(stream->clientData,
925                                               &lookahead_next,
926                                               lookahead_mode))
927                     {
928                         /* end of stream reached: no "more" and no lookahead */
929                         lookahead_item = 0;
930                         more = 0;
931                     }
932                     if (lookahead_item && max_item &&
933                         (*b->method->compare_item)(max_item, lookahead_item) <= 0)
934                     {
935                         /* the lookahead goes beyond what we allow in this
936                            leaf. Mark it as "gone" */
937                         lookahead_item = 0;
938                     }
939                     
940                     p->dirty = 1;
941                 }
942             }
943             else if (d == 0)
944             {
945                 /* exact match .. move both pointers */
946
947                 lookahead_next = lookahead_item;
948                 if (!(*stream->read_item)(stream->clientData,
949                                           &lookahead_next, lookahead_mode))
950                 {
951                     lookahead_item = 0;
952                     more = 0;
953                 }
954                 if (src == endp)
955                     break;  /* end of file stream reached .. */
956                 file_item = file_item_buf; /* move file pointer */
957                 (*b->method->codec.decode)(c1, &file_item, &src);
958             }
959             else
960             {
961                 /* file pointer must be moved */
962                 if (src == endp)
963                     break;
964                 file_item = file_item_buf;
965                 (*b->method->codec.decode)(c1, &file_item, &src);
966             }
967         }
968     }
969
970     /* this loop runs when we are "appending" to a leaf page. That is
971        either it's empty (new) or all file items have been read in
972        previous loop */
973
974     /* determine maximum ptr for tail */
975     if (half2)
976     {
977         /* split already. No more splits - fill up to one full (Extra) block */
978         maxp = half2 + b->file[b->no_cat-1].head.block_max;
979     }
980     else
981     {
982         /* no split. Fill up to 1+1/4 block */
983         maxp = dst_buf + b->file[b->no_cat-1].head.block_max + quater;
984     }
985     while (lookahead_item)
986     {
987         char *dst_item;
988         const char *src = lookahead_item;
989         char *dst_0 = dst;
990         
991         /* if we have a lookahead item, we stop if we exceed the value of it */
992         if (max_item &&
993             (*b->method->compare_item)(max_item, lookahead_item) <= 0)
994         {
995             /* stop if we have reached the value of max item */
996             break;
997         }
998         if (!*lookahead_mode)
999         {
1000             /* this is append. So a delete is bad */
1001             yaz_log(YLOG_WARN, "isamb: Inconsistent register (2)");
1002             abort();
1003         }
1004         else if (!half1 && dst > tail_cut)
1005         {
1006             const char *src_0 = src;
1007             half1 = dst; /* candidate for splitting */
1008             
1009             (*b->method->codec.encode)(c2, &dst, &src);
1010             
1011             cut_item_size = src - src_0;
1012             assert(cut_item_size > 0);
1013             memcpy (cut_item_buf, src_0, cut_item_size);
1014             
1015             no_items_1 = no_items;
1016             half2 = dst;
1017         }
1018         else
1019             (*b->method->codec.encode)(c2, &dst, &src);
1020
1021         if (dst > maxp)
1022         {
1023             dst = dst_0;
1024             break;
1025         }
1026         no_items++;
1027         if (p)
1028             p->dirty = 1;
1029         dst_item = lookahead_item;
1030         if (!(*stream->read_item)(stream->clientData, &dst_item,
1031                                   lookahead_mode))
1032         {
1033             lookahead_item = 0;
1034             more = 0;
1035         }
1036     }
1037     new_size = dst - dst_buf;
1038     if (p && p->cat != b->no_cat-1 && 
1039         new_size > b->file[p->cat].head.block_max)
1040     {
1041         /* non-btree block will be removed */
1042         p->deleted = 1;
1043         close_block(b, p);
1044         /* delete it too!! */
1045         p = 0; /* make a new one anyway */
1046     }
1047     if (!p)
1048     {   /* must create a new one */
1049         int i;
1050         for (i = 0; i < b->no_cat; i++)
1051             if (new_size <= b->file[i].head.block_max)
1052                 break;
1053         if (i == b->no_cat)
1054             i = b->no_cat - 1;
1055         p = new_leaf (b, i);
1056     }
1057     if (new_size > b->file[p->cat].head.block_max)
1058     {
1059         char *first_dst;
1060         const char *cut_item = cut_item_buf;
1061
1062         assert (half1);
1063         assert (half2);
1064
1065         assert(cut_item_size > 0);
1066         
1067         /* first half */
1068         p->size = half1 - dst_buf;
1069         memcpy (p->bytes, dst_buf, half1 - dst_buf);
1070         p->no_items = no_items_1;
1071
1072         /* second half */
1073         *sp2 = new_leaf (b, p->cat);
1074
1075         (*b->method->codec.reset)(c2);
1076
1077         first_dst = (*sp2)->bytes;
1078
1079         (*b->method->codec.encode)(c2, &first_dst, &cut_item);
1080
1081         memcpy (first_dst, half2, dst - half2);
1082
1083         (*sp2)->size = (first_dst - (*sp2)->bytes) + (dst - half2);
1084         (*sp2)->no_items = no_items - no_items_1;
1085         (*sp2)->dirty = 1;
1086         p->dirty = 1;
1087         memcpy (sub_item, cut_item_buf, cut_item_size);
1088         *sub_size = cut_item_size;
1089     }
1090     else
1091     {
1092         memcpy (p->bytes, dst_buf, dst - dst_buf);
1093         p->size = new_size;
1094         p->no_items = no_items;
1095     }
1096     (*b->method->codec.stop)(c1);
1097     (*b->method->codec.stop)(c2);
1098     *sp1 = p;
1099     return more;
1100 }
1101
1102 int insert_sub (ISAMB b, struct ISAMB_block **p, void *new_item,
1103                 int *mode,
1104                 ISAMC_I *stream,
1105                 struct ISAMB_block **sp,
1106                 void *sub_item, int *sub_size,
1107                 const void *max_item)
1108 {
1109     if (!*p || (*p)->leaf)
1110         return insert_leaf (b, p, new_item, mode, stream, sp, sub_item, 
1111                             sub_size, max_item);
1112     else
1113         return insert_int (b, *p, new_item, mode, stream, sp, sub_item,
1114                            sub_size, max_item);
1115 }
1116
1117 int isamb_unlink (ISAMB b, ISAMC_P pos)
1118 {
1119     struct ISAMB_block *p1;
1120
1121     if (!pos)
1122         return 0;
1123     p1 = open_block(b, pos);
1124     p1->deleted = 1;
1125     if (!p1->leaf)
1126     {
1127         zint sub_p;
1128         const char *src = p1->bytes + p1->offset;
1129 #if INT_ENCODE
1130         void *c1 = (*b->method->codec.start)();
1131 #endif
1132         decode_ptr(&src, &sub_p);
1133         isamb_unlink(b, sub_p);
1134         
1135         while (src != p1->bytes + p1->size)
1136         {
1137 #if INT_ENCODE
1138             char file_item_buf[DST_ITEM_MAX];
1139             char *file_item = file_item_buf;
1140             (*b->method->codec.reset)(c1);
1141             (*b->method->codec.decode)(c1, &file_item, &src);
1142 #else
1143             zint item_len;
1144             decode_item_len(&src, &item_len);
1145             src += item_len;
1146 #endif
1147             decode_ptr(&src, &sub_p);
1148             isamb_unlink(b, sub_p);
1149         }
1150 #if INT_ENCODE
1151         (*b->method->codec.stop)(c1);
1152 #endif
1153     }
1154     close_block(b, p1);
1155     return 0;
1156 }
1157
1158 ISAMB_P isamb_merge (ISAMB b, ISAMC_P pos, ISAMC_I *stream)
1159 {
1160     char item_buf[DST_ITEM_MAX];
1161     char *item_ptr;
1162     int i_mode;
1163     int more;
1164     int must_delete = 0;
1165
1166     if (b->cache < 0)
1167     {
1168         int more = 1;
1169         while (more)
1170         {
1171             item_ptr = item_buf;
1172             more =
1173                 (*stream->read_item)(stream->clientData, &item_ptr, &i_mode);
1174         }
1175         return 1;
1176     }
1177     item_ptr = item_buf;
1178     more = (*stream->read_item)(stream->clientData, &item_ptr, &i_mode);
1179     while (more)
1180     {
1181         struct ISAMB_block *p = 0, *sp = 0;
1182         char sub_item[DST_ITEM_MAX];
1183         int sub_size;
1184         
1185         if (pos)
1186             p = open_block(b, pos);
1187         more = insert_sub (b, &p, item_buf, &i_mode, stream, &sp,
1188                            sub_item, &sub_size, 0);
1189         if (sp)
1190         {    /* increase level of tree by one */
1191             struct ISAMB_block *p2 = new_int (b, p->cat);
1192             char *dst = p2->bytes + p2->size;
1193 #if INT_ENCODE
1194             void *c1 = (*b->method->codec.start)();
1195             const char *sub_item_ptr = sub_item;
1196 #endif
1197
1198             encode_ptr(&dst, p->pos);
1199             assert (sub_size < 80 && sub_size > 1);
1200 #if INT_ENCODE
1201             (*b->method->codec.reset)(c1);
1202             (*b->method->codec.encode)(c1, &dst, &sub_item_ptr);
1203 #else
1204             encode_item_len (&dst, sub_size);
1205             memcpy (dst, sub_item, sub_size);
1206             dst += sub_size;
1207 #endif
1208             encode_ptr(&dst, sp->pos);
1209             
1210             p2->size = dst - p2->bytes;
1211             p2->no_items = p->no_items + sp->no_items;
1212             pos = p2->pos;  /* return new super page */
1213             close_block(b, sp);
1214             close_block(b, p2);
1215 #if INT_ENCODE
1216             (*b->method->codec.stop)(c1);
1217 #endif
1218         }
1219         else
1220         {
1221             pos = p->pos;   /* return current one (again) */
1222         }
1223         if (p->no_items == 0)
1224             must_delete = 1;
1225         else
1226             must_delete = 0;
1227         close_block(b, p);
1228     }
1229     if (must_delete)
1230     {
1231         isamb_unlink(b, pos);
1232         return 0;
1233     }
1234     return pos;
1235 }
1236
1237 ISAMB_PP isamb_pp_open_x(ISAMB isamb, ISAMB_P pos, int *level, int scope)
1238 {
1239     ISAMB_PP pp = xmalloc(sizeof(*pp));
1240     int i;
1241
1242     assert(pos);
1243
1244     pp->isamb = isamb;
1245     pp->block = xmalloc(ISAMB_MAX_LEVEL * sizeof(*pp->block));
1246
1247     pp->pos = pos;
1248     pp->level = 0;
1249     pp->maxlevel = 0;
1250     pp->total_size = 0;
1251     pp->no_blocks = 0;
1252     pp->skipped_numbers = 0;
1253     pp->returned_numbers = 0;
1254     pp->scope = scope;
1255     for (i = 0; i<ISAMB_MAX_LEVEL; i++)
1256         pp->skipped_nodes[i] = pp->accessed_nodes[i] = 0;
1257     while (1)
1258     {
1259         struct ISAMB_block *p = open_block(isamb, pos);
1260         const char *src = p->bytes + p->offset;
1261         pp->block[pp->level] = p;
1262
1263         pp->total_size += p->size;
1264         pp->no_blocks++;
1265         if (p->leaf)
1266             break;
1267         decode_ptr(&src, &pos);
1268         p->offset = src - p->bytes;
1269         pp->level++;
1270         pp->accessed_nodes[pp->level]++; 
1271     }
1272     pp->block[pp->level+1] = 0;
1273     pp->maxlevel = pp->level;
1274     if (level)
1275         *level = pp->level;
1276     return pp;
1277 }
1278
1279 ISAMB_PP isamb_pp_open (ISAMB isamb, ISAMB_P pos, int scope)
1280 {
1281     return isamb_pp_open_x(isamb, pos, 0, scope);
1282 }
1283
1284 void isamb_pp_close_x(ISAMB_PP pp, zint *size, zint *blocks)
1285 {
1286     int i;
1287     if (!pp)
1288         return;
1289     yaz_log(YLOG_DEBUG, "isamb_pp_close lev=%d returned "ZINT_FORMAT" values, " 
1290                     "skipped "ZINT_FORMAT,
1291         pp->maxlevel, pp->skipped_numbers, pp->returned_numbers);
1292     for (i = pp->maxlevel; i>=0; i--)
1293         if (pp->skipped_nodes[i] || pp->accessed_nodes[i])
1294             yaz_log(YLOG_DEBUG, "isamb_pp_close  level leaf-%d: "
1295                             ZINT_FORMAT" read, "ZINT_FORMAT" skipped", i,
1296                  pp->accessed_nodes[i], pp->skipped_nodes[i]);
1297     pp->isamb->skipped_numbers += pp->skipped_numbers;
1298     pp->isamb->returned_numbers += pp->returned_numbers;
1299     for (i = pp->maxlevel; i>=0; i--)
1300     {
1301         pp->isamb->accessed_nodes[i] += pp->accessed_nodes[i];
1302         pp->isamb->skipped_nodes[i] += pp->skipped_nodes[i];
1303     }
1304     if (size)
1305         *size = pp->total_size;
1306     if (blocks)
1307         *blocks = pp->no_blocks;
1308     for (i = 0; i <= pp->level; i++)
1309         close_block(pp->isamb, pp->block[i]);
1310     xfree(pp->block);
1311     xfree(pp);
1312 }
1313
1314 int isamb_block_info (ISAMB isamb, int cat)
1315 {
1316     if (cat >= 0 && cat < isamb->no_cat)
1317         return isamb->file[cat].head.block_size;
1318     return -1;
1319 }
1320
1321 void isamb_pp_close (ISAMB_PP pp)
1322 {
1323     isamb_pp_close_x(pp, 0, 0);
1324 }
1325
1326 /* simple recursive dumper .. */
1327 static void isamb_dump_r (ISAMB b, ISAMB_P pos, void (*pr)(const char *str),
1328                           int level)
1329 {
1330     char buf[1024];
1331     char prefix_str[1024];
1332     if (pos)
1333     {
1334         struct ISAMB_block *p = open_block(b, pos);
1335         sprintf(prefix_str, "%*s " ZINT_FORMAT " cat=%d size=%d max=%d items="
1336                 ZINT_FORMAT, level*2, "",
1337                 pos, p->cat, p->size, b->file[p->cat].head.block_max,
1338                 p->no_items);
1339         (*pr)(prefix_str);
1340         sprintf(prefix_str, "%*s " ZINT_FORMAT, level*2, "", pos);
1341         if (p->leaf)
1342         {
1343             while (p->offset < p->size)
1344             {
1345                 const char *src = p->bytes + p->offset;
1346                 char *dst = buf;
1347                 (*b->method->codec.decode)(p->decodeClientData, &dst, &src);
1348                 (*b->method->log_item)(YLOG_DEBUG, buf, prefix_str);
1349                 p->offset = src - (char*) p->bytes;
1350             }
1351             assert(p->offset == p->size);
1352         }
1353         else
1354         {
1355             const char *src = p->bytes + p->offset;
1356             ISAMB_P sub;
1357
1358             decode_ptr(&src, &sub);
1359             p->offset = src - (char*) p->bytes;
1360
1361             isamb_dump_r(b, sub, pr, level+1);
1362             
1363             while (p->offset < p->size)
1364             {
1365 #if INT_ENCODE
1366                 char file_item_buf[DST_ITEM_MAX];
1367                 char *file_item = file_item_buf;
1368                 void *c1 = (*b->method->codec.start)();
1369                 (*b->method->codec.decode)(c1, &file_item, &src);
1370                 (*b->method->codec.stop)(c1);
1371                 (*b->method->log_item)(YLOG_DEBUG, file_item_buf, prefix_str);
1372 #else
1373                 zint item_len;
1374                 decode_item_len(&src, &item_len);
1375                 (*b->method->log_item)(YLOG_DEBUG, src, prefix_str);
1376                 src += item_len;
1377 #endif
1378                 decode_ptr(&src, &sub);
1379                 
1380                 p->offset = src - (char*) p->bytes;
1381                 
1382                 isamb_dump_r(b, sub, pr, level+1);
1383             }           
1384         }
1385         close_block(b, p);
1386     }
1387 }
1388
1389 void isamb_dump(ISAMB b, ISAMB_P pos, void (*pr)(const char *str))
1390 {
1391     isamb_dump_r(b, pos, pr, 0);
1392 }
1393
1394 int isamb_pp_read(ISAMB_PP pp, void *buf)
1395 {
1396     return isamb_pp_forward(pp, buf, 0);
1397 }
1398
1399
1400 static int isamb_pp_on_right_node(ISAMB_PP pp, int level, const void *untilbuf)
1401 { /* looks one node higher to see if we should be on this node at all */
1402   /* useful in backing off quickly, and in avoiding tail descends */
1403   /* call with pp->level to begin with */
1404     struct ISAMB_block *p;
1405     int cmp;
1406     const char *src;
1407     ISAMB b = pp->isamb;
1408
1409     assert(level >= 0);
1410     if (level == 0)
1411     {
1412 #if ISAMB_DEBUG
1413             yaz_log(YLOG_DEBUG, "isamb_pp_on_right returning true for root");
1414 #endif
1415         return 1; /* we can never skip the root node */
1416     }
1417     level--;
1418     p = pp->block[level];
1419     assert(p->offset <= p->size);
1420     if (p->offset < p->size)
1421     {
1422 #if INT_ENCODE
1423         char file_item_buf[DST_ITEM_MAX];
1424         char *file_item = file_item_buf;
1425         void *c1 = (*b->method->codec.start)();
1426         assert(p->offset > 0); 
1427         src = p->bytes + p->offset;
1428         (*b->method->codec.decode)(c1, &file_item, &src);
1429         (*b->method->codec.stop)(c1);
1430         cmp = (*b->method->compare_item)(untilbuf, file_item_buf);
1431 #else
1432         zint item_len;
1433         assert(p->offset > 0); 
1434         src = p->bytes + p->offset;
1435         decode_item_len(&src, &item_len);
1436 #if ISAMB_DEBUG
1437         (*b->method->codec.log_item)(YLOG_DEBUG, untilbuf, "on_leaf: until");
1438         (*b->method->codec.log_item)(YLOG_DEBUG, src, "on_leaf: value");
1439 #endif
1440         cmp = (*b->method->compare_item)(untilbuf, src);
1441 #endif
1442         if (cmp < pp->scope)
1443         {  /* cmp<2 */
1444 #if ISAMB_DEBUG
1445             yaz_log(YLOG_DEBUG, "isamb_pp_on_right returning true "
1446                             "cmp=%d lev=%d ofs=%d", cmp, level, p->offset);
1447 #endif
1448             return 1; 
1449         }
1450         else
1451         {
1452 #if ISAMB_DEBUG
1453             yaz_log(YLOG_DEBUG, "isamb_pp_on_right returning false "
1454                             "cmp=%d lev=%d ofs=%d", cmp, level, p->offset);
1455 #endif
1456             return 0; 
1457         }
1458     }
1459     else {
1460 #if ISAMB_DEBUG
1461         yaz_log(YLOG_DEBUG, "isamb_pp_on_right at tail, looking higher "
1462                         "lev=%d", level);
1463 #endif
1464         return isamb_pp_on_right_node(pp, level, untilbuf);
1465     }
1466 } /* isamb_pp_on_right_node */
1467
1468 static int isamb_pp_read_on_leaf(ISAMB_PP pp, void *buf)
1469
1470     /* reads the next item on the current leaf, returns 0 if end of leaf*/
1471     struct ISAMB_block *p = pp->block[pp->level];
1472     char *dst;
1473     const char *src;
1474     assert(pp);
1475     assert(buf);
1476     if (p->offset == p->size)
1477     {
1478 #if ISAMB_DEBUG
1479         yaz_log(YLOG_DEBUG, "isamb_pp_read_on_leaf returning 0 on "
1480                 "node %d", p->pos);
1481 #endif
1482         return 0; /* at end of leaf */
1483     }
1484     src = p->bytes + p->offset;
1485     dst = buf;
1486     (*pp->isamb->method->codec.decode)(p->decodeClientData, &dst, &src);
1487     p->offset = src - (char*) p->bytes;
1488 #if ISAMB_DEBUG
1489     (*pp->isamb->method->codec.log_item)(YLOG_DEBUG, buf,
1490                                          "read_on_leaf returning 1");
1491 #endif
1492     pp->returned_numbers++;
1493     return 1;
1494 } /* read_on_leaf */
1495
1496 static int isamb_pp_forward_on_leaf(ISAMB_PP pp, void *buf, const void *untilbuf)
1497 { /* forwards on the current leaf, returns 0 if not found */
1498     int cmp;
1499     int skips = 0;
1500     while (1)
1501     {
1502         if (!isamb_pp_read_on_leaf(pp, buf))
1503             return 0;
1504         /* FIXME - this is an extra function call, inline the read? */
1505         cmp=(*pp->isamb->method->compare_item)(untilbuf, buf);
1506         if (cmp <pp->scope)
1507         {  /* cmp<2  found a good one */
1508 #if ISAMB_DEBUG
1509             if (skips)
1510                 yaz_log(YLOG_DEBUG, "isam_pp_fwd_on_leaf skipped %d items", skips);
1511 #endif
1512             pp->returned_numbers++;
1513             return 1;
1514         }
1515         if (!skips)
1516             if (!isamb_pp_on_right_node(pp, pp->level, untilbuf))
1517                 return 0; /* never mind the rest of this leaf */
1518         pp->skipped_numbers++;
1519         skips++;
1520     }
1521 } /* forward_on_leaf */
1522
1523 static int isamb_pp_climb_level(ISAMB_PP pp, ISAMB_P *pos)
1524 { /* climbs higher in the tree, until finds a level with data left */
1525   /* returns the node to (consider to) descend to in *pos) */
1526     struct ISAMB_block *p = pp->block[pp->level];
1527     const char *src;
1528 #if ISAMB_DEBUG
1529     yaz_log(YLOG_DEBUG, "isamb_pp_climb_level starting "
1530                    "at level %d node %d ofs=%d sz=%d",
1531                     pp->level, p->pos, p->offset, p->size);
1532 #endif
1533     assert(pp->level >= 0);
1534     assert(p->offset <= p->size);
1535     if (pp->level==0)
1536     {
1537 #if ISAMB_DEBUG
1538         yaz_log(YLOG_DEBUG, "isamb_pp_climb_level returning 0 at root");
1539 #endif
1540         return 0;
1541     }
1542     assert(pp->level>0); 
1543     close_block(pp->isamb, pp->block[pp->level]);
1544     pp->block[pp->level] = 0;
1545     (pp->level)--;
1546     p = pp->block[pp->level];
1547 #if ISAMB_DEBUG
1548     yaz_log(YLOG_DEBUG, "isamb_pp_climb_level climbed to level %d node %d ofs=%d",
1549                     pp->level, p->pos, p->offset);
1550 #endif
1551     assert(!p->leaf);
1552     assert(p->offset <= p->size);
1553     if (p->offset == p->size)
1554     {
1555         /* we came from the last pointer, climb on */
1556         if (!isamb_pp_climb_level(pp, pos))
1557             return 0;
1558         p = pp->block[pp->level];
1559     }
1560     else
1561     {
1562 #if INT_ENCODE
1563         char file_item_buf[DST_ITEM_MAX];
1564         char *file_item = file_item_buf;
1565         ISAMB b = pp->isamb;
1566         void *c1 = (*b->method->codec.start)();
1567 #else
1568         zint item_len;
1569 #endif
1570         /* skip the child we just came from */
1571 #if ISAMB_DEBUG
1572         yaz_log(YLOG_DEBUG, "isam_pp_climb_level: skipping lev=%d ofs=%d sz=%d", 
1573                         pp->level, p->offset, p->size);
1574 #endif
1575         assert (p->offset < p->size);
1576         src = p->bytes + p->offset;
1577 #if INT_ENCODE
1578         (*b->method->codec.decode)(c1, &file_item, &src);
1579         (*b->method->codec.stop)(c1);
1580 #else
1581         decode_item_len(&src, &item_len);
1582         src += item_len;
1583 #endif
1584         decode_ptr(&src, pos);
1585         p->offset = src - (char *)p->bytes;
1586             
1587     }
1588     return 1;
1589 } /* climb_level */
1590
1591
1592 static zint isamb_pp_forward_unode(ISAMB_PP pp, zint pos, const void *untilbuf)
1593 { /* scans a upper node until it finds a child <= untilbuf */
1594   /* pp points to the key value, as always. pos is the child read from */
1595   /* the buffer */
1596   /* if all values are too small, returns the last child in the node */
1597   /* FIXME - this can be detected, and avoided by looking at the */
1598   /* parent node, but that gets messy. Presumably the cost is */
1599   /* pretty low anyway */
1600     ISAMB b = pp->isamb;
1601     struct ISAMB_block *p = pp->block[pp->level];
1602     const char *src = p->bytes + p->offset;
1603     int cmp;
1604     zint nxtpos;
1605 #if ISAMB_DEBUG
1606     int skips = 0;
1607     yaz_log(YLOG_DEBUG, "isamb_pp_forward_unode starting "
1608                    "at level %d node %d ofs=%di sz=%d",
1609                     pp->level, p->pos, p->offset, p->size);
1610 #endif
1611     assert(!p->leaf);
1612     assert(p->offset <= p->size);
1613     if (p->offset == p->size)
1614     {
1615 #if ISAMB_DEBUG
1616             yaz_log(YLOG_DEBUG, "isamb_pp_forward_unode returning at end "
1617                    "at level %d node %d ofs=%di sz=%d",
1618                     pp->level, p->pos, p->offset, p->size);
1619 #endif
1620         return pos; /* already at the end of it */
1621     }
1622     while(p->offset < p->size)
1623     {
1624 #if INT_ENCODE
1625         char file_item_buf[DST_ITEM_MAX];
1626         char *file_item = file_item_buf;
1627         void *c1 = (*b->method->codec.start)();
1628         (*b->method->codec.decode)(c1, &file_item, &src);
1629         (*b->method->codec.stop)(c1);
1630         cmp = (*b->method->compare_item)(untilbuf, file_item_buf);
1631 #else
1632         zint item_len;
1633         decode_item_len(&src, &item_len);
1634         cmp = (*b->method->compare_item)(untilbuf, src);
1635         src += item_len;
1636 #endif
1637         decode_ptr(&src, &nxtpos);
1638         if (cmp<pp->scope) /* cmp<2 */
1639         {
1640 #if ISAMB_DEBUG
1641             yaz_log(YLOG_DEBUG, "isamb_pp_forward_unode returning a hit "
1642                    "at level %d node %d ofs=%d sz=%d",
1643                     pp->level, p->pos, p->offset, p->size);
1644 #endif
1645             return pos;
1646         } /* found one */
1647         pos = nxtpos;
1648         p->offset = src-(char*)p->bytes;
1649         (pp->skipped_nodes[pp->maxlevel - pp->level -1])++;
1650 #if ISAMB_DEBUG
1651         skips++;
1652 #endif
1653     }
1654 #if ISAMB_DEBUG
1655             yaz_log(YLOG_DEBUG, "isamb_pp_forward_unode returning at tail "
1656                    "at level %d node %d ofs=%d sz=%d skips=%d",
1657                     pp->level, p->pos, p->offset, p->size, skips);
1658 #endif
1659     return pos; /* that's the last one in the line */
1660     
1661 } /* forward_unode */
1662
1663 static void isamb_pp_descend_to_leaf(ISAMB_PP pp, ISAMB_P pos,
1664                                      const void *untilbuf)
1665 { /* climbs down the tree, from pos, to the leftmost leaf */
1666     struct ISAMB_block *p = pp->block[pp->level];
1667     const char *src;
1668     assert(!p->leaf);
1669 #if ISAMB_DEBUG
1670     yaz_log(YLOG_DEBUG, "isamb_pp_descend_to_leaf "
1671                    "starting at lev %d node %d ofs=%d lf=%d u=%p", 
1672                    pp->level, p->pos, p->offset, p->leaf, untilbuf);
1673 #endif
1674     if (untilbuf)
1675         pos = isamb_pp_forward_unode(pp, pos, untilbuf);
1676     ++(pp->level);
1677     assert(pos);
1678     p = open_block(pp->isamb, pos);
1679     pp->block[pp->level] = p;
1680     ++(pp->accessed_nodes[pp->maxlevel-pp->level]);
1681     ++(pp->no_blocks);
1682 #if ISAMB_DEBUG
1683     yaz_log(YLOG_DEBUG, "isamb_pp_descend_to_leaf "
1684                    "got lev %d node %d lf=%d", 
1685                    pp->level, p->pos, p->leaf);
1686 #endif
1687     if (p->leaf)
1688         return;
1689     assert (p->offset==0);
1690     src = p->bytes + p->offset;
1691     decode_ptr(&src, &pos);
1692     p->offset = src-(char*)p->bytes;
1693     isamb_pp_descend_to_leaf(pp, pos, untilbuf);
1694 #if ISAMB_DEBUG
1695     yaz_log(YLOG_DEBUG, "isamb_pp_descend_to_leaf "
1696                    "returning at lev %d node %d ofs=%d lf=%d", 
1697                    pp->level, p->pos, p->offset, p->leaf);
1698 #endif
1699 } /* descend_to_leaf */
1700
1701 static int isamb_pp_find_next_leaf(ISAMB_PP pp)
1702 { /* finds the next leaf by climbing up and down */
1703     ISAMB_P pos;
1704     if (!isamb_pp_climb_level(pp, &pos))
1705         return 0;
1706     isamb_pp_descend_to_leaf(pp, pos, 0);
1707     return 1;
1708 }
1709
1710 static int isamb_pp_climb_desc(ISAMB_PP pp,  const void *untilbuf)
1711 { /* climbs up and descends to a leaf where values >= *untilbuf are found */
1712     ISAMB_P pos;
1713 #if ISAMB_DEBUG
1714     struct ISAMB_block *p = pp->block[pp->level];
1715     yaz_log(YLOG_DEBUG, "isamb_pp_climb_desc starting "
1716                    "at level %d node %d ofs=%d sz=%d",
1717                     pp->level, p->pos, p->offset, p->size);
1718 #endif
1719     if (!isamb_pp_climb_level(pp, &pos))
1720         return 0;
1721     /* see if it would pay to climb one higher */
1722     if (!isamb_pp_on_right_node(pp, pp->level, untilbuf))
1723         if (!isamb_pp_climb_level(pp, &pos))
1724             return 0;
1725     isamb_pp_descend_to_leaf(pp, pos, untilbuf);
1726 #if ISAMB_DEBUG
1727     p = pp->block[pp->level];
1728     yaz_log(YLOG_DEBUG, "isamb_pp_climb_desc done "
1729                    "at level %d node %d ofs=%d sz=%d",
1730                     pp->level, p->pos, p->offset, p->size);
1731 #endif
1732     return 1;
1733 } /* climb_desc */
1734
1735 int isamb_pp_forward (ISAMB_PP pp, void *buf, const void *untilbuf)
1736 {
1737 #if ISAMB_DEBUG
1738     struct ISAMB_block *p = pp->block[pp->level];
1739     assert(p->leaf);
1740     yaz_log(YLOG_DEBUG, "isamb_pp_forward starting "
1741                    "at level %d node %d ofs=%d sz=%d u=%p sc=%d",
1742                     pp->level, p->pos, p->offset, p->size, untilbuf, scope);
1743 #endif
1744     if (untilbuf)
1745     {
1746         if (isamb_pp_forward_on_leaf(pp, buf, untilbuf))
1747         {
1748 #if ISAMB_DEBUG
1749             yaz_log(YLOG_DEBUG, "isamb_pp_forward (f) returning (A) "
1750                    "at level %d node %d ofs=%d sz=%d",
1751                     pp->level, p->pos, p->offset, p->size);
1752 #endif
1753             return 1;
1754         }
1755         if (! isamb_pp_climb_desc(pp, untilbuf))
1756         {
1757 #if ISAMB_DEBUG
1758             yaz_log(YLOG_DEBUG, "isamb_pp_forward (f) returning notfound (B) "
1759                    "at level %d node %d ofs=%d sz=%d",
1760                     pp->level, p->pos, p->offset, p->size);
1761 #endif
1762             return 0; /* could not find a leaf */
1763         }
1764         do {
1765             if (isamb_pp_forward_on_leaf(pp, buf, untilbuf))
1766             {
1767 #if ISAMB_DEBUG
1768                 yaz_log(YLOG_DEBUG, "isamb_pp_forward (f) returning (c) "
1769                         "at level %d node %d ofs=%d sz=%d",
1770                         pp->level, p->pos, p->offset, p->size);
1771 #endif
1772                 return 1;
1773             }
1774         } while (isamb_pp_find_next_leaf(pp));
1775         return 0; /* could not find at all */
1776     }
1777     else { /* no untilbuf, a straight read */
1778         /* FIXME - this should be moved
1779          * directly into the pp_read */
1780         /* keeping here now, to keep same
1781          * interface as the old fwd */
1782         if (isamb_pp_read_on_leaf(pp, buf))
1783         {
1784 #if ISAMB_DEBUG
1785             yaz_log(YLOG_DEBUG, "isamb_pp_forward (read) returning (D) "
1786                    "at level %d node %d ofs=%d sz=%d",
1787                     pp->level, p->pos, p->offset, p->size);
1788 #endif
1789             return 1;
1790         }
1791         if (isamb_pp_find_next_leaf(pp))
1792         {
1793 #if ISAMB_DEBUG
1794             yaz_log(YLOG_DEBUG, "isamb_pp_forward (read) returning (E) "
1795                    "at level %d node %d ofs=%d sz=%d",
1796                     pp->level, p->pos, p->offset, p->size);
1797 #endif
1798             return isamb_pp_read_on_leaf(pp, buf);
1799         }
1800         else
1801             return 0;
1802     }
1803 } /* isam_pp_forward (new version) */
1804
1805 void isamb_pp_pos(ISAMB_PP pp, double *current, double *total)
1806 { /* return an estimate of the current position and of the total number of */
1807   /* occureences in the isam tree, based on the current leaf */
1808     struct ISAMB_block *p = pp->block[pp->level];
1809     assert(total);
1810     assert(current);
1811     assert(p->leaf);
1812
1813     *total = (double) (pp->block[0]->no_items);
1814     *current = (double) pp->returned_numbers;
1815 #if ISAMB_DEBUG
1816     yaz_log(YLOG_LOG, "isamb_pp_pos returning: cur= %0.1f tot=%0.1f rn="
1817          ZINT_FORMAT, *current, *total, pp->returned_numbers);
1818 #endif
1819 }
1820
1821 int isamb_pp_forward2(ISAMB_PP pp, void *buf, const void *untilb)
1822 {
1823     char *dst = buf;
1824     const char *src;
1825     struct ISAMB_block *p = pp->block[pp->level];
1826     ISAMB b = pp->isamb;
1827     if (!p)
1828         return 0;
1829 again:
1830     while (p->offset == p->size)
1831     {
1832         ISAMB_P pos;
1833 #if INT_ENCODE
1834         const char *src_0;
1835         void *c1;
1836         char file_item_buf[DST_ITEM_MAX];
1837         char *file_item = file_item_buf;
1838 #else
1839         zint item_len;
1840 #endif
1841         while (p->offset == p->size)
1842         {
1843             if (pp->level == 0)
1844                 return 0;
1845             close_block (pp->isamb, pp->block[pp->level]);
1846             pp->block[pp->level] = 0;
1847             (pp->level)--;
1848             p = pp->block[pp->level];
1849             assert (!p->leaf);  
1850         }
1851
1852         assert(!p->leaf);
1853         src = p->bytes + p->offset;
1854        
1855 #if INT_ENCODE
1856         c1 = (*b->method->codec.start)();
1857         (*b->method->codec.decode)(c1, &file_item, &src);
1858 #else
1859         decode_ptr (&src, &item_len);
1860         src += item_len;
1861 #endif        
1862         decode_ptr (&src, &pos);
1863         p->offset = src - (char*) p->bytes;
1864
1865         src = p->bytes + p->offset;
1866
1867         while(1)
1868         {
1869             if (!untilb || p->offset == p->size)
1870                 break;
1871             assert(p->offset < p->size);
1872 #if INT_ENCODE
1873             src_0 = src;
1874             file_item = file_item_buf;
1875             (*b->method->codec.reset)(c1);
1876             (*b->method->codec.decode)(c1, &file_item, &src);
1877             if ((*b->method->compare_item)(untilb, file_item_buf) <= 1)
1878             {
1879                 src = src_0;
1880                 break;
1881             }
1882 #else
1883             decode_item_len(&src, &item_len);
1884             if ((*b->method->compare_item)(untilb, src) <= 1)
1885                 break;
1886             src += item_len;
1887 #endif
1888             decode_ptr (&src, &pos);
1889             p->offset = src - (char*) p->bytes;
1890         }
1891
1892         pp->level++;
1893
1894         while (1)
1895         {
1896             pp->block[pp->level] = p = open_block (pp->isamb, pos);
1897
1898             pp->total_size += p->size;
1899             pp->no_blocks++;
1900             
1901             if (p->leaf) 
1902             {
1903                 break;
1904             }
1905             
1906             src = p->bytes + p->offset;
1907             while(1)
1908             {
1909                 decode_ptr (&src, &pos);
1910                 p->offset = src - (char*) p->bytes;
1911                 
1912                 if (!untilb || p->offset == p->size)
1913                     break;
1914                 assert(p->offset < p->size);
1915 #if INT_ENCODE
1916                 src_0 = src;
1917                 file_item = file_item_buf;
1918                 (*b->method->codec.reset)(c1);
1919                 (*b->method->codec.decode)(c1, &file_item, &src);
1920                 if ((*b->method->compare_item)(untilb, file_item_buf) <= 1)
1921                 {
1922                     src = src_0;
1923                     break;
1924                 }
1925 #else
1926                 decode_ptr (&src, &item_len);
1927                 if ((*b->method->compare_item)(untilb, src) <= 1)
1928                     break;
1929                 src += item_len;
1930 #endif
1931             }
1932             pp->level++;
1933         }
1934 #if INT_ENCODE
1935         (*b->method->codec.stop)(c1);
1936 #endif
1937     }
1938     assert (p->offset < p->size);
1939     assert (p->leaf);
1940     while(1)
1941     {
1942         char *dst0 = dst;
1943         src = p->bytes + p->offset;
1944         (*pp->isamb->method->codec.decode)(p->decodeClientData, &dst, &src);
1945         p->offset = src - (char*) p->bytes;
1946         if (!untilb || (*pp->isamb->method->compare_item)(untilb, dst0) <= 1)
1947             break;
1948         dst = dst0;
1949         if (p->offset == p->size) goto again;
1950     }
1951     return 1;
1952 }