Fixed a bug regarding ISAMB tree splitting. Code fixed in similar way
[idzebra-moved-to-github.git] / isamb / isamb.c
1 /* $Id: isamb.c,v 1.47.2.6 2006-08-14 17:14:08 adam Exp $
2    Copyright (C) 1995,1996,1997,1998,1999,2000,2001,2002,2003,2004
3    Index Data Aps
4
5 This file is part of the Zebra server.
6
7 Zebra is free software; you can redistribute it and/or modify it under
8 the terms of the GNU General Public License as published by the Free
9 Software Foundation; either version 2, or (at your option) any later
10 version.
11
12 Zebra is distributed in the hope that it will be useful, but WITHOUT ANY
13 WARRANTY; without even the implied warranty of MERCHANTABILITY or
14 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
15 for more details.
16
17 You should have received a copy of the GNU General Public License
18 along with this program; if not, write to the Free Software
19 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
20
21 */
22
23 #include <string.h>
24 #include <stdlib.h>
25 #include <yaz/xmalloc.h>
26 #include <yaz/log.h>
27 #include <isamb.h>
28 #include <assert.h>
29
30 #ifndef ISAMB_DEBUG
31 #define ISAMB_DEBUG 0
32 #endif
33
34 struct ISAMB_head {
35     int first_block;
36     int last_block;
37     int block_size;
38     int block_max;
39     int free_list;
40 };
41
42 #define ISAMB_DATA_OFFSET 3
43
44 /* maximum size of encoded buffer */
45 #define DST_ITEM_MAX 256
46
47 #define ISAMB_MAX_LEVEL 10
48 /* approx 2*max page + max size of item */
49 #define DST_BUF_SIZE 16840
50
51 #define ISAMB_CACHE_ENTRY_SIZE 4096
52
53 /* CAT_MAX: _must_ be power of 2 */
54 #define CAT_MAX 4
55 #define CAT_MASK (CAT_MAX-1)
56 /* CAT_NO: <= CAT_MAX */
57 #define CAT_NO 4
58
59 /* ISAMB_PTR_CODEC=1 var, =0 fixed */
60 #define ISAMB_PTR_CODEC  0
61
62 struct ISAMB_cache_entry {
63     ISAMB_P pos;
64     unsigned char *buf;
65     int dirty;
66     int hits;
67     struct ISAMB_cache_entry *next;
68 };
69
70 struct ISAMB_file {
71     BFile bf;
72     int head_dirty;
73     struct ISAMB_head head;
74     struct ISAMB_cache_entry *cache_entries;
75 };
76
77 struct ISAMB_s {
78     BFiles bfs;
79     ISAMC_M *method;
80
81     struct ISAMB_file *file;
82     int no_cat;
83     int cache; /* 0=no cache, 1=use cache, -1=dummy isam (for testing only) */
84     int log_io;        /* log level for bf_read/bf_write calls */
85     int log_freelist;  /* log level for freelist handling */
86     int skipped_numbers; /* on a leaf node */
87     int returned_numbers; 
88     int skipped_nodes[ISAMB_MAX_LEVEL]; /* [0]=skipped leaves, 1=higher etc */
89     int accessed_nodes[ISAMB_MAX_LEVEL]; /* nodes we did not skip */
90 };
91
92 struct ISAMB_block {
93     ISAMB_P pos;
94     int cat;
95     int size;
96     int leaf;
97     int dirty;
98     int deleted;
99     int offset;
100     char *bytes;
101     char *cbuf;
102     unsigned char *buf;
103     void *decodeClientData;
104     int log_rw;
105 };
106
107 struct ISAMB_PP_s {
108     ISAMB isamb;
109     ISAMB_P pos;
110     int level;
111     int maxlevel; /* total depth */
112     int total_size;
113     int no_blocks;
114     int skipped_numbers; /* on a leaf node */
115     int returned_numbers; 
116     int skipped_nodes[ISAMB_MAX_LEVEL]; /* [0]=skipped leaves, 1=higher etc */
117     int accessed_nodes[ISAMB_MAX_LEVEL]; /* nodes we did not skip */
118     struct ISAMB_block **block;
119 };
120
121
122 #if ISAMB_PTR_CODEC
123 static void encode_ptr (char **dst, unsigned pos)
124 {
125     unsigned char *bp = (unsigned char*) *dst;
126
127     while (pos > 127)
128     {
129          *bp++ = 128 | (pos & 127);
130          pos = pos >> 7;
131     }
132     *bp++ = pos;
133     *dst = (char *) bp;
134 }
135 #else
136 static void encode_ptr (char **dst, unsigned pos)
137 {
138     memcpy(*dst, &pos, sizeof(pos));
139     (*dst) += sizeof(pos);
140 }
141 #endif
142
143 #if ISAMB_PTR_CODEC
144 static void decode_ptr (char **src1, int *pos)
145 {
146     unsigned char **src = (unsigned char **) src1;
147     unsigned d = 0;
148     unsigned char c;
149     unsigned r = 0;
150
151     while (((c = *(*src)++) & 128))
152     {
153         d += ((c & 127) << r);
154         r += 7;
155     }
156     d += (c << r);
157     *pos = d;
158 }
159 #else
160 static void decode_ptr (char **src, int *pos)
161 {
162      memcpy (pos, *src, sizeof(*pos));
163      (*src) += sizeof(*pos);
164 }
165 #endif
166
167 ISAMB isamb_open (BFiles bfs, const char *name, int writeflag, ISAMC_M *method,
168                   int cache)
169 {
170     ISAMB isamb = xmalloc (sizeof(*isamb));
171     int i, b_size = 32;
172
173     isamb->bfs = bfs;
174     isamb->method = (ISAMC_M *) xmalloc (sizeof(*method));
175     memcpy (isamb->method, method, sizeof(*method));
176     isamb->no_cat = CAT_NO;
177     isamb->log_io = 0;
178     isamb->log_freelist = 0;
179     isamb->cache = cache;
180     isamb->skipped_numbers=0;
181     isamb->returned_numbers=0;
182     for (i=0;i<ISAMB_MAX_LEVEL;i++)
183       isamb->skipped_nodes[i]= isamb->accessed_nodes[i]=0;
184
185     assert (cache == 0);
186     isamb->file = xmalloc (sizeof(*isamb->file) * isamb->no_cat);
187     for (i = 0; i<isamb->no_cat; i++)
188     {
189         char fname[DST_BUF_SIZE];
190         isamb->file[i].cache_entries = 0;
191         isamb->file[i].head_dirty = 0;
192         sprintf (fname, "%s%c", name, i+'A');
193         if (cache)
194             isamb->file[i].bf = bf_open (bfs, fname, ISAMB_CACHE_ENTRY_SIZE,
195                                          writeflag);
196         else
197             isamb->file[i].bf = bf_open (bfs, fname, b_size, writeflag);
198
199         
200         if (!bf_read (isamb->file[i].bf, 0, 0, sizeof(struct ISAMB_head),
201                       &isamb->file[i].head))
202         {
203             isamb->file[i].head.first_block = ISAMB_CACHE_ENTRY_SIZE/b_size+1;
204             isamb->file[i].head.last_block = isamb->file[i].head.first_block;
205             isamb->file[i].head.block_size = b_size;
206             isamb->file[i].head.block_max = b_size - ISAMB_DATA_OFFSET;
207             isamb->file[i].head.free_list = 0;
208         }
209         assert (isamb->file[i].head.block_size >= ISAMB_DATA_OFFSET);
210         isamb->file[i].head_dirty = 0;
211         assert(isamb->file[i].head.block_size == b_size);
212         b_size = b_size * 4;
213     }
214 #if ISAMB_DEBUG
215     logf(LOG_WARN, "isamb debug enabled. Things will be slower than usual");
216 #endif
217     return isamb;
218 }
219
220 static void flush_blocks (ISAMB b, int cat)
221 {
222     while (b->file[cat].cache_entries)
223     {
224         struct ISAMB_cache_entry *ce_this = b->file[cat].cache_entries;
225         b->file[cat].cache_entries = ce_this->next;
226
227         if (ce_this->dirty)
228         {
229             yaz_log (b->log_io, "bf_write: flush_blocks");
230             bf_write (b->file[cat].bf, ce_this->pos, 0, 0, ce_this->buf);
231         }
232         xfree (ce_this->buf);
233         xfree (ce_this);
234     }
235 }
236
237 static int get_block (ISAMB b, ISAMC_P pos, char *userbuf, int wr)
238 {
239     int cat = pos&CAT_MASK;
240     int off = ((pos/CAT_MAX) & 
241                (ISAMB_CACHE_ENTRY_SIZE / b->file[cat].head.block_size - 1))
242         * b->file[cat].head.block_size;
243     int norm = pos / (CAT_MASK*ISAMB_CACHE_ENTRY_SIZE / b->file[cat].head.block_size);
244     int no = 0;
245     struct ISAMB_cache_entry **ce, *ce_this = 0, **ce_last = 0;
246
247     if (!b->cache)
248         return 0;
249
250     assert (ISAMB_CACHE_ENTRY_SIZE >= b->file[cat].head.block_size);
251     for (ce = &b->file[cat].cache_entries; *ce; ce = &(*ce)->next, no++)
252     {
253         ce_last = ce;
254         if ((*ce)->pos == norm)
255         {
256             ce_this = *ce;
257             *ce = (*ce)->next;   /* remove from list */
258             
259             ce_this->next = b->file[cat].cache_entries;  /* move to front */
260             b->file[cat].cache_entries = ce_this;
261             
262             if (wr)
263             {
264                 memcpy (ce_this->buf + off, userbuf, 
265                         b->file[cat].head.block_size);
266                 ce_this->dirty = 1;
267             }
268             else
269                 memcpy (userbuf, ce_this->buf + off,
270                         b->file[cat].head.block_size);
271             return 1;
272         }
273     }
274     if (no >= 40)
275     {
276         assert (no == 40);
277         assert (ce_last && *ce_last);
278         ce_this = *ce_last;
279         *ce_last = 0;  /* remove the last entry from list */
280         if (ce_this->dirty)
281         {
282             yaz_log (b->log_io, "bf_write: get_block");
283             bf_write (b->file[cat].bf, ce_this->pos, 0, 0, ce_this->buf);
284         }
285         xfree (ce_this->buf);
286         xfree (ce_this);
287     }
288     ce_this = xmalloc (sizeof(*ce_this));
289     ce_this->next = b->file[cat].cache_entries;
290     b->file[cat].cache_entries = ce_this;
291     ce_this->buf = xmalloc (ISAMB_CACHE_ENTRY_SIZE);
292     ce_this->pos = norm;
293     yaz_log (b->log_io, "bf_read: get_block");
294     if (!bf_read (b->file[cat].bf, norm, 0, 0, ce_this->buf))
295         memset (ce_this->buf, 0, ISAMB_CACHE_ENTRY_SIZE);
296     if (wr)
297     {
298         memcpy (ce_this->buf + off, userbuf, b->file[cat].head.block_size);
299         ce_this->dirty = 1;
300     }
301     else
302     {
303         ce_this->dirty = 0;
304         memcpy (userbuf, ce_this->buf + off, b->file[cat].head.block_size);
305     }
306     return 1;
307 }
308
309
310 void isamb_close (ISAMB isamb)
311 {
312     int i;
313     for (i=0;isamb->accessed_nodes[i];i++)
314         logf(LOG_DEBUG,"isamb_close  level leaf-%d: %d read, %d skipped",
315              i, isamb->accessed_nodes[i], isamb->skipped_nodes[i]);
316     logf(LOG_DEBUG,"isamb_close returned %d values, skipped %d",
317          isamb->skipped_numbers, isamb->returned_numbers);
318     for (i = 0; i<isamb->no_cat; i++)
319     {
320         flush_blocks (isamb, i);
321         if (isamb->file[i].head_dirty)
322             bf_write (isamb->file[i].bf, 0, 0,
323                       sizeof(struct ISAMB_head), &isamb->file[i].head);
324         
325         bf_close (isamb->file[i].bf);
326     }
327     xfree (isamb->file);
328     xfree (isamb->method);
329     xfree (isamb);
330 }
331
332 static struct ISAMB_block *open_block (ISAMB b, ISAMC_P pos)
333 {
334     int cat = pos&CAT_MASK;
335     struct ISAMB_block *p;
336     if (!pos)
337         return 0;
338     p = xmalloc (sizeof(*p));
339     p->pos = pos;
340     p->cat = pos & CAT_MASK;
341     p->buf = xmalloc (b->file[cat].head.block_size);
342     p->cbuf = 0;
343
344     if (!get_block (b, pos, p->buf, 0))
345     {
346         yaz_log (b->log_io, "bf_read: open_block");
347         if (!bf_read (b->file[cat].bf, pos/CAT_MAX, 0, 0, p->buf))
348         {
349             yaz_log (LOG_FATAL, "isamb: read fail for pos=%ld block=%ld",
350                      (long) pos, (long) pos/CAT_MAX);
351             abort();
352         }
353     }
354     p->bytes = p->buf + ISAMB_DATA_OFFSET;
355     p->leaf = p->buf[0];
356     p->size = (p->buf[1] + 256 * p->buf[2]) - ISAMB_DATA_OFFSET;
357     if (p->size < 0)
358     {
359         yaz_log (LOG_FATAL, "Bad block size %d in pos=%d\n", p->size, pos);
360     }
361     assert (p->size >= 0);
362     p->offset = 0;
363     p->dirty = 0;
364     p->deleted = 0;
365     p->decodeClientData = (*b->method->code_start)(ISAMC_DECODE);
366     yaz_log (LOG_DEBUG, "isamb_open_block: Opened block %d ofs=%d",pos, p->offset);
367     return p;
368 }
369
370 struct ISAMB_block *new_block (ISAMB b, int leaf, int cat)
371 {
372     struct ISAMB_block *p;
373
374     p = xmalloc (sizeof(*p));
375     p->buf = xmalloc (b->file[cat].head.block_size);
376
377     if (!b->file[cat].head.free_list)
378     {
379         int block_no;
380         block_no = b->file[cat].head.last_block++;
381         p->pos = block_no * CAT_MAX + cat;
382     }
383     else
384     {
385         p->pos = b->file[cat].head.free_list;
386         assert((p->pos & CAT_MASK) == cat);
387         if (!get_block (b, p->pos, p->buf, 0))
388         {
389             yaz_log (b->log_io, "bf_read: new_block");
390             if (!bf_read (b->file[cat].bf, p->pos/CAT_MAX, 0, 0, p->buf))
391             {
392                 yaz_log (LOG_FATAL, "isamb: read fail for pos=%ld block=%ld",
393                          (long) p->pos/CAT_MAX, (long) p->pos/CAT_MAX);
394                 abort ();
395             }
396         }
397         yaz_log (b->log_freelist, "got block %d from freelist %d:%d", p->pos,
398                  cat, p->pos/CAT_MAX);
399         memcpy (&b->file[cat].head.free_list, p->buf, sizeof(int));
400     }
401     p->cat = cat;
402     b->file[cat].head_dirty = 1;
403     memset (p->buf, 0, b->file[cat].head.block_size);
404     p->bytes = p->buf + ISAMB_DATA_OFFSET;
405     p->leaf = leaf;
406     p->size = 0;
407     p->dirty = 1;
408     p->deleted = 0;
409     p->offset = 0;
410     p->decodeClientData = (*b->method->code_start)(ISAMC_DECODE);
411     return p;
412 }
413
414 struct ISAMB_block *new_leaf (ISAMB b, int cat)
415 {
416     return new_block (b, 1, cat);
417 }
418
419
420 struct ISAMB_block *new_int (ISAMB b, int cat)
421 {
422     return new_block (b, 0, cat);
423 }
424
425 static void check_block (ISAMB b, struct ISAMB_block *p)
426 {
427     if (p->leaf)
428     {
429         ;
430     }
431     else
432     {
433         /* sanity check */
434         char *startp = p->bytes;
435         char *src = startp;
436         char *endp = p->bytes + p->size;
437         int pos;
438             
439         decode_ptr (&src, &pos);
440         assert ((pos&CAT_MASK) == p->cat);
441         while (src != endp)
442         {
443             int item_len;
444             decode_ptr (&src, &item_len);
445             assert (item_len > 0 && item_len < 30);
446             src += item_len;
447             decode_ptr (&src, &pos);
448             assert ((pos&CAT_MASK) == p->cat);
449         }
450     }
451 }
452
453 void close_block (ISAMB b, struct ISAMB_block *p)
454 {
455     if (!p)
456         return;
457     if (p->deleted)
458     {
459         yaz_log (b->log_freelist, "release block %d from freelist %d:%d",
460                  p->pos, p->cat, p->pos/CAT_MAX);
461         memcpy (p->buf, &b->file[p->cat].head.free_list, sizeof(int));
462         b->file[p->cat].head.free_list = p->pos;
463         if (!get_block (b, p->pos, p->buf, 1))
464         {
465             yaz_log (b->log_io, "bf_write: close_block (deleted)");
466             bf_write (b->file[p->cat].bf, p->pos/CAT_MAX, 0, 0, p->buf);
467         }
468     }
469     else if (p->dirty)
470     {
471         int size = p->size + ISAMB_DATA_OFFSET;
472         assert (p->size >= 0);
473         p->buf[0] = p->leaf;
474         p->buf[1] = size & 255;
475         p->buf[2] = size >> 8;
476         check_block(b, p);
477         if (!get_block (b, p->pos, p->buf, 1))
478         {
479             yaz_log (b->log_io, "bf_write: close_block");
480             bf_write (b->file[p->cat].bf, p->pos/CAT_MAX, 0, 0, p->buf);
481         }
482     }
483     (*b->method->code_stop)(ISAMC_DECODE, p->decodeClientData);
484     xfree (p->buf);
485     xfree (p);
486 }
487
488 int insert_sub (ISAMB b, struct ISAMB_block **p,
489                 void *new_item, int *mode,
490                 ISAMC_I *stream,
491                 struct ISAMB_block **sp,
492                 void *sub_item, int *sub_size,
493                 void *max_item);
494
495 int insert_int (ISAMB b, struct ISAMB_block *p, void *lookahead_item,
496                 int *mode,
497                 ISAMC_I *stream, struct ISAMB_block **sp,
498                 void *split_item, int *split_size, void *last_max_item)
499 {
500     char *startp = p->bytes;
501     char *src = startp;
502     char *endp = p->bytes + p->size;
503     int pos;
504     struct ISAMB_block *sub_p1 = 0, *sub_p2 = 0;
505     char sub_item[DST_ITEM_MAX];
506     int sub_size;
507     int more = 0;
508
509     *sp = 0;
510
511     assert(p->size >= 0);
512     decode_ptr (&src, &pos);
513     while (src != endp)
514     {
515         int item_len;
516         int d;
517         char *src0 = src;
518         decode_ptr (&src, &item_len);
519         d = (*b->method->compare_item)(src, lookahead_item);
520         if (d > 0)
521         {
522             sub_p1 = open_block (b, pos);
523             assert (sub_p1);
524             more = insert_sub (b, &sub_p1, lookahead_item, mode,
525                                stream, &sub_p2, 
526                                sub_item, &sub_size, src);
527             src = src0;
528             break;
529         }
530         src += item_len;
531         decode_ptr (&src, &pos);
532     }
533     if (!sub_p1)
534     {
535         sub_p1 = open_block (b, pos);
536         assert (sub_p1);
537         more = insert_sub (b, &sub_p1, lookahead_item, mode, stream, &sub_p2, 
538                            sub_item, &sub_size, last_max_item);
539     }
540     if (sub_p2)
541     {
542         /* there was a split - must insert pointer in this one */
543         char dst_buf[DST_BUF_SIZE];
544         char *dst = dst_buf;
545
546         assert (sub_size < 30 && sub_size > 1);
547
548         memcpy (dst, startp, src - startp);
549                 
550         dst += src - startp;
551
552         encode_ptr (&dst, sub_size);      /* sub length and item */
553         memcpy (dst, sub_item, sub_size);
554         dst += sub_size;
555
556         encode_ptr (&dst, sub_p2->pos);   /* pos */
557
558         if (endp - src)                   /* remaining data */
559         {
560             memcpy (dst, src, endp - src);
561             dst += endp - src;
562         }
563         p->size = dst - dst_buf;
564         assert (p->size >= 0);
565         if (p->size <= b->file[p->cat].head.block_max)
566         {
567             memcpy (startp, dst_buf, dst - dst_buf);
568
569             close_block(b, sub_p2);
570         }
571         else
572         {
573             int p_new_size;
574             char *half;
575             src = dst_buf;
576             endp = dst;
577
578             p->dirty = 1;
579             close_block(b, sub_p2);
580
581             half = src + b->file[p->cat].head.block_size/2;
582             decode_ptr (&src, &pos);
583             while (src <= half)
584             {
585                 decode_ptr (&src, split_size);
586                 src += *split_size;
587                 decode_ptr (&src, &pos);
588             }
589             p_new_size = src - dst_buf;
590             memcpy (p->bytes, dst_buf, p_new_size);
591
592             decode_ptr (&src, split_size);
593             memcpy (split_item, src, *split_size);
594             src += *split_size;
595
596             *sp = new_int (b, p->cat);
597             (*sp)->size = endp - src;
598             memcpy ((*sp)->bytes, src, (*sp)->size);
599
600             p->size = p_new_size;
601         }
602         p->dirty = 1;
603     }
604     close_block (b, sub_p1);
605     return more;
606 }
607
608
609 int insert_leaf (ISAMB b, struct ISAMB_block **sp1, void *lookahead_item,
610                  int *lookahead_mode, ISAMC_I *stream,
611                  struct ISAMB_block **sp2,
612                  void *sub_item, int *sub_size,
613                  void *max_item)
614 {
615     struct ISAMB_block *p = *sp1;
616     char *src = 0, *endp = 0;
617     char dst_buf[DST_BUF_SIZE], *dst = dst_buf;
618     int new_size;
619     void *c1 = (*b->method->code_start)(ISAMC_DECODE);
620     void *c2 = (*b->method->code_start)(ISAMC_ENCODE);
621     int more = 1;
622     int quater = b->file[b->no_cat-1].head.block_max / CAT_MAX;
623     char *cut = dst_buf + quater * 2;
624     char *maxp = dst_buf + b->file[b->no_cat-1].head.block_max;
625     char *half1 = 0;
626     char *half2 = 0;
627     char cut_item_buf[DST_ITEM_MAX];
628     int cut_item_size = 0;
629
630     if (p && p->size)
631     {
632         char file_item_buf[DST_ITEM_MAX];
633         char *file_item = file_item_buf;
634             
635         src = p->bytes;
636         endp = p->bytes + p->size;
637         (*b->method->code_item)(ISAMC_DECODE, c1, &file_item, &src);
638         while (1)
639         {
640             char *dst_item = 0;
641             char *lookahead_next;
642             int d = -1;
643             
644             if (lookahead_item)
645                 d = (*b->method->compare_item)(file_item_buf, lookahead_item);
646             
647             if (d > 0)
648             {
649                 dst_item = lookahead_item;
650                 if (!*lookahead_mode)
651                 {
652                     yaz_log (LOG_WARN, "isamb: Inconsistent register (1)");
653                     assert (*lookahead_mode);
654                 }
655             }
656             else
657                 dst_item = file_item_buf;
658             if (!*lookahead_mode && d == 0)
659             {
660                 p->dirty = 1;
661             }
662             else if (!half1 && dst > cut)
663             {
664                 char *dst_item_0 = dst_item;
665                 half1 = dst; /* candidate for splitting */
666                 
667                 (*b->method->code_item)(ISAMC_ENCODE, c2, &dst, &dst_item);
668                 
669                 cut_item_size = dst_item - dst_item_0;
670                 memcpy (cut_item_buf, dst_item_0, cut_item_size);
671                 
672                 half2 = dst;
673             }
674             else
675                 (*b->method->code_item)(ISAMC_ENCODE, c2, &dst, &dst_item);
676             if (d > 0)  
677             {
678                 if (dst > maxp)
679                     lookahead_item = 0;
680                 else
681                 {
682                     lookahead_next = lookahead_item;
683                     if (!(*stream->read_item)(stream->clientData,
684                                               &lookahead_next,
685                                               lookahead_mode))
686                     {
687                         lookahead_item = 0;
688                         more = 0;
689                     }
690                     if (lookahead_item && max_item &&
691                         (*b->method->compare_item)(max_item, lookahead_item) <= 0)
692                     {
693                         /* max_item 1 */
694                         lookahead_item = 0;
695                     }
696                     
697                     p->dirty = 1;
698                 }
699             }
700             else if (d == 0)
701             {
702                 lookahead_next = lookahead_item;
703                 if (!(*stream->read_item)(stream->clientData,
704                                           &lookahead_next, lookahead_mode))
705                 {
706                     lookahead_item = 0;
707                     more = 0;
708                 }
709                 if (src == endp)
710                     break;
711                 file_item = file_item_buf;
712                 (*b->method->code_item)(ISAMC_DECODE, c1, &file_item, &src);
713             }
714             else
715             {
716                 if (src == endp)
717                     break;
718                 file_item = file_item_buf;
719                 (*b->method->code_item)(ISAMC_DECODE, c1, &file_item, &src);
720             }
721         }
722     }
723     maxp = dst_buf + b->file[b->no_cat-1].head.block_max + quater;
724     while (lookahead_item)
725     {
726         char *dst_item = lookahead_item;
727         char *dst_0 = dst;
728         
729         if (max_item &&
730             (*b->method->compare_item)(max_item, lookahead_item) <= 0)
731         {
732             /* max_item 2 */
733             break;
734         }
735         if (!*lookahead_mode)
736         {
737             yaz_log (LOG_WARN, "isamb: Inconsistent register (2)");
738             abort();
739         }
740         else if (!half1 && dst > cut)   
741         {
742             char *dst_item_0 = dst_item;
743             half1 = dst; /* candidate for splitting */
744             
745             (*b->method->code_item)(ISAMC_ENCODE, c2, &dst, &dst_item);
746             
747             cut_item_size = dst_item - dst_item_0;
748             memcpy (cut_item_buf, dst_item_0, cut_item_size);
749             
750             half2 = dst;
751         }
752         else
753             (*b->method->code_item)(ISAMC_ENCODE, c2, &dst, &dst_item);
754
755         if (dst > maxp)
756         {
757             dst = dst_0;
758             break;
759         }
760         if (p)
761             p->dirty = 1;
762         dst_item = lookahead_item;
763         if (!(*stream->read_item)(stream->clientData, &dst_item,
764                                   lookahead_mode))
765         {
766             lookahead_item = 0;
767             more = 0;
768         }
769     }
770     new_size = dst - dst_buf;
771     if (p && p->cat != b->no_cat-1 && 
772         new_size > b->file[p->cat].head.block_max)
773     {
774         /* non-btree block will be removed */
775         p->deleted = 1;
776         close_block (b, p);
777         /* delete it too!! */
778         p = 0; /* make a new one anyway */
779     }
780     if (!p)
781     {   /* must create a new one */
782         int i;
783         for (i = 0; i < b->no_cat; i++)
784             if (new_size <= b->file[i].head.block_max)
785                 break;
786         if (i == b->no_cat)
787             i = b->no_cat - 1;
788         p = new_leaf (b, i);
789     }
790     if (new_size > b->file[p->cat].head.block_max)
791     {
792         char *first_dst;
793         char *cut_item = cut_item_buf;
794
795         assert (half1);
796         assert (half2);
797
798        /* first half */
799         p->size = half1 - dst_buf;
800         memcpy (p->bytes, dst_buf, half1 - dst_buf);
801
802         /* second half */
803         *sp2 = new_leaf (b, p->cat);
804
805         (*b->method->code_reset)(c2);
806
807         first_dst = (*sp2)->bytes;
808
809         (*b->method->code_item)(ISAMC_ENCODE, c2, &first_dst, &cut_item);
810
811         memcpy (first_dst, half2, dst - half2);
812
813         (*sp2)->size = (first_dst - (*sp2)->bytes) + (dst - half2);
814         (*sp2)->dirty = 1;
815         p->dirty = 1;
816         memcpy (sub_item, cut_item_buf, cut_item_size);
817         *sub_size = cut_item_size;
818     }
819     else
820     {
821         memcpy (p->bytes, dst_buf, dst - dst_buf);
822         p->size = new_size;
823     }
824     (*b->method->code_stop)(ISAMC_DECODE, c1);
825     (*b->method->code_stop)(ISAMC_ENCODE, c2);
826     *sp1 = p;
827     return more;
828 }
829
830 int insert_sub (ISAMB b, struct ISAMB_block **p, void *new_item,
831                 int *mode,
832                 ISAMC_I *stream,
833                 struct ISAMB_block **sp,
834                 void *sub_item, int *sub_size,
835                 void *max_item)
836 {
837     if (!*p || (*p)->leaf)
838         return insert_leaf (b, p, new_item, mode, stream, sp, sub_item, 
839                             sub_size, max_item);
840     else
841         return insert_int (b, *p, new_item, mode, stream, sp, sub_item,
842                            sub_size, max_item);
843 }
844
845 int isamb_unlink (ISAMB b, ISAMC_P pos)
846 {
847     struct ISAMB_block *p1;
848
849     if (!pos)
850         return 0;
851     p1 = open_block(b, pos);
852     p1->deleted = 1;
853     if (!p1->leaf)
854     {
855         int sub_p;
856         int item_len;
857         char *src = p1->bytes + p1->offset;
858
859         decode_ptr(&src, &sub_p);
860         isamb_unlink(b, sub_p);
861         
862         while (src != p1->bytes + p1->size)
863         {
864             decode_ptr(&src, &item_len);
865             src += item_len;
866             decode_ptr(&src, &sub_p);
867             isamb_unlink(b, sub_p);
868         }
869     }
870     close_block(b, p1);
871     return 0;
872 }
873
874 int isamb_merge (ISAMB b, ISAMC_P pos, ISAMC_I *stream)
875 {
876     char item_buf[DST_ITEM_MAX];
877     char *item_ptr;
878     int i_mode;
879     int more;
880
881     if (b->cache < 0)
882     {
883         int more = 1;
884         while (more)
885         {
886             item_ptr = item_buf;
887             more =
888                 (*stream->read_item)(stream->clientData, &item_ptr, &i_mode);
889         }
890         return 1;
891     }
892     item_ptr = item_buf;
893     more = (*stream->read_item)(stream->clientData, &item_ptr, &i_mode);
894     while (more)
895     {
896         struct ISAMB_block *p = 0, *sp = 0;
897         char sub_item[DST_ITEM_MAX];
898         int sub_size;
899         
900         if (pos)
901             p = open_block (b, pos);
902         more = insert_sub (b, &p, item_buf, &i_mode, stream, &sp,
903                             sub_item, &sub_size, 0);
904         if (sp)
905         {    /* increase level of tree by one */
906             struct ISAMB_block *p2 = new_int (b, p->cat);
907             char *dst = p2->bytes + p2->size;
908             
909             encode_ptr (&dst, p->pos);
910             assert (sub_size < 20);
911             encode_ptr (&dst, sub_size);
912             memcpy (dst, sub_item, sub_size);
913             dst += sub_size;
914             encode_ptr (&dst, sp->pos);
915             
916             p2->size = dst - p2->bytes;
917             pos = p2->pos;  /* return new super page */
918             close_block (b, sp);
919             close_block (b, p2);
920         }
921         else
922             pos = p->pos;   /* return current one (again) */
923         close_block (b, p);
924     }
925     return pos;
926 }
927
928 ISAMB_PP isamb_pp_open_x (ISAMB isamb, ISAMB_P pos, int *level)
929 {
930     ISAMB_PP pp = xmalloc (sizeof(*pp));
931     int i;
932
933     pp->isamb = isamb;
934     pp->block = xmalloc (ISAMB_MAX_LEVEL * sizeof(*pp->block));
935
936     pp->pos = pos;
937     pp->level = 0;
938     pp->maxlevel=0;
939     pp->total_size = 0;
940     pp->no_blocks = 0;
941     pp->skipped_numbers=0;
942     pp->returned_numbers=0;
943     for (i = 0; i<ISAMB_MAX_LEVEL; i++)
944         pp->skipped_nodes[i] = pp->accessed_nodes[i] = 0;
945     pp->block[0] = 0;
946     if (pos)
947     {
948         while (1)
949         {
950             struct ISAMB_block *p = open_block (isamb, pos);
951             char *src = p->bytes + p->offset;
952             pp->block[pp->level] = p;
953             
954             pp->total_size += p->size;
955             pp->no_blocks++;
956             if (p->leaf)
957                 break;
958             
959             decode_ptr (&src, &pos);
960             p->offset = src - p->bytes;
961             pp->level++;
962             pp->accessed_nodes[pp->level]++; 
963         }
964     }
965     pp->block[pp->level+1] = 0;
966     pp->maxlevel = pp->level;
967     if (level)
968         *level = pp->level;
969     return pp;
970 }
971
972 ISAMB_PP isamb_pp_open (ISAMB isamb, ISAMB_P pos)
973 {
974     return isamb_pp_open_x (isamb, pos, 0);
975 }
976
977 void isamb_pp_close_x (ISAMB_PP pp, int *size, int *blocks)
978 {
979     int i;
980     if (!pp)
981         return;
982     logf(LOG_DEBUG,"isamb_pp_close lev=%d returned %d values, skipped %d",
983         pp->maxlevel, pp->skipped_numbers, pp->returned_numbers);
984     for (i=pp->maxlevel;i>=0;i--)
985         if ( pp->skipped_nodes[i] || pp->accessed_nodes[i])
986             logf(LOG_DEBUG,"isamb_pp_close  level leaf-%d: %d read, %d skipped", i,
987                  pp->accessed_nodes[i], pp->skipped_nodes[i]);
988     pp->isamb->skipped_numbers += pp->skipped_numbers;
989     pp->isamb->returned_numbers += pp->returned_numbers;
990     for (i=pp->maxlevel;i>=0;i--)
991     {
992         pp->isamb->accessed_nodes[i] += pp->accessed_nodes[i];
993         pp->isamb->skipped_nodes[i] += pp->skipped_nodes[i];
994     }
995     if (size)
996         *size = pp->total_size;
997     if (blocks)
998         *blocks = pp->no_blocks;
999     for (i = 0; i <= pp->level; i++)
1000         close_block (pp->isamb, pp->block[i]);
1001     xfree (pp->block);
1002     xfree (pp);
1003 }
1004
1005 int isamb_block_info (ISAMB isamb, int cat)
1006 {
1007     if (cat >= 0 && cat < isamb->no_cat)
1008         return isamb->file[cat].head.block_size;
1009     return -1;
1010 }
1011
1012 void isamb_pp_close (ISAMB_PP pp)
1013 {
1014     isamb_pp_close_x (pp, 0, 0);
1015 }
1016
1017 /* simple recursive dumper .. */
1018 static void isamb_dump_r (ISAMB b, ISAMB_P pos, void (*pr)(const char *str),
1019                           int level)
1020 {
1021     char buf[1024];
1022     char prefix_str[1024];
1023     if (pos)
1024     {
1025         struct ISAMB_block *p = open_block (b, pos);
1026         sprintf(prefix_str, "%*s %d cat=%d size=%d max=%d", level*2, "",
1027                 pos, p->cat, p->size, b->file[p->cat].head.block_max);
1028         (*pr)(prefix_str);
1029         sprintf(prefix_str, "%*s %d", level*2, "", pos);
1030         if (p->leaf)
1031         {
1032             while (p->offset < p->size)
1033             {
1034                 char *src = p->bytes + p->offset;
1035                 char *dst = buf;
1036                 (*b->method->code_item)(ISAMC_DECODE, p->decodeClientData,
1037                                         &dst, &src);
1038                 (*b->method->log_item)(LOG_DEBUG, buf, prefix_str);
1039                 p->offset = src - (char*) p->bytes;
1040             }
1041             assert(p->offset == p->size);
1042         }
1043         else
1044         {
1045             char *src = p->bytes + p->offset;
1046             int sub;
1047             int item_len;
1048
1049             decode_ptr (&src, &sub);
1050             p->offset = src - (char*) p->bytes;
1051
1052             isamb_dump_r(b, sub, pr, level+1);
1053             
1054             while (p->offset < p->size)
1055             {
1056                 decode_ptr (&src, &item_len);
1057                 (*b->method->log_item)(LOG_DEBUG, src, prefix_str);
1058                 src += item_len;
1059                 decode_ptr (&src, &sub);
1060                 
1061                 p->offset = src - (char*) p->bytes;
1062                 
1063                 isamb_dump_r(b, sub, pr, level+1);
1064             }           
1065         }
1066         close_block(b,p);
1067     }
1068 }
1069
1070 void isamb_dump (ISAMB b, ISAMB_P pos, void (*pr)(const char *str))
1071 {
1072     isamb_dump_r(b, pos, pr, 0);
1073 }
1074
1075 #if 0
1076 /* Old isamb_pp_read that Adam wrote, kept as a reference in case we need to
1077    debug the more complex pp_read that also forwards. May be deleted near end
1078    of 2004, if it has not shown to be useful */
1079
1080
1081 int isamb_pp_read (ISAMB_PP pp, void *buf)
1082 {
1083     char *dst = buf;
1084     char *src;
1085     struct ISAMB_block *p = pp->block[pp->level];
1086     if (!p)
1087         return 0;
1088
1089     while (p->offset == p->size)
1090     {
1091         int pos, item_len;
1092         while (p->offset == p->size)
1093         {
1094             if (pp->level == 0)
1095                 return 0;
1096             close_block (pp->isamb, pp->block[pp->level]);
1097             pp->block[pp->level] = 0;
1098             (pp->level)--;
1099             p = pp->block[pp->level];
1100             assert (!p->leaf);  
1101         }
1102         src = p->bytes + p->offset;
1103         
1104         decode_ptr (&src, &item_len);
1105         src += item_len;
1106         decode_ptr (&src, &pos);
1107         
1108         p->offset = src - (char*) p->bytes;
1109
1110         ++(pp->level);
1111         
1112         while (1)
1113         {
1114             pp->block[pp->level] = p = open_block (pp->isamb, pos);
1115
1116             pp->total_size += p->size;
1117             pp->no_blocks++;
1118             
1119             if (p->leaf) 
1120             {
1121                 break;
1122             }
1123             src = p->bytes + p->offset;
1124             decode_ptr (&src, &pos);
1125             p->offset = src - (char*) p->bytes;
1126             pp->level++;
1127         }
1128     }
1129     assert (p->offset < p->size);
1130     assert (p->leaf);
1131     src = p->bytes + p->offset;
1132     (*pp->isamb->method->code_item)(ISAMC_DECODE, p->decodeClientData,
1133                                     &dst, &src);
1134     p->offset = src - (char*) p->bytes;
1135     /* key_logdump_txt(LOG_DEBUG,buf, "isamb_pp_read returning 1"); */
1136     return 1;
1137 }
1138
1139 #else
1140 int isamb_pp_read (ISAMB_PP pp, void *buf)
1141 {
1142     return isamb_pp_forward(pp,buf,0);
1143 }
1144 #endif
1145
1146 #define NEW_FORWARD 1
1147
1148 #if NEW_FORWARD == 1
1149
1150 /*
1151 #undef ISAMB_DEBUB
1152 #define ISAMB_DEBUG 1
1153 */
1154 static int isamb_pp_on_right_node(ISAMB_PP pp, int level, const void *untilbuf)
1155 { /* looks one node higher to see if we should be on this node at all */
1156   /* useful in backing off quickly, and in avoiding tail descends */
1157   /* call with pp->level to begin with */
1158     struct ISAMB_block *p;
1159     int cmp;
1160     char *src;
1161     int item_len;
1162     assert(level>=0);
1163     if ( level == 0) {
1164 #if ISAMB_DEBUG
1165             logf(LOG_DEBUG,"isamb_pp_on_right returning true for root");
1166 #endif
1167         return 1; /* we can never skip the root node */
1168     }
1169     level--;
1170     p=pp->block[level];
1171     assert(p->offset <= p->size);
1172     if (p->offset < p->size )
1173     {
1174         assert(p->offset>0); 
1175         src=p->bytes + p->offset;
1176         decode_ptr(&src,&item_len);
1177 #if ISAMB_DEBUG
1178         (*pp->isamb->method->log_item)(LOG_DEBUG,untilbuf,"on_leaf: until");
1179         (*pp->isamb->method->log_item)(LOG_DEBUG,src,"on_leaf: value");
1180 #endif
1181         cmp=(*pp->isamb->method->compare_item)(untilbuf,src);
1182         if (cmp<2) {
1183 #if ISAMB_DEBUG
1184             logf(LOG_DEBUG,"isamb_pp_on_right returning true "
1185                             "cmp=%d lev=%d ofs=%d",cmp,level,p->offset);
1186 #endif
1187             return 1; 
1188         }
1189         else {
1190 #if ISAMB_DEBUG
1191             logf(LOG_DEBUG,"isamb_pp_on_right returning false "
1192                             "cmp=%d lev=%d ofs=%d",cmp,level,p->offset);
1193 #endif
1194             return 0; 
1195         }
1196     }
1197     else {
1198 #if ISAMB_DEBUG
1199         logf(LOG_DEBUG,"isamb_pp_on_right at tail, looking higher "
1200                         "lev=%d",level);
1201 #endif
1202         return isamb_pp_on_right_node(pp, level, untilbuf);
1203     }
1204 } /* isamb_pp_on_right_node */
1205
1206 static int isamb_pp_read_on_leaf(ISAMB_PP pp, void *buf)
1207 { /* reads the next item on the current leaf, returns 0 if end of leaf*/
1208     struct ISAMB_block *p = pp->block[pp->level];
1209     char *dst;
1210     char *src;
1211     assert(pp);
1212     assert(buf);
1213     if (!p || p->offset == p->size) {
1214 #if ISAMB_DEBUG
1215         logf(LOG_DEBUG,"isamb_pp_read_on_leaf returning 0 on node %d",p->pos);
1216 #endif
1217         return 0; /* at end of leaf */
1218     }
1219     src=p->bytes + p->offset;
1220     dst=buf;
1221     (*pp->isamb->method->code_item)
1222            (ISAMC_DECODE, p->decodeClientData,&dst, &src);
1223     p->offset = src - (char*) p->bytes;
1224     /*
1225 #if ISAMB_DEBUG
1226     (*pp->isamb->method->log_item)(LOG_DEBUG, buf, "read_on_leaf returning 1");
1227 #endif
1228 */
1229     return 1;
1230 } /* read_on_leaf */
1231
1232 static int isamb_pp_forward_on_leaf(ISAMB_PP pp, void *buf, const void *untilbuf)
1233 { /* forwards on the current leaf, returns 0 if not found */
1234     int cmp;
1235     int skips=0;
1236     while (1){
1237         if (!isamb_pp_read_on_leaf(pp,buf))
1238             return 0;
1239         /* FIXME - this is an extra function call, inline the read? */
1240         cmp=(*pp->isamb->method->compare_item)(untilbuf,buf);
1241         if (cmp <2){  /* found a good one */
1242 #if ISAMB_DEBUG
1243             if (skips)
1244                 logf(LOG_DEBUG, "isam_pp_fwd_on_leaf skipped %d items",skips);
1245 #endif
1246             pp->returned_numbers++;
1247             return 1;
1248         }
1249         if (!skips)
1250             if (!isamb_pp_on_right_node(pp, pp->level, untilbuf))
1251                 return 0; /* never mind the rest of this leaf */
1252         pp->skipped_numbers++;
1253         skips++;
1254     }
1255 } /* forward_on_leaf */
1256
1257 static int isamb_pp_climb_level(ISAMB_PP pp, int *pos)
1258 { /* climbs higher in the tree, until finds a level with data left */
1259   /* returns the node to (consider to) descend to in *pos) */
1260     struct ISAMB_block *p = pp->block[pp->level];
1261     char *src;
1262     int item_len;
1263 #if ISAMB_DEBUG
1264     logf(LOG_DEBUG,"isamb_pp_climb_level starting "
1265                    "at level %d node %d ofs=%d sz=%d",
1266                     pp->level, p->pos, p->offset, p->size);
1267 #endif
1268     assert(pp->level >= 0);
1269     if (!p)
1270         return 0;
1271     assert(p->offset <= p->size);
1272     if (pp->level==0)
1273     {
1274 #if ISAMB_DEBUG
1275         logf(LOG_DEBUG,"isamb_pp_climb_level returning 0 at root");
1276 #endif
1277         return 0;
1278     }
1279     assert(pp->level>0); 
1280     close_block(pp->isamb, pp->block[pp->level]);
1281     pp->block[pp->level]=0;
1282     (pp->level)--;
1283     p=pp->block[pp->level];
1284 #if ISAMB_DEBUG
1285     logf(LOG_DEBUG,"isamb_pp_climb_level climbed to level %d node %d ofs=%d",
1286                     pp->level, p->pos, p->offset);
1287 #endif
1288     assert(!p->leaf);
1289     assert(p->offset <= p->size);
1290     if (p->offset == p->size ) {
1291         /* we came from the last pointer, climb on */
1292         if (!isamb_pp_climb_level(pp,pos))
1293             return 0;
1294         p=pp->block[pp->level];
1295     }
1296     else
1297     {
1298         /* skip the child we just came from */
1299 #if ISAMB_DEBUG
1300         logf(LOG_DEBUG,"isam_pp_climb_level: skipping lev=%d ofs=%d sz=%d", 
1301                         pp->level, p->offset, p->size);
1302 #endif
1303         assert (p->offset < p->size );
1304         src=p->bytes + p->offset;
1305         decode_ptr(&src, &item_len);
1306         src += item_len;
1307         decode_ptr(&src, pos);
1308         p->offset=src - (char *)p->bytes;
1309             
1310     }
1311     return 1;
1312 } /* climb_level */
1313
1314
1315 static int isamb_pp_forward_unode(ISAMB_PP pp, int pos, const void *untilbuf)
1316 { /* scans a upper node until it finds a child <= untilbuf */
1317   /* pp points to the key value, as always. pos is the child read from */
1318   /* the buffer */
1319   /* if all values are too small, returns the last child in the node */
1320   /* FIXME - this can be detected, and avoided by looking at the */
1321   /* parent node, but that gets messy. Presumably the cost is */
1322   /* pretty low anyway */
1323     struct ISAMB_block *p = pp->block[pp->level];
1324     char *src = p->bytes + p->offset;
1325     int item_len;
1326     int cmp;
1327     int nxtpos;
1328 #if ISAMB_DEBUG
1329     int skips = 0;
1330     if (p)
1331         logf(LOG_DEBUG,"isamb_pp_forward_unode starting "
1332              "at level %d node %d ofs=%di sz=%d",
1333              pp->level, p->pos, p->offset, p->size);
1334 #endif
1335     assert(!p->leaf);
1336     assert(p->offset <= p->size);
1337     if (p->offset == p->size) {
1338 #if ISAMB_DEBUG
1339         logf(LOG_DEBUG,"isamb_pp_forward_unode returning at end "
1340              "at level %d node %d ofs=%di sz=%d",
1341              pp->level, p->pos, p->offset, p->size);
1342 #endif
1343         return pos; /* already at the end of it */
1344     }
1345     while(p->offset < p->size)
1346     {
1347         decode_ptr(&src,&item_len);
1348         cmp=(*pp->isamb->method->compare_item)(untilbuf,src);
1349         src+=item_len;
1350         decode_ptr(&src,&nxtpos);
1351         if (cmp<2)
1352         {
1353 #if ISAMB_DEBUG
1354             logf(LOG_DEBUG,"isamb_pp_forward_unode returning a hit "
1355                  "at level %d node %d ofs=%d sz=%d",
1356                  pp->level, p->pos, p->offset, p->size);
1357 #endif
1358             return pos;
1359         } /* found one */
1360         pos=nxtpos;
1361         p->offset=src-(char*)p->bytes;
1362         (pp->skipped_nodes[pp->maxlevel - pp->level -1])++;
1363 #if ISAMB_DEBUG
1364         skips++;
1365 #endif
1366     }
1367 #if ISAMB_DEBUG
1368             logf(LOG_DEBUG,"isamb_pp_forward_unode returning at tail "
1369                    "at level %d node %d ofs=%d sz=%d skips=%d",
1370                     pp->level, p->pos, p->offset, p->size, skips);
1371 #endif
1372     return pos; /* that's the last one in the line */
1373     
1374 } /* forward_unode */
1375
1376 static void isamb_pp_descend_to_leaf(ISAMB_PP pp, int pos, const void *untilbuf)
1377 { /* climbs down the tree, from pos, to the leftmost leaf */
1378     struct ISAMB_block *p = pp->block[pp->level];
1379     char *src;
1380     assert(!p->leaf);
1381 #if ISAMB_DEBUG
1382     logf(LOG_DEBUG,"isamb_pp_descend_to_leaf "
1383                    "starting at lev %d node %d ofs=%d lf=%d u=%p", 
1384                    pp->level, p->pos, p->offset, p->leaf, untilbuf);
1385 #endif
1386     if (untilbuf)
1387         pos=isamb_pp_forward_unode(pp,pos,untilbuf);
1388     ++(pp->level);
1389     assert(pos);
1390     p = open_block(pp->isamb, pos);
1391     pp->block[pp->level] = p;
1392     ++(pp->accessed_nodes[pp->maxlevel-pp->level]);
1393     ++(pp->no_blocks);
1394 #if ISAMB_DEBUG
1395     logf(LOG_DEBUG,"isamb_pp_descend_to_leaf "
1396                    "got lev %d node %d lf=%d", 
1397                    pp->level, p->pos, p->leaf);
1398 #endif
1399     if (p->leaf)
1400         return;
1401     assert (p->offset==0);
1402     src = p->bytes + p->offset;
1403     decode_ptr(&src, &pos);
1404     p->offset = src-(char*)p->bytes;
1405     isamb_pp_descend_to_leaf(pp,pos,untilbuf);
1406 #if ISAMB_DEBUG
1407     logf(LOG_DEBUG,"isamb_pp_descend_to_leaf "
1408                    "returning at lev %d node %d ofs=%d lf=%d", 
1409                    pp->level, p->pos, p->offset, p->leaf);
1410 #endif
1411 } /* descend_to_leaf */
1412
1413 static int isamb_pp_find_next_leaf(ISAMB_PP pp)
1414 { /* finds the next leaf by climbing up and down */
1415     int pos;
1416     if (!isamb_pp_climb_level(pp,&pos))
1417         return 0;
1418     isamb_pp_descend_to_leaf(pp, pos,0);
1419     return 1;
1420 }
1421
1422 static int isamb_pp_climb_desc(ISAMB_PP pp, void *buf, const void *untilbuf)
1423 { /* climbs up and descends to a leaf where values >= *untilbuf are found */
1424     int pos;
1425 #if ISAMB_DEBUG
1426     struct ISAMB_block *p = pp->block[pp->level];
1427     logf(LOG_DEBUG,"isamb_pp_climb_desc starting "
1428                    "at level %d node %d ofs=%d sz=%d",
1429                     pp->level, p->pos, p->offset, p->size);
1430 #endif
1431     if (!isamb_pp_climb_level(pp,&pos))
1432         return 0;
1433     /* see if it would pay to climb one higher */
1434     if (!isamb_pp_on_right_node(pp, pp->level, untilbuf))
1435         if (!isamb_pp_climb_level(pp,&pos))
1436             return 0;
1437     isamb_pp_descend_to_leaf(pp, pos,untilbuf);
1438 #if ISAMB_DEBUG
1439     p = pp->block[pp->level];
1440     logf(LOG_DEBUG,"isamb_pp_climb_desc done "
1441                    "at level %d node %d ofs=%d sz=%d",
1442                     pp->level, p->pos, p->offset, p->size);
1443 #endif
1444     return 1;
1445 } /* climb_desc */
1446
1447 int isamb_pp_forward (ISAMB_PP pp, void *buf, const void *untilbuf)
1448 {
1449 #if ISAMB_DEBUG
1450     struct ISAMB_block *p = pp->block[pp->level];
1451     if (p)
1452     {
1453         assert(p->leaf);
1454         logf(LOG_DEBUG,"isamb_pp_forward starting "
1455              "at level %d node %d ofs=%d sz=%d u=%p",
1456              pp->level, p->pos, p->offset, p->size,untilbuf);
1457     }
1458 #endif
1459     if (untilbuf) {
1460         if (isamb_pp_forward_on_leaf( pp, buf, untilbuf)) {
1461 #if ISAMB_DEBUG
1462             if (p)
1463                 logf(LOG_DEBUG,"isamb_pp_forward (f) returning (A) "
1464                      "at level %d node %d ofs=%d sz=%d",
1465                     pp->level, p->pos, p->offset, p->size);
1466 #endif
1467             return 1;
1468         }
1469         if (! isamb_pp_climb_desc( pp, buf, untilbuf)) {
1470 #if ISAMB_DEBUG
1471             if (p)
1472                 logf(LOG_DEBUG,"isamb_pp_forward (f) returning notfound (B) "
1473                      "at level %d node %d ofs=%d sz=%d",
1474                      pp->level, p->pos, p->offset, p->size);
1475 #endif
1476             return 0; /* could not find a leaf */
1477         }
1478         do {
1479             if (isamb_pp_forward_on_leaf( pp, buf, untilbuf)) {
1480 #if ISAMB_DEBUG
1481                 if(p)
1482                     logf(LOG_DEBUG,"isamb_pp_forward (f) returning (C) "
1483                          "at level %d node %d ofs=%d sz=%d",
1484                          pp->level, p->pos, p->offset, p->size);
1485 #endif
1486                 return 1;
1487             }
1488         } while (isamb_pp_find_next_leaf(pp));
1489         return 0; /* could not find at all */
1490     }
1491     else { /* no untilbuf, a straight read */
1492         /* FIXME - this should be moved
1493          * directly into the pp_read */
1494         /* keeping here now, to keep same
1495          * interface as the old fwd */
1496         if (isamb_pp_read_on_leaf( pp, buf)) {
1497 #if ISAMB_DEBUG
1498             if (p)
1499                 logf(LOG_DEBUG,"isamb_pp_forward (read) returning (D) "
1500                      "at level %d node %d ofs=%d sz=%d",
1501                      pp->level, p->pos, p->offset, p->size);
1502 #endif
1503             return 1;
1504         }
1505         if (isamb_pp_find_next_leaf(pp)) {
1506 #if ISAMB_DEBUG
1507             if (p)
1508                 logf(LOG_DEBUG,"isamb_pp_forward (read) returning (E) "
1509                      "at level %d node %d ofs=%d sz=%d",
1510                     pp->level, p->pos, p->offset, p->size);
1511 #endif
1512             return isamb_pp_read_on_leaf(pp, buf);
1513         }
1514         else
1515             return 0;
1516     }
1517 } /* isam_pp_forward (new version) */
1518
1519 #elif NEW_FORWARD == 0
1520
1521 int isamb_pp_forward (ISAMB_PP pp, void *buf, const void *untilbuf)
1522 {
1523     /* pseudocode:
1524      *   while 1
1525      *     while at end of node
1526      *       climb higher. If out, return 0
1527      *     while not on a leaf (and not at its end)
1528      *       decode next
1529      *       if cmp
1530      *         descend to node
1531      *     decode next
1532      *     if cmp
1533      *       return 1
1534      */
1535      /* 
1536       * The upper nodes consist of a sequence of nodenumbers and keys
1537       * When opening a block,  the first node number is read in, and
1538       * offset points to the first key, which is the upper limit of keys
1539       * in the node just read.
1540       */
1541     char *dst = buf;
1542     char *src;
1543     struct ISAMB_block *p = pp->block[pp->level];
1544     int cmp;
1545     int item_len;
1546     int pos;
1547     int nxtpos;
1548     int descending=0; /* used to prevent a border condition error */
1549     if (!p)
1550         return 0;
1551 #if ISAMB_DEBUG
1552     logf(LOG_DEBUG,"isamb_pp_forward starting [%p] p=%d",pp,p->pos);
1553     
1554     (*pp->isamb->method->log_item)(LOG_DEBUG, untilbuf, "until");
1555     (*pp->isamb->method->log_item)(LOG_DEBUG, buf, "buf");
1556 #endif
1557
1558     while (1)
1559     {
1560         while ( (p->offset == p->size) && !descending )
1561         {  /* end of this block - climb higher */
1562             assert (p->offset <= p->size);
1563 #if ISAMB_DEBUG
1564             logf(LOG_DEBUG,"isamb_pp_forward climbing from l=%d",
1565                             pp->level);
1566 #endif
1567             if (pp->level == 0)
1568             {
1569 #if ISAMB_DEBUG
1570                 logf(LOG_DEBUG,"isamb_pp_forward returning 0 at root");
1571 #endif
1572                 return 0; /* at end of the root, nothing left */
1573             }
1574             close_block(pp->isamb, pp->block[pp->level]);
1575             pp->block[pp->level]=0;
1576             (pp->level)--;
1577             p=pp->block[pp->level];
1578 #if ISAMB_DEBUG
1579             logf(LOG_DEBUG,"isamb_pp_forward climbed to node %d off=%d",
1580                             p->pos, p->offset);
1581 #endif
1582             assert(!p->leaf);
1583             assert(p->offset <= p->size);
1584             /* skip the child we have handled */
1585             if (p->offset != p->size)
1586             { 
1587                 src = p->bytes + p->offset;
1588                 decode_ptr(&src, &item_len);
1589 #if ISAMB_DEBUG         
1590                 (*pp->isamb->method->log_item)(LOG_DEBUG, src,
1591                                                " isamb_pp_forward "
1592                                                "climb skipping old key");
1593 #endif
1594                 src += item_len;
1595                 decode_ptr(&src,&pos);
1596                 p->offset = src - (char*) p->bytes;
1597                 break; /* even if this puts us at the end of the block, we
1598                           need to descend to the last pos. UGLY coding,
1599                           clean up some day */
1600             }
1601         }
1602         if (!p->leaf)
1603         { 
1604             src = p->bytes + p->offset;
1605             if (p->offset == p->size)
1606                 cmp=-2 ; /* descend to the last node, as we have
1607                             no value to cmp */
1608             else
1609             {
1610                 decode_ptr(&src, &item_len);
1611 #if ISAMB_DEBUG
1612                 logf(LOG_DEBUG,"isamb_pp_forward (B) on a high node. "
1613                      "ofs=%d sz=%d nxtpos=%d ",
1614                         p->offset,p->size,pos);
1615                 (*pp->isamb->method->log_item)(LOG_DEBUG, src, "");
1616 #endif
1617                 if (untilbuf)
1618                     cmp=(*pp->isamb->method->compare_item)(untilbuf,src);
1619                 else
1620                     cmp=-2;
1621                 src += item_len;
1622                 decode_ptr(&src,&nxtpos);
1623             }
1624             if (cmp<2)
1625             { 
1626 #if ISAMB_DEBUG
1627                 logf(LOG_DEBUG,"isambb_pp_forward descending l=%d p=%d ",
1628                             pp->level, pos);
1629 #endif
1630                 descending=1; /* prevent climbing for a while */
1631                 ++(pp->level);
1632                 p = open_block(pp->isamb,pos);
1633                 pp->block[pp->level] = p ;
1634                 pp->total_size += p->size;
1635                 (pp->accessed_nodes[pp->maxlevel - pp->level])++;
1636                 pp->no_blocks++;
1637                 if ( !p->leaf)
1638                 { /* block starts with a pos */
1639                     src = p->bytes + p->offset;
1640                     decode_ptr(&src,&pos);
1641                     p->offset=src-(char*) p->bytes;
1642 #if ISAMB_DEBUG
1643                     logf(LOG_DEBUG,"isamb_pp_forward: block %d starts with %d",
1644                                     p->pos, pos);
1645 #endif
1646                 } 
1647             } /* descend to the node */
1648             else
1649             { /* skip the node */
1650                 p->offset = src - (char*) p->bytes;
1651                 pos=nxtpos;
1652                 (pp->skipped_nodes[pp->maxlevel - pp->level -1])++;
1653 #if ISAMB_DEBUG
1654                 logf(LOG_DEBUG,
1655                     "isamb_pp_forward: skipping block on level %d, noting "
1656                      "on %d (%d)",
1657                     pp->level, pp->maxlevel - pp->level-1 , 
1658                     pp->skipped_nodes[pp->maxlevel - pp->level-1 ]);
1659 #endif
1660                 /* 0 is always leafs, 1 is one level above leafs etc, no
1661                  * matter how high tree */
1662             }
1663         } /* not on a leaf */
1664         else
1665         { /* on a leaf */
1666             if (p->offset == p->size) { 
1667                 descending = 0;
1668             }
1669             else
1670             {
1671                 assert (p->offset < p->size);
1672                 src = p->bytes + p->offset;
1673                 dst=buf;
1674                 (*pp->isamb->method->code_item)(ISAMC_DECODE, p->decodeClientData,
1675                                                 &dst, &src);
1676                 p->offset = src - (char*) p->bytes;
1677                 if (untilbuf)
1678                     cmp=(*pp->isamb->method->compare_item)(untilbuf,buf);
1679                 else
1680                     cmp=-2;
1681 #if ISAMB_DEBUG
1682                 logf(LOG_DEBUG,"isamb_pp_forward on a leaf. cmp=%d", 
1683                      cmp);
1684                 (*pp->isamb->method->log_item)(LOG_DEBUG, buf, "");
1685 #endif
1686                 if (cmp <2)
1687                 {
1688 #if ISAMB_DEBUG
1689                     if (untilbuf)
1690                     {
1691                         (*pp->isamb->method->log_item)(
1692                             LOG_DEBUG, buf,  "isamb_pp_forward returning 1");
1693                     }
1694                     else
1695                     {
1696                         (*pp->isamb->method->log_item)(
1697                             LOG_DEBUG, buf, "isamb_pp_read returning 1 (fwd)");
1698                     }
1699 #endif
1700                     pp->returned_numbers++;
1701                     return 1;
1702                 }
1703                 else
1704                     pp->skipped_numbers++;
1705             }
1706         } /* leaf */
1707     } /* main loop */
1708 }
1709
1710 #elif NEW_FORWARD == 2
1711
1712 int isamb_pp_forward (ISAMB_PP pp, void *buf, const void *untilb)
1713 {
1714     char *dst = buf;
1715     char *src;
1716     struct ISAMB_block *p = pp->block[pp->level];
1717     if (!p)
1718         return 0;
1719
1720 again:
1721     while (p->offset == p->size)
1722     {
1723         int pos, item_len;
1724         while (p->offset == p->size)
1725         {
1726             if (pp->level == 0)
1727                 return 0;
1728             close_block (pp->isamb, pp->block[pp->level]);
1729             pp->block[pp->level] = 0;
1730             (pp->level)--;
1731             p = pp->block[pp->level];
1732             assert (!p->leaf);  
1733         }
1734
1735         assert(!p->leaf);
1736         src = p->bytes + p->offset;
1737         
1738         decode_ptr (&src, &item_len);
1739         src += item_len;
1740         decode_ptr (&src, &pos);
1741         
1742         p->offset = src - (char*) p->bytes;
1743
1744         src = p->bytes + p->offset;
1745
1746         while(1)
1747         {
1748             if (!untilb || p->offset == p->size)
1749                 break;
1750             assert(p->offset < p->size);
1751             decode_ptr (&src, &item_len);
1752             if ((*pp->isamb->method->compare_item)(untilb, src) <= 1)
1753                 break;
1754             src += item_len;
1755             decode_ptr (&src, &pos);
1756             p->offset = src - (char*) p->bytes;
1757         }
1758
1759         pp->level++;
1760
1761         while (1)
1762         {
1763             pp->block[pp->level] = p = open_block (pp->isamb, pos);
1764
1765             pp->total_size += p->size;
1766             pp->no_blocks++;
1767             
1768             if (p->leaf) 
1769             {
1770                 break;
1771             }
1772             
1773             src = p->bytes + p->offset;
1774             while(1)
1775             {
1776                 decode_ptr (&src, &pos);
1777                 p->offset = src - (char*) p->bytes;
1778                 
1779                 if (!untilb || p->offset == p->size)
1780                     break;
1781                 assert(p->offset < p->size);
1782                 decode_ptr (&src, &item_len);
1783                 if ((*pp->isamb->method->compare_item)(untilb, src) <= 1)
1784                     break;
1785                 src += item_len;
1786             }
1787             pp->level++;
1788         }
1789     }
1790     assert (p->offset < p->size);
1791     assert (p->leaf);
1792     while(1)
1793     {
1794         char *dst0 = dst;
1795         src = p->bytes + p->offset;
1796         (*pp->isamb->method->code_item)(ISAMC_DECODE, p->decodeClientData,
1797                                     &dst, &src);
1798         p->offset = src - (char*) p->bytes;
1799         if (!untilb || (*pp->isamb->method->compare_item)(untilb, dst0) <= 1)
1800             break;
1801         dst = dst0;
1802         if (p->offset == p->size) goto again;
1803     }
1804     /* key_logdump_txt(LOG_DEBUG,buf, "isamb_pp_read returning 1"); */
1805     return 1;
1806 }
1807
1808 #endif
1809
1810 int isamb_pp_num (ISAMB_PP pp)
1811 {
1812     return 1;
1813 }
1814
1815 static void isamb_pp_leaf_pos( ISAMB_PP pp, 
1816                                int *current, int *total, void *dummybuf )
1817 {
1818     struct ISAMB_block *p = pp->block[pp->level];
1819     char *src=p->bytes;
1820     char *end=p->bytes+p->size;
1821     char *cur=p->bytes+p->offset;
1822     char *dst;
1823     assert(p->offset <= p->size);
1824     assert(cur <= end);
1825     assert(p->leaf);
1826     *current=0;
1827     *total=0;
1828
1829     while(src < end) {
1830         dst=dummybuf;
1831         (*pp->isamb->method->code_item)
1832            (ISAMC_DECODE, p->decodeClientData,&dst, &src);
1833         assert(dst<(char*) dummybuf+100); /*FIXME */
1834         (*total)++;
1835         if (src<=cur)
1836              (*current)++;
1837     }
1838     logf(LOG_DEBUG, "isamb_pp_leaf_pos: cur=%d tot=%d ofs=%d sz=%d lev=%d",
1839                     *current, *total, p->offset, p->size, pp->level);
1840     assert(src==end);
1841 }
1842
1843 static void isamb_pp_upper_pos( ISAMB_PP pp, int *current, int *total, 
1844                                 int size, int level )
1845 { /* estimates total/current occurrences from here up, excl leaf */
1846     struct ISAMB_block *p = pp->block[level];
1847     char *src=p->bytes;
1848     char *end=p->bytes+p->size;
1849     char *cur=p->bytes+p->offset;
1850     int item_size;
1851     int child;
1852     assert(level>=0);
1853     assert(!p->leaf);
1854     logf(LOG_DEBUG,"isamb_pp_upper_pos at beginning     l=%d "
1855                    "cur=%d tot=%d ofs=%d sz=%d pos=%d", 
1856                    level, *current, *total, p->offset, p->size, p->pos);
1857     assert (p->offset <= p->size);
1858         decode_ptr (&src, &child ); /* first child */
1859     while(src < end) {
1860         if (src!=cur) {
1861             *total += size;
1862             if (src < cur)
1863                 *current +=size;
1864         }
1865             decode_ptr (&src, &item_size ); 
1866         assert(src+item_size<=end);
1867         src += item_size;
1868             decode_ptr (&src, &child );
1869     }
1870     if (level>0)
1871         isamb_pp_upper_pos(pp, current, total, *total, level-1);
1872 } /* upper_pos */
1873
1874 void isamb_pp_pos( ISAMB_PP pp, int *current, int *total )
1875 { /* return an estimate of the current position and of the total number of */
1876   /* occureences in the isam tree, based on the current leaf */
1877     struct ISAMB_block *p = pp->block[pp->level];
1878     char dummy[100]; /* 100 bytes/entry must be enough */
1879     assert(total);
1880     assert(current);
1881     assert(p->leaf);
1882     isamb_pp_leaf_pos(pp,current, total, dummy);
1883     if (pp->level>0)
1884         isamb_pp_upper_pos(pp, current, total, *total, pp->level-1);
1885     /*
1886     logf(LOG_DEBUG,"isamb_pp_pos: C=%d T=%d =%6.2f%%",
1887                     *current, *total, 100.0*(*current)/(*total));
1888     */
1889 }