Limiting hits to INT_MAX, because yaz can't handle 64-bit hitcounts
[idzebra-moved-to-github.git] / isamb / isamb.c
1 /* $Id: isamb.c,v 1.53 2004-08-10 08:54:40 heikki Exp $
2    Copyright (C) 1995,1996,1997,1998,1999,2000,2001,2002,2003,2004
3    Index Data Aps
4
5 This file is part of the Zebra server.
6
7 Zebra is free software; you can redistribute it and/or modify it under
8 the terms of the GNU General Public License as published by the Free
9 Software Foundation; either version 2, or (at your option) any later
10 version.
11
12 Zebra is distributed in the hope that it will be useful, but WITHOUT ANY
13 WARRANTY; without even the implied warranty of MERCHANTABILITY or
14 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
15 for more details.
16
17 You should have received a copy of the GNU General Public License
18 along with Zebra; see the file LICENSE.zebra.  If not, write to the
19 Free Software Foundation, 59 Temple Place - Suite 330, Boston, MA
20 02111-1307, USA.
21 */
22
23 #include <string.h>
24 #include <yaz/xmalloc.h>
25 #include <yaz/log.h>
26 #include <isamb.h>
27 #include <assert.h>
28
29 #ifndef ISAMB_DEBUG
30 #define ISAMB_DEBUG 0
31 #endif
32
33 struct ISAMB_head {
34     zint first_block;
35     zint last_block;
36     int block_size;
37     int block_max;
38     zint free_list;
39 };
40
41 #define ISAMB_DATA_OFFSET 3
42
43 /* maximum size of encoded buffer */
44 #define DST_ITEM_MAX 256
45
46 #define ISAMB_MAX_LEVEL 10
47 /* approx 2*max page + max size of item */
48 #define DST_BUF_SIZE 16840
49
50 #define ISAMB_CACHE_ENTRY_SIZE 4096
51
52 /* CAT_MAX: _must_ be power of 2 */
53 #define CAT_MAX 4
54 #define CAT_MASK (CAT_MAX-1)
55 /* CAT_NO: <= CAT_MAX */
56 #define CAT_NO 4
57
58 /* ISAMB_PTR_CODEC=1 var, =0 fixed */
59 #define ISAMB_PTR_CODEC 0
60
61 struct ISAMB_cache_entry {
62     ISAMB_P pos;
63     unsigned char *buf;
64     int dirty;
65     int hits;
66     struct ISAMB_cache_entry *next;
67 };
68
69 struct ISAMB_file {
70     BFile bf;
71     int head_dirty;
72     struct ISAMB_head head;
73     struct ISAMB_cache_entry *cache_entries;
74 };
75
76 struct ISAMB_s {
77     BFiles bfs;
78     ISAMC_M *method;
79
80     struct ISAMB_file *file;
81     int no_cat;
82     int cache; /* 0=no cache, 1=use cache, -1=dummy isam (for testing only) */
83     int log_io;        /* log level for bf_read/bf_write calls */
84     int log_freelist;  /* log level for freelist handling */
85     zint skipped_numbers; /* on a leaf node */
86     zint returned_numbers; 
87     zint skipped_nodes[ISAMB_MAX_LEVEL]; /* [0]=skipped leaves, 1=higher etc */
88     zint accessed_nodes[ISAMB_MAX_LEVEL]; /* nodes we did not skip */
89 };
90
91 struct ISAMB_block {
92     ISAMB_P pos;
93     int cat;
94     int size;
95     int leaf;
96     int dirty;
97     int deleted;
98     int offset;
99     char *bytes;
100     char *cbuf;
101     unsigned char *buf;
102     void *decodeClientData;
103     int log_rw;
104 };
105
106 struct ISAMB_PP_s {
107     ISAMB isamb;
108     ISAMB_P pos;
109     int level;
110     int maxlevel; /* total depth */
111     zint total_size;
112     zint no_blocks;
113     zint skipped_numbers; /* on a leaf node */
114     zint returned_numbers; 
115     zint skipped_nodes[ISAMB_MAX_LEVEL]; /* [0]=skipped leaves, 1=higher etc */
116     zint accessed_nodes[ISAMB_MAX_LEVEL]; /* nodes we did not skip */
117     struct ISAMB_block **block;
118 };
119
120
121 #if ISAMB_PTR_CODEC
122 static void encode_ptr (char **dst, zint pos)
123 {
124     unsigned char *bp = (unsigned char*) *dst;
125
126     while (pos > 127)
127     {
128          *bp++ = 128 | (pos & 127);
129          pos = pos >> 7;
130     }
131     *bp++ = pos;
132     *dst = (char *) bp;
133 }
134 #else
135 static void encode_ptr (char **dst, zint pos)
136 {
137     memcpy(*dst, &pos, sizeof(pos));
138     (*dst) += sizeof(pos);
139 }
140 #endif
141
142 #if ISAMB_PTR_CODEC
143 static void decode_ptr (const char **src1, zint *pos)
144 {
145     const unsigned char **src = (const unsigned char **) src1;
146     zint d = 0;
147     unsigned char c;
148     unsigned r = 0;
149
150     while (((c = *(*src)++) & 128))
151     {
152         d += ((zint) (c & 127) << r);
153         r += 7;
154     }
155     d += ((zint) c << r);
156     *pos = d;
157 }
158 #else
159 static void decode_ptr (const char **src, zint *pos)
160 {
161      memcpy (pos, *src, sizeof(*pos));
162      (*src) += sizeof(*pos);
163 }
164 #endif
165
166 ISAMB isamb_open (BFiles bfs, const char *name, int writeflag, ISAMC_M *method,
167                   int cache)
168 {
169     ISAMB isamb = xmalloc (sizeof(*isamb));
170     int i, b_size = 32;
171
172     isamb->bfs = bfs;
173     isamb->method = (ISAMC_M *) xmalloc (sizeof(*method));
174     memcpy (isamb->method, method, sizeof(*method));
175     isamb->no_cat = CAT_NO;
176     isamb->log_io = 0;
177     isamb->log_freelist = 0;
178     isamb->cache = cache;
179     isamb->skipped_numbers=0;
180     isamb->returned_numbers=0;
181     for (i=0;i<ISAMB_MAX_LEVEL;i++)
182       isamb->skipped_nodes[i]= isamb->accessed_nodes[i]=0;
183
184     assert (cache == 0);
185     isamb->file = xmalloc (sizeof(*isamb->file) * isamb->no_cat);
186     for (i = 0; i<isamb->no_cat; i++)
187     {
188         char fname[DST_BUF_SIZE];
189         isamb->file[i].cache_entries = 0;
190         isamb->file[i].head_dirty = 0;
191         sprintf (fname, "%s%c", name, i+'A');
192         if (cache)
193             isamb->file[i].bf = bf_open (bfs, fname, ISAMB_CACHE_ENTRY_SIZE,
194                                          writeflag);
195         else
196             isamb->file[i].bf = bf_open (bfs, fname, b_size, writeflag);
197
198         
199         if (!bf_read (isamb->file[i].bf, 0, 0, sizeof(struct ISAMB_head),
200                       &isamb->file[i].head))
201         {
202             isamb->file[i].head.first_block = ISAMB_CACHE_ENTRY_SIZE/b_size+1;
203             isamb->file[i].head.last_block = isamb->file[i].head.first_block;
204             isamb->file[i].head.block_size = b_size;
205             isamb->file[i].head.block_max = b_size - ISAMB_DATA_OFFSET;
206             isamb->file[i].head.free_list = 0;
207         }
208         assert (isamb->file[i].head.block_size >= ISAMB_DATA_OFFSET);
209         isamb->file[i].head_dirty = 0;
210         assert(isamb->file[i].head.block_size == b_size);
211         b_size = b_size * 4;
212     }
213 #if ISAMB_DEBUG
214     logf(LOG_WARN, "isamb debug enabled. Things will be slower than usual");
215 #endif
216     return isamb;
217 }
218
219 static void flush_blocks (ISAMB b, int cat)
220 {
221     while (b->file[cat].cache_entries)
222     {
223         struct ISAMB_cache_entry *ce_this = b->file[cat].cache_entries;
224         b->file[cat].cache_entries = ce_this->next;
225
226         if (ce_this->dirty)
227         {
228             yaz_log (b->log_io, "bf_write: flush_blocks");
229             bf_write (b->file[cat].bf, ce_this->pos, 0, 0, ce_this->buf);
230         }
231         xfree (ce_this->buf);
232         xfree (ce_this);
233     }
234 }
235
236 static int get_block (ISAMB b, ISAMC_P pos, char *userbuf, int wr)
237 {
238     int cat = (int) (pos&CAT_MASK);
239     int off = (int) (((pos/CAT_MAX) & 
240                (ISAMB_CACHE_ENTRY_SIZE / b->file[cat].head.block_size - 1))
241         * b->file[cat].head.block_size);
242     zint norm = pos / (CAT_MASK*ISAMB_CACHE_ENTRY_SIZE / b->file[cat].head.block_size);
243     int no = 0;
244     struct ISAMB_cache_entry **ce, *ce_this = 0, **ce_last = 0;
245
246     if (!b->cache)
247         return 0;
248
249     assert (ISAMB_CACHE_ENTRY_SIZE >= b->file[cat].head.block_size);
250     for (ce = &b->file[cat].cache_entries; *ce; ce = &(*ce)->next, no++)
251     {
252         ce_last = ce;
253         if ((*ce)->pos == norm)
254         {
255             ce_this = *ce;
256             *ce = (*ce)->next;   /* remove from list */
257             
258             ce_this->next = b->file[cat].cache_entries;  /* move to front */
259             b->file[cat].cache_entries = ce_this;
260             
261             if (wr)
262             {
263                 memcpy (ce_this->buf + off, userbuf, 
264                         b->file[cat].head.block_size);
265                 ce_this->dirty = 1;
266             }
267             else
268                 memcpy (userbuf, ce_this->buf + off,
269                         b->file[cat].head.block_size);
270             return 1;
271         }
272     }
273     if (no >= 40)
274     {
275         assert (no == 40);
276         assert (ce_last && *ce_last);
277         ce_this = *ce_last;
278         *ce_last = 0;  /* remove the last entry from list */
279         if (ce_this->dirty)
280         {
281             yaz_log (b->log_io, "bf_write: get_block");
282             bf_write (b->file[cat].bf, ce_this->pos, 0, 0, ce_this->buf);
283         }
284         xfree (ce_this->buf);
285         xfree (ce_this);
286     }
287     ce_this = xmalloc (sizeof(*ce_this));
288     ce_this->next = b->file[cat].cache_entries;
289     b->file[cat].cache_entries = ce_this;
290     ce_this->buf = xmalloc (ISAMB_CACHE_ENTRY_SIZE);
291     ce_this->pos = norm;
292     yaz_log (b->log_io, "bf_read: get_block");
293     if (!bf_read (b->file[cat].bf, norm, 0, 0, ce_this->buf))
294         memset (ce_this->buf, 0, ISAMB_CACHE_ENTRY_SIZE);
295     if (wr)
296     {
297         memcpy (ce_this->buf + off, userbuf, b->file[cat].head.block_size);
298         ce_this->dirty = 1;
299     }
300     else
301     {
302         ce_this->dirty = 0;
303         memcpy (userbuf, ce_this->buf + off, b->file[cat].head.block_size);
304     }
305     return 1;
306 }
307
308
309 void isamb_close (ISAMB isamb)
310 {
311     int i;
312     for (i=0;isamb->accessed_nodes[i];i++)
313         logf(LOG_DEBUG,"isamb_close  level leaf-%d: "ZINT_FORMAT" read, "
314                         ZINT_FORMAT" skipped",
315              i, isamb->accessed_nodes[i], isamb->skipped_nodes[i]);
316     logf(LOG_DEBUG,"isamb_close returned "ZINT_FORMAT" values, "
317                    "skipped "ZINT_FORMAT,
318          isamb->skipped_numbers, isamb->returned_numbers);
319     for (i = 0; i<isamb->no_cat; i++)
320     {
321         flush_blocks (isamb, i);
322         if (isamb->file[i].head_dirty)
323             bf_write (isamb->file[i].bf, 0, 0,
324                       sizeof(struct ISAMB_head), &isamb->file[i].head);
325         
326         bf_close (isamb->file[i].bf);
327     }
328     xfree (isamb->file);
329     xfree (isamb->method);
330     xfree (isamb);
331 }
332
333 static struct ISAMB_block *open_block (ISAMB b, ISAMC_P pos)
334 {
335     int cat = (int) (pos&CAT_MASK);
336     struct ISAMB_block *p;
337     if (!pos)
338         return 0;
339     p = xmalloc (sizeof(*p));
340     p->pos = pos;
341     p->cat = (int) (pos & CAT_MASK);
342     p->buf = xmalloc (b->file[cat].head.block_size);
343     p->cbuf = 0;
344
345     if (!get_block (b, pos, p->buf, 0))
346     {
347         yaz_log (b->log_io, "bf_read: open_block");
348         if (!bf_read (b->file[cat].bf, pos/CAT_MAX, 0, 0, p->buf))
349         {
350             yaz_log (LOG_FATAL, "isamb: read fail for pos=%ld block=%ld",
351                      (long) pos, (long) pos/CAT_MAX);
352             abort();
353         }
354     }
355     p->bytes = p->buf + ISAMB_DATA_OFFSET;
356     p->leaf = p->buf[0];
357     p->size = (p->buf[1] + 256 * p->buf[2]) - ISAMB_DATA_OFFSET;
358     if (p->size < 0)
359     {
360         yaz_log (LOG_FATAL, "Bad block size %d in pos=" ZINT_FORMAT "\n",
361                  p->size, pos);
362     }
363     assert (p->size >= 0);
364     p->offset = 0;
365     p->dirty = 0;
366     p->deleted = 0;
367     p->decodeClientData = (*b->method->codec.start)();
368     yaz_log (LOG_DEBUG, "isamb_open_block: Opened block " ZINT_FORMAT " ofs=%d",pos, p->offset);
369     return p;
370 }
371
372 struct ISAMB_block *new_block (ISAMB b, int leaf, int cat)
373 {
374     struct ISAMB_block *p;
375
376     p = xmalloc (sizeof(*p));
377     p->buf = xmalloc (b->file[cat].head.block_size);
378
379     if (!b->file[cat].head.free_list)
380     {
381         zint block_no;
382         block_no = b->file[cat].head.last_block++;
383         p->pos = block_no * CAT_MAX + cat;
384     }
385     else
386     {
387         p->pos = b->file[cat].head.free_list;
388         assert((p->pos & CAT_MASK) == cat);
389         if (!get_block (b, p->pos, p->buf, 0))
390         {
391             yaz_log (b->log_io, "bf_read: new_block");
392             if (!bf_read (b->file[cat].bf, p->pos/CAT_MAX, 0, 0, p->buf))
393             {
394                 yaz_log (LOG_FATAL, "isamb: read fail for pos=%ld block=%ld",
395                          (long) p->pos/CAT_MAX, (long) p->pos/CAT_MAX);
396                 abort ();
397             }
398         }
399         yaz_log (b->log_freelist, "got block " ZINT_FORMAT " from freelist %d:" ZINT_FORMAT, p->pos,
400                  cat, p->pos/CAT_MAX);
401         memcpy (&b->file[cat].head.free_list, p->buf, sizeof(int));
402     }
403     p->cat = cat;
404     b->file[cat].head_dirty = 1;
405     memset (p->buf, 0, b->file[cat].head.block_size);
406     p->bytes = p->buf + ISAMB_DATA_OFFSET;
407     p->leaf = leaf;
408     p->size = 0;
409     p->dirty = 1;
410     p->deleted = 0;
411     p->offset = 0;
412     p->decodeClientData = (*b->method->codec.start)();
413     return p;
414 }
415
416 struct ISAMB_block *new_leaf (ISAMB b, int cat)
417 {
418     return new_block (b, 1, cat);
419 }
420
421
422 struct ISAMB_block *new_int (ISAMB b, int cat)
423 {
424     return new_block (b, 0, cat);
425 }
426
427 static void check_block (ISAMB b, struct ISAMB_block *p)
428 {
429     assert(b); /* mostly to make the compiler shut up about unused b */
430     if (p->leaf)
431     {
432         ;
433     }
434     else
435     {
436         /* sanity check */
437         char *startp = p->bytes;
438         const char *src = startp;
439         char *endp = p->bytes + p->size;
440         ISAMB_P pos;
441             
442         decode_ptr (&src, &pos);
443         assert ((pos&CAT_MASK) == p->cat);
444         while (src != endp)
445         {
446             zint item_len;
447             decode_ptr (&src, &item_len);
448             assert (item_len > 0 && item_len < 80);
449             src += item_len;
450             decode_ptr (&src, &pos);
451             assert ((pos&CAT_MASK) == p->cat);
452         }
453     }
454 }
455
456 void close_block (ISAMB b, struct ISAMB_block *p)
457 {
458     if (!p)
459         return;
460     if (p->deleted)
461     {
462         yaz_log (b->log_freelist, "release block " ZINT_FORMAT " from freelist %d:" ZINT_FORMAT,
463                  p->pos, p->cat, p->pos/CAT_MAX);
464         memcpy (p->buf, &b->file[p->cat].head.free_list, sizeof(int));
465         b->file[p->cat].head.free_list = p->pos;
466         if (!get_block (b, p->pos, p->buf, 1))
467         {
468             yaz_log (b->log_io, "bf_write: close_block (deleted)");
469             bf_write (b->file[p->cat].bf, p->pos/CAT_MAX, 0, 0, p->buf);
470         }
471     }
472     else if (p->dirty)
473     {
474         int size = p->size + ISAMB_DATA_OFFSET;
475         assert (p->size >= 0);
476         p->buf[0] = p->leaf;
477         p->buf[1] = size & 255;
478         p->buf[2] = size >> 8;
479         check_block(b, p);
480         if (!get_block (b, p->pos, p->buf, 1))
481         {
482             yaz_log (b->log_io, "bf_write: close_block");
483             bf_write (b->file[p->cat].bf, p->pos/CAT_MAX, 0, 0, p->buf);
484         }
485     }
486     (*b->method->codec.stop)(p->decodeClientData);
487     xfree (p->buf);
488     xfree (p);
489 }
490
491 int insert_sub (ISAMB b, struct ISAMB_block **p,
492                 void *new_item, int *mode,
493                 ISAMC_I *stream,
494                 struct ISAMB_block **sp,
495                 void *sub_item, int *sub_size,
496                 const void *max_item);
497
498 int insert_int (ISAMB b, struct ISAMB_block *p, void *lookahead_item,
499                 int *mode,
500                 ISAMC_I *stream, struct ISAMB_block **sp,
501                 void *split_item, int *split_size, const void *last_max_item)
502 {
503     char *startp = p->bytes;
504     const char *src = startp;
505     char *endp = p->bytes + p->size;
506     ISAMB_P pos;
507     struct ISAMB_block *sub_p1 = 0, *sub_p2 = 0;
508     char sub_item[DST_ITEM_MAX];
509     int sub_size;
510     int more = 0;
511
512     *sp = 0;
513
514     assert(p->size >= 0);
515     decode_ptr (&src, &pos);
516     while (src != endp)
517     {
518         zint item_len;
519         int d;
520         const char *src0 = src;
521         decode_ptr (&src, &item_len);
522         d = (*b->method->compare_item)(src, lookahead_item);
523         if (d > 0)
524         {
525             sub_p1 = open_block (b, pos);
526             assert (sub_p1);
527             more = insert_sub (b, &sub_p1, lookahead_item, mode,
528                                stream, &sub_p2, 
529                                sub_item, &sub_size, src);
530             src = src0;
531             break;
532         }
533         src += item_len;
534         decode_ptr (&src, &pos);
535     }
536     if (!sub_p1)
537     {
538         sub_p1 = open_block (b, pos);
539         assert (sub_p1);
540         more = insert_sub (b, &sub_p1, lookahead_item, mode, stream, &sub_p2, 
541                            sub_item, &sub_size, last_max_item);
542     }
543     if (sub_p2)
544     {
545         /* there was a split - must insert pointer in this one */
546         char dst_buf[DST_BUF_SIZE];
547         char *dst = dst_buf;
548
549         assert (sub_size < 80 && sub_size > 1);
550
551         memcpy (dst, startp, src - startp);
552                 
553         dst += src - startp;
554
555         encode_ptr (&dst, sub_size);      /* sub length and item */
556         memcpy (dst, sub_item, sub_size);
557         dst += sub_size;
558
559         encode_ptr (&dst, sub_p2->pos);   /* pos */
560
561         if (endp - src)                   /* remaining data */
562         {
563             memcpy (dst, src, endp - src);
564             dst += endp - src;
565         }
566         p->size = dst - dst_buf;
567         assert (p->size >= 0);
568         if (p->size <= b->file[p->cat].head.block_max)
569         {
570             memcpy (startp, dst_buf, dst - dst_buf);
571         }
572         else
573         {
574             zint split_size_tmp;
575             int p_new_size;
576             const char *half;
577             src = dst_buf;
578             endp = dst;
579
580             half = src + b->file[p->cat].head.block_size/2;
581             decode_ptr (&src, &pos);
582             while (src <= half)
583             {
584                 decode_ptr (&src, &split_size_tmp);
585                 *split_size = (int) split_size_tmp;
586
587                 src += *split_size;
588                 decode_ptr (&src, &pos);
589             }
590             p_new_size = src - dst_buf;
591             memcpy (p->bytes, dst_buf, p_new_size);
592
593             decode_ptr (&src, &split_size_tmp);
594             *split_size = (int) split_size_tmp;
595             memcpy (split_item, src, *split_size);
596             src += *split_size;
597
598             *sp = new_int (b, p->cat);
599             (*sp)->size = endp - src;
600             memcpy ((*sp)->bytes, src, (*sp)->size);
601
602             p->size = p_new_size;
603         }
604         p->dirty = 1;
605         close_block (b, sub_p2);
606     }
607     close_block (b, sub_p1);
608     return more;
609 }
610
611 int insert_leaf (ISAMB b, struct ISAMB_block **sp1, void *lookahead_item,
612                  int *lookahead_mode, ISAMC_I *stream,
613                  struct ISAMB_block **sp2,
614                  void *sub_item, int *sub_size,
615                  const void *max_item)
616 {
617     struct ISAMB_block *p = *sp1;
618     char *endp = 0;
619     const char *src = 0;
620     char dst_buf[DST_BUF_SIZE], *dst = dst_buf;
621     int new_size;
622     void *c1 = (*b->method->codec.start)();
623     void *c2 = (*b->method->codec.start)();
624     int more = 1;
625     int quater = b->file[b->no_cat-1].head.block_max / CAT_MAX;
626     char *cut = dst_buf + quater * 2;
627     char *maxp = dst_buf + b->file[b->no_cat-1].head.block_max;
628     char *half1 = 0;
629     char *half2 = 0;
630     char cut_item_buf[DST_ITEM_MAX];
631     int cut_item_size = 0;
632
633     if (p && p->size)
634     {
635         char file_item_buf[DST_ITEM_MAX];
636         char *file_item = file_item_buf;
637             
638         src = p->bytes;
639         endp = p->bytes + p->size;
640         (*b->method->codec.decode)(c1, &file_item, &src);
641         while (1)
642         {
643             const char *dst_item = 0;
644             char *dst_0 = dst;
645             char *lookahead_next;
646             int d = -1;
647             
648             if (lookahead_item)
649                 d = (*b->method->compare_item)(file_item_buf, lookahead_item);
650             
651             if (d > 0)
652             {
653                 dst_item = lookahead_item;
654                 if (!*lookahead_mode)
655                 {
656                     yaz_log (LOG_WARN, "isamb: Inconsistent register (1)");
657                     assert (*lookahead_mode);
658                 }
659             }
660             else
661                 dst_item = file_item_buf;
662             if (!*lookahead_mode && d == 0)
663             {
664                 p->dirty = 1;
665             }
666             else if (!half1 && dst > cut)
667             {
668                 const char *dst_item_0 = dst_item;
669                 half1 = dst; /* candidate for splitting */
670                 
671                 (*b->method->codec.encode)(c2, &dst, &dst_item);
672                 
673                 cut_item_size = dst_item - dst_item_0;
674                 assert(cut_item_size > 0);
675                 memcpy (cut_item_buf, dst_item_0, cut_item_size);
676                 
677                 half2 = dst;
678             }
679             else
680                 (*b->method->codec.encode)(c2, &dst, &dst_item);
681             if (d > 0)  
682             {
683                 if (dst > maxp)
684                 {
685                     dst = dst_0;
686                     lookahead_item = 0;
687                 }
688                 else
689                 {
690                     lookahead_next = lookahead_item;
691                     if (!(*stream->read_item)(stream->clientData,
692                                               &lookahead_next,
693                                               lookahead_mode))
694                     {
695                         lookahead_item = 0;
696                         more = 0;
697                     }
698                     if (lookahead_item && max_item &&
699                         (*b->method->compare_item)(max_item, lookahead_item) <= 0)
700                     {
701                         /* max_item 1 */
702                         lookahead_item = 0;
703                     }
704                     
705                     p->dirty = 1;
706                 }
707             }
708             else if (d == 0)
709             {
710                 lookahead_next = lookahead_item;
711                 if (!(*stream->read_item)(stream->clientData,
712                                           &lookahead_next, lookahead_mode))
713                 {
714                     lookahead_item = 0;
715                     more = 0;
716                 }
717                 if (src == endp)
718                     break;
719                 file_item = file_item_buf;
720                 (*b->method->codec.decode)(c1, &file_item, &src);
721             }
722             else
723             {
724                 if (src == endp)
725                     break;
726                 file_item = file_item_buf;
727                 (*b->method->codec.decode)(c1, &file_item, &src);
728             }
729         }
730     }
731     maxp = dst_buf + b->file[b->no_cat-1].head.block_max + quater;
732     while (lookahead_item)
733     {
734         char *dst_item;
735         const char *src = lookahead_item;
736         char *dst_0 = dst;
737         
738         if (max_item &&
739             (*b->method->compare_item)(max_item, lookahead_item) <= 0)
740         {
741             /* max_item 2 */
742             break;
743         }
744         if (!*lookahead_mode)
745         {
746             yaz_log (LOG_WARN, "isamb: Inconsistent register (2)");
747             abort();
748         }
749         else if (!half1 && dst > cut)   
750         {
751             const char *src_0 = src;
752             half1 = dst; /* candidate for splitting */
753             
754             (*b->method->codec.encode)(c2, &dst, &src);
755             
756             cut_item_size = src - src_0;
757             assert(cut_item_size > 0);
758             memcpy (cut_item_buf, src_0, cut_item_size);
759             
760             half2 = dst;
761         }
762         else
763             (*b->method->codec.encode)(c2, &dst, &src);
764
765         if (dst > maxp)
766         {
767             dst = dst_0;
768             break;
769         }
770         if (p)
771             p->dirty = 1;
772         dst_item = lookahead_item;
773         if (!(*stream->read_item)(stream->clientData, &dst_item,
774                                   lookahead_mode))
775         {
776             lookahead_item = 0;
777             more = 0;
778         }
779     }
780     new_size = dst - dst_buf;
781     if (p && p->cat != b->no_cat-1 && 
782         new_size > b->file[p->cat].head.block_max)
783     {
784         /* non-btree block will be removed */
785         p->deleted = 1;
786         close_block (b, p);
787         /* delete it too!! */
788         p = 0; /* make a new one anyway */
789     }
790     if (!p)
791     {   /* must create a new one */
792         int i;
793         for (i = 0; i < b->no_cat; i++)
794             if (new_size <= b->file[i].head.block_max)
795                 break;
796         if (i == b->no_cat)
797             i = b->no_cat - 1;
798         p = new_leaf (b, i);
799     }
800     if (new_size > b->file[p->cat].head.block_max)
801     {
802         char *first_dst;
803         const char *cut_item = cut_item_buf;
804
805         assert (half1);
806         assert (half2);
807
808         assert(cut_item_size > 0);
809         
810         /* first half */
811         p->size = half1 - dst_buf;
812         memcpy (p->bytes, dst_buf, half1 - dst_buf);
813
814         /* second half */
815         *sp2 = new_leaf (b, p->cat);
816
817         (*b->method->codec.reset)(c2);
818
819         first_dst = (*sp2)->bytes;
820
821         (*b->method->codec.encode)(c2, &first_dst, &cut_item);
822
823         memcpy (first_dst, half2, dst - half2);
824
825         (*sp2)->size = (first_dst - (*sp2)->bytes) + (dst - half2);
826         (*sp2)->dirty = 1;
827         p->dirty = 1;
828         memcpy (sub_item, cut_item_buf, cut_item_size);
829         *sub_size = cut_item_size;
830     }
831     else
832     {
833         memcpy (p->bytes, dst_buf, dst - dst_buf);
834         p->size = new_size;
835     }
836     (*b->method->codec.stop)(c1);
837     (*b->method->codec.stop)(c2);
838     *sp1 = p;
839     return more;
840 }
841
842 int insert_sub (ISAMB b, struct ISAMB_block **p, void *new_item,
843                 int *mode,
844                 ISAMC_I *stream,
845                 struct ISAMB_block **sp,
846                 void *sub_item, int *sub_size,
847                 const void *max_item)
848 {
849     if (!*p || (*p)->leaf)
850         return insert_leaf (b, p, new_item, mode, stream, sp, sub_item, 
851                             sub_size, max_item);
852     else
853         return insert_int (b, *p, new_item, mode, stream, sp, sub_item,
854                            sub_size, max_item);
855 }
856
857 int isamb_unlink (ISAMB b, ISAMC_P pos)
858 {
859     struct ISAMB_block *p1;
860
861     if (!pos)
862         return 0;
863     p1 = open_block(b, pos);
864     p1->deleted = 1;
865     if (!p1->leaf)
866     {
867         zint sub_p;
868         zint item_len;
869         const char *src = p1->bytes + p1->offset;
870
871         decode_ptr(&src, &sub_p);
872         isamb_unlink(b, sub_p);
873         
874         while (src != p1->bytes + p1->size)
875         {
876             decode_ptr(&src, &item_len);
877             src += item_len;
878             decode_ptr(&src, &sub_p);
879             isamb_unlink(b, sub_p);
880         }
881     }
882     close_block(b, p1);
883     return 0;
884 }
885
886 ISAMB_P isamb_merge (ISAMB b, ISAMC_P pos, ISAMC_I *stream)
887 {
888     char item_buf[DST_ITEM_MAX];
889     char *item_ptr;
890     int i_mode;
891     int more;
892
893     if (b->cache < 0)
894     {
895         int more = 1;
896         while (more)
897         {
898             item_ptr = item_buf;
899             more =
900                 (*stream->read_item)(stream->clientData, &item_ptr, &i_mode);
901         }
902         return 1;
903     }
904     item_ptr = item_buf;
905     more = (*stream->read_item)(stream->clientData, &item_ptr, &i_mode);
906     while (more)
907     {
908         struct ISAMB_block *p = 0, *sp = 0;
909         char sub_item[DST_ITEM_MAX];
910         int sub_size;
911         
912         if (pos)
913             p = open_block (b, pos);
914         more = insert_sub (b, &p, item_buf, &i_mode, stream, &sp,
915                             sub_item, &sub_size, 0);
916         if (sp)
917         {    /* increase level of tree by one */
918             struct ISAMB_block *p2 = new_int (b, p->cat);
919             char *dst = p2->bytes + p2->size;
920             
921             encode_ptr (&dst, p->pos);
922             assert (sub_size < 40);
923             encode_ptr (&dst, sub_size);
924             memcpy (dst, sub_item, sub_size);
925             dst += sub_size;
926             encode_ptr (&dst, sp->pos);
927             
928             p2->size = dst - p2->bytes;
929             pos = p2->pos;  /* return new super page */
930             close_block (b, sp);
931             close_block (b, p2);
932         }
933         else
934             pos = p->pos;   /* return current one (again) */
935         close_block (b, p);
936     }
937     return pos;
938 }
939
940 ISAMB_PP isamb_pp_open_x (ISAMB isamb, ISAMB_P pos, int *level)
941 {
942     ISAMB_PP pp = xmalloc (sizeof(*pp));
943     int i;
944
945     pp->isamb = isamb;
946     pp->block = xmalloc (ISAMB_MAX_LEVEL * sizeof(*pp->block));
947
948     pp->pos = pos;
949     pp->level = 0;
950     pp->maxlevel=0;
951     pp->total_size = 0;
952     pp->no_blocks = 0;
953     pp->skipped_numbers=0;
954     pp->returned_numbers=0;
955     for (i=0;i<ISAMB_MAX_LEVEL;i++)
956         pp->skipped_nodes[i] = pp->accessed_nodes[i]=0;
957     while (1)
958     {
959         struct ISAMB_block *p = open_block (isamb, pos);
960         const char *src = p->bytes + p->offset;
961         pp->block[pp->level] = p;
962
963         pp->total_size += p->size;
964         pp->no_blocks++;
965         if (p->leaf)
966             break;
967
968                                         
969         decode_ptr (&src, &pos);
970         p->offset = src - p->bytes;
971         pp->level++;
972         pp->accessed_nodes[pp->level]++; 
973     }
974     pp->block[pp->level+1] = 0;
975     pp->maxlevel=pp->level;
976     if (level)
977         *level = pp->level;
978     return pp;
979 }
980
981 ISAMB_PP isamb_pp_open (ISAMB isamb, ISAMB_P pos)
982 {
983     return isamb_pp_open_x (isamb, pos, 0);
984 }
985
986 void isamb_pp_close_x (ISAMB_PP pp, int *size, int *blocks)
987 {
988     int i;
989     if (!pp)
990         return;
991     logf(LOG_DEBUG,"isamb_pp_close lev=%d returned "ZINT_FORMAT" values," 
992                     "skipped "ZINT_FORMAT,
993         pp->maxlevel, pp->skipped_numbers, pp->returned_numbers);
994     for (i=pp->maxlevel;i>=0;i--)
995         if ( pp->skipped_nodes[i] || pp->accessed_nodes[i])
996             logf(LOG_DEBUG,"isamb_pp_close  level leaf-%d: "
997                             ZINT_FORMAT" read, "ZINT_FORMAT" skipped", i,
998                  pp->accessed_nodes[i], pp->skipped_nodes[i]);
999     pp->isamb->skipped_numbers += pp->skipped_numbers;
1000     pp->isamb->returned_numbers += pp->returned_numbers;
1001     for (i=pp->maxlevel;i>=0;i--)
1002     {
1003         pp->isamb->accessed_nodes[i] += pp->accessed_nodes[i];
1004         pp->isamb->skipped_nodes[i] += pp->skipped_nodes[i];
1005     }
1006     if (size)
1007         *size = pp->total_size;
1008     if (blocks)
1009         *blocks = pp->no_blocks;
1010     for (i = 0; i <= pp->level; i++)
1011         close_block (pp->isamb, pp->block[i]);
1012     xfree (pp->block);
1013     xfree (pp);
1014 }
1015
1016 int isamb_block_info (ISAMB isamb, int cat)
1017 {
1018     if (cat >= 0 && cat < isamb->no_cat)
1019         return isamb->file[cat].head.block_size;
1020     return -1;
1021 }
1022
1023 void isamb_pp_close (ISAMB_PP pp)
1024 {
1025     isamb_pp_close_x (pp, 0, 0);
1026 }
1027
1028 /* simple recursive dumper .. */
1029 static void isamb_dump_r (ISAMB b, ISAMB_P pos, void (*pr)(const char *str),
1030                           int level)
1031 {
1032     char buf[1024];
1033     char prefix_str[1024];
1034     if (pos)
1035     {
1036         struct ISAMB_block *p = open_block (b, pos);
1037         sprintf(prefix_str, "%*s " ZINT_FORMAT " cat=%d size=%d max=%d", level*2, "",
1038                 pos, p->cat, p->size, b->file[p->cat].head.block_max);
1039         (*pr)(prefix_str);
1040         sprintf(prefix_str, "%*s " ZINT_FORMAT, level*2, "", pos);
1041         if (p->leaf)
1042         {
1043             while (p->offset < p->size)
1044             {
1045                 const char *src = p->bytes + p->offset;
1046                 char *dst = buf;
1047                 (*b->method->codec.decode)(p->decodeClientData, &dst, &src);
1048                 (*b->method->log_item)(LOG_DEBUG, buf, prefix_str);
1049                 p->offset = src - (char*) p->bytes;
1050             }
1051             assert(p->offset == p->size);
1052         }
1053         else
1054         {
1055             const char *src = p->bytes + p->offset;
1056             ISAMB_P sub;
1057             zint item_len;
1058
1059             decode_ptr (&src, &sub);
1060             p->offset = src - (char*) p->bytes;
1061
1062             isamb_dump_r(b, sub, pr, level+1);
1063             
1064             while (p->offset < p->size)
1065             {
1066                 decode_ptr (&src, &item_len);
1067                 (*b->method->log_item)(LOG_DEBUG, src, prefix_str);
1068                 src += item_len;
1069                 decode_ptr (&src, &sub);
1070                 
1071                 p->offset = src - (char*) p->bytes;
1072                 
1073                 isamb_dump_r(b, sub, pr, level+1);
1074             }           
1075         }
1076         close_block(b,p);
1077     }
1078 }
1079
1080 void isamb_dump (ISAMB b, ISAMB_P pos, void (*pr)(const char *str))
1081 {
1082     isamb_dump_r(b, pos, pr, 0);
1083 }
1084
1085 #if 0
1086 /* Old isamb_pp_read that Adam wrote, kept as a reference in case we need to
1087    debug the more complex pp_read that also forwards. May be deleted near end
1088    of 2004, if it has not shown to be useful */
1089
1090
1091 int isamb_pp_read (ISAMB_PP pp, void *buf)
1092 {
1093     char *dst = buf;
1094     char *src;
1095     struct ISAMB_block *p = pp->block[pp->level];
1096     if (!p)
1097         return 0;
1098
1099     while (p->offset == p->size)
1100     {
1101         int pos, item_len;
1102         while (p->offset == p->size)
1103         {
1104             if (pp->level == 0)
1105                 return 0;
1106             close_block (pp->isamb, pp->block[pp->level]);
1107             pp->block[pp->level] = 0;
1108             (pp->level)--;
1109             p = pp->block[pp->level];
1110             assert (!p->leaf);  
1111         }
1112         src = p->bytes + p->offset;
1113         
1114         decode_ptr (&src, &item_len);
1115         src += item_len;
1116         decode_ptr (&src, &pos);
1117         
1118         p->offset = src - (char*) p->bytes;
1119
1120         ++(pp->level);
1121         
1122         while (1)
1123         {
1124             pp->block[pp->level] = p = open_block (pp->isamb, pos);
1125
1126             pp->total_size += p->size;
1127             pp->no_blocks++;
1128             
1129             if (p->leaf) 
1130             {
1131                 break;
1132             }
1133             src = p->bytes + p->offset;
1134             decode_ptr (&src, &pos);
1135             p->offset = src - (char*) p->bytes;
1136             pp->level++;
1137         }
1138     }
1139     assert (p->offset < p->size);
1140     assert (p->leaf);
1141     src = p->bytes + p->offset;
1142     (*pp->isamb->method->codec.code_item)(ISAMC_DECODE, p->decodeClientData,
1143                                     &dst, &src);
1144     p->offset = src - (char*) p->bytes;
1145     /* key_logdump_txt(LOG_DEBUG,buf, "isamb_pp_read returning 1"); */
1146     return 1;
1147 }
1148
1149 #else
1150 int isamb_pp_read (ISAMB_PP pp, void *buf)
1151 {
1152     return isamb_pp_forward(pp, buf, 0);
1153 }
1154 #endif
1155
1156 #define NEW_FORWARD 1
1157
1158 #if NEW_FORWARD == 1
1159
1160 static int isamb_pp_on_right_node(ISAMB_PP pp, int level, const void *untilbuf)
1161 { /* looks one node higher to see if we should be on this node at all */
1162   /* useful in backing off quickly, and in avoiding tail descends */
1163   /* call with pp->level to begin with */
1164     struct ISAMB_block *p;
1165     int cmp;
1166     const char *src;
1167     zint item_len;
1168     assert(level>=0);
1169     if ( level == 0) {
1170 #if ISAMB_DEBUG
1171             logf(LOG_DEBUG,"isamb_pp_on_right returning true for root");
1172 #endif
1173         return 1; /* we can never skip the root node */
1174     }
1175     level--;
1176     p=pp->block[level];
1177     assert(p->offset <= p->size);
1178     if (p->offset < p->size )
1179     {
1180         assert(p->offset>0); 
1181         src=p->bytes + p->offset;
1182         decode_ptr(&src, &item_len);
1183 #if ISAMB_DEBUG
1184         (*pp->isamb->method->codec.log_item)(LOG_DEBUG,untilbuf,"on_leaf: until");
1185         (*pp->isamb->method->codec.log_item)(LOG_DEBUG,src,"on_leaf: value");
1186 #endif
1187         cmp=(*pp->isamb->method->compare_item)(untilbuf,src);
1188         if (cmp<2) {
1189 #if ISAMB_DEBUG
1190             logf(LOG_DEBUG,"isamb_pp_on_right returning true "
1191                             "cmp=%d lev=%d ofs=%d",cmp,level,p->offset);
1192 #endif
1193             return 1; 
1194         }
1195         else {
1196 #if ISAMB_DEBUG
1197             logf(LOG_DEBUG,"isamb_pp_on_right returning false "
1198                             "cmp=%d lev=%d ofs=%d",cmp,level,p->offset);
1199 #endif
1200             return 0; 
1201         }
1202     }
1203     else {
1204 #if ISAMB_DEBUG
1205         logf(LOG_DEBUG,"isamb_pp_on_right at tail, looking higher "
1206                         "lev=%d",level);
1207 #endif
1208         return isamb_pp_on_right_node(pp, level, untilbuf);
1209     }
1210 } /* isamb_pp_on_right_node */
1211
1212 static int isamb_pp_read_on_leaf(ISAMB_PP pp, void *buf)
1213 { /* reads the next item on the current leaf, returns 0 if end of leaf*/
1214     struct ISAMB_block *p = pp->block[pp->level];
1215     char *dst;
1216     const char *src;
1217     assert(pp);
1218     assert(buf);
1219     if (p->offset == p->size) {
1220 #if ISAMB_DEBUG
1221         logf(LOG_DEBUG,"isamb_pp_read_on_leaf returning 0 on node %d",p->pos);
1222 #endif
1223         return 0; /* at end of leaf */
1224     }
1225     src=p->bytes + p->offset;
1226     dst=buf;
1227     (*pp->isamb->method->codec.decode)(p->decodeClientData,&dst, &src);
1228     p->offset = src - (char*) p->bytes;
1229     /*
1230 #if ISAMB_DEBUG
1231     (*pp->isamb->method->codec.log_item)(LOG_DEBUG, buf, "read_on_leaf returning 1");
1232 #endif
1233 */
1234     pp->returned_numbers++;
1235     return 1;
1236 } /* read_on_leaf */
1237
1238 static int isamb_pp_forward_on_leaf(ISAMB_PP pp, void *buf, const void *untilbuf)
1239 { /* forwards on the current leaf, returns 0 if not found */
1240     int cmp;
1241     int skips=0;
1242     while (1){
1243         if (!isamb_pp_read_on_leaf(pp,buf))
1244             return 0;
1245         /* FIXME - this is an extra function call, inline the read? */
1246         cmp=(*pp->isamb->method->compare_item)(untilbuf,buf);
1247         if (cmp <2){  /* found a good one */
1248 #if ISAMB_DEBUG
1249             if (skips)
1250                 logf(LOG_DEBUG, "isam_pp_fwd_on_leaf skipped %d items",skips);
1251 #endif
1252             pp->returned_numbers++;
1253             return 1;
1254         }
1255         if (!skips)
1256             if (!isamb_pp_on_right_node(pp, pp->level, untilbuf))
1257                 return 0; /* never mind the rest of this leaf */
1258         pp->skipped_numbers++;
1259         skips++;
1260     }
1261 } /* forward_on_leaf */
1262
1263 static int isamb_pp_climb_level(ISAMB_PP pp, ISAMB_P *pos)
1264 { /* climbs higher in the tree, until finds a level with data left */
1265   /* returns the node to (consider to) descend to in *pos) */
1266     struct ISAMB_block *p = pp->block[pp->level];
1267     const char *src;
1268     zint item_len;
1269 #if ISAMB_DEBUG
1270     logf(LOG_DEBUG,"isamb_pp_climb_level starting "
1271                    "at level %d node %d ofs=%d sz=%d",
1272                     pp->level, p->pos, p->offset, p->size);
1273 #endif
1274     assert(pp->level >= 0);
1275     assert(p->offset <= p->size);
1276     if (pp->level==0)
1277     {
1278 #if ISAMB_DEBUG
1279         logf(LOG_DEBUG,"isamb_pp_climb_level returning 0 at root");
1280 #endif
1281         return 0;
1282     }
1283     assert(pp->level>0); 
1284     close_block(pp->isamb, pp->block[pp->level]);
1285     pp->block[pp->level]=0;
1286     (pp->level)--;
1287     p=pp->block[pp->level];
1288 #if ISAMB_DEBUG
1289     logf(LOG_DEBUG,"isamb_pp_climb_level climbed to level %d node %d ofs=%d",
1290                     pp->level, p->pos, p->offset);
1291 #endif
1292     assert(!p->leaf);
1293     assert(p->offset <= p->size);
1294     if (p->offset == p->size ) {
1295         /* we came from the last pointer, climb on */
1296         if (!isamb_pp_climb_level(pp,pos))
1297             return 0;
1298         p=pp->block[pp->level];
1299     }
1300     else
1301     {
1302         /* skip the child we just came from */
1303 #if ISAMB_DEBUG
1304         logf(LOG_DEBUG,"isam_pp_climb_level: skipping lev=%d ofs=%d sz=%d", 
1305                         pp->level, p->offset, p->size);
1306 #endif
1307         assert (p->offset < p->size );
1308         src=p->bytes + p->offset;
1309         decode_ptr(&src, &item_len);
1310         src += item_len;
1311         decode_ptr(&src, pos);
1312         p->offset=src - (char *)p->bytes;
1313             
1314     }
1315     return 1;
1316 } /* climb_level */
1317
1318
1319 static zint isamb_pp_forward_unode(ISAMB_PP pp, zint pos, const void *untilbuf)
1320 { /* scans a upper node until it finds a child <= untilbuf */
1321   /* pp points to the key value, as always. pos is the child read from */
1322   /* the buffer */
1323   /* if all values are too small, returns the last child in the node */
1324   /* FIXME - this can be detected, and avoided by looking at the */
1325   /* parent node, but that gets messy. Presumably the cost is */
1326   /* pretty low anyway */
1327     struct ISAMB_block *p = pp->block[pp->level];
1328     const char *src=p->bytes + p->offset;
1329     zint item_len;
1330     int cmp;
1331     zint nxtpos;
1332 #if ISAMB_DEBUG
1333     int skips=0;
1334     logf(LOG_DEBUG,"isamb_pp_forward_unode starting "
1335                    "at level %d node %d ofs=%di sz=%d",
1336                     pp->level, p->pos, p->offset, p->size);
1337 #endif
1338     assert(!p->leaf);
1339     assert(p->offset <= p->size);
1340     if (p->offset == p->size) {
1341 #if ISAMB_DEBUG
1342             logf(LOG_DEBUG,"isamb_pp_forward_unode returning at end "
1343                    "at level %d node %d ofs=%di sz=%d",
1344                     pp->level, p->pos, p->offset, p->size);
1345 #endif
1346         return pos; /* already at the end of it */
1347     }
1348     while(p->offset < p->size) {
1349         decode_ptr(&src,&item_len);
1350         cmp=(*pp->isamb->method->compare_item)(untilbuf,src);
1351         src+=item_len;
1352         decode_ptr(&src,&nxtpos);
1353         if (cmp<2)
1354         {
1355 #if ISAMB_DEBUG
1356             logf(LOG_DEBUG,"isamb_pp_forward_unode returning a hit "
1357                    "at level %d node %d ofs=%d sz=%d",
1358                     pp->level, p->pos, p->offset, p->size);
1359 #endif
1360             return pos;
1361         } /* found one */
1362         pos=nxtpos;
1363         p->offset=src-(char*)p->bytes;
1364         (pp->skipped_nodes[pp->maxlevel - pp->level -1])++;
1365 #if ISAMB_DEBUG
1366         skips++;
1367 #endif
1368     }
1369 #if ISAMB_DEBUG
1370             logf(LOG_DEBUG,"isamb_pp_forward_unode returning at tail "
1371                    "at level %d node %d ofs=%d sz=%d skips=%d",
1372                     pp->level, p->pos, p->offset, p->size, skips);
1373 #endif
1374     return pos; /* that's the last one in the line */
1375     
1376 } /* forward_unode */
1377
1378 static void isamb_pp_descend_to_leaf(ISAMB_PP pp, ISAMB_P pos, const void *untilbuf)
1379 { /* climbs down the tree, from pos, to the leftmost leaf */
1380     struct ISAMB_block *p = pp->block[pp->level];
1381     const char *src;
1382     assert(!p->leaf);
1383 #if ISAMB_DEBUG
1384     logf(LOG_DEBUG,"isamb_pp_descend_to_leaf "
1385                    "starting at lev %d node %d ofs=%d lf=%d u=%p", 
1386                    pp->level, p->pos, p->offset, p->leaf, untilbuf);
1387 #endif
1388     if (untilbuf)
1389         pos=isamb_pp_forward_unode(pp,pos,untilbuf);
1390     ++(pp->level);
1391     assert(pos);
1392     p=open_block(pp->isamb, pos);
1393     pp->block[pp->level]=p;
1394     ++(pp->accessed_nodes[pp->maxlevel-pp->level]);
1395     ++(pp->no_blocks);
1396 #if ISAMB_DEBUG
1397     logf(LOG_DEBUG,"isamb_pp_descend_to_leaf "
1398                    "got lev %d node %d lf=%d", 
1399                    pp->level, p->pos, p->leaf);
1400 #endif
1401     if (p->leaf)
1402         return;
1403     assert (p->offset==0 );
1404     src=p->bytes + p->offset;
1405     decode_ptr(&src, &pos);
1406     p->offset=src-(char*)p->bytes;
1407     isamb_pp_descend_to_leaf(pp,pos,untilbuf);
1408 #if ISAMB_DEBUG
1409     logf(LOG_DEBUG,"isamb_pp_descend_to_leaf "
1410                    "returning at lev %d node %d ofs=%d lf=%d", 
1411                    pp->level, p->pos, p->offset, p->leaf);
1412 #endif
1413 } /* descend_to_leaf */
1414
1415 static int isamb_pp_find_next_leaf(ISAMB_PP pp)
1416 { /* finds the next leaf by climbing up and down */
1417     ISAMB_P pos;
1418     if (!isamb_pp_climb_level(pp,&pos))
1419         return 0;
1420     isamb_pp_descend_to_leaf(pp, pos,0);
1421     return 1;
1422 }
1423
1424 static int isamb_pp_climb_desc(ISAMB_PP pp,  const void *untilbuf)
1425 { /* climbs up and descends to a leaf where values >= *untilbuf are found */
1426     ISAMB_P pos;
1427 #if ISAMB_DEBUG
1428     struct ISAMB_block *p = pp->block[pp->level];
1429     logf(LOG_DEBUG,"isamb_pp_climb_desc starting "
1430                    "at level %d node %d ofs=%d sz=%d",
1431                     pp->level, p->pos, p->offset, p->size);
1432 #endif
1433     if (!isamb_pp_climb_level(pp,&pos))
1434         return 0;
1435     /* see if it would pay to climb one higher */
1436     if (!isamb_pp_on_right_node(pp, pp->level, untilbuf))
1437         if (!isamb_pp_climb_level(pp,&pos))
1438             return 0;
1439     isamb_pp_descend_to_leaf(pp, pos,untilbuf);
1440 #if ISAMB_DEBUG
1441     p = pp->block[pp->level];
1442     logf(LOG_DEBUG,"isamb_pp_climb_desc done "
1443                    "at level %d node %d ofs=%d sz=%d",
1444                     pp->level, p->pos, p->offset, p->size);
1445 #endif
1446     return 1;
1447 } /* climb_desc */
1448
1449 int isamb_pp_forward (ISAMB_PP pp, void *buf, const void *untilbuf)
1450 {
1451 #if ISAMB_DEBUG
1452     struct ISAMB_block *p = pp->block[pp->level];
1453     assert(p->leaf);
1454     logf(LOG_DEBUG,"isamb_pp_forward starting "
1455                    "at level %d node %d ofs=%d sz=%d u=%p",
1456                     pp->level, p->pos, p->offset, p->size,untilbuf);
1457 #endif
1458     if (untilbuf) {
1459         if (isamb_pp_forward_on_leaf( pp, buf, untilbuf)) {
1460 #if ISAMB_DEBUG
1461             logf(LOG_DEBUG,"isamb_pp_forward (f) returning (A) "
1462                    "at level %d node %d ofs=%d sz=%d",
1463                     pp->level, p->pos, p->offset, p->size);
1464 #endif
1465             return 1;
1466         }
1467         if (! isamb_pp_climb_desc( pp, untilbuf)) {
1468 #if ISAMB_DEBUG
1469             logf(LOG_DEBUG,"isamb_pp_forward (f) returning notfound (B) "
1470                    "at level %d node %d ofs=%d sz=%d",
1471                     pp->level, p->pos, p->offset, p->size);
1472 #endif
1473             return 0; /* could not find a leaf */
1474         }
1475         do{
1476             if (isamb_pp_forward_on_leaf( pp, buf, untilbuf)) {
1477 #if ISAMB_DEBUG
1478             logf(LOG_DEBUG,"isamb_pp_forward (f) returning (C) "
1479                    "at level %d node %d ofs=%d sz=%d",
1480                     pp->level, p->pos, p->offset, p->size);
1481 #endif
1482                 return 1;
1483             }
1484         }while ( isamb_pp_find_next_leaf(pp));
1485         return 0; /* could not find at all */
1486     }
1487     else { /* no untilbuf, a straight read */
1488         /* FIXME - this should be moved
1489          * directly into the pp_read */
1490         /* keeping here now, to keep same
1491          * interface as the old fwd */
1492         if (isamb_pp_read_on_leaf( pp, buf)) {
1493 #if ISAMB_DEBUG
1494             logf(LOG_DEBUG,"isamb_pp_forward (read) returning (D) "
1495                    "at level %d node %d ofs=%d sz=%d",
1496                     pp->level, p->pos, p->offset, p->size);
1497 #endif
1498             return 1;
1499         }
1500         if (isamb_pp_find_next_leaf(pp)) {
1501 #if ISAMB_DEBUG
1502             logf(LOG_DEBUG,"isamb_pp_forward (read) returning (E) "
1503                    "at level %d node %d ofs=%d sz=%d",
1504                     pp->level, p->pos, p->offset, p->size);
1505 #endif
1506             return isamb_pp_read_on_leaf(pp, buf);
1507         }
1508         else
1509             return 0;
1510     }
1511 } /* isam_pp_forward (new version) */
1512
1513 #elif NEW_FORWARD == 0
1514
1515 int isamb_pp_forward (ISAMB_PP pp, void *buf, const void *untilbuf)
1516 {
1517     /* pseudocode:
1518      *   while 1
1519      *     while at end of node
1520      *       climb higher. If out, return 0
1521      *     while not on a leaf (and not at its end)
1522      *       decode next
1523      *       if cmp
1524      *         descend to node
1525      *     decode next
1526      *     if cmp
1527      *       return 1
1528      */
1529      /* 
1530       * The upper nodes consist of a sequence of nodenumbers and keys
1531       * When opening a block,  the first node number is read in, and
1532       * offset points to the first key, which is the upper limit of keys
1533       * in the node just read.
1534       */
1535     char *dst = buf;
1536     const char *src;
1537     struct ISAMB_block *p = pp->block[pp->level];
1538     int cmp;
1539     int item_len;
1540     int pos;
1541     int nxtpos;
1542     int descending=0; /* used to prevent a border condition error */
1543     if (!p)
1544         return 0;
1545 #if ISAMB_DEBUG
1546     logf(LOG_DEBUG,"isamb_pp_forward starting [%p] p=%d",pp,p->pos);
1547     
1548     (*pp->isamb->method->codec.log_item)(LOG_DEBUG, untilbuf, "until");
1549     (*pp->isamb->method->codec.log_item)(LOG_DEBUG, buf, "buf");
1550 #endif
1551
1552     while (1)
1553     {
1554         while ( (p->offset == p->size) && !descending )
1555         {  /* end of this block - climb higher */
1556             assert (p->offset <= p->size);
1557 #if ISAMB_DEBUG
1558             logf(LOG_DEBUG,"isamb_pp_forward climbing from l=%d",
1559                             pp->level);
1560 #endif
1561             if (pp->level == 0)
1562             {
1563 #if ISAMB_DEBUG
1564                 logf(LOG_DEBUG,"isamb_pp_forward returning 0 at root");
1565 #endif
1566                 return 0; /* at end of the root, nothing left */
1567             }
1568             close_block(pp->isamb, pp->block[pp->level]);
1569             pp->block[pp->level]=0;
1570             (pp->level)--;
1571             p=pp->block[pp->level];
1572 #if ISAMB_DEBUG
1573             logf(LOG_DEBUG,"isamb_pp_forward climbed to node %d off=%d",
1574                             p->pos, p->offset);
1575 #endif
1576             assert(!p->leaf);
1577             assert(p->offset <= p->size);
1578             /* skip the child we have handled */
1579             if (p->offset != p->size)
1580             { 
1581                 src = p->bytes + p->offset;
1582                 decode_ptr(&src, &item_len);
1583 #if ISAMB_DEBUG         
1584                 (*pp->isamb->method->codec.log_item)(LOG_DEBUG, src,
1585                                                " isamb_pp_forward "
1586                                                "climb skipping old key");
1587 #endif
1588                 src += item_len;
1589                 decode_ptr(&src,&pos);
1590                 p->offset = src - (char*) p->bytes;
1591                 break; /* even if this puts us at the end of the block, we
1592                           need to descend to the last pos. UGLY coding,
1593                           clean up some day */
1594             }
1595         }
1596         if (!p->leaf)
1597         { 
1598             src = p->bytes + p->offset;
1599             if (p->offset == p->size)
1600                 cmp=-2 ; /* descend to the last node, as we have
1601                             no value to cmp */
1602             else
1603             {
1604                 decode_ptr(&src, &item_len);
1605 #if ISAMB_DEBUG
1606                 logf(LOG_DEBUG,"isamb_pp_forward (B) on a high node. "
1607                      "ofs=%d sz=%d nxtpos=%d ",
1608                         p->offset,p->size,pos);
1609                 (*pp->isamb->method->codec.log_item)(LOG_DEBUG, src, "");
1610 #endif
1611                 if (untilbuf)
1612                     cmp=(*pp->isamb->method->compare_item)(untilbuf,src);
1613                 else
1614                     cmp=-2;
1615                 src += item_len;
1616                 decode_ptr(&src,&nxtpos);
1617             }
1618             if (cmp<2)
1619             { 
1620 #if ISAMB_DEBUG
1621                 logf(LOG_DEBUG,"isambb_pp_forward descending l=%d p=%d ",
1622                             pp->level, pos);
1623 #endif
1624                 descending=1; /* prevent climbing for a while */
1625                 ++(pp->level);
1626                 p = open_block(pp->isamb,pos);
1627                 pp->block[pp->level] = p ;
1628                 pp->total_size += p->size;
1629                 (pp->accessed_nodes[pp->maxlevel - pp->level])++;
1630                 pp->no_blocks++;
1631                 if ( !p->leaf)
1632                 { /* block starts with a pos */
1633                     src = p->bytes + p->offset;
1634                     decode_ptr(&src,&pos);
1635                     p->offset=src-(char*) p->bytes;
1636 #if ISAMB_DEBUG
1637                     logf(LOG_DEBUG,"isamb_pp_forward: block %d starts with %d",
1638                                     p->pos, pos);
1639 #endif
1640                 } 
1641             } /* descend to the node */
1642             else
1643             { /* skip the node */
1644                 p->offset = src - (char*) p->bytes;
1645                 pos=nxtpos;
1646                 (pp->skipped_nodes[pp->maxlevel - pp->level -1])++;
1647 #if ISAMB_DEBUG
1648                 logf(LOG_DEBUG,
1649                     "isamb_pp_forward: skipping block on level %d, noting "
1650                      "on %d (%d)",
1651                     pp->level, pp->maxlevel - pp->level-1 , 
1652                     pp->skipped_nodes[pp->maxlevel - pp->level-1 ]);
1653 #endif
1654                 /* 0 is always leafs, 1 is one level above leafs etc, no
1655                  * matter how high tree */
1656             }
1657         } /* not on a leaf */
1658         else
1659         { /* on a leaf */
1660             if (p->offset == p->size) { 
1661                 descending = 0;
1662             }
1663             else
1664             {
1665                 assert (p->offset < p->size);
1666                 src = p->bytes + p->offset;
1667                 dst=buf;
1668                 (*pp->isamb->method->codec.decode)(p->decodeClientData,
1669                                                 &dst, &src);
1670                 p->offset = src - (char*) p->bytes;
1671                 if (untilbuf)
1672                     cmp=(*pp->isamb->method->compare_item)(untilbuf,buf);
1673                 else
1674                     cmp=-2;
1675 #if ISAMB_DEBUG
1676                 logf(LOG_DEBUG,"isamb_pp_forward on a leaf. cmp=%d", 
1677                      cmp);
1678                 (*pp->isamb->method->codec.log_item)(LOG_DEBUG, buf, "");
1679 #endif
1680                 if (cmp <2)
1681                 {
1682 #if ISAMB_DEBUG
1683                     if (untilbuf)
1684                     {
1685                         (*pp->isamb->method->codec.log_item)(
1686                             LOG_DEBUG, buf,  "isamb_pp_forward returning 1");
1687                     }
1688                     else
1689                     {
1690                         (*pp->isamb->method->codec.log_item)(
1691                             LOG_DEBUG, buf, "isamb_pp_read returning 1 (fwd)");
1692                     }
1693 #endif
1694                     pp->returned_numbers++;
1695                     return 1;
1696                 }
1697                 else
1698                     pp->skipped_numbers++;
1699             }
1700         } /* leaf */
1701     } /* main loop */
1702 }
1703
1704 #elif NEW_FORWARD == 2
1705
1706 int isamb_pp_forward (ISAMB_PP pp, void *buf, const void *untilb)
1707 {
1708     char *dst = buf;
1709     const char *src;
1710     struct ISAMB_block *p = pp->block[pp->level];
1711     if (!p)
1712         return 0;
1713
1714 again:
1715     while (p->offset == p->size)
1716     {
1717         int pos, item_len;
1718         while (p->offset == p->size)
1719         {
1720             if (pp->level == 0)
1721                 return 0;
1722             close_block (pp->isamb, pp->block[pp->level]);
1723             pp->block[pp->level] = 0;
1724             (pp->level)--;
1725             p = pp->block[pp->level];
1726             assert (!p->leaf);  
1727         }
1728
1729         assert(!p->leaf);
1730         src = p->bytes + p->offset;
1731         
1732         decode_ptr (&src, &item_len);
1733         src += item_len;
1734         decode_ptr (&src, &pos);
1735         
1736         p->offset = src - (char*) p->bytes;
1737
1738         src = p->bytes + p->offset;
1739
1740         while(1)
1741         {
1742             if (!untilb || p->offset == p->size)
1743                 break;
1744             assert(p->offset < p->size);
1745             decode_ptr (&src, &item_len);
1746             if ((*pp->isamb->method->compare_item)(untilb, src) <= 1)
1747                 break;
1748             src += item_len;
1749             decode_ptr (&src, &pos);
1750             p->offset = src - (char*) p->bytes;
1751         }
1752
1753         pp->level++;
1754
1755         while (1)
1756         {
1757             pp->block[pp->level] = p = open_block (pp->isamb, pos);
1758
1759             pp->total_size += p->size;
1760             pp->no_blocks++;
1761             
1762             if (p->leaf) 
1763             {
1764                 break;
1765             }
1766             
1767             src = p->bytes + p->offset;
1768             while(1)
1769             {
1770                 decode_ptr (&src, &pos);
1771                 p->offset = src - (char*) p->bytes;
1772                 
1773                 if (!untilb || p->offset == p->size)
1774                     break;
1775                 assert(p->offset < p->size);
1776                 decode_ptr (&src, &item_len);
1777                 if ((*pp->isamb->method->compare_item)(untilb, src) <= 1)
1778                     break;
1779                 src += item_len;
1780             }
1781             pp->level++;
1782         }
1783     }
1784     assert (p->offset < p->size);
1785     assert (p->leaf);
1786     while(1)
1787     {
1788         char *dst0 = dst;
1789         src = p->bytes + p->offset;
1790         (*pp->isamb->method->codec.decode)(p->decodeClientData, &dst, &src);
1791         p->offset = src - (char*) p->bytes;
1792         if (!untilb || (*pp->isamb->method->compare_item)(untilb, dst0) <= 1)
1793             break;
1794         dst = dst0;
1795         if (p->offset == p->size) goto again;
1796     }
1797     /* key_logdump_txt(LOG_DEBUG,buf, "isamb_pp_read returning 1"); */
1798     return 1;
1799 }
1800
1801 #endif
1802
1803 int isamb_pp_num (ISAMB_PP pp)
1804 {
1805     assert(pp); /* shut up about unused arguments */
1806     return 1;
1807 }
1808
1809 static void isamb_pp_leaf_pos( ISAMB_PP pp, 
1810                                double *current, double *total, 
1811                                void *dummybuf )
1812 {
1813     struct ISAMB_block *p = pp->block[pp->level];
1814     const char *src=p->bytes;
1815     char *end=p->bytes+p->size;
1816     char *cur=p->bytes+p->offset;
1817     char *dst;
1818     void *decodeClientData;
1819     assert(p->offset <= p->size);
1820     assert(cur <= end);
1821     assert(p->leaf);
1822     *current=0;
1823     *total=0;
1824
1825     decodeClientData = (pp->isamb->method->codec.start)();
1826
1827     while(src < end) {
1828         dst=dummybuf;
1829         (*pp->isamb->method->codec.decode)(decodeClientData,&dst, &src);
1830         assert(dst<(char*) dummybuf+100); /*FIXME */
1831         (*total)++;
1832         if (src<=cur)
1833              (*current)++;
1834     }
1835 #if ISAMB_DEBUG
1836     logf(LOG_DEBUG, "isamb_pp_leaf_pos: cur= %0.1f tot=%0.1f "
1837                     " ofs=%d sz=%d lev=%d",
1838                     *current, *total, p->offset, p->size, pp->level);
1839 #endif
1840     assert(src==end);
1841     (pp->isamb->method->codec.stop)(decodeClientData);
1842 }
1843
1844 static void isamb_pp_upper_pos( ISAMB_PP pp, double *current, double *total, 
1845                                 double size, int level )
1846 { /* estimates total/current occurrences from here up, excl leaf */
1847     struct ISAMB_block *p = pp->block[level];
1848     const char *src=p->bytes;
1849     char *end=p->bytes+p->size;
1850     char *cur=p->bytes+p->offset;
1851     zint item_size;
1852     ISAMB_P child;
1853     
1854     assert(level>=0);
1855     assert(!p->leaf);
1856    
1857 #if  1 // ISAMB_DEBUG
1858     logf(LOG_DEBUG,"isamb_pp_upper_pos at beginning     l=%d "
1859                    "cur=%0.1f tot=%0.1f "
1860                    " ofs=%d sz=%d pos=" ZINT_FORMAT, 
1861                    level, *current, *total, p->offset, p->size, p->pos);
1862 #endif    
1863     assert (p->offset <= p->size);
1864     decode_ptr (&src, &child ); /* first child */
1865     if (src!=cur) {
1866         *total += size;
1867         if (src < cur)
1868             *current +=size;
1869     }
1870     while(src < end) {
1871         decode_ptr (&src, &item_size ); 
1872         assert(src+item_size<=end);
1873         src += item_size;
1874         decode_ptr (&src, &child );
1875         if (src!=cur) {
1876             *total += size;
1877             if (src < cur)
1878                 *current +=size;
1879         }
1880     }
1881 #if ISAMB_DEBUG
1882     logf(LOG_DEBUG,"isamb_pp_upper_pos before recursion l=%d "
1883                    "cur=%0.1f tot=%0.1f "
1884                    " ofs=%d sz=%d pos=" ZINT_FORMAT, 
1885                    level, *current, *total, p->offset, p->size, p->pos);
1886 #endif    
1887     if (level>0)
1888         isamb_pp_upper_pos(pp, current, total, *total, level-1);
1889 } /* upper_pos */
1890
1891 void isamb_pp_pos( ISAMB_PP pp, double *current, double *total )
1892 { /* return an estimate of the current position and of the total number of */
1893   /* occureences in the isam tree, based on the current leaf */
1894         /* FIXME - Isam-B ought to know how many we have, so we could return */
1895         /* that directly */
1896     struct ISAMB_block *p = pp->block[pp->level];
1897     char dummy[100]; /* 100 bytes/entry must be enough */
1898     assert(total);
1899     assert(current);
1900     assert(p->leaf);
1901     isamb_pp_leaf_pos(pp,current, total, dummy);
1902     if (pp->level>0)
1903         isamb_pp_upper_pos(pp, current, total, *total, pp->level-1);
1904     *current = (double) pp->returned_numbers;
1905     /* use the precise number, since we have it! */
1906 #if ISAMB_DEBUG
1907     logf(LOG_LOG, "isamb_pp_pos returning: cur= %0.1f tot=%0.1f rn="ZINT_FORMAT,
1908                     *current, *total, pp->returned_numbers);
1909 #endif
1910 }