Added several type casts and changed many types - mostly from int to zint.
[idzebra-moved-to-github.git] / isamb / isamb.c
1 /* $Id: isamb.c,v 1.52 2004-08-06 12:28:23 adam Exp $
2    Copyright (C) 1995,1996,1997,1998,1999,2000,2001,2002,2003,2004
3    Index Data Aps
4
5 This file is part of the Zebra server.
6
7 Zebra is free software; you can redistribute it and/or modify it under
8 the terms of the GNU General Public License as published by the Free
9 Software Foundation; either version 2, or (at your option) any later
10 version.
11
12 Zebra is distributed in the hope that it will be useful, but WITHOUT ANY
13 WARRANTY; without even the implied warranty of MERCHANTABILITY or
14 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
15 for more details.
16
17 You should have received a copy of the GNU General Public License
18 along with Zebra; see the file LICENSE.zebra.  If not, write to the
19 Free Software Foundation, 59 Temple Place - Suite 330, Boston, MA
20 02111-1307, USA.
21 */
22
23 #include <string.h>
24 #include <yaz/xmalloc.h>
25 #include <yaz/log.h>
26 #include <isamb.h>
27 #include <assert.h>
28
29 #ifndef ISAMB_DEBUG
30 #define ISAMB_DEBUG 0
31 #endif
32
33 struct ISAMB_head {
34     zint first_block;
35     zint last_block;
36     int block_size;
37     int block_max;
38     zint free_list;
39 };
40
41 #define ISAMB_DATA_OFFSET 3
42
43 /* maximum size of encoded buffer */
44 #define DST_ITEM_MAX 256
45
46 #define ISAMB_MAX_LEVEL 10
47 /* approx 2*max page + max size of item */
48 #define DST_BUF_SIZE 16840
49
50 #define ISAMB_CACHE_ENTRY_SIZE 4096
51
52 /* CAT_MAX: _must_ be power of 2 */
53 #define CAT_MAX 4
54 #define CAT_MASK (CAT_MAX-1)
55 /* CAT_NO: <= CAT_MAX */
56 #define CAT_NO 4
57
58 /* ISAMB_PTR_CODEC=1 var, =0 fixed */
59 #define ISAMB_PTR_CODEC 0
60
61 struct ISAMB_cache_entry {
62     ISAMB_P pos;
63     unsigned char *buf;
64     int dirty;
65     int hits;
66     struct ISAMB_cache_entry *next;
67 };
68
69 struct ISAMB_file {
70     BFile bf;
71     int head_dirty;
72     struct ISAMB_head head;
73     struct ISAMB_cache_entry *cache_entries;
74 };
75
76 struct ISAMB_s {
77     BFiles bfs;
78     ISAMC_M *method;
79
80     struct ISAMB_file *file;
81     int no_cat;
82     int cache; /* 0=no cache, 1=use cache, -1=dummy isam (for testing only) */
83     int log_io;        /* log level for bf_read/bf_write calls */
84     int log_freelist;  /* log level for freelist handling */
85     int skipped_numbers; /* on a leaf node */
86     int returned_numbers; 
87     int skipped_nodes[ISAMB_MAX_LEVEL]; /* [0]=skipped leaves, 1=higher etc */
88     int accessed_nodes[ISAMB_MAX_LEVEL]; /* nodes we did not skip */
89 };
90
91 struct ISAMB_block {
92     ISAMB_P pos;
93     int cat;
94     int size;
95     int leaf;
96     int dirty;
97     int deleted;
98     int offset;
99     char *bytes;
100     char *cbuf;
101     unsigned char *buf;
102     void *decodeClientData;
103     int log_rw;
104 };
105
106 struct ISAMB_PP_s {
107     ISAMB isamb;
108     ISAMB_P pos;
109     int level;
110     int maxlevel; /* total depth */
111     int total_size;
112     int no_blocks;
113     int skipped_numbers; /* on a leaf node */
114     int returned_numbers; 
115     int skipped_nodes[ISAMB_MAX_LEVEL]; /* [0]=skipped leaves, 1=higher etc */
116     int accessed_nodes[ISAMB_MAX_LEVEL]; /* nodes we did not skip */
117     struct ISAMB_block **block;
118 };
119
120
121 #if ISAMB_PTR_CODEC
122 static void encode_ptr (char **dst, zint pos)
123 {
124     unsigned char *bp = (unsigned char*) *dst;
125
126     while (pos > 127)
127     {
128          *bp++ = 128 | (pos & 127);
129          pos = pos >> 7;
130     }
131     *bp++ = pos;
132     *dst = (char *) bp;
133 }
134 #else
135 static void encode_ptr (char **dst, zint pos)
136 {
137     memcpy(*dst, &pos, sizeof(pos));
138     (*dst) += sizeof(pos);
139 }
140 #endif
141
142 #if ISAMB_PTR_CODEC
143 static void decode_ptr (const char **src1, zint *pos)
144 {
145     const unsigned char **src = (const unsigned char **) src1;
146     zint d = 0;
147     unsigned char c;
148     unsigned r = 0;
149
150     while (((c = *(*src)++) & 128))
151     {
152         d += ((zint) (c & 127) << r);
153         r += 7;
154     }
155     d += ((zint) c << r);
156     *pos = d;
157 }
158 #else
159 static void decode_ptr (const char **src, zint *pos)
160 {
161      memcpy (pos, *src, sizeof(*pos));
162      (*src) += sizeof(*pos);
163 }
164 #endif
165
166 ISAMB isamb_open (BFiles bfs, const char *name, int writeflag, ISAMC_M *method,
167                   int cache)
168 {
169     ISAMB isamb = xmalloc (sizeof(*isamb));
170     int i, b_size = 32;
171
172     isamb->bfs = bfs;
173     isamb->method = (ISAMC_M *) xmalloc (sizeof(*method));
174     memcpy (isamb->method, method, sizeof(*method));
175     isamb->no_cat = CAT_NO;
176     isamb->log_io = 0;
177     isamb->log_freelist = 0;
178     isamb->cache = cache;
179     isamb->skipped_numbers=0;
180     isamb->returned_numbers=0;
181     for (i=0;i<ISAMB_MAX_LEVEL;i++)
182       isamb->skipped_nodes[i]= isamb->accessed_nodes[i]=0;
183
184     assert (cache == 0);
185     isamb->file = xmalloc (sizeof(*isamb->file) * isamb->no_cat);
186     for (i = 0; i<isamb->no_cat; i++)
187     {
188         char fname[DST_BUF_SIZE];
189         isamb->file[i].cache_entries = 0;
190         isamb->file[i].head_dirty = 0;
191         sprintf (fname, "%s%c", name, i+'A');
192         if (cache)
193             isamb->file[i].bf = bf_open (bfs, fname, ISAMB_CACHE_ENTRY_SIZE,
194                                          writeflag);
195         else
196             isamb->file[i].bf = bf_open (bfs, fname, b_size, writeflag);
197
198         
199         if (!bf_read (isamb->file[i].bf, 0, 0, sizeof(struct ISAMB_head),
200                       &isamb->file[i].head))
201         {
202             isamb->file[i].head.first_block = ISAMB_CACHE_ENTRY_SIZE/b_size+1;
203             isamb->file[i].head.last_block = isamb->file[i].head.first_block;
204             isamb->file[i].head.block_size = b_size;
205             isamb->file[i].head.block_max = b_size - ISAMB_DATA_OFFSET;
206             isamb->file[i].head.free_list = 0;
207         }
208         assert (isamb->file[i].head.block_size >= ISAMB_DATA_OFFSET);
209         isamb->file[i].head_dirty = 0;
210         assert(isamb->file[i].head.block_size == b_size);
211         b_size = b_size * 4;
212     }
213 #if ISAMB_DEBUG
214     logf(LOG_WARN, "isamb debug enabled. Things will be slower than usual");
215 #endif
216     return isamb;
217 }
218
219 static void flush_blocks (ISAMB b, int cat)
220 {
221     while (b->file[cat].cache_entries)
222     {
223         struct ISAMB_cache_entry *ce_this = b->file[cat].cache_entries;
224         b->file[cat].cache_entries = ce_this->next;
225
226         if (ce_this->dirty)
227         {
228             yaz_log (b->log_io, "bf_write: flush_blocks");
229             bf_write (b->file[cat].bf, ce_this->pos, 0, 0, ce_this->buf);
230         }
231         xfree (ce_this->buf);
232         xfree (ce_this);
233     }
234 }
235
236 static int get_block (ISAMB b, ISAMC_P pos, char *userbuf, int wr)
237 {
238     int cat = (int) (pos&CAT_MASK);
239     int off = (int) (((pos/CAT_MAX) & 
240                (ISAMB_CACHE_ENTRY_SIZE / b->file[cat].head.block_size - 1))
241         * b->file[cat].head.block_size);
242     zint norm = pos / (CAT_MASK*ISAMB_CACHE_ENTRY_SIZE / b->file[cat].head.block_size);
243     int no = 0;
244     struct ISAMB_cache_entry **ce, *ce_this = 0, **ce_last = 0;
245
246     if (!b->cache)
247         return 0;
248
249     assert (ISAMB_CACHE_ENTRY_SIZE >= b->file[cat].head.block_size);
250     for (ce = &b->file[cat].cache_entries; *ce; ce = &(*ce)->next, no++)
251     {
252         ce_last = ce;
253         if ((*ce)->pos == norm)
254         {
255             ce_this = *ce;
256             *ce = (*ce)->next;   /* remove from list */
257             
258             ce_this->next = b->file[cat].cache_entries;  /* move to front */
259             b->file[cat].cache_entries = ce_this;
260             
261             if (wr)
262             {
263                 memcpy (ce_this->buf + off, userbuf, 
264                         b->file[cat].head.block_size);
265                 ce_this->dirty = 1;
266             }
267             else
268                 memcpy (userbuf, ce_this->buf + off,
269                         b->file[cat].head.block_size);
270             return 1;
271         }
272     }
273     if (no >= 40)
274     {
275         assert (no == 40);
276         assert (ce_last && *ce_last);
277         ce_this = *ce_last;
278         *ce_last = 0;  /* remove the last entry from list */
279         if (ce_this->dirty)
280         {
281             yaz_log (b->log_io, "bf_write: get_block");
282             bf_write (b->file[cat].bf, ce_this->pos, 0, 0, ce_this->buf);
283         }
284         xfree (ce_this->buf);
285         xfree (ce_this);
286     }
287     ce_this = xmalloc (sizeof(*ce_this));
288     ce_this->next = b->file[cat].cache_entries;
289     b->file[cat].cache_entries = ce_this;
290     ce_this->buf = xmalloc (ISAMB_CACHE_ENTRY_SIZE);
291     ce_this->pos = norm;
292     yaz_log (b->log_io, "bf_read: get_block");
293     if (!bf_read (b->file[cat].bf, norm, 0, 0, ce_this->buf))
294         memset (ce_this->buf, 0, ISAMB_CACHE_ENTRY_SIZE);
295     if (wr)
296     {
297         memcpy (ce_this->buf + off, userbuf, b->file[cat].head.block_size);
298         ce_this->dirty = 1;
299     }
300     else
301     {
302         ce_this->dirty = 0;
303         memcpy (userbuf, ce_this->buf + off, b->file[cat].head.block_size);
304     }
305     return 1;
306 }
307
308
309 void isamb_close (ISAMB isamb)
310 {
311     int i;
312     for (i=0;isamb->accessed_nodes[i];i++)
313         logf(LOG_DEBUG,"isamb_close  level leaf-%d: %d read, %d skipped",
314              i, isamb->accessed_nodes[i], isamb->skipped_nodes[i]);
315     logf(LOG_DEBUG,"isamb_close returned %d values, skipped %d",
316          isamb->skipped_numbers, isamb->returned_numbers);
317     for (i = 0; i<isamb->no_cat; i++)
318     {
319         flush_blocks (isamb, i);
320         if (isamb->file[i].head_dirty)
321             bf_write (isamb->file[i].bf, 0, 0,
322                       sizeof(struct ISAMB_head), &isamb->file[i].head);
323         
324         bf_close (isamb->file[i].bf);
325     }
326     xfree (isamb->file);
327     xfree (isamb->method);
328     xfree (isamb);
329 }
330
331 static struct ISAMB_block *open_block (ISAMB b, ISAMC_P pos)
332 {
333     int cat = (int) (pos&CAT_MASK);
334     struct ISAMB_block *p;
335     if (!pos)
336         return 0;
337     p = xmalloc (sizeof(*p));
338     p->pos = pos;
339     p->cat = (int) (pos & CAT_MASK);
340     p->buf = xmalloc (b->file[cat].head.block_size);
341     p->cbuf = 0;
342
343     if (!get_block (b, pos, p->buf, 0))
344     {
345         yaz_log (b->log_io, "bf_read: open_block");
346         if (!bf_read (b->file[cat].bf, pos/CAT_MAX, 0, 0, p->buf))
347         {
348             yaz_log (LOG_FATAL, "isamb: read fail for pos=%ld block=%ld",
349                      (long) pos, (long) pos/CAT_MAX);
350             abort();
351         }
352     }
353     p->bytes = p->buf + ISAMB_DATA_OFFSET;
354     p->leaf = p->buf[0];
355     p->size = (p->buf[1] + 256 * p->buf[2]) - ISAMB_DATA_OFFSET;
356     if (p->size < 0)
357     {
358         yaz_log (LOG_FATAL, "Bad block size %d in pos=" ZINT_FORMAT "\n",
359                  p->size, pos);
360     }
361     assert (p->size >= 0);
362     p->offset = 0;
363     p->dirty = 0;
364     p->deleted = 0;
365     p->decodeClientData = (*b->method->codec.start)();
366     yaz_log (LOG_DEBUG, "isamb_open_block: Opened block " ZINT_FORMAT " ofs=%d",pos, p->offset);
367     return p;
368 }
369
370 struct ISAMB_block *new_block (ISAMB b, int leaf, int cat)
371 {
372     struct ISAMB_block *p;
373
374     p = xmalloc (sizeof(*p));
375     p->buf = xmalloc (b->file[cat].head.block_size);
376
377     if (!b->file[cat].head.free_list)
378     {
379         zint block_no;
380         block_no = b->file[cat].head.last_block++;
381         p->pos = block_no * CAT_MAX + cat;
382     }
383     else
384     {
385         p->pos = b->file[cat].head.free_list;
386         assert((p->pos & CAT_MASK) == cat);
387         if (!get_block (b, p->pos, p->buf, 0))
388         {
389             yaz_log (b->log_io, "bf_read: new_block");
390             if (!bf_read (b->file[cat].bf, p->pos/CAT_MAX, 0, 0, p->buf))
391             {
392                 yaz_log (LOG_FATAL, "isamb: read fail for pos=%ld block=%ld",
393                          (long) p->pos/CAT_MAX, (long) p->pos/CAT_MAX);
394                 abort ();
395             }
396         }
397         yaz_log (b->log_freelist, "got block " ZINT_FORMAT " from freelist %d:" ZINT_FORMAT, p->pos,
398                  cat, p->pos/CAT_MAX);
399         memcpy (&b->file[cat].head.free_list, p->buf, sizeof(int));
400     }
401     p->cat = cat;
402     b->file[cat].head_dirty = 1;
403     memset (p->buf, 0, b->file[cat].head.block_size);
404     p->bytes = p->buf + ISAMB_DATA_OFFSET;
405     p->leaf = leaf;
406     p->size = 0;
407     p->dirty = 1;
408     p->deleted = 0;
409     p->offset = 0;
410     p->decodeClientData = (*b->method->codec.start)();
411     return p;
412 }
413
414 struct ISAMB_block *new_leaf (ISAMB b, int cat)
415 {
416     return new_block (b, 1, cat);
417 }
418
419
420 struct ISAMB_block *new_int (ISAMB b, int cat)
421 {
422     return new_block (b, 0, cat);
423 }
424
425 static void check_block (ISAMB b, struct ISAMB_block *p)
426 {
427     if (p->leaf)
428     {
429         ;
430     }
431     else
432     {
433         /* sanity check */
434         char *startp = p->bytes;
435         const char *src = startp;
436         char *endp = p->bytes + p->size;
437         ISAMB_P pos;
438             
439         decode_ptr (&src, &pos);
440         assert ((pos&CAT_MASK) == p->cat);
441         while (src != endp)
442         {
443             zint item_len;
444             decode_ptr (&src, &item_len);
445             assert (item_len > 0 && item_len < 80);
446             src += item_len;
447             decode_ptr (&src, &pos);
448             assert ((pos&CAT_MASK) == p->cat);
449         }
450     }
451 }
452
453 void close_block (ISAMB b, struct ISAMB_block *p)
454 {
455     if (!p)
456         return;
457     if (p->deleted)
458     {
459         yaz_log (b->log_freelist, "release block " ZINT_FORMAT " from freelist %d:" ZINT_FORMAT,
460                  p->pos, p->cat, p->pos/CAT_MAX);
461         memcpy (p->buf, &b->file[p->cat].head.free_list, sizeof(int));
462         b->file[p->cat].head.free_list = p->pos;
463         if (!get_block (b, p->pos, p->buf, 1))
464         {
465             yaz_log (b->log_io, "bf_write: close_block (deleted)");
466             bf_write (b->file[p->cat].bf, p->pos/CAT_MAX, 0, 0, p->buf);
467         }
468     }
469     else if (p->dirty)
470     {
471         int size = p->size + ISAMB_DATA_OFFSET;
472         assert (p->size >= 0);
473         p->buf[0] = p->leaf;
474         p->buf[1] = size & 255;
475         p->buf[2] = size >> 8;
476         check_block(b, p);
477         if (!get_block (b, p->pos, p->buf, 1))
478         {
479             yaz_log (b->log_io, "bf_write: close_block");
480             bf_write (b->file[p->cat].bf, p->pos/CAT_MAX, 0, 0, p->buf);
481         }
482     }
483     (*b->method->codec.stop)(p->decodeClientData);
484     xfree (p->buf);
485     xfree (p);
486 }
487
488 int insert_sub (ISAMB b, struct ISAMB_block **p,
489                 void *new_item, int *mode,
490                 ISAMC_I *stream,
491                 struct ISAMB_block **sp,
492                 void *sub_item, int *sub_size,
493                 const void *max_item);
494
495 int insert_int (ISAMB b, struct ISAMB_block *p, void *lookahead_item,
496                 int *mode,
497                 ISAMC_I *stream, struct ISAMB_block **sp,
498                 void *split_item, int *split_size, const void *last_max_item)
499 {
500     char *startp = p->bytes;
501     const char *src = startp;
502     char *endp = p->bytes + p->size;
503     ISAMB_P pos;
504     struct ISAMB_block *sub_p1 = 0, *sub_p2 = 0;
505     char sub_item[DST_ITEM_MAX];
506     int sub_size;
507     int more = 0;
508
509     *sp = 0;
510
511     assert(p->size >= 0);
512     decode_ptr (&src, &pos);
513     while (src != endp)
514     {
515         zint item_len;
516         int d;
517         const char *src0 = src;
518         decode_ptr (&src, &item_len);
519         d = (*b->method->compare_item)(src, lookahead_item);
520         if (d > 0)
521         {
522             sub_p1 = open_block (b, pos);
523             assert (sub_p1);
524             more = insert_sub (b, &sub_p1, lookahead_item, mode,
525                                stream, &sub_p2, 
526                                sub_item, &sub_size, src);
527             src = src0;
528             break;
529         }
530         src += item_len;
531         decode_ptr (&src, &pos);
532     }
533     if (!sub_p1)
534     {
535         sub_p1 = open_block (b, pos);
536         assert (sub_p1);
537         more = insert_sub (b, &sub_p1, lookahead_item, mode, stream, &sub_p2, 
538                            sub_item, &sub_size, last_max_item);
539     }
540     if (sub_p2)
541     {
542         /* there was a split - must insert pointer in this one */
543         char dst_buf[DST_BUF_SIZE];
544         char *dst = dst_buf;
545
546         assert (sub_size < 80 && sub_size > 1);
547
548         memcpy (dst, startp, src - startp);
549                 
550         dst += src - startp;
551
552         encode_ptr (&dst, sub_size);      /* sub length and item */
553         memcpy (dst, sub_item, sub_size);
554         dst += sub_size;
555
556         encode_ptr (&dst, sub_p2->pos);   /* pos */
557
558         if (endp - src)                   /* remaining data */
559         {
560             memcpy (dst, src, endp - src);
561             dst += endp - src;
562         }
563         p->size = dst - dst_buf;
564         assert (p->size >= 0);
565         if (p->size <= b->file[p->cat].head.block_max)
566         {
567             memcpy (startp, dst_buf, dst - dst_buf);
568         }
569         else
570         {
571             zint split_size_tmp;
572             int p_new_size;
573             const char *half;
574             src = dst_buf;
575             endp = dst;
576
577             half = src + b->file[p->cat].head.block_size/2;
578             decode_ptr (&src, &pos);
579             while (src <= half)
580             {
581                 decode_ptr (&src, &split_size_tmp);
582                 *split_size = (int) split_size_tmp;
583
584                 src += *split_size;
585                 decode_ptr (&src, &pos);
586             }
587             p_new_size = src - dst_buf;
588             memcpy (p->bytes, dst_buf, p_new_size);
589
590             decode_ptr (&src, &split_size_tmp);
591             *split_size = (int) split_size_tmp;
592             memcpy (split_item, src, *split_size);
593             src += *split_size;
594
595             *sp = new_int (b, p->cat);
596             (*sp)->size = endp - src;
597             memcpy ((*sp)->bytes, src, (*sp)->size);
598
599             p->size = p_new_size;
600         }
601         p->dirty = 1;
602         close_block (b, sub_p2);
603     }
604     close_block (b, sub_p1);
605     return more;
606 }
607
608 int insert_leaf (ISAMB b, struct ISAMB_block **sp1, void *lookahead_item,
609                  int *lookahead_mode, ISAMC_I *stream,
610                  struct ISAMB_block **sp2,
611                  void *sub_item, int *sub_size,
612                  const void *max_item)
613 {
614     struct ISAMB_block *p = *sp1;
615     char *endp = 0;
616     const char *src = 0;
617     char dst_buf[DST_BUF_SIZE], *dst = dst_buf;
618     int new_size;
619     void *c1 = (*b->method->codec.start)();
620     void *c2 = (*b->method->codec.start)();
621     int more = 1;
622     int quater = b->file[b->no_cat-1].head.block_max / CAT_MAX;
623     char *cut = dst_buf + quater * 2;
624     char *maxp = dst_buf + b->file[b->no_cat-1].head.block_max;
625     char *half1 = 0;
626     char *half2 = 0;
627     char cut_item_buf[DST_ITEM_MAX];
628     int cut_item_size = 0;
629
630     if (p && p->size)
631     {
632         char file_item_buf[DST_ITEM_MAX];
633         char *file_item = file_item_buf;
634             
635         src = p->bytes;
636         endp = p->bytes + p->size;
637         (*b->method->codec.decode)(c1, &file_item, &src);
638         while (1)
639         {
640             const char *dst_item = 0;
641             char *dst_0 = dst;
642             char *lookahead_next;
643             int d = -1;
644             
645             if (lookahead_item)
646                 d = (*b->method->compare_item)(file_item_buf, lookahead_item);
647             
648             if (d > 0)
649             {
650                 dst_item = lookahead_item;
651                 if (!*lookahead_mode)
652                 {
653                     yaz_log (LOG_WARN, "isamb: Inconsistent register (1)");
654                     assert (*lookahead_mode);
655                 }
656             }
657             else
658                 dst_item = file_item_buf;
659             if (!*lookahead_mode && d == 0)
660             {
661                 p->dirty = 1;
662             }
663             else if (!half1 && dst > cut)
664             {
665                 const char *dst_item_0 = dst_item;
666                 half1 = dst; /* candidate for splitting */
667                 
668                 (*b->method->codec.encode)(c2, &dst, &dst_item);
669                 
670                 cut_item_size = dst_item - dst_item_0;
671                 assert(cut_item_size > 0);
672                 memcpy (cut_item_buf, dst_item_0, cut_item_size);
673                 
674                 half2 = dst;
675             }
676             else
677                 (*b->method->codec.encode)(c2, &dst, &dst_item);
678             if (d > 0)  
679             {
680                 if (dst > maxp)
681                 {
682                     dst = dst_0;
683                     lookahead_item = 0;
684                 }
685                 else
686                 {
687                     lookahead_next = lookahead_item;
688                     if (!(*stream->read_item)(stream->clientData,
689                                               &lookahead_next,
690                                               lookahead_mode))
691                     {
692                         lookahead_item = 0;
693                         more = 0;
694                     }
695                     if (lookahead_item && max_item &&
696                         (*b->method->compare_item)(max_item, lookahead_item) <= 0)
697                     {
698                         /* max_item 1 */
699                         lookahead_item = 0;
700                     }
701                     
702                     p->dirty = 1;
703                 }
704             }
705             else if (d == 0)
706             {
707                 lookahead_next = lookahead_item;
708                 if (!(*stream->read_item)(stream->clientData,
709                                           &lookahead_next, lookahead_mode))
710                 {
711                     lookahead_item = 0;
712                     more = 0;
713                 }
714                 if (src == endp)
715                     break;
716                 file_item = file_item_buf;
717                 (*b->method->codec.decode)(c1, &file_item, &src);
718             }
719             else
720             {
721                 if (src == endp)
722                     break;
723                 file_item = file_item_buf;
724                 (*b->method->codec.decode)(c1, &file_item, &src);
725             }
726         }
727     }
728     maxp = dst_buf + b->file[b->no_cat-1].head.block_max + quater;
729     while (lookahead_item)
730     {
731         char *dst_item;
732         const char *src = lookahead_item;
733         char *dst_0 = dst;
734         
735         if (max_item &&
736             (*b->method->compare_item)(max_item, lookahead_item) <= 0)
737         {
738             /* max_item 2 */
739             break;
740         }
741         if (!*lookahead_mode)
742         {
743             yaz_log (LOG_WARN, "isamb: Inconsistent register (2)");
744             abort();
745         }
746         else if (!half1 && dst > cut)   
747         {
748             const char *src_0 = src;
749             half1 = dst; /* candidate for splitting */
750             
751             (*b->method->codec.encode)(c2, &dst, &src);
752             
753             cut_item_size = src - src_0;
754             assert(cut_item_size > 0);
755             memcpy (cut_item_buf, src_0, cut_item_size);
756             
757             half2 = dst;
758         }
759         else
760             (*b->method->codec.encode)(c2, &dst, &src);
761
762         if (dst > maxp)
763         {
764             dst = dst_0;
765             break;
766         }
767         if (p)
768             p->dirty = 1;
769         dst_item = lookahead_item;
770         if (!(*stream->read_item)(stream->clientData, &dst_item,
771                                   lookahead_mode))
772         {
773             lookahead_item = 0;
774             more = 0;
775         }
776     }
777     new_size = dst - dst_buf;
778     if (p && p->cat != b->no_cat-1 && 
779         new_size > b->file[p->cat].head.block_max)
780     {
781         /* non-btree block will be removed */
782         p->deleted = 1;
783         close_block (b, p);
784         /* delete it too!! */
785         p = 0; /* make a new one anyway */
786     }
787     if (!p)
788     {   /* must create a new one */
789         int i;
790         for (i = 0; i < b->no_cat; i++)
791             if (new_size <= b->file[i].head.block_max)
792                 break;
793         if (i == b->no_cat)
794             i = b->no_cat - 1;
795         p = new_leaf (b, i);
796     }
797     if (new_size > b->file[p->cat].head.block_max)
798     {
799         char *first_dst;
800         const char *cut_item = cut_item_buf;
801
802         assert (half1);
803         assert (half2);
804
805         assert(cut_item_size > 0);
806         
807         /* first half */
808         p->size = half1 - dst_buf;
809         memcpy (p->bytes, dst_buf, half1 - dst_buf);
810
811         /* second half */
812         *sp2 = new_leaf (b, p->cat);
813
814         (*b->method->codec.reset)(c2);
815
816         first_dst = (*sp2)->bytes;
817
818         (*b->method->codec.encode)(c2, &first_dst, &cut_item);
819
820         memcpy (first_dst, half2, dst - half2);
821
822         (*sp2)->size = (first_dst - (*sp2)->bytes) + (dst - half2);
823         (*sp2)->dirty = 1;
824         p->dirty = 1;
825         memcpy (sub_item, cut_item_buf, cut_item_size);
826         *sub_size = cut_item_size;
827     }
828     else
829     {
830         memcpy (p->bytes, dst_buf, dst - dst_buf);
831         p->size = new_size;
832     }
833     (*b->method->codec.stop)(c1);
834     (*b->method->codec.stop)(c2);
835     *sp1 = p;
836     return more;
837 }
838
839 int insert_sub (ISAMB b, struct ISAMB_block **p, void *new_item,
840                 int *mode,
841                 ISAMC_I *stream,
842                 struct ISAMB_block **sp,
843                 void *sub_item, int *sub_size,
844                 const void *max_item)
845 {
846     if (!*p || (*p)->leaf)
847         return insert_leaf (b, p, new_item, mode, stream, sp, sub_item, 
848                             sub_size, max_item);
849     else
850         return insert_int (b, *p, new_item, mode, stream, sp, sub_item,
851                            sub_size, max_item);
852 }
853
854 int isamb_unlink (ISAMB b, ISAMC_P pos)
855 {
856     struct ISAMB_block *p1;
857
858     if (!pos)
859         return 0;
860     p1 = open_block(b, pos);
861     p1->deleted = 1;
862     if (!p1->leaf)
863     {
864         zint sub_p;
865         zint item_len;
866         const char *src = p1->bytes + p1->offset;
867
868         decode_ptr(&src, &sub_p);
869         isamb_unlink(b, sub_p);
870         
871         while (src != p1->bytes + p1->size)
872         {
873             decode_ptr(&src, &item_len);
874             src += item_len;
875             decode_ptr(&src, &sub_p);
876             isamb_unlink(b, sub_p);
877         }
878     }
879     close_block(b, p1);
880     return 0;
881 }
882
883 ISAMB_P isamb_merge (ISAMB b, ISAMC_P pos, ISAMC_I *stream)
884 {
885     char item_buf[DST_ITEM_MAX];
886     char *item_ptr;
887     int i_mode;
888     int more;
889
890     if (b->cache < 0)
891     {
892         int more = 1;
893         while (more)
894         {
895             item_ptr = item_buf;
896             more =
897                 (*stream->read_item)(stream->clientData, &item_ptr, &i_mode);
898         }
899         return 1;
900     }
901     item_ptr = item_buf;
902     more = (*stream->read_item)(stream->clientData, &item_ptr, &i_mode);
903     while (more)
904     {
905         struct ISAMB_block *p = 0, *sp = 0;
906         char sub_item[DST_ITEM_MAX];
907         int sub_size;
908         
909         if (pos)
910             p = open_block (b, pos);
911         more = insert_sub (b, &p, item_buf, &i_mode, stream, &sp,
912                             sub_item, &sub_size, 0);
913         if (sp)
914         {    /* increase level of tree by one */
915             struct ISAMB_block *p2 = new_int (b, p->cat);
916             char *dst = p2->bytes + p2->size;
917             
918             encode_ptr (&dst, p->pos);
919             assert (sub_size < 40);
920             encode_ptr (&dst, sub_size);
921             memcpy (dst, sub_item, sub_size);
922             dst += sub_size;
923             encode_ptr (&dst, sp->pos);
924             
925             p2->size = dst - p2->bytes;
926             pos = p2->pos;  /* return new super page */
927             close_block (b, sp);
928             close_block (b, p2);
929         }
930         else
931             pos = p->pos;   /* return current one (again) */
932         close_block (b, p);
933     }
934     return pos;
935 }
936
937 ISAMB_PP isamb_pp_open_x (ISAMB isamb, ISAMB_P pos, int *level)
938 {
939     ISAMB_PP pp = xmalloc (sizeof(*pp));
940     int i;
941
942     pp->isamb = isamb;
943     pp->block = xmalloc (ISAMB_MAX_LEVEL * sizeof(*pp->block));
944
945     pp->pos = pos;
946     pp->level = 0;
947     pp->maxlevel=0;
948     pp->total_size = 0;
949     pp->no_blocks = 0;
950     pp->skipped_numbers=0;
951     pp->returned_numbers=0;
952     for (i=0;i<ISAMB_MAX_LEVEL;i++)
953         pp->skipped_nodes[i] = pp->accessed_nodes[i]=0;
954     while (1)
955     {
956         struct ISAMB_block *p = open_block (isamb, pos);
957         const char *src = p->bytes + p->offset;
958         pp->block[pp->level] = p;
959
960         pp->total_size += p->size;
961         pp->no_blocks++;
962         if (p->leaf)
963             break;
964
965                                         
966         decode_ptr (&src, &pos);
967         p->offset = src - p->bytes;
968         pp->level++;
969         pp->accessed_nodes[pp->level]++; 
970     }
971     pp->block[pp->level+1] = 0;
972     pp->maxlevel=pp->level;
973     if (level)
974         *level = pp->level;
975     return pp;
976 }
977
978 ISAMB_PP isamb_pp_open (ISAMB isamb, ISAMB_P pos)
979 {
980     return isamb_pp_open_x (isamb, pos, 0);
981 }
982
983 void isamb_pp_close_x (ISAMB_PP pp, int *size, int *blocks)
984 {
985     int i;
986     if (!pp)
987         return;
988     logf(LOG_DEBUG,"isamb_pp_close lev=%d returned %d values, skipped %d",
989         pp->maxlevel, pp->skipped_numbers, pp->returned_numbers);
990     for (i=pp->maxlevel;i>=0;i--)
991         if ( pp->skipped_nodes[i] || pp->accessed_nodes[i])
992             logf(LOG_DEBUG,"isamb_pp_close  level leaf-%d: %d read, %d skipped", i,
993                  pp->accessed_nodes[i], pp->skipped_nodes[i]);
994     pp->isamb->skipped_numbers += pp->skipped_numbers;
995     pp->isamb->returned_numbers += pp->returned_numbers;
996     for (i=pp->maxlevel;i>=0;i--)
997     {
998         pp->isamb->accessed_nodes[i] += pp->accessed_nodes[i];
999         pp->isamb->skipped_nodes[i] += pp->skipped_nodes[i];
1000     }
1001     if (size)
1002         *size = pp->total_size;
1003     if (blocks)
1004         *blocks = pp->no_blocks;
1005     for (i = 0; i <= pp->level; i++)
1006         close_block (pp->isamb, pp->block[i]);
1007     xfree (pp->block);
1008     xfree (pp);
1009 }
1010
1011 int isamb_block_info (ISAMB isamb, int cat)
1012 {
1013     if (cat >= 0 && cat < isamb->no_cat)
1014         return isamb->file[cat].head.block_size;
1015     return -1;
1016 }
1017
1018 void isamb_pp_close (ISAMB_PP pp)
1019 {
1020     isamb_pp_close_x (pp, 0, 0);
1021 }
1022
1023 /* simple recursive dumper .. */
1024 static void isamb_dump_r (ISAMB b, ISAMB_P pos, void (*pr)(const char *str),
1025                           int level)
1026 {
1027     char buf[1024];
1028     char prefix_str[1024];
1029     if (pos)
1030     {
1031         struct ISAMB_block *p = open_block (b, pos);
1032         sprintf(prefix_str, "%*s " ZINT_FORMAT " cat=%d size=%d max=%d", level*2, "",
1033                 pos, p->cat, p->size, b->file[p->cat].head.block_max);
1034         (*pr)(prefix_str);
1035         sprintf(prefix_str, "%*s " ZINT_FORMAT, level*2, "", pos);
1036         if (p->leaf)
1037         {
1038             while (p->offset < p->size)
1039             {
1040                 const char *src = p->bytes + p->offset;
1041                 char *dst = buf;
1042                 (*b->method->codec.decode)(p->decodeClientData, &dst, &src);
1043                 (*b->method->log_item)(LOG_DEBUG, buf, prefix_str);
1044                 p->offset = src - (char*) p->bytes;
1045             }
1046             assert(p->offset == p->size);
1047         }
1048         else
1049         {
1050             const char *src = p->bytes + p->offset;
1051             ISAMB_P sub;
1052             zint item_len;
1053
1054             decode_ptr (&src, &sub);
1055             p->offset = src - (char*) p->bytes;
1056
1057             isamb_dump_r(b, sub, pr, level+1);
1058             
1059             while (p->offset < p->size)
1060             {
1061                 decode_ptr (&src, &item_len);
1062                 (*b->method->log_item)(LOG_DEBUG, src, prefix_str);
1063                 src += item_len;
1064                 decode_ptr (&src, &sub);
1065                 
1066                 p->offset = src - (char*) p->bytes;
1067                 
1068                 isamb_dump_r(b, sub, pr, level+1);
1069             }           
1070         }
1071         close_block(b,p);
1072     }
1073 }
1074
1075 void isamb_dump (ISAMB b, ISAMB_P pos, void (*pr)(const char *str))
1076 {
1077     isamb_dump_r(b, pos, pr, 0);
1078 }
1079
1080 #if 0
1081 /* Old isamb_pp_read that Adam wrote, kept as a reference in case we need to
1082    debug the more complex pp_read that also forwards. May be deleted near end
1083    of 2004, if it has not shown to be useful */
1084
1085
1086 int isamb_pp_read (ISAMB_PP pp, void *buf)
1087 {
1088     char *dst = buf;
1089     char *src;
1090     struct ISAMB_block *p = pp->block[pp->level];
1091     if (!p)
1092         return 0;
1093
1094     while (p->offset == p->size)
1095     {
1096         int pos, item_len;
1097         while (p->offset == p->size)
1098         {
1099             if (pp->level == 0)
1100                 return 0;
1101             close_block (pp->isamb, pp->block[pp->level]);
1102             pp->block[pp->level] = 0;
1103             (pp->level)--;
1104             p = pp->block[pp->level];
1105             assert (!p->leaf);  
1106         }
1107         src = p->bytes + p->offset;
1108         
1109         decode_ptr (&src, &item_len);
1110         src += item_len;
1111         decode_ptr (&src, &pos);
1112         
1113         p->offset = src - (char*) p->bytes;
1114
1115         ++(pp->level);
1116         
1117         while (1)
1118         {
1119             pp->block[pp->level] = p = open_block (pp->isamb, pos);
1120
1121             pp->total_size += p->size;
1122             pp->no_blocks++;
1123             
1124             if (p->leaf) 
1125             {
1126                 break;
1127             }
1128             src = p->bytes + p->offset;
1129             decode_ptr (&src, &pos);
1130             p->offset = src - (char*) p->bytes;
1131             pp->level++;
1132         }
1133     }
1134     assert (p->offset < p->size);
1135     assert (p->leaf);
1136     src = p->bytes + p->offset;
1137     (*pp->isamb->method->codec.code_item)(ISAMC_DECODE, p->decodeClientData,
1138                                     &dst, &src);
1139     p->offset = src - (char*) p->bytes;
1140     /* key_logdump_txt(LOG_DEBUG,buf, "isamb_pp_read returning 1"); */
1141     return 1;
1142 }
1143
1144 #else
1145 int isamb_pp_read (ISAMB_PP pp, void *buf)
1146 {
1147     return isamb_pp_forward(pp, buf, 0);
1148 }
1149 #endif
1150
1151 #define NEW_FORWARD 1
1152
1153 #if NEW_FORWARD == 1
1154
1155 static int isamb_pp_on_right_node(ISAMB_PP pp, int level, const void *untilbuf)
1156 { /* looks one node higher to see if we should be on this node at all */
1157   /* useful in backing off quickly, and in avoiding tail descends */
1158   /* call with pp->level to begin with */
1159     struct ISAMB_block *p;
1160     int cmp;
1161     const char *src;
1162     zint item_len;
1163     assert(level>=0);
1164     if ( level == 0) {
1165 #if ISAMB_DEBUG
1166             logf(LOG_DEBUG,"isamb_pp_on_right returning true for root");
1167 #endif
1168         return 1; /* we can never skip the root node */
1169     }
1170     level--;
1171     p=pp->block[level];
1172     assert(p->offset <= p->size);
1173     if (p->offset < p->size )
1174     {
1175         assert(p->offset>0); 
1176         src=p->bytes + p->offset;
1177         decode_ptr(&src, &item_len);
1178 #if ISAMB_DEBUG
1179         (*pp->isamb->method->codec.log_item)(LOG_DEBUG,untilbuf,"on_leaf: until");
1180         (*pp->isamb->method->codec.log_item)(LOG_DEBUG,src,"on_leaf: value");
1181 #endif
1182         cmp=(*pp->isamb->method->compare_item)(untilbuf,src);
1183         if (cmp<2) {
1184 #if ISAMB_DEBUG
1185             logf(LOG_DEBUG,"isamb_pp_on_right returning true "
1186                             "cmp=%d lev=%d ofs=%d",cmp,level,p->offset);
1187 #endif
1188             return 1; 
1189         }
1190         else {
1191 #if ISAMB_DEBUG
1192             logf(LOG_DEBUG,"isamb_pp_on_right returning false "
1193                             "cmp=%d lev=%d ofs=%d",cmp,level,p->offset);
1194 #endif
1195             return 0; 
1196         }
1197     }
1198     else {
1199 #if ISAMB_DEBUG
1200         logf(LOG_DEBUG,"isamb_pp_on_right at tail, looking higher "
1201                         "lev=%d",level);
1202 #endif
1203         return isamb_pp_on_right_node(pp, level, untilbuf);
1204     }
1205 } /* isamb_pp_on_right_node */
1206
1207 static int isamb_pp_read_on_leaf(ISAMB_PP pp, void *buf)
1208 { /* reads the next item on the current leaf, returns 0 if end of leaf*/
1209     struct ISAMB_block *p = pp->block[pp->level];
1210     char *dst;
1211     const char *src;
1212     assert(pp);
1213     assert(buf);
1214     if (p->offset == p->size) {
1215 #if ISAMB_DEBUG
1216         logf(LOG_DEBUG,"isamb_pp_read_on_leaf returning 0 on node %d",p->pos);
1217 #endif
1218         return 0; /* at end of leaf */
1219     }
1220     src=p->bytes + p->offset;
1221     dst=buf;
1222     (*pp->isamb->method->codec.decode)(p->decodeClientData,&dst, &src);
1223     p->offset = src - (char*) p->bytes;
1224     /*
1225 #if ISAMB_DEBUG
1226     (*pp->isamb->method->codec.log_item)(LOG_DEBUG, buf, "read_on_leaf returning 1");
1227 #endif
1228 */
1229     return 1;
1230 } /* read_on_leaf */
1231
1232 static int isamb_pp_forward_on_leaf(ISAMB_PP pp, void *buf, const void *untilbuf)
1233 { /* forwards on the current leaf, returns 0 if not found */
1234     int cmp;
1235     int skips=0;
1236     while (1){
1237         if (!isamb_pp_read_on_leaf(pp,buf))
1238             return 0;
1239         /* FIXME - this is an extra function call, inline the read? */
1240         cmp=(*pp->isamb->method->compare_item)(untilbuf,buf);
1241         if (cmp <2){  /* found a good one */
1242 #if ISAMB_DEBUG
1243             if (skips)
1244                 logf(LOG_DEBUG, "isam_pp_fwd_on_leaf skipped %d items",skips);
1245 #endif
1246             pp->returned_numbers++;
1247             return 1;
1248         }
1249         if (!skips)
1250             if (!isamb_pp_on_right_node(pp, pp->level, untilbuf))
1251                 return 0; /* never mind the rest of this leaf */
1252         pp->skipped_numbers++;
1253         skips++;
1254     }
1255 } /* forward_on_leaf */
1256
1257 static int isamb_pp_climb_level(ISAMB_PP pp, ISAMB_P *pos)
1258 { /* climbs higher in the tree, until finds a level with data left */
1259   /* returns the node to (consider to) descend to in *pos) */
1260     struct ISAMB_block *p = pp->block[pp->level];
1261     const char *src;
1262     zint item_len;
1263 #if ISAMB_DEBUG
1264     logf(LOG_DEBUG,"isamb_pp_climb_level starting "
1265                    "at level %d node %d ofs=%d sz=%d",
1266                     pp->level, p->pos, p->offset, p->size);
1267 #endif
1268     assert(pp->level >= 0);
1269     assert(p->offset <= p->size);
1270     if (pp->level==0)
1271     {
1272 #if ISAMB_DEBUG
1273         logf(LOG_DEBUG,"isamb_pp_climb_level returning 0 at root");
1274 #endif
1275         return 0;
1276     }
1277     assert(pp->level>0); 
1278     close_block(pp->isamb, pp->block[pp->level]);
1279     pp->block[pp->level]=0;
1280     (pp->level)--;
1281     p=pp->block[pp->level];
1282 #if ISAMB_DEBUG
1283     logf(LOG_DEBUG,"isamb_pp_climb_level climbed to level %d node %d ofs=%d",
1284                     pp->level, p->pos, p->offset);
1285 #endif
1286     assert(!p->leaf);
1287     assert(p->offset <= p->size);
1288     if (p->offset == p->size ) {
1289         /* we came from the last pointer, climb on */
1290         if (!isamb_pp_climb_level(pp,pos))
1291             return 0;
1292         p=pp->block[pp->level];
1293     }
1294     else
1295     {
1296         /* skip the child we just came from */
1297 #if ISAMB_DEBUG
1298         logf(LOG_DEBUG,"isam_pp_climb_level: skipping lev=%d ofs=%d sz=%d", 
1299                         pp->level, p->offset, p->size);
1300 #endif
1301         assert (p->offset < p->size );
1302         src=p->bytes + p->offset;
1303         decode_ptr(&src, &item_len);
1304         src += item_len;
1305         decode_ptr(&src, pos);
1306         p->offset=src - (char *)p->bytes;
1307             
1308     }
1309     return 1;
1310 } /* climb_level */
1311
1312
1313 static zint isamb_pp_forward_unode(ISAMB_PP pp, zint pos, const void *untilbuf)
1314 { /* scans a upper node until it finds a child <= untilbuf */
1315   /* pp points to the key value, as always. pos is the child read from */
1316   /* the buffer */
1317   /* if all values are too small, returns the last child in the node */
1318   /* FIXME - this can be detected, and avoided by looking at the */
1319   /* parent node, but that gets messy. Presumably the cost is */
1320   /* pretty low anyway */
1321     struct ISAMB_block *p = pp->block[pp->level];
1322     const char *src=p->bytes + p->offset;
1323     zint item_len;
1324     int cmp;
1325     zint nxtpos;
1326 #if ISAMB_DEBUG
1327     int skips=0;
1328     logf(LOG_DEBUG,"isamb_pp_forward_unode starting "
1329                    "at level %d node %d ofs=%di sz=%d",
1330                     pp->level, p->pos, p->offset, p->size);
1331 #endif
1332     assert(!p->leaf);
1333     assert(p->offset <= p->size);
1334     if (p->offset == p->size) {
1335 #if ISAMB_DEBUG
1336             logf(LOG_DEBUG,"isamb_pp_forward_unode returning at end "
1337                    "at level %d node %d ofs=%di sz=%d",
1338                     pp->level, p->pos, p->offset, p->size);
1339 #endif
1340         return pos; /* already at the end of it */
1341     }
1342     while(p->offset < p->size) {
1343         decode_ptr(&src,&item_len);
1344         cmp=(*pp->isamb->method->compare_item)(untilbuf,src);
1345         src+=item_len;
1346         decode_ptr(&src,&nxtpos);
1347         if (cmp<2)
1348         {
1349 #if ISAMB_DEBUG
1350             logf(LOG_DEBUG,"isamb_pp_forward_unode returning a hit "
1351                    "at level %d node %d ofs=%d sz=%d",
1352                     pp->level, p->pos, p->offset, p->size);
1353 #endif
1354             return pos;
1355         } /* found one */
1356         pos=nxtpos;
1357         p->offset=src-(char*)p->bytes;
1358         (pp->skipped_nodes[pp->maxlevel - pp->level -1])++;
1359 #if ISAMB_DEBUG
1360         skips++;
1361 #endif
1362     }
1363 #if ISAMB_DEBUG
1364             logf(LOG_DEBUG,"isamb_pp_forward_unode returning at tail "
1365                    "at level %d node %d ofs=%d sz=%d skips=%d",
1366                     pp->level, p->pos, p->offset, p->size, skips);
1367 #endif
1368     return pos; /* that's the last one in the line */
1369     
1370 } /* forward_unode */
1371
1372 static void isamb_pp_descend_to_leaf(ISAMB_PP pp, ISAMB_P pos, const void *untilbuf)
1373 { /* climbs down the tree, from pos, to the leftmost leaf */
1374     struct ISAMB_block *p = pp->block[pp->level];
1375     const char *src;
1376     assert(!p->leaf);
1377 #if ISAMB_DEBUG
1378     logf(LOG_DEBUG,"isamb_pp_descend_to_leaf "
1379                    "starting at lev %d node %d ofs=%d lf=%d u=%p", 
1380                    pp->level, p->pos, p->offset, p->leaf, untilbuf);
1381 #endif
1382     if (untilbuf)
1383         pos=isamb_pp_forward_unode(pp,pos,untilbuf);
1384     ++(pp->level);
1385     assert(pos);
1386     p=open_block(pp->isamb, pos);
1387     pp->block[pp->level]=p;
1388     ++(pp->accessed_nodes[pp->maxlevel-pp->level]);
1389     ++(pp->no_blocks);
1390 #if ISAMB_DEBUG
1391     logf(LOG_DEBUG,"isamb_pp_descend_to_leaf "
1392                    "got lev %d node %d lf=%d", 
1393                    pp->level, p->pos, p->leaf);
1394 #endif
1395     if (p->leaf)
1396         return;
1397     assert (p->offset==0 );
1398     src=p->bytes + p->offset;
1399     decode_ptr(&src, &pos);
1400     p->offset=src-(char*)p->bytes;
1401     isamb_pp_descend_to_leaf(pp,pos,untilbuf);
1402 #if ISAMB_DEBUG
1403     logf(LOG_DEBUG,"isamb_pp_descend_to_leaf "
1404                    "returning at lev %d node %d ofs=%d lf=%d", 
1405                    pp->level, p->pos, p->offset, p->leaf);
1406 #endif
1407 } /* descend_to_leaf */
1408
1409 static int isamb_pp_find_next_leaf(ISAMB_PP pp)
1410 { /* finds the next leaf by climbing up and down */
1411     ISAMB_P pos;
1412     if (!isamb_pp_climb_level(pp,&pos))
1413         return 0;
1414     isamb_pp_descend_to_leaf(pp, pos,0);
1415     return 1;
1416 }
1417
1418 static int isamb_pp_climb_desc(ISAMB_PP pp, void *buf, const void *untilbuf)
1419 { /* climbs up and descends to a leaf where values >= *untilbuf are found */
1420     ISAMB_P pos;
1421 #if ISAMB_DEBUG
1422     struct ISAMB_block *p = pp->block[pp->level];
1423     logf(LOG_DEBUG,"isamb_pp_climb_desc starting "
1424                    "at level %d node %d ofs=%d sz=%d",
1425                     pp->level, p->pos, p->offset, p->size);
1426 #endif
1427     if (!isamb_pp_climb_level(pp,&pos))
1428         return 0;
1429     /* see if it would pay to climb one higher */
1430     if (!isamb_pp_on_right_node(pp, pp->level, untilbuf))
1431         if (!isamb_pp_climb_level(pp,&pos))
1432             return 0;
1433     isamb_pp_descend_to_leaf(pp, pos,untilbuf);
1434 #if ISAMB_DEBUG
1435     p = pp->block[pp->level];
1436     logf(LOG_DEBUG,"isamb_pp_climb_desc done "
1437                    "at level %d node %d ofs=%d sz=%d",
1438                     pp->level, p->pos, p->offset, p->size);
1439 #endif
1440     return 1;
1441 } /* climb_desc */
1442
1443 int isamb_pp_forward (ISAMB_PP pp, void *buf, const void *untilbuf)
1444 {
1445 #if ISAMB_DEBUG
1446     struct ISAMB_block *p = pp->block[pp->level];
1447     assert(p->leaf);
1448     logf(LOG_DEBUG,"isamb_pp_forward starting "
1449                    "at level %d node %d ofs=%d sz=%d u=%p",
1450                     pp->level, p->pos, p->offset, p->size,untilbuf);
1451 #endif
1452     if (untilbuf) {
1453         if (isamb_pp_forward_on_leaf( pp, buf, untilbuf)) {
1454 #if ISAMB_DEBUG
1455             logf(LOG_DEBUG,"isamb_pp_forward (f) returning (A) "
1456                    "at level %d node %d ofs=%d sz=%d",
1457                     pp->level, p->pos, p->offset, p->size);
1458 #endif
1459             return 1;
1460         }
1461         if (! isamb_pp_climb_desc( pp, buf, untilbuf)) {
1462 #if ISAMB_DEBUG
1463             logf(LOG_DEBUG,"isamb_pp_forward (f) returning notfound (B) "
1464                    "at level %d node %d ofs=%d sz=%d",
1465                     pp->level, p->pos, p->offset, p->size);
1466 #endif
1467             return 0; /* could not find a leaf */
1468         }
1469         do{
1470             if (isamb_pp_forward_on_leaf( pp, buf, untilbuf)) {
1471 #if ISAMB_DEBUG
1472             logf(LOG_DEBUG,"isamb_pp_forward (f) returning (C) "
1473                    "at level %d node %d ofs=%d sz=%d",
1474                     pp->level, p->pos, p->offset, p->size);
1475 #endif
1476                 return 1;
1477             }
1478         }while ( isamb_pp_find_next_leaf(pp));
1479         return 0; /* could not find at all */
1480     }
1481     else { /* no untilbuf, a straight read */
1482         /* FIXME - this should be moved
1483          * directly into the pp_read */
1484         /* keeping here now, to keep same
1485          * interface as the old fwd */
1486         if (isamb_pp_read_on_leaf( pp, buf)) {
1487 #if ISAMB_DEBUG
1488             logf(LOG_DEBUG,"isamb_pp_forward (read) returning (D) "
1489                    "at level %d node %d ofs=%d sz=%d",
1490                     pp->level, p->pos, p->offset, p->size);
1491 #endif
1492             return 1;
1493         }
1494         if (isamb_pp_find_next_leaf(pp)) {
1495 #if ISAMB_DEBUG
1496             logf(LOG_DEBUG,"isamb_pp_forward (read) returning (E) "
1497                    "at level %d node %d ofs=%d sz=%d",
1498                     pp->level, p->pos, p->offset, p->size);
1499 #endif
1500             return isamb_pp_read_on_leaf(pp, buf);
1501         }
1502         else
1503             return 0;
1504     }
1505 } /* isam_pp_forward (new version) */
1506
1507 #elif NEW_FORWARD == 0
1508
1509 int isamb_pp_forward (ISAMB_PP pp, void *buf, const void *untilbuf)
1510 {
1511     /* pseudocode:
1512      *   while 1
1513      *     while at end of node
1514      *       climb higher. If out, return 0
1515      *     while not on a leaf (and not at its end)
1516      *       decode next
1517      *       if cmp
1518      *         descend to node
1519      *     decode next
1520      *     if cmp
1521      *       return 1
1522      */
1523      /* 
1524       * The upper nodes consist of a sequence of nodenumbers and keys
1525       * When opening a block,  the first node number is read in, and
1526       * offset points to the first key, which is the upper limit of keys
1527       * in the node just read.
1528       */
1529     char *dst = buf;
1530     const char *src;
1531     struct ISAMB_block *p = pp->block[pp->level];
1532     int cmp;
1533     int item_len;
1534     int pos;
1535     int nxtpos;
1536     int descending=0; /* used to prevent a border condition error */
1537     if (!p)
1538         return 0;
1539 #if ISAMB_DEBUG
1540     logf(LOG_DEBUG,"isamb_pp_forward starting [%p] p=%d",pp,p->pos);
1541     
1542     (*pp->isamb->method->codec.log_item)(LOG_DEBUG, untilbuf, "until");
1543     (*pp->isamb->method->codec.log_item)(LOG_DEBUG, buf, "buf");
1544 #endif
1545
1546     while (1)
1547     {
1548         while ( (p->offset == p->size) && !descending )
1549         {  /* end of this block - climb higher */
1550             assert (p->offset <= p->size);
1551 #if ISAMB_DEBUG
1552             logf(LOG_DEBUG,"isamb_pp_forward climbing from l=%d",
1553                             pp->level);
1554 #endif
1555             if (pp->level == 0)
1556             {
1557 #if ISAMB_DEBUG
1558                 logf(LOG_DEBUG,"isamb_pp_forward returning 0 at root");
1559 #endif
1560                 return 0; /* at end of the root, nothing left */
1561             }
1562             close_block(pp->isamb, pp->block[pp->level]);
1563             pp->block[pp->level]=0;
1564             (pp->level)--;
1565             p=pp->block[pp->level];
1566 #if ISAMB_DEBUG
1567             logf(LOG_DEBUG,"isamb_pp_forward climbed to node %d off=%d",
1568                             p->pos, p->offset);
1569 #endif
1570             assert(!p->leaf);
1571             assert(p->offset <= p->size);
1572             /* skip the child we have handled */
1573             if (p->offset != p->size)
1574             { 
1575                 src = p->bytes + p->offset;
1576                 decode_ptr(&src, &item_len);
1577 #if ISAMB_DEBUG         
1578                 (*pp->isamb->method->codec.log_item)(LOG_DEBUG, src,
1579                                                " isamb_pp_forward "
1580                                                "climb skipping old key");
1581 #endif
1582                 src += item_len;
1583                 decode_ptr(&src,&pos);
1584                 p->offset = src - (char*) p->bytes;
1585                 break; /* even if this puts us at the end of the block, we
1586                           need to descend to the last pos. UGLY coding,
1587                           clean up some day */
1588             }
1589         }
1590         if (!p->leaf)
1591         { 
1592             src = p->bytes + p->offset;
1593             if (p->offset == p->size)
1594                 cmp=-2 ; /* descend to the last node, as we have
1595                             no value to cmp */
1596             else
1597             {
1598                 decode_ptr(&src, &item_len);
1599 #if ISAMB_DEBUG
1600                 logf(LOG_DEBUG,"isamb_pp_forward (B) on a high node. "
1601                      "ofs=%d sz=%d nxtpos=%d ",
1602                         p->offset,p->size,pos);
1603                 (*pp->isamb->method->codec.log_item)(LOG_DEBUG, src, "");
1604 #endif
1605                 if (untilbuf)
1606                     cmp=(*pp->isamb->method->compare_item)(untilbuf,src);
1607                 else
1608                     cmp=-2;
1609                 src += item_len;
1610                 decode_ptr(&src,&nxtpos);
1611             }
1612             if (cmp<2)
1613             { 
1614 #if ISAMB_DEBUG
1615                 logf(LOG_DEBUG,"isambb_pp_forward descending l=%d p=%d ",
1616                             pp->level, pos);
1617 #endif
1618                 descending=1; /* prevent climbing for a while */
1619                 ++(pp->level);
1620                 p = open_block(pp->isamb,pos);
1621                 pp->block[pp->level] = p ;
1622                 pp->total_size += p->size;
1623                 (pp->accessed_nodes[pp->maxlevel - pp->level])++;
1624                 pp->no_blocks++;
1625                 if ( !p->leaf)
1626                 { /* block starts with a pos */
1627                     src = p->bytes + p->offset;
1628                     decode_ptr(&src,&pos);
1629                     p->offset=src-(char*) p->bytes;
1630 #if ISAMB_DEBUG
1631                     logf(LOG_DEBUG,"isamb_pp_forward: block %d starts with %d",
1632                                     p->pos, pos);
1633 #endif
1634                 } 
1635             } /* descend to the node */
1636             else
1637             { /* skip the node */
1638                 p->offset = src - (char*) p->bytes;
1639                 pos=nxtpos;
1640                 (pp->skipped_nodes[pp->maxlevel - pp->level -1])++;
1641 #if ISAMB_DEBUG
1642                 logf(LOG_DEBUG,
1643                     "isamb_pp_forward: skipping block on level %d, noting "
1644                      "on %d (%d)",
1645                     pp->level, pp->maxlevel - pp->level-1 , 
1646                     pp->skipped_nodes[pp->maxlevel - pp->level-1 ]);
1647 #endif
1648                 /* 0 is always leafs, 1 is one level above leafs etc, no
1649                  * matter how high tree */
1650             }
1651         } /* not on a leaf */
1652         else
1653         { /* on a leaf */
1654             if (p->offset == p->size) { 
1655                 descending = 0;
1656             }
1657             else
1658             {
1659                 assert (p->offset < p->size);
1660                 src = p->bytes + p->offset;
1661                 dst=buf;
1662                 (*pp->isamb->method->codec.decode)(p->decodeClientData,
1663                                                 &dst, &src);
1664                 p->offset = src - (char*) p->bytes;
1665                 if (untilbuf)
1666                     cmp=(*pp->isamb->method->compare_item)(untilbuf,buf);
1667                 else
1668                     cmp=-2;
1669 #if ISAMB_DEBUG
1670                 logf(LOG_DEBUG,"isamb_pp_forward on a leaf. cmp=%d", 
1671                      cmp);
1672                 (*pp->isamb->method->codec.log_item)(LOG_DEBUG, buf, "");
1673 #endif
1674                 if (cmp <2)
1675                 {
1676 #if ISAMB_DEBUG
1677                     if (untilbuf)
1678                     {
1679                         (*pp->isamb->method->codec.log_item)(
1680                             LOG_DEBUG, buf,  "isamb_pp_forward returning 1");
1681                     }
1682                     else
1683                     {
1684                         (*pp->isamb->method->codec.log_item)(
1685                             LOG_DEBUG, buf, "isamb_pp_read returning 1 (fwd)");
1686                     }
1687 #endif
1688                     pp->returned_numbers++;
1689                     return 1;
1690                 }
1691                 else
1692                     pp->skipped_numbers++;
1693             }
1694         } /* leaf */
1695     } /* main loop */
1696 }
1697
1698 #elif NEW_FORWARD == 2
1699
1700 int isamb_pp_forward (ISAMB_PP pp, void *buf, const void *untilb)
1701 {
1702     char *dst = buf;
1703     const char *src;
1704     struct ISAMB_block *p = pp->block[pp->level];
1705     if (!p)
1706         return 0;
1707
1708 again:
1709     while (p->offset == p->size)
1710     {
1711         int pos, item_len;
1712         while (p->offset == p->size)
1713         {
1714             if (pp->level == 0)
1715                 return 0;
1716             close_block (pp->isamb, pp->block[pp->level]);
1717             pp->block[pp->level] = 0;
1718             (pp->level)--;
1719             p = pp->block[pp->level];
1720             assert (!p->leaf);  
1721         }
1722
1723         assert(!p->leaf);
1724         src = p->bytes + p->offset;
1725         
1726         decode_ptr (&src, &item_len);
1727         src += item_len;
1728         decode_ptr (&src, &pos);
1729         
1730         p->offset = src - (char*) p->bytes;
1731
1732         src = p->bytes + p->offset;
1733
1734         while(1)
1735         {
1736             if (!untilb || p->offset == p->size)
1737                 break;
1738             assert(p->offset < p->size);
1739             decode_ptr (&src, &item_len);
1740             if ((*pp->isamb->method->compare_item)(untilb, src) <= 1)
1741                 break;
1742             src += item_len;
1743             decode_ptr (&src, &pos);
1744             p->offset = src - (char*) p->bytes;
1745         }
1746
1747         pp->level++;
1748
1749         while (1)
1750         {
1751             pp->block[pp->level] = p = open_block (pp->isamb, pos);
1752
1753             pp->total_size += p->size;
1754             pp->no_blocks++;
1755             
1756             if (p->leaf) 
1757             {
1758                 break;
1759             }
1760             
1761             src = p->bytes + p->offset;
1762             while(1)
1763             {
1764                 decode_ptr (&src, &pos);
1765                 p->offset = src - (char*) p->bytes;
1766                 
1767                 if (!untilb || p->offset == p->size)
1768                     break;
1769                 assert(p->offset < p->size);
1770                 decode_ptr (&src, &item_len);
1771                 if ((*pp->isamb->method->compare_item)(untilb, src) <= 1)
1772                     break;
1773                 src += item_len;
1774             }
1775             pp->level++;
1776         }
1777     }
1778     assert (p->offset < p->size);
1779     assert (p->leaf);
1780     while(1)
1781     {
1782         char *dst0 = dst;
1783         src = p->bytes + p->offset;
1784         (*pp->isamb->method->codec.decode)(p->decodeClientData, &dst, &src);
1785         p->offset = src - (char*) p->bytes;
1786         if (!untilb || (*pp->isamb->method->compare_item)(untilb, dst0) <= 1)
1787             break;
1788         dst = dst0;
1789         if (p->offset == p->size) goto again;
1790     }
1791     /* key_logdump_txt(LOG_DEBUG,buf, "isamb_pp_read returning 1"); */
1792     return 1;
1793 }
1794
1795 #endif
1796
1797 int isamb_pp_num (ISAMB_PP pp)
1798 {
1799     return 1;
1800 }
1801
1802 static void isamb_pp_leaf_pos( ISAMB_PP pp, 
1803                                double *current, double *total, 
1804                                void *dummybuf )
1805 {
1806     struct ISAMB_block *p = pp->block[pp->level];
1807     const char *src=p->bytes;
1808     char *end=p->bytes+p->size;
1809     char *cur=p->bytes+p->offset;
1810     char *dst;
1811     void *decodeClientData;
1812     assert(p->offset <= p->size);
1813     assert(cur <= end);
1814     assert(p->leaf);
1815     *current=0;
1816     *total=0;
1817
1818     decodeClientData = (pp->isamb->method->codec.start)();
1819
1820     while(src < end) {
1821         dst=dummybuf;
1822         (*pp->isamb->method->codec.decode)(decodeClientData,&dst, &src);
1823         assert(dst<(char*) dummybuf+100); /*FIXME */
1824         (*total)++;
1825         if (src<=cur)
1826              (*current)++;
1827     }
1828 #if ISAMB_DEBUG
1829     logf(LOG_DEBUG, "isamb_pp_leaf_pos: cur= %0.1f tot=%0.1f "
1830                     " ofs=%d sz=%d lev=%d",
1831                     *current, *total, p->offset, p->size, pp->level);
1832 #endif
1833     assert(src==end);
1834     (pp->isamb->method->codec.stop)(decodeClientData);
1835 }
1836
1837 static void isamb_pp_upper_pos( ISAMB_PP pp, double *current, double *total, 
1838                                 zint size, int level )
1839 { /* estimates total/current occurrences from here up, excl leaf */
1840     struct ISAMB_block *p = pp->block[level];
1841     const char *src=p->bytes;
1842     char *end=p->bytes+p->size;
1843     char *cur=p->bytes+p->offset;
1844     zint item_size;
1845     ISAMB_P child;
1846     
1847     assert(level>=0);
1848     assert(!p->leaf);
1849    
1850 #if ISAMB_DEBUG
1851     logf(LOG_DEBUG,"isamb_pp_upper_pos at beginning     l=%d "
1852                    "cur="ZINT_FORMAT" tot="ZINT_FORMAT
1853                    " ofs=%d sz=%d pos=" ZINT_FORMAT, 
1854                    level, *current, *total, p->offset, p->size, p->pos);
1855 #endif    
1856     assert (p->offset <= p->size);
1857     decode_ptr (&src, &child ); /* first child */
1858     while(src < end) {
1859         if (src!=cur) {
1860             *total += size;
1861             if (src < cur)
1862                 *current +=size;
1863         }
1864         decode_ptr (&src, &item_size ); 
1865         assert(src+item_size<=end);
1866         src += item_size;
1867             decode_ptr (&src, &child );
1868     }
1869     if (level>0)
1870         isamb_pp_upper_pos(pp, current, total, (zint) *total, level-1);
1871 } /* upper_pos */
1872
1873 void isamb_pp_pos( ISAMB_PP pp, double *current, double *total )
1874 { /* return an estimate of the current position and of the total number of */
1875   /* occureences in the isam tree, based on the current leaf */
1876     struct ISAMB_block *p = pp->block[pp->level];
1877     char dummy[100]; /* 100 bytes/entry must be enough */
1878     assert(total);
1879     assert(current);
1880     assert(p->leaf);
1881     isamb_pp_leaf_pos(pp,current, total, dummy);
1882     if (pp->level>0)
1883         isamb_pp_upper_pos(pp, current, total, (zint) *total, pp->level-1);
1884 }