b6334ad8586f19e886141660c0a9fa74b23f39f7
[idzebra-moved-to-github.git] / isamb / isamb.c
1 /* $Id: isamb.c,v 1.54 2004-08-16 10:08:37 adam Exp $
2    Copyright (C) 1995,1996,1997,1998,1999,2000,2001,2002,2003,2004
3    Index Data Aps
4
5 This file is part of the Zebra server.
6
7 Zebra is free software; you can redistribute it and/or modify it under
8 the terms of the GNU General Public License as published by the Free
9 Software Foundation; either version 2, or (at your option) any later
10 version.
11
12 Zebra is distributed in the hope that it will be useful, but WITHOUT ANY
13 WARRANTY; without even the implied warranty of MERCHANTABILITY or
14 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
15 for more details.
16
17 You should have received a copy of the GNU General Public License
18 along with Zebra; see the file LICENSE.zebra.  If not, write to the
19 Free Software Foundation, 59 Temple Place - Suite 330, Boston, MA
20 02111-1307, USA.
21 */
22
23 #include <string.h>
24 #include <yaz/xmalloc.h>
25 #include <yaz/log.h>
26 #include <isamb.h>
27 #include <assert.h>
28
29 #ifndef ISAMB_DEBUG
30 #define ISAMB_DEBUG 0
31 #endif
32
33 #define ISAMB_MAJOR_VERSION 1
34 #define ISAMB_MINOR_VERSION 0
35
36 struct ISAMB_head {
37     zint first_block;
38     zint last_block;
39     int block_size;
40     int block_max;
41     zint free_list;
42 };
43
44 #define ISAMB_DATA_OFFSET 3
45
46 /* maximum size of encoded buffer */
47 #define DST_ITEM_MAX 256
48
49 #define ISAMB_MAX_LEVEL 10
50 /* approx 2*max page + max size of item */
51 #define DST_BUF_SIZE 16840
52
53 #define ISAMB_CACHE_ENTRY_SIZE 4096
54
55 /* CAT_MAX: _must_ be power of 2 */
56 #define CAT_MAX 4
57 #define CAT_MASK (CAT_MAX-1)
58 /* CAT_NO: <= CAT_MAX */
59 #define CAT_NO 4
60
61 /* ISAMB_PTR_CODEC=1 var, =0 fixed */
62 #define ISAMB_PTR_CODEC 1
63
64 struct ISAMB_cache_entry {
65     ISAMB_P pos;
66     unsigned char *buf;
67     int dirty;
68     int hits;
69     struct ISAMB_cache_entry *next;
70 };
71
72 struct ISAMB_file {
73     BFile bf;
74     int head_dirty;
75     struct ISAMB_head head;
76     struct ISAMB_cache_entry *cache_entries;
77 };
78
79 struct ISAMB_s {
80     BFiles bfs;
81     ISAMC_M *method;
82
83     struct ISAMB_file *file;
84     int no_cat;
85     int cache; /* 0=no cache, 1=use cache, -1=dummy isam (for testing only) */
86     int log_io;        /* log level for bf_read/bf_write calls */
87     int log_freelist;  /* log level for freelist handling */
88     zint skipped_numbers; /* on a leaf node */
89     zint returned_numbers; 
90     zint skipped_nodes[ISAMB_MAX_LEVEL]; /* [0]=skipped leaves, 1=higher etc */
91     zint accessed_nodes[ISAMB_MAX_LEVEL]; /* nodes we did not skip */
92 };
93
94 struct ISAMB_block {
95     ISAMB_P pos;
96     int cat;
97     int size;
98     int leaf;
99     int dirty;
100     int deleted;
101     int offset;
102     char *bytes;
103     char *cbuf;
104     unsigned char *buf;
105     void *decodeClientData;
106     int log_rw;
107 };
108
109 struct ISAMB_PP_s {
110     ISAMB isamb;
111     ISAMB_P pos;
112     int level;
113     int maxlevel; /* total depth */
114     zint total_size;
115     zint no_blocks;
116     zint skipped_numbers; /* on a leaf node */
117     zint returned_numbers; 
118     zint skipped_nodes[ISAMB_MAX_LEVEL]; /* [0]=skipped leaves, 1=higher etc */
119     zint accessed_nodes[ISAMB_MAX_LEVEL]; /* nodes we did not skip */
120     struct ISAMB_block **block;
121 };
122
123
124 #if ISAMB_PTR_CODEC
125 static void encode_ptr (char **dst, zint pos)
126 {
127     unsigned char *bp = (unsigned char*) *dst;
128
129     while (pos > 127)
130     {
131          *bp++ = 128 | (pos & 127);
132          pos = pos >> 7;
133     }
134     *bp++ = pos;
135     *dst = (char *) bp;
136 }
137 #else
138 static void encode_ptr (char **dst, zint pos)
139 {
140     memcpy(*dst, &pos, sizeof(pos));
141     (*dst) += sizeof(pos);
142 }
143 #endif
144
145 #if ISAMB_PTR_CODEC
146 static void decode_ptr (const char **src1, zint *pos)
147 {
148     const unsigned char **src = (const unsigned char **) src1;
149     zint d = 0;
150     unsigned char c;
151     unsigned r = 0;
152
153     while (((c = *(*src)++) & 128))
154     {
155         d += ((zint) (c & 127) << r);
156         r += 7;
157     }
158     d += ((zint) c << r);
159     *pos = d;
160 }
161 #else
162 static void decode_ptr (const char **src, zint *pos)
163 {
164      memcpy (pos, *src, sizeof(*pos));
165      (*src) += sizeof(*pos);
166 }
167 #endif
168
169 ISAMB isamb_open (BFiles bfs, const char *name, int writeflag, ISAMC_M *method,
170                   int cache)
171 {
172     ISAMB isamb = xmalloc (sizeof(*isamb));
173     int i, b_size = 32;
174
175     isamb->bfs = bfs;
176     isamb->method = (ISAMC_M *) xmalloc (sizeof(*method));
177     memcpy (isamb->method, method, sizeof(*method));
178     isamb->no_cat = CAT_NO;
179     isamb->log_io = 0;
180     isamb->log_freelist = 0;
181     isamb->cache = cache;
182     isamb->skipped_numbers=0;
183     isamb->returned_numbers=0;
184     for (i=0;i<ISAMB_MAX_LEVEL;i++)
185       isamb->skipped_nodes[i]= isamb->accessed_nodes[i]=0;
186
187     assert (cache == 0);
188     isamb->file = xmalloc (sizeof(*isamb->file) * isamb->no_cat);
189     for (i = 0; i<isamb->no_cat; i++)
190     {
191         char fname[DST_BUF_SIZE];
192         char hbuf[DST_BUF_SIZE];
193         isamb->file[i].cache_entries = 0;
194         isamb->file[i].head_dirty = 0;
195         sprintf (fname, "%s%c", name, i+'A');
196         if (cache)
197             isamb->file[i].bf = bf_open (bfs, fname, ISAMB_CACHE_ENTRY_SIZE,
198                                          writeflag);
199         else
200             isamb->file[i].bf = bf_open (bfs, fname, b_size, writeflag);
201
202         /* fill-in default values (for empty isamb) */
203         isamb->file[i].head.first_block = ISAMB_CACHE_ENTRY_SIZE/b_size+1;
204         isamb->file[i].head.last_block = isamb->file[i].head.first_block;
205         isamb->file[i].head.block_size = b_size;
206         isamb->file[i].head.block_max = b_size - ISAMB_DATA_OFFSET;
207         isamb->file[i].head.free_list = 0;
208         if (bf_read (isamb->file[i].bf, 0, 0, 0, hbuf))
209         {
210             /* got header assume "isamb"major minor len can fit in 16 bytes */
211             zint zint_tmp;
212             int major, minor, len, pos = 0;
213             int left;
214             const char *src = 0;
215             if (memcmp(hbuf, "isamb", 5))
216             {
217                 logf(LOG_WARN, "bad isamb header for file %s", fname);
218                 return 0;
219             }
220             if (sscanf(hbuf+5, "%d %d %d", &major, &minor, &len) != 3)
221             {
222                 logf(LOG_WARN, "bad isamb header for file %s", fname);
223                 return 0;
224             }
225             if (major != ISAMB_MAJOR_VERSION)
226             {
227                 logf(LOG_WARN, "bad major version for file %s %d, must be %d",
228                      fname, major, ISAMB_MAJOR_VERSION);
229                 return 0;
230             }
231             for (left = len - b_size; left > 0; left = left - b_size)
232             {
233                 pos++;
234                 if (!bf_read (isamb->file[i].bf, pos, 0, 0, hbuf + pos*b_size))
235                 {
236                     logf(LOG_WARN, "truncated isamb header for " 
237                          "file=%s len=%d pos=%d",
238                          fname, len, pos);
239                     return 0;
240                 }
241             }
242             src = hbuf + 16;
243             decode_ptr(&src, &isamb->file[i].head.first_block);
244             decode_ptr(&src, &isamb->file[i].head.last_block);
245             decode_ptr(&src, &zint_tmp);
246             isamb->file[i].head.block_size = zint_tmp;
247             decode_ptr(&src, &zint_tmp);
248             isamb->file[i].head.block_max = zint_tmp;
249             decode_ptr(&src, &isamb->file[i].head.free_list);
250         }
251         assert (isamb->file[i].head.block_size >= ISAMB_DATA_OFFSET);
252         isamb->file[i].head_dirty = 0;
253         assert(isamb->file[i].head.block_size == b_size);
254         b_size = b_size * 4;
255     }
256 #if ISAMB_DEBUG
257     logf(LOG_WARN, "isamb debug enabled. Things will be slower than usual");
258 #endif
259     return isamb;
260 }
261
262 static void flush_blocks (ISAMB b, int cat)
263 {
264     while (b->file[cat].cache_entries)
265     {
266         struct ISAMB_cache_entry *ce_this = b->file[cat].cache_entries;
267         b->file[cat].cache_entries = ce_this->next;
268
269         if (ce_this->dirty)
270         {
271             yaz_log (b->log_io, "bf_write: flush_blocks");
272             bf_write (b->file[cat].bf, ce_this->pos, 0, 0, ce_this->buf);
273         }
274         xfree (ce_this->buf);
275         xfree (ce_this);
276     }
277 }
278
279 static int get_block (ISAMB b, ISAMC_P pos, char *userbuf, int wr)
280 {
281     int cat = (int) (pos&CAT_MASK);
282     int off = (int) (((pos/CAT_MAX) & 
283                (ISAMB_CACHE_ENTRY_SIZE / b->file[cat].head.block_size - 1))
284         * b->file[cat].head.block_size);
285     zint norm = pos / (CAT_MASK*ISAMB_CACHE_ENTRY_SIZE / b->file[cat].head.block_size);
286     int no = 0;
287     struct ISAMB_cache_entry **ce, *ce_this = 0, **ce_last = 0;
288
289     if (!b->cache)
290         return 0;
291
292     assert (ISAMB_CACHE_ENTRY_SIZE >= b->file[cat].head.block_size);
293     for (ce = &b->file[cat].cache_entries; *ce; ce = &(*ce)->next, no++)
294     {
295         ce_last = ce;
296         if ((*ce)->pos == norm)
297         {
298             ce_this = *ce;
299             *ce = (*ce)->next;   /* remove from list */
300             
301             ce_this->next = b->file[cat].cache_entries;  /* move to front */
302             b->file[cat].cache_entries = ce_this;
303             
304             if (wr)
305             {
306                 memcpy (ce_this->buf + off, userbuf, 
307                         b->file[cat].head.block_size);
308                 ce_this->dirty = 1;
309             }
310             else
311                 memcpy (userbuf, ce_this->buf + off,
312                         b->file[cat].head.block_size);
313             return 1;
314         }
315     }
316     if (no >= 40)
317     {
318         assert (no == 40);
319         assert (ce_last && *ce_last);
320         ce_this = *ce_last;
321         *ce_last = 0;  /* remove the last entry from list */
322         if (ce_this->dirty)
323         {
324             yaz_log (b->log_io, "bf_write: get_block");
325             bf_write (b->file[cat].bf, ce_this->pos, 0, 0, ce_this->buf);
326         }
327         xfree (ce_this->buf);
328         xfree (ce_this);
329     }
330     ce_this = xmalloc (sizeof(*ce_this));
331     ce_this->next = b->file[cat].cache_entries;
332     b->file[cat].cache_entries = ce_this;
333     ce_this->buf = xmalloc (ISAMB_CACHE_ENTRY_SIZE);
334     ce_this->pos = norm;
335     yaz_log (b->log_io, "bf_read: get_block");
336     if (!bf_read (b->file[cat].bf, norm, 0, 0, ce_this->buf))
337         memset (ce_this->buf, 0, ISAMB_CACHE_ENTRY_SIZE);
338     if (wr)
339     {
340         memcpy (ce_this->buf + off, userbuf, b->file[cat].head.block_size);
341         ce_this->dirty = 1;
342     }
343     else
344     {
345         ce_this->dirty = 0;
346         memcpy (userbuf, ce_this->buf + off, b->file[cat].head.block_size);
347     }
348     return 1;
349 }
350
351
352 void isamb_close (ISAMB isamb)
353 {
354     int i;
355     for (i=0;isamb->accessed_nodes[i];i++)
356         logf(LOG_DEBUG,"isamb_close  level leaf-%d: "ZINT_FORMAT" read, "
357                         ZINT_FORMAT" skipped",
358              i, isamb->accessed_nodes[i], isamb->skipped_nodes[i]);
359     logf(LOG_DEBUG,"isamb_close returned "ZINT_FORMAT" values, "
360                    "skipped "ZINT_FORMAT,
361          isamb->skipped_numbers, isamb->returned_numbers);
362     for (i = 0; i<isamb->no_cat; i++)
363     {
364         flush_blocks (isamb, i);
365         if (isamb->file[i].head_dirty)
366         {
367             char hbuf[DST_BUF_SIZE];
368             int major = ISAMB_MAJOR_VERSION;
369             int minor = ISAMB_MINOR_VERSION;
370             int len = 16;
371             char *dst = hbuf + 16;
372             int pos = 0, left;
373             int b_size = isamb->file[i].head.block_size;
374
375             encode_ptr(&dst, isamb->file[i].head.first_block);
376             encode_ptr(&dst, isamb->file[i].head.last_block);
377             encode_ptr(&dst, isamb->file[i].head.block_size);
378             encode_ptr(&dst, isamb->file[i].head.block_max);
379             encode_ptr(&dst, isamb->file[i].head.free_list);
380             memset(dst, '\0', 16); /* ensure no random bytes are written */
381
382             len = dst - hbuf;
383
384             /* print exactly 16 bytes (including trailing 0) */
385             sprintf(hbuf, "isamb%02d %02d %02d\r\n", major, minor, len);
386
387             bf_write (isamb->file[i].bf, pos, 0, 0, hbuf);
388
389             for (left = len - b_size; left > 0; left = left - b_size)
390             {
391                 pos++;
392                 bf_write (isamb->file[i].bf, pos, 0, 0, hbuf + pos*b_size);
393             }
394         }
395         bf_close (isamb->file[i].bf);
396     }
397     xfree (isamb->file);
398     xfree (isamb->method);
399     xfree (isamb);
400 }
401
402 static struct ISAMB_block *open_block (ISAMB b, ISAMC_P pos)
403 {
404     int cat = (int) (pos&CAT_MASK);
405     struct ISAMB_block *p;
406     if (!pos)
407         return 0;
408     p = xmalloc (sizeof(*p));
409     p->pos = pos;
410     p->cat = (int) (pos & CAT_MASK);
411     p->buf = xmalloc (b->file[cat].head.block_size);
412     p->cbuf = 0;
413
414     if (!get_block (b, pos, p->buf, 0))
415     {
416         yaz_log (b->log_io, "bf_read: open_block");
417         if (!bf_read (b->file[cat].bf, pos/CAT_MAX, 0, 0, p->buf))
418         {
419             yaz_log (LOG_FATAL, "isamb: read fail for pos=%ld block=%ld",
420                      (long) pos, (long) pos/CAT_MAX);
421             abort();
422         }
423     }
424     p->bytes = p->buf + ISAMB_DATA_OFFSET;
425     p->leaf = p->buf[0];
426     p->size = (p->buf[1] + 256 * p->buf[2]) - ISAMB_DATA_OFFSET;
427     if (p->size < 0)
428     {
429         yaz_log (LOG_FATAL, "Bad block size %d in pos=" ZINT_FORMAT "\n",
430                  p->size, pos);
431     }
432     assert (p->size >= 0);
433     p->offset = 0;
434     p->dirty = 0;
435     p->deleted = 0;
436     p->decodeClientData = (*b->method->codec.start)();
437     yaz_log (LOG_DEBUG, "isamb_open_block: Opened block " ZINT_FORMAT " ofs=%d",pos, p->offset);
438     return p;
439 }
440
441 struct ISAMB_block *new_block (ISAMB b, int leaf, int cat)
442 {
443     struct ISAMB_block *p;
444
445     p = xmalloc (sizeof(*p));
446     p->buf = xmalloc (b->file[cat].head.block_size);
447
448     if (!b->file[cat].head.free_list)
449     {
450         zint block_no;
451         block_no = b->file[cat].head.last_block++;
452         p->pos = block_no * CAT_MAX + cat;
453     }
454     else
455     {
456         p->pos = b->file[cat].head.free_list;
457         assert((p->pos & CAT_MASK) == cat);
458         if (!get_block (b, p->pos, p->buf, 0))
459         {
460             yaz_log (b->log_io, "bf_read: new_block");
461             if (!bf_read (b->file[cat].bf, p->pos/CAT_MAX, 0, 0, p->buf))
462             {
463                 yaz_log (LOG_FATAL, "isamb: read fail for pos=%ld block=%ld",
464                          (long) p->pos/CAT_MAX, (long) p->pos/CAT_MAX);
465                 abort ();
466             }
467         }
468         yaz_log (b->log_freelist, "got block " ZINT_FORMAT " from freelist %d:" ZINT_FORMAT, p->pos,
469                  cat, p->pos/CAT_MAX);
470         memcpy (&b->file[cat].head.free_list, p->buf, sizeof(int));
471     }
472     p->cat = cat;
473     b->file[cat].head_dirty = 1;
474     memset (p->buf, 0, b->file[cat].head.block_size);
475     p->bytes = p->buf + ISAMB_DATA_OFFSET;
476     p->leaf = leaf;
477     p->size = 0;
478     p->dirty = 1;
479     p->deleted = 0;
480     p->offset = 0;
481     p->decodeClientData = (*b->method->codec.start)();
482     return p;
483 }
484
485 struct ISAMB_block *new_leaf (ISAMB b, int cat)
486 {
487     return new_block (b, 1, cat);
488 }
489
490
491 struct ISAMB_block *new_int (ISAMB b, int cat)
492 {
493     return new_block (b, 0, cat);
494 }
495
496 static void check_block (ISAMB b, struct ISAMB_block *p)
497 {
498     assert(b); /* mostly to make the compiler shut up about unused b */
499     if (p->leaf)
500     {
501         ;
502     }
503     else
504     {
505         /* sanity check */
506         char *startp = p->bytes;
507         const char *src = startp;
508         char *endp = p->bytes + p->size;
509         ISAMB_P pos;
510             
511         decode_ptr (&src, &pos);
512         assert ((pos&CAT_MASK) == p->cat);
513         while (src != endp)
514         {
515             zint item_len;
516             decode_ptr (&src, &item_len);
517             assert (item_len > 0 && item_len < 80);
518             src += item_len;
519             decode_ptr (&src, &pos);
520             assert ((pos&CAT_MASK) == p->cat);
521         }
522     }
523 }
524
525 void close_block (ISAMB b, struct ISAMB_block *p)
526 {
527     if (!p)
528         return;
529     if (p->deleted)
530     {
531         yaz_log (b->log_freelist, "release block " ZINT_FORMAT " from freelist %d:" ZINT_FORMAT,
532                  p->pos, p->cat, p->pos/CAT_MAX);
533         memcpy (p->buf, &b->file[p->cat].head.free_list, sizeof(int));
534         b->file[p->cat].head.free_list = p->pos;
535         if (!get_block (b, p->pos, p->buf, 1))
536         {
537             yaz_log (b->log_io, "bf_write: close_block (deleted)");
538             bf_write (b->file[p->cat].bf, p->pos/CAT_MAX, 0, 0, p->buf);
539         }
540     }
541     else if (p->dirty)
542     {
543         int size = p->size + ISAMB_DATA_OFFSET;
544         assert (p->size >= 0);
545         p->buf[0] = p->leaf;
546         p->buf[1] = size & 255;
547         p->buf[2] = size >> 8;
548         check_block(b, p);
549         if (!get_block (b, p->pos, p->buf, 1))
550         {
551             yaz_log (b->log_io, "bf_write: close_block");
552             bf_write (b->file[p->cat].bf, p->pos/CAT_MAX, 0, 0, p->buf);
553         }
554     }
555     (*b->method->codec.stop)(p->decodeClientData);
556     xfree (p->buf);
557     xfree (p);
558 }
559
560 int insert_sub (ISAMB b, struct ISAMB_block **p,
561                 void *new_item, int *mode,
562                 ISAMC_I *stream,
563                 struct ISAMB_block **sp,
564                 void *sub_item, int *sub_size,
565                 const void *max_item);
566
567 int insert_int (ISAMB b, struct ISAMB_block *p, void *lookahead_item,
568                 int *mode,
569                 ISAMC_I *stream, struct ISAMB_block **sp,
570                 void *split_item, int *split_size, const void *last_max_item)
571 {
572     char *startp = p->bytes;
573     const char *src = startp;
574     char *endp = p->bytes + p->size;
575     ISAMB_P pos;
576     struct ISAMB_block *sub_p1 = 0, *sub_p2 = 0;
577     char sub_item[DST_ITEM_MAX];
578     int sub_size;
579     int more = 0;
580
581     *sp = 0;
582
583     assert(p->size >= 0);
584     decode_ptr (&src, &pos);
585     while (src != endp)
586     {
587         zint item_len;
588         int d;
589         const char *src0 = src;
590         decode_ptr (&src, &item_len);
591         d = (*b->method->compare_item)(src, lookahead_item);
592         if (d > 0)
593         {
594             sub_p1 = open_block (b, pos);
595             assert (sub_p1);
596             more = insert_sub (b, &sub_p1, lookahead_item, mode,
597                                stream, &sub_p2, 
598                                sub_item, &sub_size, src);
599             src = src0;
600             break;
601         }
602         src += item_len;
603         decode_ptr (&src, &pos);
604     }
605     if (!sub_p1)
606     {
607         sub_p1 = open_block (b, pos);
608         assert (sub_p1);
609         more = insert_sub (b, &sub_p1, lookahead_item, mode, stream, &sub_p2, 
610                            sub_item, &sub_size, last_max_item);
611     }
612     if (sub_p2)
613     {
614         /* there was a split - must insert pointer in this one */
615         char dst_buf[DST_BUF_SIZE];
616         char *dst = dst_buf;
617
618         assert (sub_size < 80 && sub_size > 1);
619
620         memcpy (dst, startp, src - startp);
621                 
622         dst += src - startp;
623
624         encode_ptr (&dst, sub_size);      /* sub length and item */
625         memcpy (dst, sub_item, sub_size);
626         dst += sub_size;
627
628         encode_ptr (&dst, sub_p2->pos);   /* pos */
629
630         if (endp - src)                   /* remaining data */
631         {
632             memcpy (dst, src, endp - src);
633             dst += endp - src;
634         }
635         p->size = dst - dst_buf;
636         assert (p->size >= 0);
637         if (p->size <= b->file[p->cat].head.block_max)
638         {
639             memcpy (startp, dst_buf, dst - dst_buf);
640         }
641         else
642         {
643             zint split_size_tmp;
644             int p_new_size;
645             const char *half;
646             src = dst_buf;
647             endp = dst;
648
649             half = src + b->file[p->cat].head.block_size/2;
650             decode_ptr (&src, &pos);
651             while (src <= half)
652             {
653                 decode_ptr (&src, &split_size_tmp);
654                 *split_size = (int) split_size_tmp;
655
656                 src += *split_size;
657                 decode_ptr (&src, &pos);
658             }
659             p_new_size = src - dst_buf;
660             memcpy (p->bytes, dst_buf, p_new_size);
661
662             decode_ptr (&src, &split_size_tmp);
663             *split_size = (int) split_size_tmp;
664             memcpy (split_item, src, *split_size);
665             src += *split_size;
666
667             *sp = new_int (b, p->cat);
668             (*sp)->size = endp - src;
669             memcpy ((*sp)->bytes, src, (*sp)->size);
670
671             p->size = p_new_size;
672         }
673         p->dirty = 1;
674         close_block (b, sub_p2);
675     }
676     close_block (b, sub_p1);
677     return more;
678 }
679
680 int insert_leaf (ISAMB b, struct ISAMB_block **sp1, void *lookahead_item,
681                  int *lookahead_mode, ISAMC_I *stream,
682                  struct ISAMB_block **sp2,
683                  void *sub_item, int *sub_size,
684                  const void *max_item)
685 {
686     struct ISAMB_block *p = *sp1;
687     char *endp = 0;
688     const char *src = 0;
689     char dst_buf[DST_BUF_SIZE], *dst = dst_buf;
690     int new_size;
691     void *c1 = (*b->method->codec.start)();
692     void *c2 = (*b->method->codec.start)();
693     int more = 1;
694     int quater = b->file[b->no_cat-1].head.block_max / CAT_MAX;
695     char *cut = dst_buf + quater * 2;
696     char *maxp = dst_buf + b->file[b->no_cat-1].head.block_max;
697     char *half1 = 0;
698     char *half2 = 0;
699     char cut_item_buf[DST_ITEM_MAX];
700     int cut_item_size = 0;
701
702     if (p && p->size)
703     {
704         char file_item_buf[DST_ITEM_MAX];
705         char *file_item = file_item_buf;
706             
707         src = p->bytes;
708         endp = p->bytes + p->size;
709         (*b->method->codec.decode)(c1, &file_item, &src);
710         while (1)
711         {
712             const char *dst_item = 0;
713             char *dst_0 = dst;
714             char *lookahead_next;
715             int d = -1;
716             
717             if (lookahead_item)
718                 d = (*b->method->compare_item)(file_item_buf, lookahead_item);
719             
720             if (d > 0)
721             {
722                 dst_item = lookahead_item;
723                 if (!*lookahead_mode)
724                 {
725                     yaz_log (LOG_WARN, "isamb: Inconsistent register (1)");
726                     assert (*lookahead_mode);
727                 }
728             }
729             else
730                 dst_item = file_item_buf;
731             if (!*lookahead_mode && d == 0)
732             {
733                 p->dirty = 1;
734             }
735             else if (!half1 && dst > cut)
736             {
737                 const char *dst_item_0 = dst_item;
738                 half1 = dst; /* candidate for splitting */
739                 
740                 (*b->method->codec.encode)(c2, &dst, &dst_item);
741                 
742                 cut_item_size = dst_item - dst_item_0;
743                 assert(cut_item_size > 0);
744                 memcpy (cut_item_buf, dst_item_0, cut_item_size);
745                 
746                 half2 = dst;
747             }
748             else
749                 (*b->method->codec.encode)(c2, &dst, &dst_item);
750             if (d > 0)  
751             {
752                 if (dst > maxp)
753                 {
754                     dst = dst_0;
755                     lookahead_item = 0;
756                 }
757                 else
758                 {
759                     lookahead_next = lookahead_item;
760                     if (!(*stream->read_item)(stream->clientData,
761                                               &lookahead_next,
762                                               lookahead_mode))
763                     {
764                         lookahead_item = 0;
765                         more = 0;
766                     }
767                     if (lookahead_item && max_item &&
768                         (*b->method->compare_item)(max_item, lookahead_item) <= 0)
769                     {
770                         /* max_item 1 */
771                         lookahead_item = 0;
772                     }
773                     
774                     p->dirty = 1;
775                 }
776             }
777             else if (d == 0)
778             {
779                 lookahead_next = lookahead_item;
780                 if (!(*stream->read_item)(stream->clientData,
781                                           &lookahead_next, lookahead_mode))
782                 {
783                     lookahead_item = 0;
784                     more = 0;
785                 }
786                 if (src == endp)
787                     break;
788                 file_item = file_item_buf;
789                 (*b->method->codec.decode)(c1, &file_item, &src);
790             }
791             else
792             {
793                 if (src == endp)
794                     break;
795                 file_item = file_item_buf;
796                 (*b->method->codec.decode)(c1, &file_item, &src);
797             }
798         }
799     }
800     maxp = dst_buf + b->file[b->no_cat-1].head.block_max + quater;
801     while (lookahead_item)
802     {
803         char *dst_item;
804         const char *src = lookahead_item;
805         char *dst_0 = dst;
806         
807         if (max_item &&
808             (*b->method->compare_item)(max_item, lookahead_item) <= 0)
809         {
810             /* max_item 2 */
811             break;
812         }
813         if (!*lookahead_mode)
814         {
815             yaz_log (LOG_WARN, "isamb: Inconsistent register (2)");
816             abort();
817         }
818         else if (!half1 && dst > cut)   
819         {
820             const char *src_0 = src;
821             half1 = dst; /* candidate for splitting */
822             
823             (*b->method->codec.encode)(c2, &dst, &src);
824             
825             cut_item_size = src - src_0;
826             assert(cut_item_size > 0);
827             memcpy (cut_item_buf, src_0, cut_item_size);
828             
829             half2 = dst;
830         }
831         else
832             (*b->method->codec.encode)(c2, &dst, &src);
833
834         if (dst > maxp)
835         {
836             dst = dst_0;
837             break;
838         }
839         if (p)
840             p->dirty = 1;
841         dst_item = lookahead_item;
842         if (!(*stream->read_item)(stream->clientData, &dst_item,
843                                   lookahead_mode))
844         {
845             lookahead_item = 0;
846             more = 0;
847         }
848     }
849     new_size = dst - dst_buf;
850     if (p && p->cat != b->no_cat-1 && 
851         new_size > b->file[p->cat].head.block_max)
852     {
853         /* non-btree block will be removed */
854         p->deleted = 1;
855         close_block (b, p);
856         /* delete it too!! */
857         p = 0; /* make a new one anyway */
858     }
859     if (!p)
860     {   /* must create a new one */
861         int i;
862         for (i = 0; i < b->no_cat; i++)
863             if (new_size <= b->file[i].head.block_max)
864                 break;
865         if (i == b->no_cat)
866             i = b->no_cat - 1;
867         p = new_leaf (b, i);
868     }
869     if (new_size > b->file[p->cat].head.block_max)
870     {
871         char *first_dst;
872         const char *cut_item = cut_item_buf;
873
874         assert (half1);
875         assert (half2);
876
877         assert(cut_item_size > 0);
878         
879         /* first half */
880         p->size = half1 - dst_buf;
881         memcpy (p->bytes, dst_buf, half1 - dst_buf);
882
883         /* second half */
884         *sp2 = new_leaf (b, p->cat);
885
886         (*b->method->codec.reset)(c2);
887
888         first_dst = (*sp2)->bytes;
889
890         (*b->method->codec.encode)(c2, &first_dst, &cut_item);
891
892         memcpy (first_dst, half2, dst - half2);
893
894         (*sp2)->size = (first_dst - (*sp2)->bytes) + (dst - half2);
895         (*sp2)->dirty = 1;
896         p->dirty = 1;
897         memcpy (sub_item, cut_item_buf, cut_item_size);
898         *sub_size = cut_item_size;
899     }
900     else
901     {
902         memcpy (p->bytes, dst_buf, dst - dst_buf);
903         p->size = new_size;
904     }
905     (*b->method->codec.stop)(c1);
906     (*b->method->codec.stop)(c2);
907     *sp1 = p;
908     return more;
909 }
910
911 int insert_sub (ISAMB b, struct ISAMB_block **p, void *new_item,
912                 int *mode,
913                 ISAMC_I *stream,
914                 struct ISAMB_block **sp,
915                 void *sub_item, int *sub_size,
916                 const void *max_item)
917 {
918     if (!*p || (*p)->leaf)
919         return insert_leaf (b, p, new_item, mode, stream, sp, sub_item, 
920                             sub_size, max_item);
921     else
922         return insert_int (b, *p, new_item, mode, stream, sp, sub_item,
923                            sub_size, max_item);
924 }
925
926 int isamb_unlink (ISAMB b, ISAMC_P pos)
927 {
928     struct ISAMB_block *p1;
929
930     if (!pos)
931         return 0;
932     p1 = open_block(b, pos);
933     p1->deleted = 1;
934     if (!p1->leaf)
935     {
936         zint sub_p;
937         zint item_len;
938         const char *src = p1->bytes + p1->offset;
939
940         decode_ptr(&src, &sub_p);
941         isamb_unlink(b, sub_p);
942         
943         while (src != p1->bytes + p1->size)
944         {
945             decode_ptr(&src, &item_len);
946             src += item_len;
947             decode_ptr(&src, &sub_p);
948             isamb_unlink(b, sub_p);
949         }
950     }
951     close_block(b, p1);
952     return 0;
953 }
954
955 ISAMB_P isamb_merge (ISAMB b, ISAMC_P pos, ISAMC_I *stream)
956 {
957     char item_buf[DST_ITEM_MAX];
958     char *item_ptr;
959     int i_mode;
960     int more;
961
962     if (b->cache < 0)
963     {
964         int more = 1;
965         while (more)
966         {
967             item_ptr = item_buf;
968             more =
969                 (*stream->read_item)(stream->clientData, &item_ptr, &i_mode);
970         }
971         return 1;
972     }
973     item_ptr = item_buf;
974     more = (*stream->read_item)(stream->clientData, &item_ptr, &i_mode);
975     while (more)
976     {
977         struct ISAMB_block *p = 0, *sp = 0;
978         char sub_item[DST_ITEM_MAX];
979         int sub_size;
980         
981         if (pos)
982             p = open_block (b, pos);
983         more = insert_sub (b, &p, item_buf, &i_mode, stream, &sp,
984                             sub_item, &sub_size, 0);
985         if (sp)
986         {    /* increase level of tree by one */
987             struct ISAMB_block *p2 = new_int (b, p->cat);
988             char *dst = p2->bytes + p2->size;
989             
990             encode_ptr (&dst, p->pos);
991             assert (sub_size < 40);
992             encode_ptr (&dst, sub_size);
993             memcpy (dst, sub_item, sub_size);
994             dst += sub_size;
995             encode_ptr (&dst, sp->pos);
996             
997             p2->size = dst - p2->bytes;
998             pos = p2->pos;  /* return new super page */
999             close_block (b, sp);
1000             close_block (b, p2);
1001         }
1002         else
1003             pos = p->pos;   /* return current one (again) */
1004         close_block (b, p);
1005     }
1006     return pos;
1007 }
1008
1009 ISAMB_PP isamb_pp_open_x (ISAMB isamb, ISAMB_P pos, int *level)
1010 {
1011     ISAMB_PP pp = xmalloc (sizeof(*pp));
1012     int i;
1013
1014     pp->isamb = isamb;
1015     pp->block = xmalloc (ISAMB_MAX_LEVEL * sizeof(*pp->block));
1016
1017     pp->pos = pos;
1018     pp->level = 0;
1019     pp->maxlevel=0;
1020     pp->total_size = 0;
1021     pp->no_blocks = 0;
1022     pp->skipped_numbers=0;
1023     pp->returned_numbers=0;
1024     for (i=0;i<ISAMB_MAX_LEVEL;i++)
1025         pp->skipped_nodes[i] = pp->accessed_nodes[i]=0;
1026     while (1)
1027     {
1028         struct ISAMB_block *p = open_block (isamb, pos);
1029         const char *src = p->bytes + p->offset;
1030         pp->block[pp->level] = p;
1031
1032         pp->total_size += p->size;
1033         pp->no_blocks++;
1034         if (p->leaf)
1035             break;
1036
1037                                         
1038         decode_ptr (&src, &pos);
1039         p->offset = src - p->bytes;
1040         pp->level++;
1041         pp->accessed_nodes[pp->level]++; 
1042     }
1043     pp->block[pp->level+1] = 0;
1044     pp->maxlevel=pp->level;
1045     if (level)
1046         *level = pp->level;
1047     return pp;
1048 }
1049
1050 ISAMB_PP isamb_pp_open (ISAMB isamb, ISAMB_P pos)
1051 {
1052     return isamb_pp_open_x (isamb, pos, 0);
1053 }
1054
1055 void isamb_pp_close_x (ISAMB_PP pp, int *size, int *blocks)
1056 {
1057     int i;
1058     if (!pp)
1059         return;
1060     logf(LOG_DEBUG,"isamb_pp_close lev=%d returned "ZINT_FORMAT" values," 
1061                     "skipped "ZINT_FORMAT,
1062         pp->maxlevel, pp->skipped_numbers, pp->returned_numbers);
1063     for (i=pp->maxlevel;i>=0;i--)
1064         if ( pp->skipped_nodes[i] || pp->accessed_nodes[i])
1065             logf(LOG_DEBUG,"isamb_pp_close  level leaf-%d: "
1066                             ZINT_FORMAT" read, "ZINT_FORMAT" skipped", i,
1067                  pp->accessed_nodes[i], pp->skipped_nodes[i]);
1068     pp->isamb->skipped_numbers += pp->skipped_numbers;
1069     pp->isamb->returned_numbers += pp->returned_numbers;
1070     for (i=pp->maxlevel;i>=0;i--)
1071     {
1072         pp->isamb->accessed_nodes[i] += pp->accessed_nodes[i];
1073         pp->isamb->skipped_nodes[i] += pp->skipped_nodes[i];
1074     }
1075     if (size)
1076         *size = pp->total_size;
1077     if (blocks)
1078         *blocks = pp->no_blocks;
1079     for (i = 0; i <= pp->level; i++)
1080         close_block (pp->isamb, pp->block[i]);
1081     xfree (pp->block);
1082     xfree (pp);
1083 }
1084
1085 int isamb_block_info (ISAMB isamb, int cat)
1086 {
1087     if (cat >= 0 && cat < isamb->no_cat)
1088         return isamb->file[cat].head.block_size;
1089     return -1;
1090 }
1091
1092 void isamb_pp_close (ISAMB_PP pp)
1093 {
1094     isamb_pp_close_x (pp, 0, 0);
1095 }
1096
1097 /* simple recursive dumper .. */
1098 static void isamb_dump_r (ISAMB b, ISAMB_P pos, void (*pr)(const char *str),
1099                           int level)
1100 {
1101     char buf[1024];
1102     char prefix_str[1024];
1103     if (pos)
1104     {
1105         struct ISAMB_block *p = open_block (b, pos);
1106         sprintf(prefix_str, "%*s " ZINT_FORMAT " cat=%d size=%d max=%d", level*2, "",
1107                 pos, p->cat, p->size, b->file[p->cat].head.block_max);
1108         (*pr)(prefix_str);
1109         sprintf(prefix_str, "%*s " ZINT_FORMAT, level*2, "", pos);
1110         if (p->leaf)
1111         {
1112             while (p->offset < p->size)
1113             {
1114                 const char *src = p->bytes + p->offset;
1115                 char *dst = buf;
1116                 (*b->method->codec.decode)(p->decodeClientData, &dst, &src);
1117                 (*b->method->log_item)(LOG_DEBUG, buf, prefix_str);
1118                 p->offset = src - (char*) p->bytes;
1119             }
1120             assert(p->offset == p->size);
1121         }
1122         else
1123         {
1124             const char *src = p->bytes + p->offset;
1125             ISAMB_P sub;
1126             zint item_len;
1127
1128             decode_ptr (&src, &sub);
1129             p->offset = src - (char*) p->bytes;
1130
1131             isamb_dump_r(b, sub, pr, level+1);
1132             
1133             while (p->offset < p->size)
1134             {
1135                 decode_ptr (&src, &item_len);
1136                 (*b->method->log_item)(LOG_DEBUG, src, prefix_str);
1137                 src += item_len;
1138                 decode_ptr (&src, &sub);
1139                 
1140                 p->offset = src - (char*) p->bytes;
1141                 
1142                 isamb_dump_r(b, sub, pr, level+1);
1143             }           
1144         }
1145         close_block(b,p);
1146     }
1147 }
1148
1149 void isamb_dump (ISAMB b, ISAMB_P pos, void (*pr)(const char *str))
1150 {
1151     isamb_dump_r(b, pos, pr, 0);
1152 }
1153
1154 #if 0
1155 /* Old isamb_pp_read that Adam wrote, kept as a reference in case we need to
1156    debug the more complex pp_read that also forwards. May be deleted near end
1157    of 2004, if it has not shown to be useful */
1158
1159
1160 int isamb_pp_read (ISAMB_PP pp, void *buf)
1161 {
1162     char *dst = buf;
1163     char *src;
1164     struct ISAMB_block *p = pp->block[pp->level];
1165     if (!p)
1166         return 0;
1167
1168     while (p->offset == p->size)
1169     {
1170         int pos, item_len;
1171         while (p->offset == p->size)
1172         {
1173             if (pp->level == 0)
1174                 return 0;
1175             close_block (pp->isamb, pp->block[pp->level]);
1176             pp->block[pp->level] = 0;
1177             (pp->level)--;
1178             p = pp->block[pp->level];
1179             assert (!p->leaf);  
1180         }
1181         src = p->bytes + p->offset;
1182         
1183         decode_ptr (&src, &item_len);
1184         src += item_len;
1185         decode_ptr (&src, &pos);
1186         
1187         p->offset = src - (char*) p->bytes;
1188
1189         ++(pp->level);
1190         
1191         while (1)
1192         {
1193             pp->block[pp->level] = p = open_block (pp->isamb, pos);
1194
1195             pp->total_size += p->size;
1196             pp->no_blocks++;
1197             
1198             if (p->leaf) 
1199             {
1200                 break;
1201             }
1202             src = p->bytes + p->offset;
1203             decode_ptr (&src, &pos);
1204             p->offset = src - (char*) p->bytes;
1205             pp->level++;
1206         }
1207     }
1208     assert (p->offset < p->size);
1209     assert (p->leaf);
1210     src = p->bytes + p->offset;
1211     (*pp->isamb->method->codec.code_item)(ISAMC_DECODE, p->decodeClientData,
1212                                     &dst, &src);
1213     p->offset = src - (char*) p->bytes;
1214     /* key_logdump_txt(LOG_DEBUG,buf, "isamb_pp_read returning 1"); */
1215     return 1;
1216 }
1217
1218 #else
1219 int isamb_pp_read (ISAMB_PP pp, void *buf)
1220 {
1221     return isamb_pp_forward(pp, buf, 0);
1222 }
1223 #endif
1224
1225 #define NEW_FORWARD 1
1226
1227 #if NEW_FORWARD == 1
1228
1229 static int isamb_pp_on_right_node(ISAMB_PP pp, int level, const void *untilbuf)
1230 { /* looks one node higher to see if we should be on this node at all */
1231   /* useful in backing off quickly, and in avoiding tail descends */
1232   /* call with pp->level to begin with */
1233     struct ISAMB_block *p;
1234     int cmp;
1235     const char *src;
1236     zint item_len;
1237     assert(level>=0);
1238     if ( level == 0) {
1239 #if ISAMB_DEBUG
1240             logf(LOG_DEBUG,"isamb_pp_on_right returning true for root");
1241 #endif
1242         return 1; /* we can never skip the root node */
1243     }
1244     level--;
1245     p=pp->block[level];
1246     assert(p->offset <= p->size);
1247     if (p->offset < p->size )
1248     {
1249         assert(p->offset>0); 
1250         src=p->bytes + p->offset;
1251         decode_ptr(&src, &item_len);
1252 #if ISAMB_DEBUG
1253         (*pp->isamb->method->codec.log_item)(LOG_DEBUG,untilbuf,"on_leaf: until");
1254         (*pp->isamb->method->codec.log_item)(LOG_DEBUG,src,"on_leaf: value");
1255 #endif
1256         cmp=(*pp->isamb->method->compare_item)(untilbuf,src);
1257         if (cmp<2) {
1258 #if ISAMB_DEBUG
1259             logf(LOG_DEBUG,"isamb_pp_on_right returning true "
1260                             "cmp=%d lev=%d ofs=%d",cmp,level,p->offset);
1261 #endif
1262             return 1; 
1263         }
1264         else {
1265 #if ISAMB_DEBUG
1266             logf(LOG_DEBUG,"isamb_pp_on_right returning false "
1267                             "cmp=%d lev=%d ofs=%d",cmp,level,p->offset);
1268 #endif
1269             return 0; 
1270         }
1271     }
1272     else {
1273 #if ISAMB_DEBUG
1274         logf(LOG_DEBUG,"isamb_pp_on_right at tail, looking higher "
1275                         "lev=%d",level);
1276 #endif
1277         return isamb_pp_on_right_node(pp, level, untilbuf);
1278     }
1279 } /* isamb_pp_on_right_node */
1280
1281 static int isamb_pp_read_on_leaf(ISAMB_PP pp, void *buf)
1282 { /* reads the next item on the current leaf, returns 0 if end of leaf*/
1283     struct ISAMB_block *p = pp->block[pp->level];
1284     char *dst;
1285     const char *src;
1286     assert(pp);
1287     assert(buf);
1288     if (p->offset == p->size) {
1289 #if ISAMB_DEBUG
1290         logf(LOG_DEBUG,"isamb_pp_read_on_leaf returning 0 on node %d",p->pos);
1291 #endif
1292         return 0; /* at end of leaf */
1293     }
1294     src=p->bytes + p->offset;
1295     dst=buf;
1296     (*pp->isamb->method->codec.decode)(p->decodeClientData,&dst, &src);
1297     p->offset = src - (char*) p->bytes;
1298     /*
1299 #if ISAMB_DEBUG
1300     (*pp->isamb->method->codec.log_item)(LOG_DEBUG, buf, "read_on_leaf returning 1");
1301 #endif
1302 */
1303     pp->returned_numbers++;
1304     return 1;
1305 } /* read_on_leaf */
1306
1307 static int isamb_pp_forward_on_leaf(ISAMB_PP pp, void *buf, const void *untilbuf)
1308 { /* forwards on the current leaf, returns 0 if not found */
1309     int cmp;
1310     int skips=0;
1311     while (1){
1312         if (!isamb_pp_read_on_leaf(pp,buf))
1313             return 0;
1314         /* FIXME - this is an extra function call, inline the read? */
1315         cmp=(*pp->isamb->method->compare_item)(untilbuf,buf);
1316         if (cmp <2){  /* found a good one */
1317 #if ISAMB_DEBUG
1318             if (skips)
1319                 logf(LOG_DEBUG, "isam_pp_fwd_on_leaf skipped %d items",skips);
1320 #endif
1321             pp->returned_numbers++;
1322             return 1;
1323         }
1324         if (!skips)
1325             if (!isamb_pp_on_right_node(pp, pp->level, untilbuf))
1326                 return 0; /* never mind the rest of this leaf */
1327         pp->skipped_numbers++;
1328         skips++;
1329     }
1330 } /* forward_on_leaf */
1331
1332 static int isamb_pp_climb_level(ISAMB_PP pp, ISAMB_P *pos)
1333 { /* climbs higher in the tree, until finds a level with data left */
1334   /* returns the node to (consider to) descend to in *pos) */
1335     struct ISAMB_block *p = pp->block[pp->level];
1336     const char *src;
1337     zint item_len;
1338 #if ISAMB_DEBUG
1339     logf(LOG_DEBUG,"isamb_pp_climb_level starting "
1340                    "at level %d node %d ofs=%d sz=%d",
1341                     pp->level, p->pos, p->offset, p->size);
1342 #endif
1343     assert(pp->level >= 0);
1344     assert(p->offset <= p->size);
1345     if (pp->level==0)
1346     {
1347 #if ISAMB_DEBUG
1348         logf(LOG_DEBUG,"isamb_pp_climb_level returning 0 at root");
1349 #endif
1350         return 0;
1351     }
1352     assert(pp->level>0); 
1353     close_block(pp->isamb, pp->block[pp->level]);
1354     pp->block[pp->level]=0;
1355     (pp->level)--;
1356     p=pp->block[pp->level];
1357 #if ISAMB_DEBUG
1358     logf(LOG_DEBUG,"isamb_pp_climb_level climbed to level %d node %d ofs=%d",
1359                     pp->level, p->pos, p->offset);
1360 #endif
1361     assert(!p->leaf);
1362     assert(p->offset <= p->size);
1363     if (p->offset == p->size ) {
1364         /* we came from the last pointer, climb on */
1365         if (!isamb_pp_climb_level(pp,pos))
1366             return 0;
1367         p=pp->block[pp->level];
1368     }
1369     else
1370     {
1371         /* skip the child we just came from */
1372 #if ISAMB_DEBUG
1373         logf(LOG_DEBUG,"isam_pp_climb_level: skipping lev=%d ofs=%d sz=%d", 
1374                         pp->level, p->offset, p->size);
1375 #endif
1376         assert (p->offset < p->size );
1377         src=p->bytes + p->offset;
1378         decode_ptr(&src, &item_len);
1379         src += item_len;
1380         decode_ptr(&src, pos);
1381         p->offset=src - (char *)p->bytes;
1382             
1383     }
1384     return 1;
1385 } /* climb_level */
1386
1387
1388 static zint isamb_pp_forward_unode(ISAMB_PP pp, zint pos, const void *untilbuf)
1389 { /* scans a upper node until it finds a child <= untilbuf */
1390   /* pp points to the key value, as always. pos is the child read from */
1391   /* the buffer */
1392   /* if all values are too small, returns the last child in the node */
1393   /* FIXME - this can be detected, and avoided by looking at the */
1394   /* parent node, but that gets messy. Presumably the cost is */
1395   /* pretty low anyway */
1396     struct ISAMB_block *p = pp->block[pp->level];
1397     const char *src=p->bytes + p->offset;
1398     zint item_len;
1399     int cmp;
1400     zint nxtpos;
1401 #if ISAMB_DEBUG
1402     int skips=0;
1403     logf(LOG_DEBUG,"isamb_pp_forward_unode starting "
1404                    "at level %d node %d ofs=%di sz=%d",
1405                     pp->level, p->pos, p->offset, p->size);
1406 #endif
1407     assert(!p->leaf);
1408     assert(p->offset <= p->size);
1409     if (p->offset == p->size) {
1410 #if ISAMB_DEBUG
1411             logf(LOG_DEBUG,"isamb_pp_forward_unode returning at end "
1412                    "at level %d node %d ofs=%di sz=%d",
1413                     pp->level, p->pos, p->offset, p->size);
1414 #endif
1415         return pos; /* already at the end of it */
1416     }
1417     while(p->offset < p->size) {
1418         decode_ptr(&src,&item_len);
1419         cmp=(*pp->isamb->method->compare_item)(untilbuf,src);
1420         src+=item_len;
1421         decode_ptr(&src,&nxtpos);
1422         if (cmp<2)
1423         {
1424 #if ISAMB_DEBUG
1425             logf(LOG_DEBUG,"isamb_pp_forward_unode returning a hit "
1426                    "at level %d node %d ofs=%d sz=%d",
1427                     pp->level, p->pos, p->offset, p->size);
1428 #endif
1429             return pos;
1430         } /* found one */
1431         pos=nxtpos;
1432         p->offset=src-(char*)p->bytes;
1433         (pp->skipped_nodes[pp->maxlevel - pp->level -1])++;
1434 #if ISAMB_DEBUG
1435         skips++;
1436 #endif
1437     }
1438 #if ISAMB_DEBUG
1439             logf(LOG_DEBUG,"isamb_pp_forward_unode returning at tail "
1440                    "at level %d node %d ofs=%d sz=%d skips=%d",
1441                     pp->level, p->pos, p->offset, p->size, skips);
1442 #endif
1443     return pos; /* that's the last one in the line */
1444     
1445 } /* forward_unode */
1446
1447 static void isamb_pp_descend_to_leaf(ISAMB_PP pp, ISAMB_P pos, const void *untilbuf)
1448 { /* climbs down the tree, from pos, to the leftmost leaf */
1449     struct ISAMB_block *p = pp->block[pp->level];
1450     const char *src;
1451     assert(!p->leaf);
1452 #if ISAMB_DEBUG
1453     logf(LOG_DEBUG,"isamb_pp_descend_to_leaf "
1454                    "starting at lev %d node %d ofs=%d lf=%d u=%p", 
1455                    pp->level, p->pos, p->offset, p->leaf, untilbuf);
1456 #endif
1457     if (untilbuf)
1458         pos=isamb_pp_forward_unode(pp,pos,untilbuf);
1459     ++(pp->level);
1460     assert(pos);
1461     p=open_block(pp->isamb, pos);
1462     pp->block[pp->level]=p;
1463     ++(pp->accessed_nodes[pp->maxlevel-pp->level]);
1464     ++(pp->no_blocks);
1465 #if ISAMB_DEBUG
1466     logf(LOG_DEBUG,"isamb_pp_descend_to_leaf "
1467                    "got lev %d node %d lf=%d", 
1468                    pp->level, p->pos, p->leaf);
1469 #endif
1470     if (p->leaf)
1471         return;
1472     assert (p->offset==0 );
1473     src=p->bytes + p->offset;
1474     decode_ptr(&src, &pos);
1475     p->offset=src-(char*)p->bytes;
1476     isamb_pp_descend_to_leaf(pp,pos,untilbuf);
1477 #if ISAMB_DEBUG
1478     logf(LOG_DEBUG,"isamb_pp_descend_to_leaf "
1479                    "returning at lev %d node %d ofs=%d lf=%d", 
1480                    pp->level, p->pos, p->offset, p->leaf);
1481 #endif
1482 } /* descend_to_leaf */
1483
1484 static int isamb_pp_find_next_leaf(ISAMB_PP pp)
1485 { /* finds the next leaf by climbing up and down */
1486     ISAMB_P pos;
1487     if (!isamb_pp_climb_level(pp,&pos))
1488         return 0;
1489     isamb_pp_descend_to_leaf(pp, pos,0);
1490     return 1;
1491 }
1492
1493 static int isamb_pp_climb_desc(ISAMB_PP pp,  const void *untilbuf)
1494 { /* climbs up and descends to a leaf where values >= *untilbuf are found */
1495     ISAMB_P pos;
1496 #if ISAMB_DEBUG
1497     struct ISAMB_block *p = pp->block[pp->level];
1498     logf(LOG_DEBUG,"isamb_pp_climb_desc starting "
1499                    "at level %d node %d ofs=%d sz=%d",
1500                     pp->level, p->pos, p->offset, p->size);
1501 #endif
1502     if (!isamb_pp_climb_level(pp,&pos))
1503         return 0;
1504     /* see if it would pay to climb one higher */
1505     if (!isamb_pp_on_right_node(pp, pp->level, untilbuf))
1506         if (!isamb_pp_climb_level(pp,&pos))
1507             return 0;
1508     isamb_pp_descend_to_leaf(pp, pos,untilbuf);
1509 #if ISAMB_DEBUG
1510     p = pp->block[pp->level];
1511     logf(LOG_DEBUG,"isamb_pp_climb_desc done "
1512                    "at level %d node %d ofs=%d sz=%d",
1513                     pp->level, p->pos, p->offset, p->size);
1514 #endif
1515     return 1;
1516 } /* climb_desc */
1517
1518 int isamb_pp_forward (ISAMB_PP pp, void *buf, const void *untilbuf)
1519 {
1520 #if ISAMB_DEBUG
1521     struct ISAMB_block *p = pp->block[pp->level];
1522     assert(p->leaf);
1523     logf(LOG_DEBUG,"isamb_pp_forward starting "
1524                    "at level %d node %d ofs=%d sz=%d u=%p",
1525                     pp->level, p->pos, p->offset, p->size,untilbuf);
1526 #endif
1527     if (untilbuf) {
1528         if (isamb_pp_forward_on_leaf( pp, buf, untilbuf)) {
1529 #if ISAMB_DEBUG
1530             logf(LOG_DEBUG,"isamb_pp_forward (f) returning (A) "
1531                    "at level %d node %d ofs=%d sz=%d",
1532                     pp->level, p->pos, p->offset, p->size);
1533 #endif
1534             return 1;
1535         }
1536         if (! isamb_pp_climb_desc( pp, untilbuf)) {
1537 #if ISAMB_DEBUG
1538             logf(LOG_DEBUG,"isamb_pp_forward (f) returning notfound (B) "
1539                    "at level %d node %d ofs=%d sz=%d",
1540                     pp->level, p->pos, p->offset, p->size);
1541 #endif
1542             return 0; /* could not find a leaf */
1543         }
1544         do{
1545             if (isamb_pp_forward_on_leaf( pp, buf, untilbuf)) {
1546 #if ISAMB_DEBUG
1547             logf(LOG_DEBUG,"isamb_pp_forward (f) returning (C) "
1548                    "at level %d node %d ofs=%d sz=%d",
1549                     pp->level, p->pos, p->offset, p->size);
1550 #endif
1551                 return 1;
1552             }
1553         }while ( isamb_pp_find_next_leaf(pp));
1554         return 0; /* could not find at all */
1555     }
1556     else { /* no untilbuf, a straight read */
1557         /* FIXME - this should be moved
1558          * directly into the pp_read */
1559         /* keeping here now, to keep same
1560          * interface as the old fwd */
1561         if (isamb_pp_read_on_leaf( pp, buf)) {
1562 #if ISAMB_DEBUG
1563             logf(LOG_DEBUG,"isamb_pp_forward (read) returning (D) "
1564                    "at level %d node %d ofs=%d sz=%d",
1565                     pp->level, p->pos, p->offset, p->size);
1566 #endif
1567             return 1;
1568         }
1569         if (isamb_pp_find_next_leaf(pp)) {
1570 #if ISAMB_DEBUG
1571             logf(LOG_DEBUG,"isamb_pp_forward (read) returning (E) "
1572                    "at level %d node %d ofs=%d sz=%d",
1573                     pp->level, p->pos, p->offset, p->size);
1574 #endif
1575             return isamb_pp_read_on_leaf(pp, buf);
1576         }
1577         else
1578             return 0;
1579     }
1580 } /* isam_pp_forward (new version) */
1581
1582 #elif NEW_FORWARD == 0
1583
1584 int isamb_pp_forward (ISAMB_PP pp, void *buf, const void *untilbuf)
1585 {
1586     /* pseudocode:
1587      *   while 1
1588      *     while at end of node
1589      *       climb higher. If out, return 0
1590      *     while not on a leaf (and not at its end)
1591      *       decode next
1592      *       if cmp
1593      *         descend to node
1594      *     decode next
1595      *     if cmp
1596      *       return 1
1597      */
1598      /* 
1599       * The upper nodes consist of a sequence of nodenumbers and keys
1600       * When opening a block,  the first node number is read in, and
1601       * offset points to the first key, which is the upper limit of keys
1602       * in the node just read.
1603       */
1604     char *dst = buf;
1605     const char *src;
1606     struct ISAMB_block *p = pp->block[pp->level];
1607     int cmp;
1608     int item_len;
1609     int pos;
1610     int nxtpos;
1611     int descending=0; /* used to prevent a border condition error */
1612     if (!p)
1613         return 0;
1614 #if ISAMB_DEBUG
1615     logf(LOG_DEBUG,"isamb_pp_forward starting [%p] p=%d",pp,p->pos);
1616     
1617     (*pp->isamb->method->codec.log_item)(LOG_DEBUG, untilbuf, "until");
1618     (*pp->isamb->method->codec.log_item)(LOG_DEBUG, buf, "buf");
1619 #endif
1620
1621     while (1)
1622     {
1623         while ( (p->offset == p->size) && !descending )
1624         {  /* end of this block - climb higher */
1625             assert (p->offset <= p->size);
1626 #if ISAMB_DEBUG
1627             logf(LOG_DEBUG,"isamb_pp_forward climbing from l=%d",
1628                             pp->level);
1629 #endif
1630             if (pp->level == 0)
1631             {
1632 #if ISAMB_DEBUG
1633                 logf(LOG_DEBUG,"isamb_pp_forward returning 0 at root");
1634 #endif
1635                 return 0; /* at end of the root, nothing left */
1636             }
1637             close_block(pp->isamb, pp->block[pp->level]);
1638             pp->block[pp->level]=0;
1639             (pp->level)--;
1640             p=pp->block[pp->level];
1641 #if ISAMB_DEBUG
1642             logf(LOG_DEBUG,"isamb_pp_forward climbed to node %d off=%d",
1643                             p->pos, p->offset);
1644 #endif
1645             assert(!p->leaf);
1646             assert(p->offset <= p->size);
1647             /* skip the child we have handled */
1648             if (p->offset != p->size)
1649             { 
1650                 src = p->bytes + p->offset;
1651                 decode_ptr(&src, &item_len);
1652 #if ISAMB_DEBUG         
1653                 (*pp->isamb->method->codec.log_item)(LOG_DEBUG, src,
1654                                                " isamb_pp_forward "
1655                                                "climb skipping old key");
1656 #endif
1657                 src += item_len;
1658                 decode_ptr(&src,&pos);
1659                 p->offset = src - (char*) p->bytes;
1660                 break; /* even if this puts us at the end of the block, we
1661                           need to descend to the last pos. UGLY coding,
1662                           clean up some day */
1663             }
1664         }
1665         if (!p->leaf)
1666         { 
1667             src = p->bytes + p->offset;
1668             if (p->offset == p->size)
1669                 cmp=-2 ; /* descend to the last node, as we have
1670                             no value to cmp */
1671             else
1672             {
1673                 decode_ptr(&src, &item_len);
1674 #if ISAMB_DEBUG
1675                 logf(LOG_DEBUG,"isamb_pp_forward (B) on a high node. "
1676                      "ofs=%d sz=%d nxtpos=%d ",
1677                         p->offset,p->size,pos);
1678                 (*pp->isamb->method->codec.log_item)(LOG_DEBUG, src, "");
1679 #endif
1680                 if (untilbuf)
1681                     cmp=(*pp->isamb->method->compare_item)(untilbuf,src);
1682                 else
1683                     cmp=-2;
1684                 src += item_len;
1685                 decode_ptr(&src,&nxtpos);
1686             }
1687             if (cmp<2)
1688             { 
1689 #if ISAMB_DEBUG
1690                 logf(LOG_DEBUG,"isambb_pp_forward descending l=%d p=%d ",
1691                             pp->level, pos);
1692 #endif
1693                 descending=1; /* prevent climbing for a while */
1694                 ++(pp->level);
1695                 p = open_block(pp->isamb,pos);
1696                 pp->block[pp->level] = p ;
1697                 pp->total_size += p->size;
1698                 (pp->accessed_nodes[pp->maxlevel - pp->level])++;
1699                 pp->no_blocks++;
1700                 if ( !p->leaf)
1701                 { /* block starts with a pos */
1702                     src = p->bytes + p->offset;
1703                     decode_ptr(&src,&pos);
1704                     p->offset=src-(char*) p->bytes;
1705 #if ISAMB_DEBUG
1706                     logf(LOG_DEBUG,"isamb_pp_forward: block %d starts with %d",
1707                                     p->pos, pos);
1708 #endif
1709                 } 
1710             } /* descend to the node */
1711             else
1712             { /* skip the node */
1713                 p->offset = src - (char*) p->bytes;
1714                 pos=nxtpos;
1715                 (pp->skipped_nodes[pp->maxlevel - pp->level -1])++;
1716 #if ISAMB_DEBUG
1717                 logf(LOG_DEBUG,
1718                     "isamb_pp_forward: skipping block on level %d, noting "
1719                      "on %d (%d)",
1720                     pp->level, pp->maxlevel - pp->level-1 , 
1721                     pp->skipped_nodes[pp->maxlevel - pp->level-1 ]);
1722 #endif
1723                 /* 0 is always leafs, 1 is one level above leafs etc, no
1724                  * matter how high tree */
1725             }
1726         } /* not on a leaf */
1727         else
1728         { /* on a leaf */
1729             if (p->offset == p->size) { 
1730                 descending = 0;
1731             }
1732             else
1733             {
1734                 assert (p->offset < p->size);
1735                 src = p->bytes + p->offset;
1736                 dst=buf;
1737                 (*pp->isamb->method->codec.decode)(p->decodeClientData,
1738                                                 &dst, &src);
1739                 p->offset = src - (char*) p->bytes;
1740                 if (untilbuf)
1741                     cmp=(*pp->isamb->method->compare_item)(untilbuf,buf);
1742                 else
1743                     cmp=-2;
1744 #if ISAMB_DEBUG
1745                 logf(LOG_DEBUG,"isamb_pp_forward on a leaf. cmp=%d", 
1746                      cmp);
1747                 (*pp->isamb->method->codec.log_item)(LOG_DEBUG, buf, "");
1748 #endif
1749                 if (cmp <2)
1750                 {
1751 #if ISAMB_DEBUG
1752                     if (untilbuf)
1753                     {
1754                         (*pp->isamb->method->codec.log_item)(
1755                             LOG_DEBUG, buf,  "isamb_pp_forward returning 1");
1756                     }
1757                     else
1758                     {
1759                         (*pp->isamb->method->codec.log_item)(
1760                             LOG_DEBUG, buf, "isamb_pp_read returning 1 (fwd)");
1761                     }
1762 #endif
1763                     pp->returned_numbers++;
1764                     return 1;
1765                 }
1766                 else
1767                     pp->skipped_numbers++;
1768             }
1769         } /* leaf */
1770     } /* main loop */
1771 }
1772
1773 #elif NEW_FORWARD == 2
1774
1775 int isamb_pp_forward (ISAMB_PP pp, void *buf, const void *untilb)
1776 {
1777     char *dst = buf;
1778     const char *src;
1779     struct ISAMB_block *p = pp->block[pp->level];
1780     if (!p)
1781         return 0;
1782
1783 again:
1784     while (p->offset == p->size)
1785     {
1786         int pos, item_len;
1787         while (p->offset == p->size)
1788         {
1789             if (pp->level == 0)
1790                 return 0;
1791             close_block (pp->isamb, pp->block[pp->level]);
1792             pp->block[pp->level] = 0;
1793             (pp->level)--;
1794             p = pp->block[pp->level];
1795             assert (!p->leaf);  
1796         }
1797
1798         assert(!p->leaf);
1799         src = p->bytes + p->offset;
1800         
1801         decode_ptr (&src, &item_len);
1802         src += item_len;
1803         decode_ptr (&src, &pos);
1804         
1805         p->offset = src - (char*) p->bytes;
1806
1807         src = p->bytes + p->offset;
1808
1809         while(1)
1810         {
1811             if (!untilb || p->offset == p->size)
1812                 break;
1813             assert(p->offset < p->size);
1814             decode_ptr (&src, &item_len);
1815             if ((*pp->isamb->method->compare_item)(untilb, src) <= 1)
1816                 break;
1817             src += item_len;
1818             decode_ptr (&src, &pos);
1819             p->offset = src - (char*) p->bytes;
1820         }
1821
1822         pp->level++;
1823
1824         while (1)
1825         {
1826             pp->block[pp->level] = p = open_block (pp->isamb, pos);
1827
1828             pp->total_size += p->size;
1829             pp->no_blocks++;
1830             
1831             if (p->leaf) 
1832             {
1833                 break;
1834             }
1835             
1836             src = p->bytes + p->offset;
1837             while(1)
1838             {
1839                 decode_ptr (&src, &pos);
1840                 p->offset = src - (char*) p->bytes;
1841                 
1842                 if (!untilb || p->offset == p->size)
1843                     break;
1844                 assert(p->offset < p->size);
1845                 decode_ptr (&src, &item_len);
1846                 if ((*pp->isamb->method->compare_item)(untilb, src) <= 1)
1847                     break;
1848                 src += item_len;
1849             }
1850             pp->level++;
1851         }
1852     }
1853     assert (p->offset < p->size);
1854     assert (p->leaf);
1855     while(1)
1856     {
1857         char *dst0 = dst;
1858         src = p->bytes + p->offset;
1859         (*pp->isamb->method->codec.decode)(p->decodeClientData, &dst, &src);
1860         p->offset = src - (char*) p->bytes;
1861         if (!untilb || (*pp->isamb->method->compare_item)(untilb, dst0) <= 1)
1862             break;
1863         dst = dst0;
1864         if (p->offset == p->size) goto again;
1865     }
1866     /* key_logdump_txt(LOG_DEBUG,buf, "isamb_pp_read returning 1"); */
1867     return 1;
1868 }
1869
1870 #endif
1871
1872 int isamb_pp_num (ISAMB_PP pp)
1873 {
1874     assert(pp); /* shut up about unused arguments */
1875     return 1;
1876 }
1877
1878 static void isamb_pp_leaf_pos( ISAMB_PP pp, 
1879                                double *current, double *total, 
1880                                void *dummybuf )
1881 {
1882     struct ISAMB_block *p = pp->block[pp->level];
1883     const char *src=p->bytes;
1884     char *end=p->bytes+p->size;
1885     char *cur=p->bytes+p->offset;
1886     char *dst;
1887     void *decodeClientData;
1888     assert(p->offset <= p->size);
1889     assert(cur <= end);
1890     assert(p->leaf);
1891     *current=0;
1892     *total=0;
1893
1894     decodeClientData = (pp->isamb->method->codec.start)();
1895
1896     while(src < end) {
1897         dst=dummybuf;
1898         (*pp->isamb->method->codec.decode)(decodeClientData,&dst, &src);
1899         assert(dst<(char*) dummybuf+100); /*FIXME */
1900         (*total)++;
1901         if (src<=cur)
1902              (*current)++;
1903     }
1904 #if ISAMB_DEBUG
1905     logf(LOG_DEBUG, "isamb_pp_leaf_pos: cur= %0.1f tot=%0.1f "
1906                     " ofs=%d sz=%d lev=%d",
1907                     *current, *total, p->offset, p->size, pp->level);
1908 #endif
1909     assert(src==end);
1910     (pp->isamb->method->codec.stop)(decodeClientData);
1911 }
1912
1913 static void isamb_pp_upper_pos( ISAMB_PP pp, double *current, double *total, 
1914                                 double size, int level )
1915 { /* estimates total/current occurrences from here up, excl leaf */
1916     struct ISAMB_block *p = pp->block[level];
1917     const char *src=p->bytes;
1918     char *end=p->bytes+p->size;
1919     char *cur=p->bytes+p->offset;
1920     zint item_size;
1921     ISAMB_P child;
1922     
1923     assert(level>=0);
1924     assert(!p->leaf);
1925    
1926 #if  1 // ISAMB_DEBUG
1927     logf(LOG_DEBUG,"isamb_pp_upper_pos at beginning     l=%d "
1928                    "cur=%0.1f tot=%0.1f "
1929                    " ofs=%d sz=%d pos=" ZINT_FORMAT, 
1930                    level, *current, *total, p->offset, p->size, p->pos);
1931 #endif    
1932     assert (p->offset <= p->size);
1933     decode_ptr (&src, &child ); /* first child */
1934     if (src!=cur) {
1935         *total += size;
1936         if (src < cur)
1937             *current +=size;
1938     }
1939     while(src < end) {
1940         decode_ptr (&src, &item_size ); 
1941         assert(src+item_size<=end);
1942         src += item_size;
1943         decode_ptr (&src, &child );
1944         if (src!=cur) {
1945             *total += size;
1946             if (src < cur)
1947                 *current +=size;
1948         }
1949     }
1950 #if ISAMB_DEBUG
1951     logf(LOG_DEBUG,"isamb_pp_upper_pos before recursion l=%d "
1952                    "cur=%0.1f tot=%0.1f "
1953                    " ofs=%d sz=%d pos=" ZINT_FORMAT, 
1954                    level, *current, *total, p->offset, p->size, p->pos);
1955 #endif    
1956     if (level>0)
1957         isamb_pp_upper_pos(pp, current, total, *total, level-1);
1958 } /* upper_pos */
1959
1960 void isamb_pp_pos( ISAMB_PP pp, double *current, double *total )
1961 { /* return an estimate of the current position and of the total number of */
1962   /* occureences in the isam tree, based on the current leaf */
1963         /* FIXME - Isam-B ought to know how many we have, so we could return */
1964         /* that directly */
1965     struct ISAMB_block *p = pp->block[pp->level];
1966     char dummy[100]; /* 100 bytes/entry must be enough */
1967     assert(total);
1968     assert(current);
1969     assert(p->leaf);
1970     isamb_pp_leaf_pos(pp,current, total, dummy);
1971     if (pp->level>0)
1972         isamb_pp_upper_pos(pp, current, total, *total, pp->level-1);
1973     *current = (double) pp->returned_numbers;
1974     /* use the precise number, since we have it! */
1975 #if ISAMB_DEBUG
1976     logf(LOG_LOG, "isamb_pp_pos returning: cur= %0.1f tot=%0.1f rn="ZINT_FORMAT,
1977                     *current, *total, pp->returned_numbers);
1978 #endif
1979 }