Added the scope parameter to rsets, and using it in all forwards and reads
[idzebra-moved-to-github.git] / isamb / isamb.c
1 /* $Id: isamb.c,v 1.58 2004-09-09 10:08:05 heikki Exp $
2    Copyright (C) 1995,1996,1997,1998,1999,2000,2001,2002,2003,2004
3    Index Data Aps
4
5 This file is part of the Zebra server.
6
7 Zebra is free software; you can redistribute it and/or modify it under
8 the terms of the GNU General Public License as published by the Free
9 Software Foundation; either version 2, or (at your option) any later
10 version.
11
12 Zebra is distributed in the hope that it will be useful, but WITHOUT ANY
13 WARRANTY; without even the implied warranty of MERCHANTABILITY or
14 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
15 for more details.
16
17 You should have received a copy of the GNU General Public License
18 along with Zebra; see the file LICENSE.zebra.  If not, write to the
19 Free Software Foundation, 59 Temple Place - Suite 330, Boston, MA
20 02111-1307, USA.
21 */
22
23 #include <string.h>
24 #include <yaz/xmalloc.h>
25 #include <yaz/log.h>
26 #include <isamb.h>
27 #include <assert.h>
28
29 #ifndef ISAMB_DEBUG
30 #define ISAMB_DEBUG 0
31 #endif
32
33 #define ISAMB_MAJOR_VERSION 2
34 #define ISAMB_MINOR_VERSION 0
35
36 struct ISAMB_head {
37     zint first_block;
38     zint last_block;
39     zint free_list;
40     zint no_items;
41     int block_size;
42     int block_max;
43     int block_offset;
44 };
45
46 /* maximum size of encoded buffer */
47 #define DST_ITEM_MAX 256
48
49 #define ISAMB_MAX_LEVEL 10
50 /* approx 2*max page + max size of item */
51 #define DST_BUF_SIZE 16840
52
53 #define ISAMB_CACHE_ENTRY_SIZE 4096
54
55 /* CAT_MAX: _must_ be power of 2 */
56 #define CAT_MAX 4
57 #define CAT_MASK (CAT_MAX-1)
58 /* CAT_NO: <= CAT_MAX */
59 #define CAT_NO 4
60
61 /* ISAMB_PTR_CODEC=1 var, =0 fixed */
62 #define ISAMB_PTR_CODEC 1
63
64 struct ISAMB_cache_entry {
65     ISAMB_P pos;
66     unsigned char *buf;
67     int dirty;
68     int hits;
69     struct ISAMB_cache_entry *next;
70 };
71
72 struct ISAMB_file {
73     BFile bf;
74     int head_dirty;
75     struct ISAMB_head head;
76     struct ISAMB_cache_entry *cache_entries;
77 };
78
79 struct ISAMB_s {
80     BFiles bfs;
81     ISAMC_M *method;
82
83     struct ISAMB_file *file;
84     int no_cat;
85     int cache; /* 0=no cache, 1=use cache, -1=dummy isam (for testing only) */
86     int log_io;        /* log level for bf_read/bf_write calls */
87     int log_freelist;  /* log level for freelist handling */
88     zint skipped_numbers; /* on a leaf node */
89     zint returned_numbers; 
90     zint skipped_nodes[ISAMB_MAX_LEVEL]; /* [0]=skipped leaves, 1=higher etc */
91     zint accessed_nodes[ISAMB_MAX_LEVEL]; /* nodes we did not skip */
92 };
93
94 struct ISAMB_block {
95     ISAMB_P pos;
96     int cat;
97     int size;
98     int leaf;
99     int dirty;
100     int deleted;
101     int offset;
102     zint no_items;  /* number of nodes in this + children */
103     char *bytes;
104     char *cbuf;
105     unsigned char *buf;
106     void *decodeClientData;
107     int log_rw;
108 };
109
110 struct ISAMB_PP_s {
111     ISAMB isamb;
112     ISAMB_P pos;
113     int level;
114     int maxlevel; /* total depth */
115     zint total_size;
116     zint no_blocks;
117     zint skipped_numbers; /* on a leaf node */
118     zint returned_numbers; 
119     zint skipped_nodes[ISAMB_MAX_LEVEL]; /* [0]=skipped leaves, 1=higher etc */
120     zint accessed_nodes[ISAMB_MAX_LEVEL]; /* nodes we did not skip */
121     struct ISAMB_block **block;
122     int scope;  /* on what level we forward */
123 };
124
125
126 #if ISAMB_PTR_CODEC
127 static void encode_ptr (char **dst, zint pos)
128 {
129     unsigned char *bp = (unsigned char*) *dst;
130
131     while (pos > 127)
132     {
133          *bp++ = 128 | (pos & 127);
134          pos = pos >> 7;
135     }
136     *bp++ = pos;
137     *dst = (char *) bp;
138 }
139 #else
140 static void encode_ptr (char **dst, zint pos)
141 {
142     memcpy(*dst, &pos, sizeof(pos));
143     (*dst) += sizeof(pos);
144 }
145 #endif
146
147 #if ISAMB_PTR_CODEC
148 static void decode_ptr (const char **src1, zint *pos)
149 {
150     const unsigned char **src = (const unsigned char **) src1;
151     zint d = 0;
152     unsigned char c;
153     unsigned r = 0;
154
155     while (((c = *(*src)++) & 128))
156     {
157         d += ((zint) (c & 127) << r);
158         r += 7;
159     }
160     d += ((zint) c << r);
161     *pos = d;
162 }
163 #else
164 static void decode_ptr (const char **src, zint *pos)
165 {
166      memcpy (pos, *src, sizeof(*pos));
167      (*src) += sizeof(*pos);
168 }
169 #endif
170
171 ISAMB isamb_open (BFiles bfs, const char *name, int writeflag, ISAMC_M *method,
172                   int cache)
173 {
174     ISAMB isamb = xmalloc (sizeof(*isamb));
175     int i, b_size = 32;
176
177     isamb->bfs = bfs;
178     isamb->method = (ISAMC_M *) xmalloc (sizeof(*method));
179     memcpy (isamb->method, method, sizeof(*method));
180     isamb->no_cat = CAT_NO;
181     isamb->log_io = 0;
182     isamb->log_freelist = 0;
183     isamb->cache = cache;
184     isamb->skipped_numbers=0;
185     isamb->returned_numbers=0;
186     for (i=0;i<ISAMB_MAX_LEVEL;i++)
187       isamb->skipped_nodes[i]= isamb->accessed_nodes[i]=0;
188
189     assert (cache == 0);
190     isamb->file = xmalloc (sizeof(*isamb->file) * isamb->no_cat);
191     for (i = 0; i < isamb->no_cat; i++)
192     {
193         char fname[DST_BUF_SIZE];
194         char hbuf[DST_BUF_SIZE];
195         isamb->file[i].cache_entries = 0;
196         isamb->file[i].head_dirty = 0;
197         sprintf (fname, "%s%c", name, i+'A');
198         if (cache)
199             isamb->file[i].bf = bf_open (bfs, fname, ISAMB_CACHE_ENTRY_SIZE,
200                                          writeflag);
201         else
202             isamb->file[i].bf = bf_open (bfs, fname, b_size, writeflag);
203
204         /* fill-in default values (for empty isamb) */
205         isamb->file[i].head.first_block = ISAMB_CACHE_ENTRY_SIZE/b_size+1;
206         isamb->file[i].head.last_block = isamb->file[i].head.first_block;
207         isamb->file[i].head.block_size = b_size;
208         if (i == isamb->no_cat-1 || b_size > 128)
209             isamb->file[i].head.block_offset = 8;
210         else
211             isamb->file[i].head.block_offset = 4;
212         isamb->file[i].head.block_max =
213             b_size - isamb->file[i].head.block_offset;
214         isamb->file[i].head.free_list = 0;
215         if (bf_read (isamb->file[i].bf, 0, 0, 0, hbuf))
216         {
217             /* got header assume "isamb"major minor len can fit in 16 bytes */
218             zint zint_tmp;
219             int major, minor, len, pos = 0;
220             int left;
221             const char *src = 0;
222             if (memcmp(hbuf, "isamb", 5))
223             {
224                 logf(LOG_WARN, "bad isamb header for file %s", fname);
225                 return 0;
226             }
227             if (sscanf(hbuf+5, "%d %d %d", &major, &minor, &len) != 3)
228             {
229                 logf(LOG_WARN, "bad isamb header for file %s", fname);
230                 return 0;
231             }
232             if (major != ISAMB_MAJOR_VERSION)
233             {
234                 logf(LOG_WARN, "bad major version for file %s %d, must be %d",
235                      fname, major, ISAMB_MAJOR_VERSION);
236                 return 0;
237             }
238             for (left = len - b_size; left > 0; left = left - b_size)
239             {
240                 pos++;
241                 if (!bf_read (isamb->file[i].bf, pos, 0, 0, hbuf + pos*b_size))
242                 {
243                     logf(LOG_WARN, "truncated isamb header for " 
244                          "file=%s len=%d pos=%d",
245                          fname, len, pos);
246                     return 0;
247                 }
248             }
249             src = hbuf + 16;
250             decode_ptr(&src, &isamb->file[i].head.first_block);
251             decode_ptr(&src, &isamb->file[i].head.last_block);
252             decode_ptr(&src, &zint_tmp);
253             isamb->file[i].head.block_size = zint_tmp;
254             decode_ptr(&src, &zint_tmp);
255             isamb->file[i].head.block_max = zint_tmp;
256             decode_ptr(&src, &isamb->file[i].head.free_list);
257         }
258         assert (isamb->file[i].head.block_size >= isamb->file[i].head.block_offset);
259         isamb->file[i].head_dirty = 0;
260         assert(isamb->file[i].head.block_size == b_size);
261         b_size = b_size * 4;
262     }
263 #if ISAMB_DEBUG
264     logf(LOG_WARN, "isamb debug enabled. Things will be slower than usual");
265 #endif
266     return isamb;
267 }
268
269 static void flush_blocks (ISAMB b, int cat)
270 {
271     while (b->file[cat].cache_entries)
272     {
273         struct ISAMB_cache_entry *ce_this = b->file[cat].cache_entries;
274         b->file[cat].cache_entries = ce_this->next;
275
276         if (ce_this->dirty)
277         {
278             yaz_log (b->log_io, "bf_write: flush_blocks");
279             bf_write (b->file[cat].bf, ce_this->pos, 0, 0, ce_this->buf);
280         }
281         xfree (ce_this->buf);
282         xfree (ce_this);
283     }
284 }
285
286 static int get_block (ISAMB b, ISAMC_P pos, char *userbuf, int wr)
287 {
288     int cat = (int) (pos&CAT_MASK);
289     int off = (int) (((pos/CAT_MAX) & 
290                (ISAMB_CACHE_ENTRY_SIZE / b->file[cat].head.block_size - 1))
291         * b->file[cat].head.block_size);
292     zint norm = pos / (CAT_MASK*ISAMB_CACHE_ENTRY_SIZE / b->file[cat].head.block_size);
293     int no = 0;
294     struct ISAMB_cache_entry **ce, *ce_this = 0, **ce_last = 0;
295
296     if (!b->cache)
297         return 0;
298
299     assert (ISAMB_CACHE_ENTRY_SIZE >= b->file[cat].head.block_size);
300     for (ce = &b->file[cat].cache_entries; *ce; ce = &(*ce)->next, no++)
301     {
302         ce_last = ce;
303         if ((*ce)->pos == norm)
304         {
305             ce_this = *ce;
306             *ce = (*ce)->next;   /* remove from list */
307             
308             ce_this->next = b->file[cat].cache_entries;  /* move to front */
309             b->file[cat].cache_entries = ce_this;
310             
311             if (wr)
312             {
313                 memcpy (ce_this->buf + off, userbuf, 
314                         b->file[cat].head.block_size);
315                 ce_this->dirty = 1;
316             }
317             else
318                 memcpy (userbuf, ce_this->buf + off,
319                         b->file[cat].head.block_size);
320             return 1;
321         }
322     }
323     if (no >= 40)
324     {
325         assert (no == 40);
326         assert (ce_last && *ce_last);
327         ce_this = *ce_last;
328         *ce_last = 0;  /* remove the last entry from list */
329         if (ce_this->dirty)
330         {
331             yaz_log (b->log_io, "bf_write: get_block");
332             bf_write (b->file[cat].bf, ce_this->pos, 0, 0, ce_this->buf);
333         }
334         xfree (ce_this->buf);
335         xfree (ce_this);
336     }
337     ce_this = xmalloc (sizeof(*ce_this));
338     ce_this->next = b->file[cat].cache_entries;
339     b->file[cat].cache_entries = ce_this;
340     ce_this->buf = xmalloc (ISAMB_CACHE_ENTRY_SIZE);
341     ce_this->pos = norm;
342     yaz_log (b->log_io, "bf_read: get_block");
343     if (!bf_read (b->file[cat].bf, norm, 0, 0, ce_this->buf))
344         memset (ce_this->buf, 0, ISAMB_CACHE_ENTRY_SIZE);
345     if (wr)
346     {
347         memcpy (ce_this->buf + off, userbuf, b->file[cat].head.block_size);
348         ce_this->dirty = 1;
349     }
350     else
351     {
352         ce_this->dirty = 0;
353         memcpy (userbuf, ce_this->buf + off, b->file[cat].head.block_size);
354     }
355     return 1;
356 }
357
358
359 void isamb_close (ISAMB isamb)
360 {
361     int i;
362     for (i=0;isamb->accessed_nodes[i];i++)
363         logf(LOG_DEBUG,"isamb_close  level leaf-%d: "ZINT_FORMAT" read, "
364                         ZINT_FORMAT" skipped",
365              i, isamb->accessed_nodes[i], isamb->skipped_nodes[i]);
366     logf(LOG_DEBUG,"isamb_close returned "ZINT_FORMAT" values, "
367                    "skipped "ZINT_FORMAT,
368          isamb->skipped_numbers, isamb->returned_numbers);
369     for (i = 0; i<isamb->no_cat; i++)
370     {
371         flush_blocks (isamb, i);
372         if (isamb->file[i].head_dirty)
373         {
374             char hbuf[DST_BUF_SIZE];
375             int major = ISAMB_MAJOR_VERSION;
376             int minor = ISAMB_MINOR_VERSION;
377             int len = 16;
378             char *dst = hbuf + 16;
379             int pos = 0, left;
380             int b_size = isamb->file[i].head.block_size;
381
382             encode_ptr(&dst, isamb->file[i].head.first_block);
383             encode_ptr(&dst, isamb->file[i].head.last_block);
384             encode_ptr(&dst, isamb->file[i].head.block_size);
385             encode_ptr(&dst, isamb->file[i].head.block_max);
386             encode_ptr(&dst, isamb->file[i].head.free_list);
387             memset(dst, '\0', 16); /* ensure no random bytes are written */
388
389             len = dst - hbuf;
390
391             /* print exactly 16 bytes (including trailing 0) */
392             sprintf(hbuf, "isamb%02d %02d %02d\r\n", major, minor, len);
393
394             bf_write (isamb->file[i].bf, pos, 0, 0, hbuf);
395
396             for (left = len - b_size; left > 0; left = left - b_size)
397             {
398                 pos++;
399                 bf_write (isamb->file[i].bf, pos, 0, 0, hbuf + pos*b_size);
400             }
401         }
402         bf_close (isamb->file[i].bf);
403     }
404     xfree (isamb->file);
405     xfree (isamb->method);
406     xfree (isamb);
407 }
408
409 /* open_block: read one block at pos.
410    Decode leading sys bytes .. consisting of
411    Offset:Meaning
412    0: leader byte, != 0 leaf, == 0, non-leaf
413    1-2: used size of block
414    3-7*: number of items and all children
415    
416    * Reserve 5 bytes for large block sizes. 1 for small ones .. Number
417    of items. We can thus have at most 2^40 nodes. 
418 */
419 static struct ISAMB_block *open_block (ISAMB b, ISAMC_P pos)
420 {
421     int cat = (int) (pos&CAT_MASK);
422     const char *src;
423     int offset = b->file[cat].head.block_offset;
424     struct ISAMB_block *p;
425     if (!pos)
426         return 0;
427     p = xmalloc (sizeof(*p));
428     p->pos = pos;
429     p->cat = (int) (pos & CAT_MASK);
430     p->buf = xmalloc (b->file[cat].head.block_size);
431     p->cbuf = 0;
432
433     if (!get_block (b, pos, p->buf, 0))
434     {
435         yaz_log (b->log_io, "bf_read: open_block");
436         if (!bf_read (b->file[cat].bf, pos/CAT_MAX, 0, 0, p->buf))
437         {
438             yaz_log (LOG_FATAL, "isamb: read fail for pos=%ld block=%ld",
439                      (long) pos, (long) pos/CAT_MAX);
440             abort();
441         }
442     }
443     p->bytes = p->buf + offset;
444     p->leaf = p->buf[0];
445     p->size = (p->buf[1] + 256 * p->buf[2]) - offset;
446     if (p->size < 0)
447     {
448         yaz_log (LOG_FATAL, "Bad block size %d in pos=" ZINT_FORMAT "\n",
449                  p->size, pos);
450     }
451     assert (p->size >= 0);
452     src = p->buf + 3;
453     decode_ptr(&src, &p->no_items);
454
455     p->offset = 0;
456     p->dirty = 0;
457     p->deleted = 0;
458     p->decodeClientData = (*b->method->codec.start)();
459     return p;
460 }
461
462 struct ISAMB_block *new_block (ISAMB b, int leaf, int cat)
463 {
464     struct ISAMB_block *p;
465
466     p = xmalloc (sizeof(*p));
467     p->buf = xmalloc (b->file[cat].head.block_size);
468
469     if (!b->file[cat].head.free_list)
470     {
471         zint block_no;
472         block_no = b->file[cat].head.last_block++;
473         p->pos = block_no * CAT_MAX + cat;
474     }
475     else
476     {
477         p->pos = b->file[cat].head.free_list;
478         assert((p->pos & CAT_MASK) == cat);
479         if (!get_block (b, p->pos, p->buf, 0))
480         {
481             yaz_log (b->log_io, "bf_read: new_block");
482             if (!bf_read (b->file[cat].bf, p->pos/CAT_MAX, 0, 0, p->buf))
483             {
484                 yaz_log (LOG_FATAL, "isamb: read fail for pos=%ld block=%ld",
485                          (long) p->pos/CAT_MAX, (long) p->pos/CAT_MAX);
486                 abort ();
487             }
488         }
489         yaz_log (b->log_freelist, "got block " ZINT_FORMAT " from freelist %d:" ZINT_FORMAT, p->pos,
490                  cat, p->pos/CAT_MAX);
491         memcpy (&b->file[cat].head.free_list, p->buf, sizeof(int));
492     }
493     p->cat = cat;
494     b->file[cat].head_dirty = 1;
495     memset (p->buf, 0, b->file[cat].head.block_size);
496     p->bytes = p->buf + b->file[cat].head.block_offset;
497     p->leaf = leaf;
498     p->size = 0;
499     p->dirty = 1;
500     p->deleted = 0;
501     p->offset = 0;
502     p->no_items = 0;
503     p->decodeClientData = (*b->method->codec.start)();
504     return p;
505 }
506
507 struct ISAMB_block *new_leaf (ISAMB b, int cat)
508 {
509     return new_block (b, 1, cat);
510 }
511
512
513 struct ISAMB_block *new_int (ISAMB b, int cat)
514 {
515     return new_block (b, 0, cat);
516 }
517
518 static void check_block (ISAMB b, struct ISAMB_block *p)
519 {
520     assert(b); /* mostly to make the compiler shut up about unused b */
521     if (p->leaf)
522     {
523         ;
524     }
525     else
526     {
527         /* sanity check */
528         char *startp = p->bytes;
529         const char *src = startp;
530         char *endp = p->bytes + p->size;
531         ISAMB_P pos;
532             
533         decode_ptr (&src, &pos);
534         assert ((pos&CAT_MASK) == p->cat);
535         while (src != endp)
536         {
537             zint item_len;
538             decode_ptr (&src, &item_len);
539             assert (item_len > 0 && item_len < 80);
540             src += item_len;
541             decode_ptr (&src, &pos);
542             if ((pos&CAT_MASK) != p->cat)
543             {
544                 assert ((pos&CAT_MASK) == p->cat);
545             }
546         }
547     }
548 }
549
550 void close_block (ISAMB b, struct ISAMB_block *p)
551 {
552     if (!p)
553         return;
554     if (p->deleted)
555     {
556         yaz_log (b->log_freelist, "release block " ZINT_FORMAT " from freelist %d:" ZINT_FORMAT,
557                  p->pos, p->cat, p->pos/CAT_MAX);
558         memcpy (p->buf, &b->file[p->cat].head.free_list, sizeof(int));
559         b->file[p->cat].head.free_list = p->pos;
560         if (!get_block (b, p->pos, p->buf, 1))
561         {
562             yaz_log (b->log_io, "bf_write: close_block (deleted)");
563             bf_write (b->file[p->cat].bf, p->pos/CAT_MAX, 0, 0, p->buf);
564         }
565     }
566     else if (p->dirty)
567     {
568         int offset = b->file[p->cat].head.block_offset;
569         int size = p->size + offset;
570         char *dst =  p->buf + 3;
571         assert (p->size >= 0);
572         
573         /* memset becuase encode_ptr usually does not write all bytes */
574         memset(p->buf, 0, b->file[p->cat].head.block_offset);
575         p->buf[0] = p->leaf;
576         p->buf[1] = size & 255;
577         p->buf[2] = size >> 8;
578         encode_ptr(&dst, p->no_items);
579         check_block(b, p);
580         if (!get_block (b, p->pos, p->buf, 1))
581         {
582             yaz_log (b->log_io, "bf_write: close_block");
583             bf_write (b->file[p->cat].bf, p->pos/CAT_MAX, 0, 0, p->buf);
584         }
585     }
586     (*b->method->codec.stop)(p->decodeClientData);
587     xfree (p->buf);
588     xfree (p);
589 }
590
591 int insert_sub (ISAMB b, struct ISAMB_block **p,
592                 void *new_item, int *mode,
593                 ISAMC_I *stream,
594                 struct ISAMB_block **sp,
595                 void *sub_item, int *sub_size,
596                 const void *max_item);
597
598 int insert_int (ISAMB b, struct ISAMB_block *p, void *lookahead_item,
599                 int *mode,
600                 ISAMC_I *stream, struct ISAMB_block **sp,
601                 void *split_item, int *split_size, const void *last_max_item)
602 {
603     char *startp = p->bytes;
604     const char *src = startp;
605     char *endp = p->bytes + p->size;
606     ISAMB_P pos;
607     struct ISAMB_block *sub_p1 = 0, *sub_p2 = 0;
608     char sub_item[DST_ITEM_MAX];
609     int sub_size;
610     int more = 0;
611     zint diff_terms = 0;
612
613     *sp = 0;
614
615     assert(p->size >= 0);
616     decode_ptr (&src, &pos);
617     while (src != endp)
618     {
619         zint item_len;
620         int d;
621         const char *src0 = src;
622         decode_ptr (&src, &item_len);
623         d = (*b->method->compare_item)(src, lookahead_item);
624         if (d > 0)
625         {
626             sub_p1 = open_block (b, pos);
627             assert (sub_p1);
628             diff_terms -= sub_p1->no_items;
629             more = insert_sub (b, &sub_p1, lookahead_item, mode,
630                                stream, &sub_p2, 
631                                sub_item, &sub_size, src);
632             diff_terms += sub_p1->no_items;
633             src = src0;
634             break;
635         }
636         src += item_len;
637         decode_ptr (&src, &pos);
638     }
639     if (!sub_p1)
640     {
641         sub_p1 = open_block (b, pos);
642         assert (sub_p1);
643         diff_terms -= sub_p1->no_items;
644         more = insert_sub (b, &sub_p1, lookahead_item, mode, stream, &sub_p2, 
645                            sub_item, &sub_size, last_max_item);
646         diff_terms += sub_p1->no_items;
647     }
648     if (sub_p2)
649         diff_terms += sub_p2->no_items;
650     if (diff_terms)
651     {
652         p->dirty = 1;
653         p->no_items += diff_terms;
654     }
655     if (sub_p2)
656     {
657         /* there was a split - must insert pointer in this one */
658         char dst_buf[DST_BUF_SIZE];
659         char *dst = dst_buf;
660
661         assert (sub_size < 80 && sub_size > 1);
662
663         memcpy (dst, startp, src - startp);
664                 
665         dst += src - startp;
666
667         encode_ptr (&dst, sub_size);      /* sub length and item */
668         memcpy (dst, sub_item, sub_size);
669         dst += sub_size;
670
671         encode_ptr (&dst, sub_p2->pos);   /* pos */
672
673         if (endp - src)                   /* remaining data */
674         {
675             memcpy (dst, src, endp - src);
676             dst += endp - src;
677         }
678         p->size = dst - dst_buf;
679         assert (p->size >= 0);
680
681
682         if (p->size <= b->file[p->cat].head.block_max)
683         {
684             memcpy (startp, dst_buf, dst - dst_buf);
685         }
686         else
687         {
688             struct ISAMB_block *sub_p3;
689             zint split_size_tmp;
690             zint no_items_first_half = 0;
691             int p_new_size;
692             const char *half;
693             src = dst_buf;
694             endp = dst;
695
696             half = src + b->file[p->cat].head.block_size/2;
697             decode_ptr (&src, &pos);
698
699             /* read sub block so we can get no_items for it */
700             sub_p3 = open_block(b, pos);
701             no_items_first_half += sub_p3->no_items;
702             close_block(b, sub_p3);
703
704             while (src <= half)
705             {
706                 decode_ptr (&src, &split_size_tmp);
707                 *split_size = (int) split_size_tmp;
708
709                 src += *split_size;
710                 decode_ptr (&src, &pos);
711
712                 /* read sub block so we can get no_items for it */
713                 sub_p3 = open_block(b, pos);
714                 no_items_first_half += sub_p3->no_items;
715                 close_block(b, sub_p3);
716             }
717             /*  p is first half */
718             p_new_size = src - dst_buf;
719             memcpy (p->bytes, dst_buf, p_new_size);
720
721             decode_ptr (&src, &split_size_tmp);
722             *split_size = (int) split_size_tmp;
723             memcpy (split_item, src, *split_size);
724             src += *split_size;
725
726             /*  *sp is second half */
727             *sp = new_int (b, p->cat);
728             (*sp)->size = endp - src;
729             memcpy ((*sp)->bytes, src, (*sp)->size);
730
731             p->size = p_new_size;
732
733             /*  adjust no_items in first&second half */
734             (*sp)->no_items = p->no_items - no_items_first_half;
735             p->no_items = no_items_first_half;
736         }
737         p->dirty = 1;
738         close_block (b, sub_p2);
739     }
740     close_block (b, sub_p1);
741     return more;
742 }
743
744 int insert_leaf (ISAMB b, struct ISAMB_block **sp1, void *lookahead_item,
745                  int *lookahead_mode, ISAMC_I *stream,
746                  struct ISAMB_block **sp2,
747                  void *sub_item, int *sub_size,
748                  const void *max_item)
749 {
750     struct ISAMB_block *p = *sp1;
751     char *endp = 0;
752     const char *src = 0;
753     char dst_buf[DST_BUF_SIZE], *dst = dst_buf;
754     int new_size;
755     void *c1 = (*b->method->codec.start)();
756     void *c2 = (*b->method->codec.start)();
757     int more = 1;
758     int quater = b->file[b->no_cat-1].head.block_max / 4;
759     char *mid_cut = dst_buf + quater * 2;
760     char *tail_cut = dst_buf + quater * 3;
761     char *maxp = dst_buf + b->file[b->no_cat-1].head.block_max;
762     char *half1 = 0;
763     char *half2 = 0;
764     char cut_item_buf[DST_ITEM_MAX];
765     int cut_item_size = 0;
766     int no_items = 0;    /* number of items (total) */
767     int no_items_1 = 0;  /* number of items (first half) */
768
769     if (p && p->size)
770     {
771         char file_item_buf[DST_ITEM_MAX];
772         char *file_item = file_item_buf;
773             
774         src = p->bytes;
775         endp = p->bytes + p->size;
776         (*b->method->codec.decode)(c1, &file_item, &src);
777         while (1)
778         {
779             const char *dst_item = 0; /* resulting item to be inserted */
780             char *lookahead_next;
781             int d = -1;
782             
783             if (lookahead_item)
784                 d = (*b->method->compare_item)(file_item_buf, lookahead_item);
785             
786             /* d now holds comparison between existing file item and
787                lookahead item 
788                d = 0: equal
789                d > 0: lookahead before file
790                d < 0: lookahead after file
791             */
792             if (d > 0)
793             {
794                 /* lookahead must be inserted */
795                 dst_item = lookahead_item;
796                 /* if this is not an insertion, it's really bad .. */
797                 if (!*lookahead_mode)
798                 {
799                     yaz_log (LOG_WARN, "isamb: Inconsistent register (1)");
800                     assert (*lookahead_mode);
801                 }
802             }
803             else
804                 dst_item = file_item_buf;
805
806                
807             if (!*lookahead_mode && d == 0)
808             {
809                 /* it's a deletion and they match so there is nothing to be
810                    inserted anyway .. But mark the thing bad (file item
811                    was part of input.. The item will not be part of output */
812                 p->dirty = 1;
813             }
814             else if (!half1 && dst > mid_cut)
815             {
816                 /* we have reached the splitting point for the first time */
817                 const char *dst_item_0 = dst_item;
818                 half1 = dst; /* candidate for splitting */
819
820                 /* encode the resulting item */
821                 (*b->method->codec.encode)(c2, &dst, &dst_item);
822                 
823                 cut_item_size = dst_item - dst_item_0;
824                 assert(cut_item_size > 0);
825                 memcpy (cut_item_buf, dst_item_0, cut_item_size);
826                 
827                 half2 = dst;
828                 no_items_1 = no_items;
829                 no_items++;
830             }
831             else
832             {
833                 /* encode the resulting item */
834                 (*b->method->codec.encode)(c2, &dst, &dst_item);
835                 no_items++;
836             }
837
838             /* now move "pointers" .. result has been encoded .. */
839             if (d > 0)  
840             {
841                 /* we must move the lookahead pointer */
842
843                 if (dst > maxp)
844                     /* no more room. Mark lookahead as "gone".. */
845                     lookahead_item = 0;
846                 else
847                 {
848                     /* move it really.. */
849                     lookahead_next = lookahead_item;
850                     if (!(*stream->read_item)(stream->clientData,
851                                               &lookahead_next,
852                                               lookahead_mode))
853                     {
854                         /* end of stream reached: no "more" and no lookahead */
855                         lookahead_item = 0;
856                         more = 0;
857                     }
858                     if (lookahead_item && max_item &&
859                         (*b->method->compare_item)(max_item, lookahead_item) <= 0)
860                     {
861                         /* the lookahead goes beyond what we allow in this
862                            leaf. Mark it as "gone" */
863                         lookahead_item = 0;
864                     }
865                     
866                     p->dirty = 1;
867                 }
868             }
869             else if (d == 0)
870             {
871                 /* exact match .. move both pointers */
872
873                 lookahead_next = lookahead_item;
874                 if (!(*stream->read_item)(stream->clientData,
875                                           &lookahead_next, lookahead_mode))
876                 {
877                     lookahead_item = 0;
878                     more = 0;
879                 }
880                 if (src == endp)
881                     break;  /* end of file stream reached .. */
882                 file_item = file_item_buf; /* move file pointer */
883                 (*b->method->codec.decode)(c1, &file_item, &src);
884             }
885             else
886             {
887                 /* file pointer must be moved */
888                 if (src == endp)
889                     break;
890                 file_item = file_item_buf;
891                 (*b->method->codec.decode)(c1, &file_item, &src);
892             }
893         }
894     }
895     maxp = dst_buf + b->file[b->no_cat-1].head.block_max + quater;
896     /* this loop runs when we are "appending" to a leaf page. That is
897        either it's empty (new) or all file items have been read in
898        previous loop */
899     while (lookahead_item)
900     {
901         char *dst_item;
902         const char *src = lookahead_item;
903         char *dst_0 = dst;
904         
905         /* compare lookahead with max item */
906         if (max_item &&
907             (*b->method->compare_item)(max_item, lookahead_item) <= 0)
908         {
909             /* stop if we have reached the value of max item */
910             break;
911         }
912         if (!*lookahead_mode)
913         {
914             /* this is append. So a delete is bad */
915             yaz_log (LOG_WARN, "isamb: Inconsistent register (2)");
916             abort();
917         }
918         else if (!half1 && dst > tail_cut)
919         {
920             const char *src_0 = src;
921             half1 = dst; /* candidate for splitting */
922             
923             (*b->method->codec.encode)(c2, &dst, &src);
924             
925             cut_item_size = src - src_0;
926             assert(cut_item_size > 0);
927             memcpy (cut_item_buf, src_0, cut_item_size);
928             
929             no_items_1 = no_items;
930             half2 = dst;
931         }
932         else
933             (*b->method->codec.encode)(c2, &dst, &src);
934
935         if (dst > maxp)
936         {
937             dst = dst_0;
938             break;
939         }
940         no_items++;
941         if (p)
942             p->dirty = 1;
943         dst_item = lookahead_item;
944         if (!(*stream->read_item)(stream->clientData, &dst_item,
945                                   lookahead_mode))
946         {
947             lookahead_item = 0;
948             more = 0;
949         }
950     }
951     new_size = dst - dst_buf;
952     if (p && p->cat != b->no_cat-1 && 
953         new_size > b->file[p->cat].head.block_max)
954     {
955         /* non-btree block will be removed */
956         p->deleted = 1;
957         close_block (b, p);
958         /* delete it too!! */
959         p = 0; /* make a new one anyway */
960     }
961     if (!p)
962     {   /* must create a new one */
963         int i;
964         for (i = 0; i < b->no_cat; i++)
965             if (new_size <= b->file[i].head.block_max)
966                 break;
967         if (i == b->no_cat)
968             i = b->no_cat - 1;
969         p = new_leaf (b, i);
970     }
971     if (new_size > b->file[p->cat].head.block_max)
972     {
973         char *first_dst;
974         const char *cut_item = cut_item_buf;
975
976         assert (half1);
977         assert (half2);
978
979         assert(cut_item_size > 0);
980         
981         /* first half */
982         p->size = half1 - dst_buf;
983         memcpy (p->bytes, dst_buf, half1 - dst_buf);
984         p->no_items = no_items_1;
985
986         /* second half */
987         *sp2 = new_leaf (b, p->cat);
988
989         (*b->method->codec.reset)(c2);
990
991         first_dst = (*sp2)->bytes;
992
993         (*b->method->codec.encode)(c2, &first_dst, &cut_item);
994
995         memcpy (first_dst, half2, dst - half2);
996
997         (*sp2)->size = (first_dst - (*sp2)->bytes) + (dst - half2);
998         (*sp2)->no_items = no_items - no_items_1;
999         (*sp2)->dirty = 1;
1000         p->dirty = 1;
1001         memcpy (sub_item, cut_item_buf, cut_item_size);
1002         *sub_size = cut_item_size;
1003     }
1004     else
1005     {
1006         memcpy (p->bytes, dst_buf, dst - dst_buf);
1007         p->size = new_size;
1008         p->no_items = no_items;
1009     }
1010     (*b->method->codec.stop)(c1);
1011     (*b->method->codec.stop)(c2);
1012     *sp1 = p;
1013     return more;
1014 }
1015
1016 int insert_sub (ISAMB b, struct ISAMB_block **p, void *new_item,
1017                 int *mode,
1018                 ISAMC_I *stream,
1019                 struct ISAMB_block **sp,
1020                 void *sub_item, int *sub_size,
1021                 const void *max_item)
1022 {
1023     if (!*p || (*p)->leaf)
1024         return insert_leaf (b, p, new_item, mode, stream, sp, sub_item, 
1025                             sub_size, max_item);
1026     else
1027         return insert_int (b, *p, new_item, mode, stream, sp, sub_item,
1028                            sub_size, max_item);
1029 }
1030
1031 int isamb_unlink (ISAMB b, ISAMC_P pos)
1032 {
1033     struct ISAMB_block *p1;
1034
1035     if (!pos)
1036         return 0;
1037     p1 = open_block(b, pos);
1038     p1->deleted = 1;
1039     if (!p1->leaf)
1040     {
1041         zint sub_p;
1042         zint item_len;
1043         const char *src = p1->bytes + p1->offset;
1044
1045         decode_ptr(&src, &sub_p);
1046         isamb_unlink(b, sub_p);
1047         
1048         while (src != p1->bytes + p1->size)
1049         {
1050             decode_ptr(&src, &item_len);
1051             src += item_len;
1052             decode_ptr(&src, &sub_p);
1053             isamb_unlink(b, sub_p);
1054         }
1055     }
1056     close_block(b, p1);
1057     return 0;
1058 }
1059
1060 ISAMB_P isamb_merge (ISAMB b, ISAMC_P pos, ISAMC_I *stream)
1061 {
1062     char item_buf[DST_ITEM_MAX];
1063     char *item_ptr;
1064     int i_mode;
1065     int more;
1066     int must_delete = 0;
1067
1068     if (b->cache < 0)
1069     {
1070         int more = 1;
1071         while (more)
1072         {
1073             item_ptr = item_buf;
1074             more =
1075                 (*stream->read_item)(stream->clientData, &item_ptr, &i_mode);
1076         }
1077         return 1;
1078     }
1079     item_ptr = item_buf;
1080     more = (*stream->read_item)(stream->clientData, &item_ptr, &i_mode);
1081     while (more)
1082     {
1083         struct ISAMB_block *p = 0, *sp = 0;
1084         char sub_item[DST_ITEM_MAX];
1085         int sub_size;
1086         
1087         if (pos)
1088             p = open_block (b, pos);
1089         more = insert_sub (b, &p, item_buf, &i_mode, stream, &sp,
1090                            sub_item, &sub_size, 0);
1091         if (sp)
1092         {    /* increase level of tree by one */
1093             struct ISAMB_block *p2 = new_int (b, p->cat);
1094             char *dst = p2->bytes + p2->size;
1095             
1096             encode_ptr (&dst, p->pos);
1097             assert (sub_size < 40);
1098             encode_ptr (&dst, sub_size);
1099             memcpy (dst, sub_item, sub_size);
1100             dst += sub_size;
1101             encode_ptr (&dst, sp->pos);
1102             
1103             p2->size = dst - p2->bytes;
1104             p2->no_items = p->no_items + sp->no_items;
1105             pos = p2->pos;  /* return new super page */
1106             close_block (b, sp);
1107             close_block (b, p2);
1108         }
1109         else
1110         {
1111             pos = p->pos;   /* return current one (again) */
1112         }
1113         if (p->no_items == 0)
1114             must_delete = 1;
1115         else
1116             must_delete = 0;
1117         close_block (b, p);
1118     }
1119     if (must_delete)
1120     {
1121         isamb_unlink(b, pos);
1122         return 0;
1123     }
1124     return pos;
1125 }
1126
1127 ISAMB_PP isamb_pp_open_x (ISAMB isamb, ISAMB_P pos, int *level, int scope)
1128 {
1129     ISAMB_PP pp = xmalloc (sizeof(*pp));
1130     int i;
1131
1132     pp->isamb = isamb;
1133     pp->block = xmalloc (ISAMB_MAX_LEVEL * sizeof(*pp->block));
1134
1135     pp->pos = pos;
1136     pp->level = 0;
1137     pp->maxlevel=0;
1138     pp->total_size = 0;
1139     pp->no_blocks = 0;
1140     pp->skipped_numbers=0;
1141     pp->returned_numbers=0;
1142     pp->scope=scope;
1143     for (i=0;i<ISAMB_MAX_LEVEL;i++)
1144         pp->skipped_nodes[i] = pp->accessed_nodes[i]=0;
1145     while (1)
1146     {
1147         struct ISAMB_block *p = open_block (isamb, pos);
1148         const char *src = p->bytes + p->offset;
1149         pp->block[pp->level] = p;
1150
1151         pp->total_size += p->size;
1152         pp->no_blocks++;
1153         if (p->leaf)
1154             break;
1155
1156                                         
1157         decode_ptr (&src, &pos);
1158         p->offset = src - p->bytes;
1159         pp->level++;
1160         pp->accessed_nodes[pp->level]++; 
1161     }
1162     pp->block[pp->level+1] = 0;
1163     pp->maxlevel=pp->level;
1164     if (level)
1165         *level = pp->level;
1166     return pp;
1167 }
1168
1169 ISAMB_PP isamb_pp_open (ISAMB isamb, ISAMB_P pos, int scope)
1170 {
1171     return isamb_pp_open_x (isamb, pos, 0, scope);
1172 }
1173
1174 void isamb_pp_close_x (ISAMB_PP pp, int *size, int *blocks)
1175 {
1176     int i;
1177     if (!pp)
1178         return;
1179     logf(LOG_DEBUG,"isamb_pp_close lev=%d returned "ZINT_FORMAT" values," 
1180                     "skipped "ZINT_FORMAT,
1181         pp->maxlevel, pp->skipped_numbers, pp->returned_numbers);
1182     for (i=pp->maxlevel;i>=0;i--)
1183         if ( pp->skipped_nodes[i] || pp->accessed_nodes[i])
1184             logf(LOG_DEBUG,"isamb_pp_close  level leaf-%d: "
1185                             ZINT_FORMAT" read, "ZINT_FORMAT" skipped", i,
1186                  pp->accessed_nodes[i], pp->skipped_nodes[i]);
1187     pp->isamb->skipped_numbers += pp->skipped_numbers;
1188     pp->isamb->returned_numbers += pp->returned_numbers;
1189     for (i=pp->maxlevel;i>=0;i--)
1190     {
1191         pp->isamb->accessed_nodes[i] += pp->accessed_nodes[i];
1192         pp->isamb->skipped_nodes[i] += pp->skipped_nodes[i];
1193     }
1194     if (size)
1195         *size = pp->total_size;
1196     if (blocks)
1197         *blocks = pp->no_blocks;
1198     for (i = 0; i <= pp->level; i++)
1199         close_block (pp->isamb, pp->block[i]);
1200     xfree (pp->block);
1201     xfree (pp);
1202 }
1203
1204 int isamb_block_info (ISAMB isamb, int cat)
1205 {
1206     if (cat >= 0 && cat < isamb->no_cat)
1207         return isamb->file[cat].head.block_size;
1208     return -1;
1209 }
1210
1211 void isamb_pp_close (ISAMB_PP pp)
1212 {
1213     isamb_pp_close_x (pp, 0, 0);
1214 }
1215
1216 /* simple recursive dumper .. */
1217 static void isamb_dump_r (ISAMB b, ISAMB_P pos, void (*pr)(const char *str),
1218                           int level)
1219 {
1220     char buf[1024];
1221     char prefix_str[1024];
1222     if (pos)
1223     {
1224         struct ISAMB_block *p = open_block (b, pos);
1225         sprintf(prefix_str, "%*s " ZINT_FORMAT " cat=%d size=%d max=%d items="
1226                 ZINT_FORMAT, level*2, "",
1227                 pos, p->cat, p->size, b->file[p->cat].head.block_max,
1228                 p->no_items);
1229         (*pr)(prefix_str);
1230         sprintf(prefix_str, "%*s " ZINT_FORMAT, level*2, "", pos);
1231         if (p->leaf)
1232         {
1233             while (p->offset < p->size)
1234             {
1235                 const char *src = p->bytes + p->offset;
1236                 char *dst = buf;
1237                 (*b->method->codec.decode)(p->decodeClientData, &dst, &src);
1238                 (*b->method->log_item)(LOG_DEBUG, buf, prefix_str);
1239                 p->offset = src - (char*) p->bytes;
1240             }
1241             assert(p->offset == p->size);
1242         }
1243         else
1244         {
1245             const char *src = p->bytes + p->offset;
1246             ISAMB_P sub;
1247             zint item_len;
1248
1249             decode_ptr (&src, &sub);
1250             p->offset = src - (char*) p->bytes;
1251
1252             isamb_dump_r(b, sub, pr, level+1);
1253             
1254             while (p->offset < p->size)
1255             {
1256                 decode_ptr (&src, &item_len);
1257                 (*b->method->log_item)(LOG_DEBUG, src, prefix_str);
1258                 src += item_len;
1259                 decode_ptr (&src, &sub);
1260                 
1261                 p->offset = src - (char*) p->bytes;
1262                 
1263                 isamb_dump_r(b, sub, pr, level+1);
1264             }           
1265         }
1266         close_block(b,p);
1267     }
1268 }
1269
1270 void isamb_dump (ISAMB b, ISAMB_P pos, void (*pr)(const char *str))
1271 {
1272     isamb_dump_r(b, pos, pr, 0);
1273 }
1274
1275 int isamb_pp_read (ISAMB_PP pp, void *buf)
1276 {
1277     return isamb_pp_forward(pp, buf, 0);
1278 }
1279
1280
1281 static int isamb_pp_on_right_node(ISAMB_PP pp, int level, const void *untilbuf)
1282 { /* looks one node higher to see if we should be on this node at all */
1283   /* useful in backing off quickly, and in avoiding tail descends */
1284   /* call with pp->level to begin with */
1285     struct ISAMB_block *p;
1286     int cmp;
1287     const char *src;
1288     zint item_len;
1289     assert(level>=0);
1290     if ( level == 0) {
1291 #if ISAMB_DEBUG
1292             logf(LOG_DEBUG,"isamb_pp_on_right returning true for root");
1293 #endif
1294         return 1; /* we can never skip the root node */
1295     }
1296     level--;
1297     p=pp->block[level];
1298     assert(p->offset <= p->size);
1299     if (p->offset < p->size )
1300     {
1301         assert(p->offset>0); 
1302         src=p->bytes + p->offset;
1303         decode_ptr(&src, &item_len);
1304 #if ISAMB_DEBUG
1305         (*pp->isamb->method->codec.log_item)(LOG_DEBUG,untilbuf,"on_leaf: until");
1306         (*pp->isamb->method->codec.log_item)(LOG_DEBUG,src,"on_leaf: value");
1307 #endif
1308         cmp=(*pp->isamb->method->compare_item)(untilbuf,src);
1309         if (cmp<pp->scope) {  /* cmp<2 */
1310 #if ISAMB_DEBUG
1311             logf(LOG_DEBUG,"isamb_pp_on_right returning true "
1312                             "cmp=%d lev=%d ofs=%d",cmp,level,p->offset);
1313 #endif
1314             return 1; 
1315         }
1316         else {
1317 #if ISAMB_DEBUG
1318             logf(LOG_DEBUG,"isamb_pp_on_right returning false "
1319                             "cmp=%d lev=%d ofs=%d",cmp,level,p->offset);
1320 #endif
1321             return 0; 
1322         }
1323     }
1324     else {
1325 #if ISAMB_DEBUG
1326         logf(LOG_DEBUG,"isamb_pp_on_right at tail, looking higher "
1327                         "lev=%d",level);
1328 #endif
1329         return isamb_pp_on_right_node(pp, level, untilbuf);
1330     }
1331 } /* isamb_pp_on_right_node */
1332
1333 static int isamb_pp_read_on_leaf(ISAMB_PP pp, void *buf)
1334 { /* reads the next item on the current leaf, returns 0 if end of leaf*/
1335     struct ISAMB_block *p = pp->block[pp->level];
1336     char *dst;
1337     const char *src;
1338     assert(pp);
1339     assert(buf);
1340     if (p->offset == p->size) {
1341 #if ISAMB_DEBUG
1342         logf(LOG_DEBUG,"isamb_pp_read_on_leaf returning 0 on node %d",p->pos);
1343 #endif
1344         return 0; /* at end of leaf */
1345     }
1346     src=p->bytes + p->offset;
1347     dst=buf;
1348     (*pp->isamb->method->codec.decode)(p->decodeClientData,&dst, &src);
1349     p->offset = src - (char*) p->bytes;
1350 #if ISAMB_DEBUG
1351     (*pp->isamb->method->codec.log_item)(LOG_DEBUG, buf, "read_on_leaf returning 1");
1352 #endif
1353     pp->returned_numbers++;
1354     return 1;
1355 } /* read_on_leaf */
1356
1357 static int isamb_pp_forward_on_leaf(ISAMB_PP pp, void *buf, const void *untilbuf)
1358 { /* forwards on the current leaf, returns 0 if not found */
1359     int cmp;
1360     int skips=0;
1361     while (1){
1362         if (!isamb_pp_read_on_leaf(pp,buf))
1363             return 0;
1364         /* FIXME - this is an extra function call, inline the read? */
1365         cmp=(*pp->isamb->method->compare_item)(untilbuf,buf);
1366         if (cmp <pp->scope){  /* cmp<2  found a good one */
1367 #if ISAMB_DEBUG
1368             if (skips)
1369                 logf(LOG_DEBUG, "isam_pp_fwd_on_leaf skipped %d items",skips);
1370 #endif
1371             pp->returned_numbers++;
1372             return 1;
1373         }
1374         if (!skips)
1375             if (!isamb_pp_on_right_node(pp, pp->level, untilbuf))
1376                 return 0; /* never mind the rest of this leaf */
1377         pp->skipped_numbers++;
1378         skips++;
1379     }
1380 } /* forward_on_leaf */
1381
1382 static int isamb_pp_climb_level(ISAMB_PP pp, ISAMB_P *pos)
1383 { /* climbs higher in the tree, until finds a level with data left */
1384   /* returns the node to (consider to) descend to in *pos) */
1385     struct ISAMB_block *p = pp->block[pp->level];
1386     const char *src;
1387     zint item_len;
1388 #if ISAMB_DEBUG
1389     logf(LOG_DEBUG,"isamb_pp_climb_level starting "
1390                    "at level %d node %d ofs=%d sz=%d",
1391                     pp->level, p->pos, p->offset, p->size);
1392 #endif
1393     assert(pp->level >= 0);
1394     assert(p->offset <= p->size);
1395     if (pp->level==0)
1396     {
1397 #if ISAMB_DEBUG
1398         logf(LOG_DEBUG,"isamb_pp_climb_level returning 0 at root");
1399 #endif
1400         return 0;
1401     }
1402     assert(pp->level>0); 
1403     close_block(pp->isamb, pp->block[pp->level]);
1404     pp->block[pp->level]=0;
1405     (pp->level)--;
1406     p=pp->block[pp->level];
1407 #if ISAMB_DEBUG
1408     logf(LOG_DEBUG,"isamb_pp_climb_level climbed to level %d node %d ofs=%d",
1409                     pp->level, p->pos, p->offset);
1410 #endif
1411     assert(!p->leaf);
1412     assert(p->offset <= p->size);
1413     if (p->offset == p->size ) {
1414         /* we came from the last pointer, climb on */
1415         if (!isamb_pp_climb_level(pp,pos))
1416             return 0;
1417         p=pp->block[pp->level];
1418     }
1419     else
1420     {
1421         /* skip the child we just came from */
1422 #if ISAMB_DEBUG
1423         logf(LOG_DEBUG,"isam_pp_climb_level: skipping lev=%d ofs=%d sz=%d", 
1424                         pp->level, p->offset, p->size);
1425 #endif
1426         assert (p->offset < p->size );
1427         src=p->bytes + p->offset;
1428         decode_ptr(&src, &item_len);
1429         src += item_len;
1430         decode_ptr(&src, pos);
1431         p->offset=src - (char *)p->bytes;
1432             
1433     }
1434     return 1;
1435 } /* climb_level */
1436
1437
1438 static zint isamb_pp_forward_unode(ISAMB_PP pp, zint pos, const void *untilbuf)
1439 { /* scans a upper node until it finds a child <= untilbuf */
1440   /* pp points to the key value, as always. pos is the child read from */
1441   /* the buffer */
1442   /* if all values are too small, returns the last child in the node */
1443   /* FIXME - this can be detected, and avoided by looking at the */
1444   /* parent node, but that gets messy. Presumably the cost is */
1445   /* pretty low anyway */
1446     struct ISAMB_block *p = pp->block[pp->level];
1447     const char *src=p->bytes + p->offset;
1448     zint item_len;
1449     int cmp;
1450     zint nxtpos;
1451 #if ISAMB_DEBUG
1452     int skips=0;
1453     logf(LOG_DEBUG,"isamb_pp_forward_unode starting "
1454                    "at level %d node %d ofs=%di sz=%d",
1455                     pp->level, p->pos, p->offset, p->size);
1456 #endif
1457     assert(!p->leaf);
1458     assert(p->offset <= p->size);
1459     if (p->offset == p->size) {
1460 #if ISAMB_DEBUG
1461             logf(LOG_DEBUG,"isamb_pp_forward_unode returning at end "
1462                    "at level %d node %d ofs=%di sz=%d",
1463                     pp->level, p->pos, p->offset, p->size);
1464 #endif
1465         return pos; /* already at the end of it */
1466     }
1467     while(p->offset < p->size) {
1468         decode_ptr(&src,&item_len);
1469         cmp=(*pp->isamb->method->compare_item)(untilbuf,src);
1470         src+=item_len;
1471         decode_ptr(&src,&nxtpos);
1472         if (cmp<pp->scope) /* cmp<2 */
1473         {
1474 #if ISAMB_DEBUG
1475             logf(LOG_DEBUG,"isamb_pp_forward_unode returning a hit "
1476                    "at level %d node %d ofs=%d sz=%d",
1477                     pp->level, p->pos, p->offset, p->size);
1478 #endif
1479             return pos;
1480         } /* found one */
1481         pos=nxtpos;
1482         p->offset=src-(char*)p->bytes;
1483         (pp->skipped_nodes[pp->maxlevel - pp->level -1])++;
1484 #if ISAMB_DEBUG
1485         skips++;
1486 #endif
1487     }
1488 #if ISAMB_DEBUG
1489             logf(LOG_DEBUG,"isamb_pp_forward_unode returning at tail "
1490                    "at level %d node %d ofs=%d sz=%d skips=%d",
1491                     pp->level, p->pos, p->offset, p->size, skips);
1492 #endif
1493     return pos; /* that's the last one in the line */
1494     
1495 } /* forward_unode */
1496
1497 static void isamb_pp_descend_to_leaf(ISAMB_PP pp, ISAMB_P pos, const void *untilbuf)
1498 { /* climbs down the tree, from pos, to the leftmost leaf */
1499     struct ISAMB_block *p = pp->block[pp->level];
1500     const char *src;
1501     assert(!p->leaf);
1502 #if ISAMB_DEBUG
1503     logf(LOG_DEBUG,"isamb_pp_descend_to_leaf "
1504                    "starting at lev %d node %d ofs=%d lf=%d u=%p", 
1505                    pp->level, p->pos, p->offset, p->leaf, untilbuf);
1506 #endif
1507     if (untilbuf)
1508         pos=isamb_pp_forward_unode(pp,pos,untilbuf);
1509     ++(pp->level);
1510     assert(pos);
1511     p=open_block(pp->isamb, pos);
1512     pp->block[pp->level]=p;
1513     ++(pp->accessed_nodes[pp->maxlevel-pp->level]);
1514     ++(pp->no_blocks);
1515 #if ISAMB_DEBUG
1516     logf(LOG_DEBUG,"isamb_pp_descend_to_leaf "
1517                    "got lev %d node %d lf=%d", 
1518                    pp->level, p->pos, p->leaf);
1519 #endif
1520     if (p->leaf)
1521         return;
1522     assert (p->offset==0 );
1523     src=p->bytes + p->offset;
1524     decode_ptr(&src, &pos);
1525     p->offset=src-(char*)p->bytes;
1526     isamb_pp_descend_to_leaf(pp,pos,untilbuf);
1527 #if ISAMB_DEBUG
1528     logf(LOG_DEBUG,"isamb_pp_descend_to_leaf "
1529                    "returning at lev %d node %d ofs=%d lf=%d", 
1530                    pp->level, p->pos, p->offset, p->leaf);
1531 #endif
1532 } /* descend_to_leaf */
1533
1534 static int isamb_pp_find_next_leaf(ISAMB_PP pp)
1535 { /* finds the next leaf by climbing up and down */
1536     ISAMB_P pos;
1537     if (!isamb_pp_climb_level(pp,&pos))
1538         return 0;
1539     isamb_pp_descend_to_leaf(pp, pos,0);
1540     return 1;
1541 }
1542
1543 static int isamb_pp_climb_desc(ISAMB_PP pp,  const void *untilbuf)
1544 { /* climbs up and descends to a leaf where values >= *untilbuf are found */
1545     ISAMB_P pos;
1546 #if ISAMB_DEBUG
1547     struct ISAMB_block *p = pp->block[pp->level];
1548     logf(LOG_DEBUG,"isamb_pp_climb_desc starting "
1549                    "at level %d node %d ofs=%d sz=%d",
1550                     pp->level, p->pos, p->offset, p->size);
1551 #endif
1552     if (!isamb_pp_climb_level(pp,&pos))
1553         return 0;
1554     /* see if it would pay to climb one higher */
1555     if (!isamb_pp_on_right_node(pp, pp->level, untilbuf))
1556         if (!isamb_pp_climb_level(pp,&pos))
1557             return 0;
1558     isamb_pp_descend_to_leaf(pp, pos,untilbuf);
1559 #if ISAMB_DEBUG
1560     p = pp->block[pp->level];
1561     logf(LOG_DEBUG,"isamb_pp_climb_desc done "
1562                    "at level %d node %d ofs=%d sz=%d",
1563                     pp->level, p->pos, p->offset, p->size);
1564 #endif
1565     return 1;
1566 } /* climb_desc */
1567
1568 int isamb_pp_forward (ISAMB_PP pp, void *buf, const void *untilbuf)
1569 {
1570 #if ISAMB_DEBUG
1571     struct ISAMB_block *p = pp->block[pp->level];
1572     assert(p->leaf);
1573     logf(LOG_DEBUG,"isamb_pp_forward starting "
1574                    "at level %d node %d ofs=%d sz=%d u=%p sc=%d",
1575                     pp->level, p->pos, p->offset, p->size,untilbuf, scope);
1576 #endif
1577     if (untilbuf) {
1578         if (isamb_pp_forward_on_leaf( pp, buf, untilbuf)) {
1579 #if ISAMB_DEBUG
1580             logf(LOG_DEBUG,"isamb_pp_forward (f) returning (A) "
1581                    "at level %d node %d ofs=%d sz=%d",
1582                     pp->level, p->pos, p->offset, p->size);
1583 #endif
1584             return 1;
1585         }
1586         if (! isamb_pp_climb_desc( pp, untilbuf)) {
1587 #if ISAMB_DEBUG
1588             logf(LOG_DEBUG,"isamb_pp_forward (f) returning notfound (B) "
1589                    "at level %d node %d ofs=%d sz=%d",
1590                     pp->level, p->pos, p->offset, p->size);
1591 #endif
1592             return 0; /* could not find a leaf */
1593         }
1594         do{
1595             if (isamb_pp_forward_on_leaf( pp, buf, untilbuf)) {
1596 #if ISAMB_DEBUG
1597             logf(LOG_DEBUG,"isamb_pp_forward (f) returning (C) "
1598                    "at level %d node %d ofs=%d sz=%d",
1599                     pp->level, p->pos, p->offset, p->size);
1600 #endif
1601                 return 1;
1602             }
1603         }while ( isamb_pp_find_next_leaf(pp));
1604         return 0; /* could not find at all */
1605     }
1606     else { /* no untilbuf, a straight read */
1607         /* FIXME - this should be moved
1608          * directly into the pp_read */
1609         /* keeping here now, to keep same
1610          * interface as the old fwd */
1611         if (isamb_pp_read_on_leaf( pp, buf)) {
1612 #if ISAMB_DEBUG
1613             logf(LOG_DEBUG,"isamb_pp_forward (read) returning (D) "
1614                    "at level %d node %d ofs=%d sz=%d",
1615                     pp->level, p->pos, p->offset, p->size);
1616 #endif
1617             return 1;
1618         }
1619         if (isamb_pp_find_next_leaf(pp)) {
1620 #if ISAMB_DEBUG
1621             logf(LOG_DEBUG,"isamb_pp_forward (read) returning (E) "
1622                    "at level %d node %d ofs=%d sz=%d",
1623                     pp->level, p->pos, p->offset, p->size);
1624 #endif
1625             return isamb_pp_read_on_leaf(pp, buf);
1626         }
1627         else
1628             return 0;
1629     }
1630 } /* isam_pp_forward (new version) */
1631
1632 void isamb_pp_pos( ISAMB_PP pp, double *current, double *total )
1633 { /* return an estimate of the current position and of the total number of */
1634   /* occureences in the isam tree, based on the current leaf */
1635     struct ISAMB_block *p = pp->block[pp->level];
1636     assert(total);
1637     assert(current);
1638     assert(p->leaf);
1639
1640     *total = pp->block[0]->no_items;
1641     *current = (double) pp->returned_numbers;
1642 #if ISAMB_DEBUG
1643     logf(LOG_LOG, "isamb_pp_pos returning: cur= %0.1f tot=%0.1f rn="
1644          ZINT_FORMAT, *current, *total, pp->returned_numbers);
1645 #endif
1646 }