Added a function to estimate total size and current position in isamb
[idzebra-moved-to-github.git] / isamb / isamb.c
1 /* $Id: isamb.c,v 1.44 2004-06-04 13:54:56 heikki Exp $
2    Copyright (C) 1995,1996,1997,1998,1999,2000,2001,2002,2003,2004
3    Index Data Aps
4
5 This file is part of the Zebra server.
6
7 Zebra is free software; you can redistribute it and/or modify it under
8 the terms of the GNU General Public License as published by the Free
9 Software Foundation; either version 2, or (at your option) any later
10 version.
11
12 Zebra is distributed in the hope that it will be useful, but WITHOUT ANY
13 WARRANTY; without even the implied warranty of MERCHANTABILITY or
14 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
15 for more details.
16
17 You should have received a copy of the GNU General Public License
18 along with Zebra; see the file LICENSE.zebra.  If not, write to the
19 Free Software Foundation, 59 Temple Place - Suite 330, Boston, MA
20 02111-1307, USA.
21 */
22
23 #include <string.h>
24 #include <yaz/xmalloc.h>
25 #include <yaz/log.h>
26 #include <isamb.h>
27 #include <assert.h>
28
29 #ifndef ISAMB_DEBUG
30 #define ISAMB_DEBUG 0
31 #endif
32
33 struct ISAMB_head {
34     int first_block;
35     int last_block;
36     int block_size;
37     int block_max;
38     int free_list;
39 };
40
41 #define ISAMB_DATA_OFFSET 3
42
43 /* maximum size of encoded buffer */
44 #define DST_ITEM_MAX 256
45
46 #define ISAMB_MAX_LEVEL 10
47 /* approx 2*max page + max size of item */
48 #define DST_BUF_SIZE 16840
49
50 #define ISAMB_CACHE_ENTRY_SIZE 4096
51
52 /* CAT_MAX: _must_ be power of 2 */
53 #define CAT_MAX 4
54 #define CAT_MASK (CAT_MAX-1)
55 /* CAT_NO: <= CAT_MAX */
56 #define CAT_NO 4
57
58 /* ISAMB_PTR_CODEC=1 var, =0 fixed */
59 #define ISAMB_PTR_CODEC  1
60
61 struct ISAMB_cache_entry {
62     ISAMB_P pos;
63     unsigned char *buf;
64     int dirty;
65     int hits;
66     struct ISAMB_cache_entry *next;
67 };
68
69 struct ISAMB_file {
70     BFile bf;
71     int head_dirty;
72     struct ISAMB_head head;
73     struct ISAMB_cache_entry *cache_entries;
74 };
75
76 struct ISAMB_s {
77     BFiles bfs;
78     ISAMC_M *method;
79
80     struct ISAMB_file *file;
81     int no_cat;
82     int cache; /* 0=no cache, 1=use cache, -1=dummy isam (for testing only) */
83     int log_io;        /* log level for bf_read/bf_write calls */
84     int log_freelist;  /* log level for freelist handling */
85     int skipped_numbers; /* on a leaf node */
86     int returned_numbers; 
87     int skipped_nodes[ISAMB_MAX_LEVEL]; /* [0]=skipped leaves, 1=higher etc */
88     int accessed_nodes[ISAMB_MAX_LEVEL]; /* nodes we did not skip */
89 };
90
91 struct ISAMB_block {
92     ISAMB_P pos;
93     int cat;
94     int size;
95     int leaf;
96     int dirty;
97     int deleted;
98     int offset;
99     char *bytes;
100     char *cbuf;
101     unsigned char *buf;
102     void *decodeClientData;
103     int log_rw;
104 };
105
106 struct ISAMB_PP_s {
107     ISAMB isamb;
108     ISAMB_P pos;
109     int level;
110     int maxlevel; /* total depth */
111     int total_size;
112     int no_blocks;
113     int skipped_numbers; /* on a leaf node */
114     int returned_numbers; 
115     int skipped_nodes[ISAMB_MAX_LEVEL]; /* [0]=skipped leaves, 1=higher etc */
116     int accessed_nodes[ISAMB_MAX_LEVEL]; /* nodes we did not skip */
117     struct ISAMB_block **block;
118 };
119
120 void isamb_pp_pos( ISAMB_PP pp, int *current, int *total );
121          /* FIXME - this should be in a header file */
122
123 #if ISAMB_PTR_CODEC
124 static void encode_ptr (char **dst, unsigned pos)
125 {
126     unsigned char *bp = (unsigned char*) *dst;
127
128     while (pos > 127)
129     {
130          *bp++ = 128 | (pos & 127);
131          pos = pos >> 7;
132     }
133     *bp++ = pos;
134     *dst = (char *) bp;
135 }
136 #else
137 static void encode_ptr (char **dst, unsigned pos)
138 {
139     memcpy(*dst, &pos, sizeof(pos));
140     (*dst) += sizeof(pos);
141 }
142 #endif
143
144 #if ISAMB_PTR_CODEC
145 static void decode_ptr (char **src1, int *pos)
146 {
147     unsigned char **src = (unsigned char **) src1;
148     unsigned d = 0;
149     unsigned char c;
150     unsigned r = 0;
151
152     while (((c = *(*src)++) & 128))
153     {
154         d += ((c & 127) << r);
155         r += 7;
156     }
157     d += (c << r);
158     *pos = d;
159 }
160 #else
161 static void decode_ptr (char **src, int *pos)
162 {
163      memcpy (pos, *src, sizeof(*pos));
164      (*src) += sizeof(*pos);
165 }
166 #endif
167
168 ISAMB isamb_open (BFiles bfs, const char *name, int writeflag, ISAMC_M *method,
169                   int cache)
170 {
171     ISAMB isamb = xmalloc (sizeof(*isamb));
172     int i, b_size = 32;
173
174     isamb->bfs = bfs;
175     isamb->method = (ISAMC_M *) xmalloc (sizeof(*method));
176     memcpy (isamb->method, method, sizeof(*method));
177     isamb->no_cat = CAT_NO;
178     isamb->log_io = 0;
179     isamb->log_freelist = 0;
180     isamb->cache = cache;
181     isamb->skipped_numbers=0;
182     isamb->returned_numbers=0;
183     for (i=0;i<ISAMB_MAX_LEVEL;i++)
184       isamb->skipped_nodes[i]= isamb->accessed_nodes[i]=0;
185
186     assert (cache == 0);
187     isamb->file = xmalloc (sizeof(*isamb->file) * isamb->no_cat);
188     for (i = 0; i<isamb->no_cat; i++)
189     {
190         char fname[DST_BUF_SIZE];
191         isamb->file[i].cache_entries = 0;
192         isamb->file[i].head_dirty = 0;
193         sprintf (fname, "%s%c", name, i+'A');
194         if (cache)
195             isamb->file[i].bf = bf_open (bfs, fname, ISAMB_CACHE_ENTRY_SIZE,
196                                          writeflag);
197         else
198             isamb->file[i].bf = bf_open (bfs, fname, b_size, writeflag);
199
200         
201         if (!bf_read (isamb->file[i].bf, 0, 0, sizeof(struct ISAMB_head),
202                       &isamb->file[i].head))
203         {
204             isamb->file[i].head.first_block = ISAMB_CACHE_ENTRY_SIZE/b_size+1;
205             isamb->file[i].head.last_block = isamb->file[i].head.first_block;
206             isamb->file[i].head.block_size = b_size;
207             isamb->file[i].head.block_max = b_size - ISAMB_DATA_OFFSET;
208             isamb->file[i].head.free_list = 0;
209         }
210         assert (isamb->file[i].head.block_size >= ISAMB_DATA_OFFSET);
211         isamb->file[i].head_dirty = 0;
212         assert(isamb->file[i].head.block_size == b_size);
213         b_size = b_size * 4;
214     }
215 #if ISAMB_DEBUG
216     logf(LOG_WARN, "isamb debug enabled. Things will be slower than usual");
217 #endif
218     return isamb;
219 }
220
221 static void flush_blocks (ISAMB b, int cat)
222 {
223     while (b->file[cat].cache_entries)
224     {
225         struct ISAMB_cache_entry *ce_this = b->file[cat].cache_entries;
226         b->file[cat].cache_entries = ce_this->next;
227
228         if (ce_this->dirty)
229         {
230             yaz_log (b->log_io, "bf_write: flush_blocks");
231             bf_write (b->file[cat].bf, ce_this->pos, 0, 0, ce_this->buf);
232         }
233         xfree (ce_this->buf);
234         xfree (ce_this);
235     }
236 }
237
238 static int get_block (ISAMB b, ISAMC_P pos, char *userbuf, int wr)
239 {
240     int cat = pos&CAT_MASK;
241     int off = ((pos/CAT_MAX) & 
242                (ISAMB_CACHE_ENTRY_SIZE / b->file[cat].head.block_size - 1))
243         * b->file[cat].head.block_size;
244     int norm = pos / (CAT_MASK*ISAMB_CACHE_ENTRY_SIZE / b->file[cat].head.block_size);
245     int no = 0;
246     struct ISAMB_cache_entry **ce, *ce_this = 0, **ce_last = 0;
247
248     if (!b->cache)
249         return 0;
250
251     assert (ISAMB_CACHE_ENTRY_SIZE >= b->file[cat].head.block_size);
252     for (ce = &b->file[cat].cache_entries; *ce; ce = &(*ce)->next, no++)
253     {
254         ce_last = ce;
255         if ((*ce)->pos == norm)
256         {
257             ce_this = *ce;
258             *ce = (*ce)->next;   /* remove from list */
259             
260             ce_this->next = b->file[cat].cache_entries;  /* move to front */
261             b->file[cat].cache_entries = ce_this;
262             
263             if (wr)
264             {
265                 memcpy (ce_this->buf + off, userbuf, 
266                         b->file[cat].head.block_size);
267                 ce_this->dirty = 1;
268             }
269             else
270                 memcpy (userbuf, ce_this->buf + off,
271                         b->file[cat].head.block_size);
272             return 1;
273         }
274     }
275     if (no >= 40)
276     {
277         assert (no == 40);
278         assert (ce_last && *ce_last);
279         ce_this = *ce_last;
280         *ce_last = 0;  /* remove the last entry from list */
281         if (ce_this->dirty)
282         {
283             yaz_log (b->log_io, "bf_write: get_block");
284             bf_write (b->file[cat].bf, ce_this->pos, 0, 0, ce_this->buf);
285         }
286         xfree (ce_this->buf);
287         xfree (ce_this);
288     }
289     ce_this = xmalloc (sizeof(*ce_this));
290     ce_this->next = b->file[cat].cache_entries;
291     b->file[cat].cache_entries = ce_this;
292     ce_this->buf = xmalloc (ISAMB_CACHE_ENTRY_SIZE);
293     ce_this->pos = norm;
294     yaz_log (b->log_io, "bf_read: get_block");
295     if (!bf_read (b->file[cat].bf, norm, 0, 0, ce_this->buf))
296         memset (ce_this->buf, 0, ISAMB_CACHE_ENTRY_SIZE);
297     if (wr)
298     {
299         memcpy (ce_this->buf + off, userbuf, b->file[cat].head.block_size);
300         ce_this->dirty = 1;
301     }
302     else
303     {
304         ce_this->dirty = 0;
305         memcpy (userbuf, ce_this->buf + off, b->file[cat].head.block_size);
306     }
307     return 1;
308 }
309
310
311 void isamb_close (ISAMB isamb)
312 {
313     int i;
314     for (i=0;isamb->accessed_nodes[i];i++)
315         logf(LOG_DEBUG,"isamb_close  level leaf-%d: %d read, %d skipped",
316              i, isamb->accessed_nodes[i], isamb->skipped_nodes[i]);
317     logf(LOG_DEBUG,"isamb_close returned %d values, skipped %d",
318          isamb->skipped_numbers, isamb->returned_numbers);
319     for (i = 0; i<isamb->no_cat; i++)
320     {
321         flush_blocks (isamb, i);
322         if (isamb->file[i].head_dirty)
323             bf_write (isamb->file[i].bf, 0, 0,
324                       sizeof(struct ISAMB_head), &isamb->file[i].head);
325         
326         bf_close (isamb->file[i].bf);
327     }
328     xfree (isamb->file);
329     xfree (isamb->method);
330     xfree (isamb);
331 }
332
333 static struct ISAMB_block *open_block (ISAMB b, ISAMC_P pos)
334 {
335     int cat = pos&CAT_MASK;
336     struct ISAMB_block *p;
337     if (!pos)
338         return 0;
339     p = xmalloc (sizeof(*p));
340     p->pos = pos;
341     p->cat = pos & CAT_MASK;
342     p->buf = xmalloc (b->file[cat].head.block_size);
343     p->cbuf = 0;
344
345     if (!get_block (b, pos, p->buf, 0))
346     {
347         yaz_log (b->log_io, "bf_read: open_block");
348         if (!bf_read (b->file[cat].bf, pos/CAT_MAX, 0, 0, p->buf))
349         {
350             yaz_log (LOG_FATAL, "isamb: read fail for pos=%ld block=%ld",
351                      (long) pos, (long) pos/CAT_MAX);
352             abort();
353         }
354     }
355     p->bytes = p->buf + ISAMB_DATA_OFFSET;
356     p->leaf = p->buf[0];
357     p->size = (p->buf[1] + 256 * p->buf[2]) - ISAMB_DATA_OFFSET;
358     if (p->size < 0)
359     {
360         yaz_log (LOG_FATAL, "Bad block size %d in pos=%d\n", p->size, pos);
361     }
362     assert (p->size >= 0);
363     p->offset = 0;
364     p->dirty = 0;
365     p->deleted = 0;
366     p->decodeClientData = (*b->method->code_start)(ISAMC_DECODE);
367     yaz_log (LOG_DEBUG, "isamb_open_block: Opened block %d ofs=%d",pos, p->offset);
368     return p;
369 }
370
371 struct ISAMB_block *new_block (ISAMB b, int leaf, int cat)
372 {
373     struct ISAMB_block *p;
374
375     p = xmalloc (sizeof(*p));
376     p->buf = xmalloc (b->file[cat].head.block_size);
377
378     if (!b->file[cat].head.free_list)
379     {
380         int block_no;
381         block_no = b->file[cat].head.last_block++;
382         p->pos = block_no * CAT_MAX + cat;
383     }
384     else
385     {
386         p->pos = b->file[cat].head.free_list;
387         assert((p->pos & CAT_MASK) == cat);
388         if (!get_block (b, p->pos, p->buf, 0))
389         {
390             yaz_log (b->log_io, "bf_read: new_block");
391             if (!bf_read (b->file[cat].bf, p->pos/CAT_MAX, 0, 0, p->buf))
392             {
393                 yaz_log (LOG_FATAL, "isamb: read fail for pos=%ld block=%ld",
394                          (long) p->pos/CAT_MAX, (long) p->pos/CAT_MAX);
395                 abort ();
396             }
397         }
398         yaz_log (b->log_freelist, "got block %d from freelist %d:%d", p->pos,
399                  cat, p->pos/CAT_MAX);
400         memcpy (&b->file[cat].head.free_list, p->buf, sizeof(int));
401     }
402     p->cat = cat;
403     b->file[cat].head_dirty = 1;
404     memset (p->buf, 0, b->file[cat].head.block_size);
405     p->bytes = p->buf + ISAMB_DATA_OFFSET;
406     p->leaf = leaf;
407     p->size = 0;
408     p->dirty = 1;
409     p->deleted = 0;
410     p->offset = 0;
411     p->decodeClientData = (*b->method->code_start)(ISAMC_DECODE);
412     return p;
413 }
414
415 struct ISAMB_block *new_leaf (ISAMB b, int cat)
416 {
417     return new_block (b, 1, cat);
418 }
419
420
421 struct ISAMB_block *new_int (ISAMB b, int cat)
422 {
423     return new_block (b, 0, cat);
424 }
425
426 static void check_block (ISAMB b, struct ISAMB_block *p)
427 {
428     if (p->leaf)
429     {
430         ;
431     }
432     else
433     {
434         /* sanity check */
435         char *startp = p->bytes;
436         char *src = startp;
437         char *endp = p->bytes + p->size;
438         int pos;
439             
440         decode_ptr (&src, &pos);
441         assert ((pos&CAT_MASK) == p->cat);
442         while (src != endp)
443         {
444             int item_len;
445             decode_ptr (&src, &item_len);
446             assert (item_len > 0 && item_len < 30);
447             src += item_len;
448             decode_ptr (&src, &pos);
449             assert ((pos&CAT_MASK) == p->cat);
450         }
451     }
452 }
453
454 void close_block (ISAMB b, struct ISAMB_block *p)
455 {
456     if (!p)
457         return;
458     if (p->deleted)
459     {
460         yaz_log (b->log_freelist, "release block %d from freelist %d:%d",
461                  p->pos, p->cat, p->pos/CAT_MAX);
462         memcpy (p->buf, &b->file[p->cat].head.free_list, sizeof(int));
463         b->file[p->cat].head.free_list = p->pos;
464         if (!get_block (b, p->pos, p->buf, 1))
465         {
466             yaz_log (b->log_io, "bf_write: close_block (deleted)");
467             bf_write (b->file[p->cat].bf, p->pos/CAT_MAX, 0, 0, p->buf);
468         }
469     }
470     else if (p->dirty)
471     {
472         int size = p->size + ISAMB_DATA_OFFSET;
473         assert (p->size >= 0);
474         p->buf[0] = p->leaf;
475         p->buf[1] = size & 255;
476         p->buf[2] = size >> 8;
477         check_block(b, p);
478         if (!get_block (b, p->pos, p->buf, 1))
479         {
480             yaz_log (b->log_io, "bf_write: close_block");
481             bf_write (b->file[p->cat].bf, p->pos/CAT_MAX, 0, 0, p->buf);
482         }
483     }
484     (*b->method->code_stop)(ISAMC_DECODE, p->decodeClientData);
485     xfree (p->buf);
486     xfree (p);
487 }
488
489 int insert_sub (ISAMB b, struct ISAMB_block **p,
490                 void *new_item, int *mode,
491                 ISAMC_I *stream,
492                 struct ISAMB_block **sp,
493                 void *sub_item, int *sub_size,
494                 void *max_item);
495
496 int insert_int (ISAMB b, struct ISAMB_block *p, void *lookahead_item,
497                 int *mode,
498                 ISAMC_I *stream, struct ISAMB_block **sp,
499                 void *split_item, int *split_size, void *last_max_item)
500 {
501     char *startp = p->bytes;
502     char *src = startp;
503     char *endp = p->bytes + p->size;
504     int pos;
505     struct ISAMB_block *sub_p1 = 0, *sub_p2 = 0;
506     char sub_item[DST_ITEM_MAX];
507     int sub_size;
508     int more;
509
510     *sp = 0;
511
512     assert(p->size >= 0);
513     decode_ptr (&src, &pos);
514     while (src != endp)
515     {
516         int item_len;
517         int d;
518         char *src0 = src;
519         decode_ptr (&src, &item_len);
520         d = (*b->method->compare_item)(src, lookahead_item);
521         if (d > 0)
522         {
523             sub_p1 = open_block (b, pos);
524             assert (sub_p1);
525             more = insert_sub (b, &sub_p1, lookahead_item, mode,
526                                stream, &sub_p2, 
527                                sub_item, &sub_size, src);
528             src = src0;
529             break;
530         }
531         src += item_len;
532         decode_ptr (&src, &pos);
533     }
534     if (!sub_p1)
535     {
536         sub_p1 = open_block (b, pos);
537         assert (sub_p1);
538         more = insert_sub (b, &sub_p1, lookahead_item, mode, stream, &sub_p2, 
539                            sub_item, &sub_size, last_max_item);
540     }
541     if (sub_p2)
542     {
543         /* there was a split - must insert pointer in this one */
544         char dst_buf[DST_BUF_SIZE];
545         char *dst = dst_buf;
546
547         assert (sub_size < 30 && sub_size > 1);
548
549         memcpy (dst, startp, src - startp);
550                 
551         dst += src - startp;
552
553         encode_ptr (&dst, sub_size);      /* sub length and item */
554         memcpy (dst, sub_item, sub_size);
555         dst += sub_size;
556
557         encode_ptr (&dst, sub_p2->pos);   /* pos */
558
559         if (endp - src)                   /* remaining data */
560         {
561             memcpy (dst, src, endp - src);
562             dst += endp - src;
563         }
564         p->size = dst - dst_buf;
565         assert (p->size >= 0);
566         if (p->size <= b->file[p->cat].head.block_max)
567         {
568             memcpy (startp, dst_buf, dst - dst_buf);
569         }
570         else
571         {
572             int p_new_size;
573             char *half;
574             src = dst_buf;
575             endp = dst;
576
577             half = src + b->file[p->cat].head.block_size/2;
578             decode_ptr (&src, &pos);
579             while (src <= half)
580             {
581                 decode_ptr (&src, split_size);
582                 src += *split_size;
583                 decode_ptr (&src, &pos);
584             }
585             p_new_size = src - dst_buf;
586             memcpy (p->bytes, dst_buf, p_new_size);
587
588             decode_ptr (&src, split_size);
589             memcpy (split_item, src, *split_size);
590             src += *split_size;
591
592             *sp = new_int (b, p->cat);
593             (*sp)->size = endp - src;
594             memcpy ((*sp)->bytes, src, (*sp)->size);
595
596             p->size = p_new_size;
597         }
598         p->dirty = 1;
599         close_block (b, sub_p2);
600     }
601     close_block (b, sub_p1);
602     return more;
603 }
604
605
606 int insert_leaf (ISAMB b, struct ISAMB_block **sp1, void *lookahead_item,
607                  int *lookahead_mode, ISAMC_I *stream,
608                  struct ISAMB_block **sp2,
609                  void *sub_item, int *sub_size,
610                  void *max_item)
611 {
612     struct ISAMB_block *p = *sp1;
613     char *src = 0, *endp = 0;
614     char dst_buf[DST_BUF_SIZE], *dst = dst_buf;
615     int new_size;
616     void *c1 = (*b->method->code_start)(ISAMC_DECODE);
617     void *c2 = (*b->method->code_start)(ISAMC_ENCODE);
618     int more = 1;
619     int quater = b->file[b->no_cat-1].head.block_max / CAT_MAX;
620     char *cut = dst_buf + quater * 2;
621     char *maxp = dst_buf + b->file[b->no_cat-1].head.block_max;
622     char *half1 = 0;
623     char *half2 = 0;
624     char cut_item_buf[DST_ITEM_MAX];
625     int cut_item_size = 0;
626
627     if (p && p->size)
628     {
629         char file_item_buf[DST_ITEM_MAX];
630         char *file_item = file_item_buf;
631             
632         src = p->bytes;
633         endp = p->bytes + p->size;
634         (*b->method->code_item)(ISAMC_DECODE, c1, &file_item, &src);
635         while (1)
636         {
637             char *dst_item = 0;
638             char *dst_0 = dst;
639             char *lookahead_next;
640             int d = -1;
641             
642             if (lookahead_item)
643                 d = (*b->method->compare_item)(file_item_buf, lookahead_item);
644             
645             if (d > 0)
646             {
647                 dst_item = lookahead_item;
648                 if (!*lookahead_mode)
649                 {
650                     yaz_log (LOG_WARN, "isamb: Inconsistent register (1)");
651                     assert (*lookahead_mode);
652                 }
653             }
654             else
655                 dst_item = file_item_buf;
656             if (!*lookahead_mode && d == 0)
657             {
658                 p->dirty = 1;
659             }
660             else if (!half1 && dst > cut)
661             {
662                 char *dst_item_0 = dst_item;
663                 half1 = dst; /* candidate for splitting */
664                 
665                 (*b->method->code_item)(ISAMC_ENCODE, c2, &dst, &dst_item);
666                 
667                 cut_item_size = dst_item - dst_item_0;
668                 memcpy (cut_item_buf, dst_item_0, cut_item_size);
669                 
670                 half2 = dst;
671             }
672             else
673                 (*b->method->code_item)(ISAMC_ENCODE, c2, &dst, &dst_item);
674             if (d > 0)  
675             {
676                 if (dst > maxp)
677                 {
678                     dst = dst_0;
679                     lookahead_item = 0;
680                 }
681                 else
682                 {
683                     lookahead_next = lookahead_item;
684                     if (!(*stream->read_item)(stream->clientData,
685                                               &lookahead_next,
686                                               lookahead_mode))
687                     {
688                         lookahead_item = 0;
689                         more = 0;
690                     }
691                     if (lookahead_item && max_item &&
692                         (*b->method->compare_item)(max_item, lookahead_item) <= 0)
693                     {
694                         /* max_item 1 */
695                         lookahead_item = 0;
696                     }
697                     
698                     p->dirty = 1;
699                 }
700             }
701             else if (d == 0)
702             {
703                 lookahead_next = lookahead_item;
704                 if (!(*stream->read_item)(stream->clientData,
705                                           &lookahead_next, lookahead_mode))
706                 {
707                     lookahead_item = 0;
708                     more = 0;
709                 }
710                 if (src == endp)
711                     break;
712                 file_item = file_item_buf;
713                 (*b->method->code_item)(ISAMC_DECODE, c1, &file_item, &src);
714             }
715             else
716             {
717                 if (src == endp)
718                     break;
719                 file_item = file_item_buf;
720                 (*b->method->code_item)(ISAMC_DECODE, c1, &file_item, &src);
721             }
722         }
723     }
724     maxp = dst_buf + b->file[b->no_cat-1].head.block_max + quater;
725     while (lookahead_item)
726     {
727         char *dst_item = lookahead_item;
728         char *dst_0 = dst;
729         
730         if (max_item &&
731             (*b->method->compare_item)(max_item, lookahead_item) <= 0)
732         {
733             /* max_item 2 */
734             break;
735         }
736         if (!*lookahead_mode)
737         {
738             yaz_log (LOG_WARN, "isamb: Inconsistent register (2)");
739             abort();
740         }
741         else if (!half1 && dst > cut)   
742         {
743             char *dst_item_0 = dst_item;
744             half1 = dst; /* candidate for splitting */
745             
746             (*b->method->code_item)(ISAMC_ENCODE, c2, &dst, &dst_item);
747             
748             cut_item_size = dst_item - dst_item_0;
749             memcpy (cut_item_buf, dst_item_0, cut_item_size);
750             
751             half2 = dst;
752         }
753         else
754             (*b->method->code_item)(ISAMC_ENCODE, c2, &dst, &dst_item);
755
756         if (dst > maxp)
757         {
758             dst = dst_0;
759             break;
760         }
761         if (p)
762             p->dirty = 1;
763         dst_item = lookahead_item;
764         if (!(*stream->read_item)(stream->clientData, &dst_item,
765                                   lookahead_mode))
766         {
767             lookahead_item = 0;
768             more = 0;
769         }
770     }
771     new_size = dst - dst_buf;
772     if (p && p->cat != b->no_cat-1 && 
773         new_size > b->file[p->cat].head.block_max)
774     {
775         /* non-btree block will be removed */
776         p->deleted = 1;
777         close_block (b, p);
778         /* delete it too!! */
779         p = 0; /* make a new one anyway */
780     }
781     if (!p)
782     {   /* must create a new one */
783         int i;
784         for (i = 0; i < b->no_cat; i++)
785             if (new_size <= b->file[i].head.block_max)
786                 break;
787         if (i == b->no_cat)
788             i = b->no_cat - 1;
789         p = new_leaf (b, i);
790     }
791     if (new_size > b->file[p->cat].head.block_max)
792     {
793         char *first_dst;
794         char *cut_item = cut_item_buf;
795
796         assert (half1);
797         assert (half2);
798
799        /* first half */
800         p->size = half1 - dst_buf;
801         memcpy (p->bytes, dst_buf, half1 - dst_buf);
802
803         /* second half */
804         *sp2 = new_leaf (b, p->cat);
805
806         (*b->method->code_reset)(c2);
807
808         first_dst = (*sp2)->bytes;
809
810         (*b->method->code_item)(ISAMC_ENCODE, c2, &first_dst, &cut_item);
811
812         memcpy (first_dst, half2, dst - half2);
813
814         (*sp2)->size = (first_dst - (*sp2)->bytes) + (dst - half2);
815         (*sp2)->dirty = 1;
816         p->dirty = 1;
817         memcpy (sub_item, cut_item_buf, cut_item_size);
818         *sub_size = cut_item_size;
819     }
820     else
821     {
822         memcpy (p->bytes, dst_buf, dst - dst_buf);
823         p->size = new_size;
824     }
825     (*b->method->code_stop)(ISAMC_DECODE, c1);
826     (*b->method->code_stop)(ISAMC_ENCODE, c2);
827     *sp1 = p;
828     return more;
829 }
830
831 int insert_sub (ISAMB b, struct ISAMB_block **p, void *new_item,
832                 int *mode,
833                 ISAMC_I *stream,
834                 struct ISAMB_block **sp,
835                 void *sub_item, int *sub_size,
836                 void *max_item)
837 {
838     if (!*p || (*p)->leaf)
839         return insert_leaf (b, p, new_item, mode, stream, sp, sub_item, 
840                             sub_size, max_item);
841     else
842         return insert_int (b, *p, new_item, mode, stream, sp, sub_item,
843                            sub_size, max_item);
844 }
845
846 int isamb_unlink (ISAMB b, ISAMC_P pos)
847 {
848     struct ISAMB_block *p1;
849
850     if (!pos)
851         return 0;
852     p1 = open_block(b, pos);
853     p1->deleted = 1;
854     if (!p1->leaf)
855     {
856         int sub_p;
857         int item_len;
858         char *src = p1->bytes + p1->offset;
859
860         decode_ptr(&src, &sub_p);
861         isamb_unlink(b, sub_p);
862         
863         while (src != p1->bytes + p1->size)
864         {
865             decode_ptr(&src, &item_len);
866             src += item_len;
867             decode_ptr(&src, &sub_p);
868             isamb_unlink(b, sub_p);
869         }
870     }
871     close_block(b, p1);
872     return 0;
873 }
874
875 int isamb_merge (ISAMB b, ISAMC_P pos, ISAMC_I *stream)
876 {
877     char item_buf[DST_ITEM_MAX];
878     char *item_ptr;
879     int i_mode;
880     int more;
881
882     if (b->cache < 0)
883     {
884         int more = 1;
885         while (more)
886         {
887             item_ptr = item_buf;
888             more =
889                 (*stream->read_item)(stream->clientData, &item_ptr, &i_mode);
890         }
891         return 1;
892     }
893     item_ptr = item_buf;
894     more = (*stream->read_item)(stream->clientData, &item_ptr, &i_mode);
895     while (more)
896     {
897         struct ISAMB_block *p = 0, *sp = 0;
898         char sub_item[DST_ITEM_MAX];
899         int sub_size;
900         
901         if (pos)
902             p = open_block (b, pos);
903         more = insert_sub (b, &p, item_buf, &i_mode, stream, &sp,
904                             sub_item, &sub_size, 0);
905         if (sp)
906         {    /* increase level of tree by one */
907             struct ISAMB_block *p2 = new_int (b, p->cat);
908             char *dst = p2->bytes + p2->size;
909             
910             encode_ptr (&dst, p->pos);
911             assert (sub_size < 20);
912             encode_ptr (&dst, sub_size);
913             memcpy (dst, sub_item, sub_size);
914             dst += sub_size;
915             encode_ptr (&dst, sp->pos);
916             
917             p2->size = dst - p2->bytes;
918             pos = p2->pos;  /* return new super page */
919             close_block (b, sp);
920             close_block (b, p2);
921         }
922         else
923             pos = p->pos;   /* return current one (again) */
924         close_block (b, p);
925     }
926     return pos;
927 }
928
929 ISAMB_PP isamb_pp_open_x (ISAMB isamb, ISAMB_P pos, int *level)
930 {
931     ISAMB_PP pp = xmalloc (sizeof(*pp));
932     int i;
933
934     pp->isamb = isamb;
935     pp->block = xmalloc (ISAMB_MAX_LEVEL * sizeof(*pp->block));
936
937     pp->pos = pos;
938     pp->level = 0;
939     pp->maxlevel=0;
940     pp->total_size = 0;
941     pp->no_blocks = 0;
942     pp->skipped_numbers=0;
943     pp->returned_numbers=0;
944     for (i=0;i<ISAMB_MAX_LEVEL;i++)
945         pp->skipped_nodes[i] = pp->accessed_nodes[i]=0;
946     while (1)
947     {
948         struct ISAMB_block *p = open_block (isamb, pos);
949         char *src = p->bytes + p->offset;
950         pp->block[pp->level] = p;
951
952         pp->total_size += p->size;
953         pp->no_blocks++;
954         if (p->leaf)
955             break;
956
957                                         
958         decode_ptr (&src, &pos);
959         p->offset = src - p->bytes;
960         pp->level++;
961         pp->accessed_nodes[pp->level]++; 
962     }
963     pp->block[pp->level+1] = 0;
964     pp->maxlevel=pp->level;
965     if (level)
966         *level = pp->level;
967     return pp;
968 }
969
970 ISAMB_PP isamb_pp_open (ISAMB isamb, ISAMB_P pos)
971 {
972     return isamb_pp_open_x (isamb, pos, 0);
973 }
974
975 void isamb_pp_close_x (ISAMB_PP pp, int *size, int *blocks)
976 {
977     int i;
978     if (!pp)
979         return;
980     logf(LOG_DEBUG,"isamb_pp_close lev=%d returned %d values, skipped %d",
981         pp->maxlevel, pp->skipped_numbers, pp->returned_numbers);
982     for (i=pp->maxlevel;i>=0;i--)
983         if ( pp->skipped_nodes[i] || pp->accessed_nodes[i])
984             logf(LOG_DEBUG,"isamb_pp_close  level leaf-%d: %d read, %d skipped", i,
985                  pp->accessed_nodes[i], pp->skipped_nodes[i]);
986     pp->isamb->skipped_numbers += pp->skipped_numbers;
987     pp->isamb->returned_numbers += pp->returned_numbers;
988     for (i=pp->maxlevel;i>=0;i--)
989     {
990         pp->isamb->accessed_nodes[i] += pp->accessed_nodes[i];
991         pp->isamb->skipped_nodes[i] += pp->skipped_nodes[i];
992     }
993     if (size)
994         *size = pp->total_size;
995     if (blocks)
996         *blocks = pp->no_blocks;
997     for (i = 0; i <= pp->level; i++)
998         close_block (pp->isamb, pp->block[i]);
999     xfree (pp->block);
1000     xfree (pp);
1001 }
1002
1003 int isamb_block_info (ISAMB isamb, int cat)
1004 {
1005     if (cat >= 0 && cat < isamb->no_cat)
1006         return isamb->file[cat].head.block_size;
1007     return -1;
1008 }
1009
1010 void isamb_pp_close (ISAMB_PP pp)
1011 {
1012     isamb_pp_close_x (pp, 0, 0);
1013 }
1014
1015 /* simple recursive dumper .. */
1016 static void isamb_dump_r (ISAMB b, ISAMB_P pos, void (*pr)(const char *str),
1017                           int level)
1018 {
1019     char buf[1024];
1020     char prefix_str[1024];
1021     if (pos)
1022     {
1023         struct ISAMB_block *p = open_block (b, pos);
1024         sprintf(prefix_str, "%*s %d cat=%d size=%d max=%d", level*2, "",
1025                 pos, p->cat, p->size, b->file[p->cat].head.block_max);
1026         (*pr)(prefix_str);
1027         sprintf(prefix_str, "%*s %d", level*2, "", pos);
1028         if (p->leaf)
1029         {
1030             while (p->offset < p->size)
1031             {
1032                 char *src = p->bytes + p->offset;
1033                 char *dst = buf;
1034                 (*b->method->code_item)(ISAMC_DECODE, p->decodeClientData,
1035                                         &dst, &src);
1036                 (*b->method->log_item)(LOG_DEBUG, buf, prefix_str);
1037                 p->offset = src - (char*) p->bytes;
1038             }
1039             assert(p->offset == p->size);
1040         }
1041         else
1042         {
1043             char *src = p->bytes + p->offset;
1044             int sub;
1045             int item_len;
1046
1047             decode_ptr (&src, &sub);
1048             p->offset = src - (char*) p->bytes;
1049
1050             isamb_dump_r(b, sub, pr, level+1);
1051             
1052             while (p->offset < p->size)
1053             {
1054                 decode_ptr (&src, &item_len);
1055                 (*b->method->log_item)(LOG_DEBUG, src, prefix_str);
1056                 src += item_len;
1057                 decode_ptr (&src, &sub);
1058                 
1059                 p->offset = src - (char*) p->bytes;
1060                 
1061                 isamb_dump_r(b, sub, pr, level+1);
1062             }           
1063         }
1064         close_block(b,p);
1065     }
1066 }
1067
1068 void isamb_dump (ISAMB b, ISAMB_P pos, void (*pr)(const char *str))
1069 {
1070     return isamb_dump_r(b, pos, pr, 0);
1071 }
1072
1073 #if 0
1074 /* Old isamb_pp_read that Adam wrote, kept as a reference in case we need to
1075    debug the more complex pp_read that also forwards. May be deleted near end
1076    of 2004, if it has not shown to be useful */
1077
1078
1079 int isamb_pp_read (ISAMB_PP pp, void *buf)
1080 {
1081     char *dst = buf;
1082     char *src;
1083     struct ISAMB_block *p = pp->block[pp->level];
1084     if (!p)
1085         return 0;
1086
1087     while (p->offset == p->size)
1088     {
1089         int pos, item_len;
1090         while (p->offset == p->size)
1091         {
1092             if (pp->level == 0)
1093                 return 0;
1094             close_block (pp->isamb, pp->block[pp->level]);
1095             pp->block[pp->level] = 0;
1096             (pp->level)--;
1097             p = pp->block[pp->level];
1098             assert (!p->leaf);  
1099         }
1100         src = p->bytes + p->offset;
1101         
1102         decode_ptr (&src, &item_len);
1103         src += item_len;
1104         decode_ptr (&src, &pos);
1105         
1106         p->offset = src - (char*) p->bytes;
1107
1108         ++(pp->level);
1109         
1110         while (1)
1111         {
1112             pp->block[pp->level] = p = open_block (pp->isamb, pos);
1113
1114             pp->total_size += p->size;
1115             pp->no_blocks++;
1116             
1117             if (p->leaf) 
1118             {
1119                 break;
1120             }
1121             src = p->bytes + p->offset;
1122             decode_ptr (&src, &pos);
1123             p->offset = src - (char*) p->bytes;
1124             pp->level++;
1125         }
1126     }
1127     assert (p->offset < p->size);
1128     assert (p->leaf);
1129     src = p->bytes + p->offset;
1130     (*pp->isamb->method->code_item)(ISAMC_DECODE, p->decodeClientData,
1131                                     &dst, &src);
1132     p->offset = src - (char*) p->bytes;
1133     /* key_logdump_txt(LOG_DEBUG,buf, "isamb_pp_read returning 1"); */
1134     return 1;
1135 }
1136
1137 #else
1138 int isamb_pp_read (ISAMB_PP pp, void *buf)
1139 {
1140     return isamb_pp_forward(pp,buf,0);
1141 }
1142 #endif
1143
1144 #define NEW_FORWARD 1
1145
1146 #if NEW_FORWARD == 1
1147
1148 /*
1149 #undef ISAMB_DEBUB
1150 #define ISAMB_DEBUG 1
1151 */
1152 static int isamb_pp_on_right_node(ISAMB_PP pp, int level, const void *untilbuf)
1153 { /* looks one node higher to see if we should be on this node at all */
1154   /* useful in backing off quickly, and in avoiding tail descends */
1155   /* call with pp->level to begin with */
1156     struct ISAMB_block *p;
1157     int cmp;
1158     char *src;
1159     int item_len;
1160     assert(level>=0);
1161     if ( level == 0) {
1162 #if ISAMB_DEBUG
1163             logf(LOG_DEBUG,"isamb_pp_on_right returning true for root");
1164 #endif
1165         return 1; /* we can never skip the root node */
1166     }
1167     level--;
1168     p=pp->block[level];
1169     assert(p->offset <= p->size);
1170     if (p->offset < p->size )
1171     {
1172         assert(p->offset>0); 
1173         src=p->bytes + p->offset;
1174         decode_ptr(&src,&item_len);
1175 #if ISAMB_DEBUG
1176         (*pp->isamb->method->log_item)(LOG_DEBUG,untilbuf,"on_leaf: until");
1177         (*pp->isamb->method->log_item)(LOG_DEBUG,src,"on_leaf: value");
1178 #endif
1179         cmp=(*pp->isamb->method->compare_item)(untilbuf,src);
1180         if (cmp<2) {
1181 #if ISAMB_DEBUG
1182             logf(LOG_DEBUG,"isamb_pp_on_right returning true "
1183                             "cmp=%d lev=%d ofs=%d",cmp,level,p->offset);
1184 #endif
1185             return 1; 
1186         }
1187         else {
1188 #if ISAMB_DEBUG
1189             logf(LOG_DEBUG,"isamb_pp_on_right returning false "
1190                             "cmp=%d lev=%d ofs=%d",cmp,level,p->offset);
1191 #endif
1192             return 0; 
1193         }
1194     }
1195     else {
1196 #if ISAMB_DEBUG
1197         logf(LOG_DEBUG,"isamb_pp_on_right at tail, looking higher "
1198                         "lev=%d",level);
1199 #endif
1200         return isamb_pp_on_right_node(pp, level, untilbuf);
1201     }
1202 } /* isamb_pp_on_right_node */
1203
1204 static int isamb_pp_read_on_leaf(ISAMB_PP pp, void *buf)
1205 { /* reads the next item on the current leaf, returns 0 if end of leaf*/
1206     struct ISAMB_block *p = pp->block[pp->level];
1207     char *dst;
1208     char *src;
1209     assert(pp);
1210     assert(buf);
1211     if (p->offset == p->size) {
1212 #if ISAMB_DEBUG
1213         logf(LOG_DEBUG,"isamb_pp_read_on_leaf returning 0 on node %d",p->pos);
1214 #endif
1215         return 0; /* at end of leaf */
1216     }
1217     src=p->bytes + p->offset;
1218     dst=buf;
1219     (*pp->isamb->method->code_item)
1220            (ISAMC_DECODE, p->decodeClientData,&dst, &src);
1221     p->offset = src - (char*) p->bytes;
1222     /*
1223 #if ISAMB_DEBUG
1224     (*pp->isamb->method->log_item)(LOG_DEBUG, buf, "read_on_leaf returning 1");
1225 #endif
1226 */
1227     return 1;
1228 } /* read_on_leaf */
1229
1230 static int isamb_pp_forward_on_leaf(ISAMB_PP pp, void *buf, const void *untilbuf)
1231 { /* forwards on the current leaf, returns 0 if not found */
1232     int cmp;
1233     int skips=0;
1234     while (1){
1235         if (!isamb_pp_read_on_leaf(pp,buf))
1236             return 0;
1237         /* FIXME - this is an extra function call, inline the read? */
1238         cmp=(*pp->isamb->method->compare_item)(untilbuf,buf);
1239         if (cmp <2){  /* found a good one */
1240 #if ISAMB_DEBUG
1241             if (skips)
1242                 logf(LOG_DEBUG, "isam_pp_fwd_on_leaf skipped %d items",skips);
1243 #endif
1244             pp->returned_numbers++;
1245             return 1;
1246         }
1247         if (!skips)
1248             if (!isamb_pp_on_right_node(pp, pp->level, untilbuf))
1249                 return 0; /* never mind the rest of this leaf */
1250         pp->skipped_numbers++;
1251         skips++;
1252     }
1253 } /* forward_on_leaf */
1254
1255 static int isamb_pp_climb_level(ISAMB_PP pp, int *pos)
1256 { /* climbs higher in the tree, until finds a level with data left */
1257   /* returns the node to (consider to) descend to in *pos) */
1258     struct ISAMB_block *p = pp->block[pp->level];
1259     char *src;
1260     int item_len;
1261 #if ISAMB_DEBUG
1262     logf(LOG_DEBUG,"isamb_pp_climb_level starting "
1263                    "at level %d node %d ofs=%d sz=%d",
1264                     pp->level, p->pos, p->offset, p->size);
1265 #endif
1266     assert(pp->level >= 0);
1267     assert(p->offset <= p->size);
1268     if (pp->level==0)
1269     {
1270 #if ISAMB_DEBUG
1271         logf(LOG_DEBUG,"isamb_pp_climb_level returning 0 at root");
1272 #endif
1273         return 0;
1274     }
1275     assert(pp->level>0); 
1276     close_block(pp->isamb, pp->block[pp->level]);
1277     pp->block[pp->level]=0;
1278     (pp->level)--;
1279     p=pp->block[pp->level];
1280 #if ISAMB_DEBUG
1281     logf(LOG_DEBUG,"isamb_pp_climb_level climbed to level %d node %d ofs=%d",
1282                     pp->level, p->pos, p->offset);
1283 #endif
1284     assert(!p->leaf);
1285     assert(p->offset <= p->size);
1286     if (p->offset == p->size ) {
1287         /* we came from the last pointer, climb on */
1288         if (!isamb_pp_climb_level(pp,pos))
1289             return 0;
1290         p=pp->block[pp->level];
1291     }
1292     else
1293     {
1294         /* skip the child we just came from */
1295 #if ISAMB_DEBUG
1296         logf(LOG_DEBUG,"isam_pp_climb_level: skipping lev=%d ofs=%d sz=%d", 
1297                         pp->level, p->offset, p->size);
1298 #endif
1299         assert (p->offset < p->size );
1300         src=p->bytes + p->offset;
1301         decode_ptr(&src, &item_len);
1302         src += item_len;
1303         decode_ptr(&src, pos);
1304         p->offset=src - (char *)p->bytes;
1305             
1306     }
1307     return 1;
1308 } /* climb_level */
1309
1310
1311 static int isamb_pp_forward_unode(ISAMB_PP pp, int pos, const void *untilbuf)
1312 { /* scans a upper node until it finds a child <= untilbuf */
1313   /* pp points to the key value, as always. pos is the child read from */
1314   /* the buffer */
1315   /* if all values are too small, returns the last child in the node */
1316   /* FIXME - this can be detected, and avoided by looking at the */
1317   /* parent node, but that gets messy. Presumably the cost is */
1318   /* pretty low anyway */
1319     struct ISAMB_block *p = pp->block[pp->level];
1320     char *src=p->bytes + p->offset;
1321     int item_len;
1322     int cmp;
1323     int nxtpos;
1324 #if ISAMB_DEBUG
1325     int skips=0;
1326     logf(LOG_DEBUG,"isamb_pp_forward_unode starting "
1327                    "at level %d node %d ofs=%di sz=%d",
1328                     pp->level, p->pos, p->offset, p->size);
1329 #endif
1330     assert(!p->leaf);
1331     assert(p->offset <= p->size);
1332     if (p->offset == p->size) {
1333 #if ISAMB_DEBUG
1334             logf(LOG_DEBUG,"isamb_pp_forward_unode returning at end "
1335                    "at level %d node %d ofs=%di sz=%d",
1336                     pp->level, p->pos, p->offset, p->size);
1337 #endif
1338         return pos; /* already at the end of it */
1339     }
1340     while(p->offset < p->size) {
1341         decode_ptr(&src,&item_len);
1342         cmp=(*pp->isamb->method->compare_item)(untilbuf,src);
1343         src+=item_len;
1344         decode_ptr(&src,&nxtpos);
1345         if (cmp<2)
1346         {
1347 #if ISAMB_DEBUG
1348             logf(LOG_DEBUG,"isamb_pp_forward_unode returning a hit "
1349                    "at level %d node %d ofs=%d sz=%d",
1350                     pp->level, p->pos, p->offset, p->size);
1351 #endif
1352             return pos;
1353         } /* found one */
1354         pos=nxtpos;
1355         p->offset=src-(char*)p->bytes;
1356         (pp->skipped_nodes[pp->maxlevel - pp->level -1])++;
1357 #if ISAMB_DEBUG
1358         skips++;
1359 #endif
1360     }
1361 #if ISAMB_DEBUG
1362             logf(LOG_DEBUG,"isamb_pp_forward_unode returning at tail "
1363                    "at level %d node %d ofs=%d sz=%d skips=%d",
1364                     pp->level, p->pos, p->offset, p->size, skips);
1365 #endif
1366     return pos; /* that's the last one in the line */
1367     
1368 } /* forward_unode */
1369
1370 static void isamb_pp_descend_to_leaf(ISAMB_PP pp, int pos, const void *untilbuf)
1371 { /* climbs down the tree, from pos, to the leftmost leaf */
1372     struct ISAMB_block *p = pp->block[pp->level];
1373     char *src;
1374     assert(!p->leaf);
1375 #if ISAMB_DEBUG
1376     logf(LOG_DEBUG,"isamb_pp_descend_to_leaf "
1377                    "starting at lev %d node %d ofs=%d lf=%d u=%p", 
1378                    pp->level, p->pos, p->offset, p->leaf, untilbuf);
1379 #endif
1380     if (untilbuf)
1381         pos=isamb_pp_forward_unode(pp,pos,untilbuf);
1382     ++(pp->level);
1383     assert(pos);
1384     p=open_block(pp->isamb, pos);
1385     pp->block[pp->level]=p;
1386     ++(pp->accessed_nodes[pp->maxlevel-pp->level]);
1387     ++(pp->no_blocks);
1388 #if ISAMB_DEBUG
1389     logf(LOG_DEBUG,"isamb_pp_descend_to_leaf "
1390                    "got lev %d node %d lf=%d", 
1391                    pp->level, p->pos, p->leaf);
1392 #endif
1393     if (p->leaf)
1394         return;
1395     assert (p->offset==0 );
1396     src=p->bytes + p->offset;
1397     decode_ptr(&src, &pos);
1398     p->offset=src-(char*)p->bytes;
1399     isamb_pp_descend_to_leaf(pp,pos,untilbuf);
1400 #if ISAMB_DEBUG
1401     logf(LOG_DEBUG,"isamb_pp_descend_to_leaf "
1402                    "returning at lev %d node %d ofs=%d lf=%d", 
1403                    pp->level, p->pos, p->offset, p->leaf);
1404 #endif
1405 } /* descend_to_leaf */
1406
1407 static int isamb_pp_find_next_leaf(ISAMB_PP pp)
1408 { /* finds the next leaf by climbing up and down */
1409     int pos;
1410     if (!isamb_pp_climb_level(pp,&pos))
1411         return 0;
1412     isamb_pp_descend_to_leaf(pp, pos,0);
1413     return 1;
1414 }
1415
1416 static int isamb_pp_climb_desc(ISAMB_PP pp, void *buf, const void *untilbuf)
1417 { /* climbs up and descends to a leaf where values >= *untilbuf are found */
1418     int pos;
1419 #if ISAMB_DEBUG
1420     struct ISAMB_block *p = pp->block[pp->level];
1421     logf(LOG_DEBUG,"isamb_pp_climb_desc starting "
1422                    "at level %d node %d ofs=%d sz=%d",
1423                     pp->level, p->pos, p->offset, p->size);
1424 #endif
1425     if (!isamb_pp_climb_level(pp,&pos))
1426         return 0;
1427     /* see if it would pay to climb one higher */
1428     if (!isamb_pp_on_right_node(pp, pp->level, untilbuf))
1429         if (!isamb_pp_climb_level(pp,&pos))
1430             return 0;
1431     isamb_pp_descend_to_leaf(pp, pos,untilbuf);
1432 #if ISAMB_DEBUG
1433     p = pp->block[pp->level];
1434     logf(LOG_DEBUG,"isamb_pp_climb_desc done "
1435                    "at level %d node %d ofs=%d sz=%d",
1436                     pp->level, p->pos, p->offset, p->size);
1437 #endif
1438     return 1;
1439 } /* climb_desc */
1440
1441 int isamb_pp_forward (ISAMB_PP pp, void *buf, const void *untilbuf)
1442 {
1443 #if ISAMB_DEBUG
1444     struct ISAMB_block *p = pp->block[pp->level];
1445     assert(p->leaf);
1446     logf(LOG_DEBUG,"isamb_pp_forward starting "
1447                    "at level %d node %d ofs=%d sz=%d u=%p",
1448                     pp->level, p->pos, p->offset, p->size,untilbuf);
1449 #endif
1450     if (untilbuf) {
1451         if (isamb_pp_forward_on_leaf( pp, buf, untilbuf)) {
1452 #if ISAMB_DEBUG
1453             logf(LOG_DEBUG,"isamb_pp_forward (f) returning (A) "
1454                    "at level %d node %d ofs=%d sz=%d",
1455                     pp->level, p->pos, p->offset, p->size);
1456 #endif
1457             return 1;
1458         }
1459         if (! isamb_pp_climb_desc( pp, buf, untilbuf)) {
1460 #if ISAMB_DEBUG
1461             logf(LOG_DEBUG,"isamb_pp_forward (f) returning notfound (B) "
1462                    "at level %d node %d ofs=%d sz=%d",
1463                     pp->level, p->pos, p->offset, p->size);
1464 #endif
1465             return 0; /* could not find a leaf */
1466         }
1467         do{
1468             if (isamb_pp_forward_on_leaf( pp, buf, untilbuf)) {
1469 #if ISAMB_DEBUG
1470             logf(LOG_DEBUG,"isamb_pp_forward (f) returning (C) "
1471                    "at level %d node %d ofs=%d sz=%d",
1472                     pp->level, p->pos, p->offset, p->size);
1473 #endif
1474                 return 1;
1475             }
1476         }while ( isamb_pp_find_next_leaf(pp));
1477         return 0; /* could not find at all */
1478     }
1479     else { /* no untilbuf, a straight read */
1480         /* FIXME - this should be moved
1481          * directly into the pp_read */
1482         /* keeping here now, to keep same
1483          * interface as the old fwd */
1484         if (isamb_pp_read_on_leaf( pp, buf)) {
1485 #if ISAMB_DEBUG
1486             logf(LOG_DEBUG,"isamb_pp_forward (read) returning (D) "
1487                    "at level %d node %d ofs=%d sz=%d",
1488                     pp->level, p->pos, p->offset, p->size);
1489 #endif
1490             return 1;
1491         }
1492         if (isamb_pp_find_next_leaf(pp)) {
1493 #if ISAMB_DEBUG
1494             logf(LOG_DEBUG,"isamb_pp_forward (read) returning (E) "
1495                    "at level %d node %d ofs=%d sz=%d",
1496                     pp->level, p->pos, p->offset, p->size);
1497 #endif
1498             return isamb_pp_read_on_leaf(pp, buf);
1499         }
1500         else
1501             return 0;
1502     }
1503 } /* isam_pp_forward (new version) */
1504
1505 #elif NEW_FORWARD == 0
1506
1507 int isamb_pp_forward (ISAMB_PP pp, void *buf, const void *untilbuf)
1508 {
1509     /* pseudocode:
1510      *   while 1
1511      *     while at end of node
1512      *       climb higher. If out, return 0
1513      *     while not on a leaf (and not at its end)
1514      *       decode next
1515      *       if cmp
1516      *         descend to node
1517      *     decode next
1518      *     if cmp
1519      *       return 1
1520      */
1521      /* 
1522       * The upper nodes consist of a sequence of nodenumbers and keys
1523       * When opening a block,  the first node number is read in, and
1524       * offset points to the first key, which is the upper limit of keys
1525       * in the node just read.
1526       */
1527     char *dst = buf;
1528     char *src;
1529     struct ISAMB_block *p = pp->block[pp->level];
1530     int cmp;
1531     int item_len;
1532     int pos;
1533     int nxtpos;
1534     int descending=0; /* used to prevent a border condition error */
1535     if (!p)
1536         return 0;
1537 #if ISAMB_DEBUG
1538     logf(LOG_DEBUG,"isamb_pp_forward starting [%p] p=%d",pp,p->pos);
1539     
1540     (*pp->isamb->method->log_item)(LOG_DEBUG, untilbuf, "until");
1541     (*pp->isamb->method->log_item)(LOG_DEBUG, buf, "buf");
1542 #endif
1543
1544     while (1)
1545     {
1546         while ( (p->offset == p->size) && !descending )
1547         {  /* end of this block - climb higher */
1548             assert (p->offset <= p->size);
1549 #if ISAMB_DEBUG
1550             logf(LOG_DEBUG,"isamb_pp_forward climbing from l=%d",
1551                             pp->level);
1552 #endif
1553             if (pp->level == 0)
1554             {
1555 #if ISAMB_DEBUG
1556                 logf(LOG_DEBUG,"isamb_pp_forward returning 0 at root");
1557 #endif
1558                 return 0; /* at end of the root, nothing left */
1559             }
1560             close_block(pp->isamb, pp->block[pp->level]);
1561             pp->block[pp->level]=0;
1562             (pp->level)--;
1563             p=pp->block[pp->level];
1564 #if ISAMB_DEBUG
1565             logf(LOG_DEBUG,"isamb_pp_forward climbed to node %d off=%d",
1566                             p->pos, p->offset);
1567 #endif
1568             assert(!p->leaf);
1569             assert(p->offset <= p->size);
1570             /* skip the child we have handled */
1571             if (p->offset != p->size)
1572             { 
1573                 src = p->bytes + p->offset;
1574                 decode_ptr(&src, &item_len);
1575 #if ISAMB_DEBUG         
1576                 (*pp->isamb->method->log_item)(LOG_DEBUG, src,
1577                                                " isamb_pp_forward "
1578                                                "climb skipping old key");
1579 #endif
1580                 src += item_len;
1581                 decode_ptr(&src,&pos);
1582                 p->offset = src - (char*) p->bytes;
1583                 break; /* even if this puts us at the end of the block, we
1584                           need to descend to the last pos. UGLY coding,
1585                           clean up some day */
1586             }
1587         }
1588         if (!p->leaf)
1589         { 
1590             src = p->bytes + p->offset;
1591             if (p->offset == p->size)
1592                 cmp=-2 ; /* descend to the last node, as we have
1593                             no value to cmp */
1594             else
1595             {
1596                 decode_ptr(&src, &item_len);
1597 #if ISAMB_DEBUG
1598                 logf(LOG_DEBUG,"isamb_pp_forward (B) on a high node. "
1599                      "ofs=%d sz=%d nxtpos=%d ",
1600                         p->offset,p->size,pos);
1601                 (*pp->isamb->method->log_item)(LOG_DEBUG, src, "");
1602 #endif
1603                 if (untilbuf)
1604                     cmp=(*pp->isamb->method->compare_item)(untilbuf,src);
1605                 else
1606                     cmp=-2;
1607                 src += item_len;
1608                 decode_ptr(&src,&nxtpos);
1609             }
1610             if (cmp<2)
1611             { 
1612 #if ISAMB_DEBUG
1613                 logf(LOG_DEBUG,"isambb_pp_forward descending l=%d p=%d ",
1614                             pp->level, pos);
1615 #endif
1616                 descending=1; /* prevent climbing for a while */
1617                 ++(pp->level);
1618                 p = open_block(pp->isamb,pos);
1619                 pp->block[pp->level] = p ;
1620                 pp->total_size += p->size;
1621                 (pp->accessed_nodes[pp->maxlevel - pp->level])++;
1622                 pp->no_blocks++;
1623                 if ( !p->leaf)
1624                 { /* block starts with a pos */
1625                     src = p->bytes + p->offset;
1626                     decode_ptr(&src,&pos);
1627                     p->offset=src-(char*) p->bytes;
1628 #if ISAMB_DEBUG
1629                     logf(LOG_DEBUG,"isamb_pp_forward: block %d starts with %d",
1630                                     p->pos, pos);
1631 #endif
1632                 } 
1633             } /* descend to the node */
1634             else
1635             { /* skip the node */
1636                 p->offset = src - (char*) p->bytes;
1637                 pos=nxtpos;
1638                 (pp->skipped_nodes[pp->maxlevel - pp->level -1])++;
1639 #if ISAMB_DEBUG
1640                 logf(LOG_DEBUG,
1641                     "isamb_pp_forward: skipping block on level %d, noting "
1642                      "on %d (%d)",
1643                     pp->level, pp->maxlevel - pp->level-1 , 
1644                     pp->skipped_nodes[pp->maxlevel - pp->level-1 ]);
1645 #endif
1646                 /* 0 is always leafs, 1 is one level above leafs etc, no
1647                  * matter how high tree */
1648             }
1649         } /* not on a leaf */
1650         else
1651         { /* on a leaf */
1652             if (p->offset == p->size) { 
1653                 descending = 0;
1654             }
1655             else
1656             {
1657                 assert (p->offset < p->size);
1658                 src = p->bytes + p->offset;
1659                 dst=buf;
1660                 (*pp->isamb->method->code_item)(ISAMC_DECODE, p->decodeClientData,
1661                                                 &dst, &src);
1662                 p->offset = src - (char*) p->bytes;
1663                 if (untilbuf)
1664                     cmp=(*pp->isamb->method->compare_item)(untilbuf,buf);
1665                 else
1666                     cmp=-2;
1667 #if ISAMB_DEBUG
1668                 logf(LOG_DEBUG,"isamb_pp_forward on a leaf. cmp=%d", 
1669                      cmp);
1670                 (*pp->isamb->method->log_item)(LOG_DEBUG, buf, "");
1671 #endif
1672                 if (cmp <2)
1673                 {
1674 #if ISAMB_DEBUG
1675                     if (untilbuf)
1676                     {
1677                         (*pp->isamb->method->log_item)(
1678                             LOG_DEBUG, buf,  "isamb_pp_forward returning 1");
1679                     }
1680                     else
1681                     {
1682                         (*pp->isamb->method->log_item)(
1683                             LOG_DEBUG, buf, "isamb_pp_read returning 1 (fwd)");
1684                     }
1685 #endif
1686                     pp->returned_numbers++;
1687                     return 1;
1688                 }
1689                 else
1690                     pp->skipped_numbers++;
1691             }
1692         } /* leaf */
1693     } /* main loop */
1694 }
1695
1696 #elif NEW_FORWARD == 2
1697
1698 int isamb_pp_forward (ISAMB_PP pp, void *buf, const void *untilb)
1699 {
1700     char *dst = buf;
1701     char *src;
1702     struct ISAMB_block *p = pp->block[pp->level];
1703     if (!p)
1704         return 0;
1705
1706 again:
1707     while (p->offset == p->size)
1708     {
1709         int pos, item_len;
1710         while (p->offset == p->size)
1711         {
1712             if (pp->level == 0)
1713                 return 0;
1714             close_block (pp->isamb, pp->block[pp->level]);
1715             pp->block[pp->level] = 0;
1716             (pp->level)--;
1717             p = pp->block[pp->level];
1718             assert (!p->leaf);  
1719         }
1720
1721         assert(!p->leaf);
1722         src = p->bytes + p->offset;
1723         
1724         decode_ptr (&src, &item_len);
1725         src += item_len;
1726         decode_ptr (&src, &pos);
1727         
1728         p->offset = src - (char*) p->bytes;
1729
1730         src = p->bytes + p->offset;
1731
1732         while(1)
1733         {
1734             if (!untilb || p->offset == p->size)
1735                 break;
1736             assert(p->offset < p->size);
1737             decode_ptr (&src, &item_len);
1738             if ((*pp->isamb->method->compare_item)(untilb, src) <= 1)
1739                 break;
1740             src += item_len;
1741             decode_ptr (&src, &pos);
1742             p->offset = src - (char*) p->bytes;
1743         }
1744
1745         pp->level++;
1746
1747         while (1)
1748         {
1749             pp->block[pp->level] = p = open_block (pp->isamb, pos);
1750
1751             pp->total_size += p->size;
1752             pp->no_blocks++;
1753             
1754             if (p->leaf) 
1755             {
1756                 break;
1757             }
1758             
1759             src = p->bytes + p->offset;
1760             while(1)
1761             {
1762                 decode_ptr (&src, &pos);
1763                 p->offset = src - (char*) p->bytes;
1764                 
1765                 if (!untilb || p->offset == p->size)
1766                     break;
1767                 assert(p->offset < p->size);
1768                 decode_ptr (&src, &item_len);
1769                 if ((*pp->isamb->method->compare_item)(untilb, src) <= 1)
1770                     break;
1771                 src += item_len;
1772             }
1773             pp->level++;
1774         }
1775     }
1776     assert (p->offset < p->size);
1777     assert (p->leaf);
1778     while(1)
1779     {
1780         char *dst0 = dst;
1781         src = p->bytes + p->offset;
1782         (*pp->isamb->method->code_item)(ISAMC_DECODE, p->decodeClientData,
1783                                     &dst, &src);
1784         p->offset = src - (char*) p->bytes;
1785         if (!untilb || (*pp->isamb->method->compare_item)(untilb, dst0) <= 1)
1786             break;
1787         dst = dst0;
1788         if (p->offset == p->size) goto again;
1789     }
1790     /* key_logdump_txt(LOG_DEBUG,buf, "isamb_pp_read returning 1"); */
1791     return 1;
1792 }
1793
1794 #endif
1795
1796 int isamb_pp_num (ISAMB_PP pp)
1797 {
1798     return 1;
1799 }
1800
1801 static void isamb_pp_leaf_pos( ISAMB_PP pp, 
1802                                int *current, int *total, void *dummybuf )
1803 {
1804     struct ISAMB_block *p = pp->block[pp->level];
1805     char *src=p->bytes;
1806     char *end=p->bytes+p->size;
1807     char *cur=p->bytes+p->offset;
1808     char *dst;
1809     assert(p->offset <= p->size);
1810     assert(cur <= end);
1811     assert(p->leaf);
1812     *current=0;
1813     *total=0;
1814
1815     while(src < end) {
1816         dst=dummybuf;
1817         (*pp->isamb->method->code_item)
1818            (ISAMC_DECODE, p->decodeClientData,&dst, &src);
1819         assert(dst<dummybuf+100); /*FIXME */
1820         (*total)++;
1821         if (src<=cur)
1822              (*current)++;
1823     }
1824     logf(LOG_DEBUG, "isamb_pp_leaf_pos: cur=%d tot=%d ofs=%d sz=%d lev=%d",
1825                     *current, *total, p->offset, p->size, pp->level);
1826     assert(src==end);
1827 }
1828
1829 static void isamb_pp_upper_pos( ISAMB_PP pp, int *current, int *total, 
1830                                 int size, int level )
1831 { /* estimates total/current occurrences from here up, excl leaf */
1832     struct ISAMB_block *p = pp->block[level];
1833     char *src=p->bytes;
1834     char *end=p->bytes+p->size;
1835     char *cur=p->bytes+p->offset;
1836     int item_size;
1837     int child;
1838     assert(level>=0);
1839     assert(!p->leaf);
1840     logf(LOG_DEBUG,"isamb_pp_upper_pos at beginning     l=%d "
1841                    "cur=%d tot=%d ofs=%d sz=%d pos=%d", 
1842                    level, *current, *total, p->offset, p->size, p->pos);
1843     assert (p->offset <= p->size);
1844         decode_ptr (&src, &child ); /* first child */
1845     while(src < end) {
1846         if (src!=cur) {
1847             *total += size;
1848             if (src < cur)
1849                 *current +=size;
1850         }
1851             decode_ptr (&src, &item_size ); 
1852         assert(src+item_size<=end);
1853         src += item_size;
1854             decode_ptr (&src, &child );
1855     }
1856     if (level>0)
1857         isamb_pp_upper_pos(pp, current, total, *total, level-1);
1858 } /* upper_pos */
1859
1860 void isamb_pp_pos( ISAMB_PP pp, int *current, int *total )
1861 { /* return an estimate of the current position and of the total number of */
1862   /* occureences in the isam tree, based on the current leaf */
1863     struct ISAMB_block *p = pp->block[pp->level];
1864     char dummy[100]; /* 100 bytes/entry must be enough */
1865     assert(total);
1866     assert(current);
1867     assert(p->leaf);
1868     isamb_pp_leaf_pos(pp,current, total, dummy);
1869     if (pp->level>0)
1870         isamb_pp_upper_pos(pp, current, total, *total, pp->level-1);
1871     /*
1872     logf(LOG_DEBUG,"isamb_pp_pos: C=%d T=%d =%6.2f%%",
1873                     *current, *total, 100.0*(*current)/(*total));
1874     */
1875 }