Allow ISAMB_PTR_CODEC=0 to work
[idzebra-moved-to-github.git] / isamb / isamb.c
1 /* $Id: isamb.c,v 1.63 2005-01-02 18:50:53 adam Exp $
2    Copyright (C) 1995-2005
3    Index Data Aps
4
5 This file is part of the Zebra server.
6
7 Zebra is free software; you can redistribute it and/or modify it under
8 the terms of the GNU General Public License as published by the Free
9 Software Foundation; either version 2, or (at your option) any later
10 version.
11
12 Zebra is distributed in the hope that it will be useful, but WITHOUT ANY
13 WARRANTY; without even the implied warranty of MERCHANTABILITY or
14 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
15 for more details.
16
17 You should have received a copy of the GNU General Public License
18 along with Zebra; see the file LICENSE.zebra.  If not, write to the
19 Free Software Foundation, 59 Temple Place - Suite 330, Boston, MA
20 02111-1307, USA.
21 */
22
23 #include <string.h>
24 #include <yaz/log.h>
25 #include <yaz/xmalloc.h>
26 #include <idzebra/isamb.h>
27 #include <assert.h>
28
29 #ifndef ISAMB_DEBUG
30 #define ISAMB_DEBUG 0
31 #endif
32
33 #define ISAMB_MAJOR_VERSION 2
34 #define ISAMB_MINOR_VERSION 0
35
36 struct ISAMB_head {
37     zint first_block;
38     zint last_block;
39     zint free_list;
40     zint no_items;
41     int block_size;
42     int block_max;
43     int block_offset;
44 };
45
46 /* maximum size of encoded buffer */
47 #define DST_ITEM_MAX 256
48
49 #define ISAMB_MAX_LEVEL 10
50 /* approx 2*max page + max size of item */
51 #define DST_BUF_SIZE 16840
52
53 #define ISAMB_CACHE_ENTRY_SIZE 4096
54
55 /* CAT_MAX: _must_ be power of 2 */
56 #define CAT_MAX 4
57 #define CAT_MASK (CAT_MAX-1)
58 /* CAT_NO: <= CAT_MAX */
59 #define CAT_NO 4
60
61 /* ISAMB_PTR_CODEC=1 var, =0 fixed */
62 #define ISAMB_PTR_CODEC 1
63
64 struct ISAMB_cache_entry {
65     ISAMB_P pos;
66     unsigned char *buf;
67     int dirty;
68     int hits;
69     struct ISAMB_cache_entry *next;
70 };
71
72 struct ISAMB_file {
73     BFile bf;
74     int head_dirty;
75     struct ISAMB_head head;
76     struct ISAMB_cache_entry *cache_entries;
77 };
78
79 struct ISAMB_s {
80     BFiles bfs;
81     ISAMC_M *method;
82
83     struct ISAMB_file *file;
84     int no_cat;
85     int cache; /* 0=no cache, 1=use cache, -1=dummy isam (for testing only) */
86     int log_io;        /* log level for bf_read/bf_write calls */
87     int log_freelist;  /* log level for freelist handling */
88     zint skipped_numbers; /* on a leaf node */
89     zint returned_numbers; 
90     zint skipped_nodes[ISAMB_MAX_LEVEL]; /* [0]=skipped leaves, 1=higher etc */
91     zint accessed_nodes[ISAMB_MAX_LEVEL]; /* nodes we did not skip */
92 };
93
94 struct ISAMB_block {
95     ISAMB_P pos;
96     int cat;
97     int size;
98     int leaf;
99     int dirty;
100     int deleted;
101     int offset;
102     zint no_items;  /* number of nodes in this + children */
103     char *bytes;
104     char *cbuf;
105     unsigned char *buf;
106     void *decodeClientData;
107     int log_rw;
108 };
109
110 struct ISAMB_PP_s {
111     ISAMB isamb;
112     ISAMB_P pos;
113     int level;
114     int maxlevel; /* total depth */
115     zint total_size;
116     zint no_blocks;
117     zint skipped_numbers; /* on a leaf node */
118     zint returned_numbers; 
119     zint skipped_nodes[ISAMB_MAX_LEVEL]; /* [0]=skipped leaves, 1=higher etc */
120     zint accessed_nodes[ISAMB_MAX_LEVEL]; /* nodes we did not skip */
121     struct ISAMB_block **block;
122     int scope;  /* on what level we forward */
123 };
124
125
126 #if ISAMB_PTR_CODEC
127 static void encode_ptr (char **dst, zint pos)
128 {
129     unsigned char *bp = (unsigned char*) *dst;
130
131     while (pos > 127)
132     {
133          *bp++ = 128 | (pos & 127);
134          pos = pos >> 7;
135     }
136     *bp++ = pos;
137     *dst = (char *) bp;
138 }
139 #else
140 static void encode_ptr (char **dst, zint pos)
141 {
142     memcpy(*dst, &pos, sizeof(pos));
143     (*dst) += sizeof(pos);
144 }
145 #endif
146
147 #if ISAMB_PTR_CODEC
148 static void decode_ptr (const char **src1, zint *pos)
149 {
150     const unsigned char **src = (const unsigned char **) src1;
151     zint d = 0;
152     unsigned char c;
153     unsigned r = 0;
154
155     while (((c = *(*src)++) & 128))
156     {
157         d += ((zint) (c & 127) << r);
158         r += 7;
159     }
160     d += ((zint) c << r);
161     *pos = d;
162 }
163 #else
164 static void decode_ptr (const char **src, zint *pos)
165 {
166      memcpy (pos, *src, sizeof(*pos));
167      (*src) += sizeof(*pos);
168 }
169 #endif
170
171 ISAMB isamb_open (BFiles bfs, const char *name, int writeflag, ISAMC_M *method,
172                   int cache)
173 {
174     ISAMB isamb = xmalloc (sizeof(*isamb));
175     int i, b_size = 32;
176
177     isamb->bfs = bfs;
178     isamb->method = (ISAMC_M *) xmalloc (sizeof(*method));
179     memcpy (isamb->method, method, sizeof(*method));
180     isamb->no_cat = CAT_NO;
181     isamb->log_io = 0;
182     isamb->log_freelist = 0;
183     isamb->cache = cache;
184     isamb->skipped_numbers=0;
185     isamb->returned_numbers=0;
186     for (i=0;i<ISAMB_MAX_LEVEL;i++)
187       isamb->skipped_nodes[i]= isamb->accessed_nodes[i]=0;
188
189     assert (cache == 0);
190     isamb->file = xmalloc (sizeof(*isamb->file) * isamb->no_cat);
191     for (i = 0; i < isamb->no_cat; i++)
192     {
193         char fname[DST_BUF_SIZE];
194         char hbuf[DST_BUF_SIZE];
195         isamb->file[i].cache_entries = 0;
196         isamb->file[i].head_dirty = 0;
197         sprintf (fname, "%s%c", name, i+'A');
198         if (cache)
199             isamb->file[i].bf = bf_open (bfs, fname, ISAMB_CACHE_ENTRY_SIZE,
200                                          writeflag);
201         else
202             isamb->file[i].bf = bf_open (bfs, fname, b_size, writeflag);
203
204         /* fill-in default values (for empty isamb) */
205         isamb->file[i].head.first_block = ISAMB_CACHE_ENTRY_SIZE/b_size+1;
206         isamb->file[i].head.last_block = isamb->file[i].head.first_block;
207         isamb->file[i].head.block_size = b_size;
208 #if ISAMB_PTR_CODEC
209         if (i == isamb->no_cat-1 || b_size > 128)
210             isamb->file[i].head.block_offset = 8;
211         else
212             isamb->file[i].head.block_offset = 4;
213 #else
214         isamb->file[i].head.block_offset = 11;
215 #endif
216         isamb->file[i].head.block_max =
217             b_size - isamb->file[i].head.block_offset;
218         isamb->file[i].head.free_list = 0;
219         if (bf_read (isamb->file[i].bf, 0, 0, 0, hbuf))
220         {
221             /* got header assume "isamb"major minor len can fit in 16 bytes */
222             zint zint_tmp;
223             int major, minor, len, pos = 0;
224             int left;
225             const char *src = 0;
226             if (memcmp(hbuf, "isamb", 5))
227             {
228                 yaz_log(YLOG_WARN, "bad isamb header for file %s", fname);
229                 return 0;
230             }
231             if (sscanf(hbuf+5, "%d %d %d", &major, &minor, &len) != 3)
232             {
233                 yaz_log(YLOG_WARN, "bad isamb header for file %s", fname);
234                 return 0;
235             }
236             if (major != ISAMB_MAJOR_VERSION)
237             {
238                 yaz_log(YLOG_WARN, "bad major version for file %s %d, must be %d",
239                      fname, major, ISAMB_MAJOR_VERSION);
240                 return 0;
241             }
242             for (left = len - b_size; left > 0; left = left - b_size)
243             {
244                 pos++;
245                 if (!bf_read (isamb->file[i].bf, pos, 0, 0, hbuf + pos*b_size))
246                 {
247                     yaz_log(YLOG_WARN, "truncated isamb header for " 
248                          "file=%s len=%d pos=%d",
249                          fname, len, pos);
250                     return 0;
251                 }
252             }
253             src = hbuf + 16;
254             decode_ptr(&src, &isamb->file[i].head.first_block);
255             decode_ptr(&src, &isamb->file[i].head.last_block);
256             decode_ptr(&src, &zint_tmp);
257             isamb->file[i].head.block_size = zint_tmp;
258             decode_ptr(&src, &zint_tmp);
259             isamb->file[i].head.block_max = zint_tmp;
260             decode_ptr(&src, &isamb->file[i].head.free_list);
261         }
262         assert (isamb->file[i].head.block_size >= isamb->file[i].head.block_offset);
263         isamb->file[i].head_dirty = 0;
264         assert(isamb->file[i].head.block_size == b_size);
265         b_size = b_size * 4;
266     }
267 #if ISAMB_DEBUG
268     yaz_log(YLOG_WARN, "isamb debug enabled. Things will be slower than usual");
269 #endif
270     return isamb;
271 }
272
273 static void flush_blocks (ISAMB b, int cat)
274 {
275     while (b->file[cat].cache_entries)
276     {
277         struct ISAMB_cache_entry *ce_this = b->file[cat].cache_entries;
278         b->file[cat].cache_entries = ce_this->next;
279
280         if (ce_this->dirty)
281         {
282             yaz_log (b->log_io, "bf_write: flush_blocks");
283             bf_write (b->file[cat].bf, ce_this->pos, 0, 0, ce_this->buf);
284         }
285         xfree (ce_this->buf);
286         xfree (ce_this);
287     }
288 }
289
290 static int get_block (ISAMB b, ISAMC_P pos, char *userbuf, int wr)
291 {
292     int cat = (int) (pos&CAT_MASK);
293     int off = (int) (((pos/CAT_MAX) & 
294                (ISAMB_CACHE_ENTRY_SIZE / b->file[cat].head.block_size - 1))
295         * b->file[cat].head.block_size);
296     zint norm = pos / (CAT_MASK*ISAMB_CACHE_ENTRY_SIZE / b->file[cat].head.block_size);
297     int no = 0;
298     struct ISAMB_cache_entry **ce, *ce_this = 0, **ce_last = 0;
299
300     if (!b->cache)
301         return 0;
302
303     assert (ISAMB_CACHE_ENTRY_SIZE >= b->file[cat].head.block_size);
304     for (ce = &b->file[cat].cache_entries; *ce; ce = &(*ce)->next, no++)
305     {
306         ce_last = ce;
307         if ((*ce)->pos == norm)
308         {
309             ce_this = *ce;
310             *ce = (*ce)->next;   /* remove from list */
311             
312             ce_this->next = b->file[cat].cache_entries;  /* move to front */
313             b->file[cat].cache_entries = ce_this;
314             
315             if (wr)
316             {
317                 memcpy (ce_this->buf + off, userbuf, 
318                         b->file[cat].head.block_size);
319                 ce_this->dirty = 1;
320             }
321             else
322                 memcpy (userbuf, ce_this->buf + off,
323                         b->file[cat].head.block_size);
324             return 1;
325         }
326     }
327     if (no >= 40)
328     {
329         assert (no == 40);
330         assert (ce_last && *ce_last);
331         ce_this = *ce_last;
332         *ce_last = 0;  /* remove the last entry from list */
333         if (ce_this->dirty)
334         {
335             yaz_log (b->log_io, "bf_write: get_block");
336             bf_write (b->file[cat].bf, ce_this->pos, 0, 0, ce_this->buf);
337         }
338         xfree (ce_this->buf);
339         xfree (ce_this);
340     }
341     ce_this = xmalloc (sizeof(*ce_this));
342     ce_this->next = b->file[cat].cache_entries;
343     b->file[cat].cache_entries = ce_this;
344     ce_this->buf = xmalloc (ISAMB_CACHE_ENTRY_SIZE);
345     ce_this->pos = norm;
346     yaz_log (b->log_io, "bf_read: get_block");
347     if (!bf_read (b->file[cat].bf, norm, 0, 0, ce_this->buf))
348         memset (ce_this->buf, 0, ISAMB_CACHE_ENTRY_SIZE);
349     if (wr)
350     {
351         memcpy (ce_this->buf + off, userbuf, b->file[cat].head.block_size);
352         ce_this->dirty = 1;
353     }
354     else
355     {
356         ce_this->dirty = 0;
357         memcpy (userbuf, ce_this->buf + off, b->file[cat].head.block_size);
358     }
359     return 1;
360 }
361
362
363 void isamb_close (ISAMB isamb)
364 {
365     int i;
366     for (i=0;isamb->accessed_nodes[i];i++)
367         yaz_log(YLOG_DEBUG,"isamb_close  level leaf-%d: "ZINT_FORMAT" read, "
368                         ZINT_FORMAT" skipped",
369              i, isamb->accessed_nodes[i], isamb->skipped_nodes[i]);
370     yaz_log(YLOG_DEBUG,"isamb_close returned "ZINT_FORMAT" values, "
371                    "skipped "ZINT_FORMAT,
372          isamb->skipped_numbers, isamb->returned_numbers);
373     for (i = 0; i<isamb->no_cat; i++)
374     {
375         flush_blocks (isamb, i);
376         if (isamb->file[i].head_dirty)
377         {
378             char hbuf[DST_BUF_SIZE];
379             int major = ISAMB_MAJOR_VERSION;
380             int minor = ISAMB_MINOR_VERSION;
381             int len = 16;
382             char *dst = hbuf + 16;
383             int pos = 0, left;
384             int b_size = isamb->file[i].head.block_size;
385
386             encode_ptr(&dst, isamb->file[i].head.first_block);
387             encode_ptr(&dst, isamb->file[i].head.last_block);
388             encode_ptr(&dst, isamb->file[i].head.block_size);
389             encode_ptr(&dst, isamb->file[i].head.block_max);
390             encode_ptr(&dst, isamb->file[i].head.free_list);
391             memset(dst, '\0', b_size); /* ensure no random bytes are written */
392
393             len = dst - hbuf;
394
395             /* print exactly 16 bytes (including trailing 0) */
396             sprintf(hbuf, "isamb%02d %02d %02d\r\n", major, minor, len);
397
398             bf_write (isamb->file[i].bf, pos, 0, 0, hbuf);
399
400             for (left = len - b_size; left > 0; left = left - b_size)
401             {
402                 pos++;
403                 bf_write (isamb->file[i].bf, pos, 0, 0, hbuf + pos*b_size);
404             }
405         }
406         bf_close (isamb->file[i].bf);
407     }
408     xfree (isamb->file);
409     xfree (isamb->method);
410     xfree (isamb);
411 }
412
413 /* open_block: read one block at pos.
414    Decode leading sys bytes .. consisting of
415    Offset:Meaning
416    0: leader byte, != 0 leaf, == 0, non-leaf
417    1-2: used size of block
418    3-7*: number of items and all children
419    
420    * Reserve 5 bytes for large block sizes. 1 for small ones .. Number
421    of items. We can thus have at most 2^40 nodes. 
422 */
423 static struct ISAMB_block *open_block (ISAMB b, ISAMC_P pos)
424 {
425     int cat = (int) (pos&CAT_MASK);
426     const char *src;
427     int offset = b->file[cat].head.block_offset;
428     struct ISAMB_block *p;
429     if (!pos)
430         return 0;
431     p = xmalloc (sizeof(*p));
432     p->pos = pos;
433     p->cat = (int) (pos & CAT_MASK);
434     p->buf = xmalloc (b->file[cat].head.block_size);
435     p->cbuf = 0;
436
437     if (!get_block (b, pos, p->buf, 0))
438     {
439         yaz_log (b->log_io, "bf_read: open_block");
440         if (!bf_read (b->file[cat].bf, pos/CAT_MAX, 0, 0, p->buf))
441         {
442             yaz_log (YLOG_FATAL, "isamb: read fail for pos=%ld block=%ld",
443                      (long) pos, (long) pos/CAT_MAX);
444             abort();
445         }
446     }
447     p->bytes = p->buf + offset;
448     p->leaf = p->buf[0];
449     p->size = (p->buf[1] + 256 * p->buf[2]) - offset;
450     if (p->size < 0)
451     {
452         yaz_log (YLOG_FATAL, "Bad block size %d in pos=" ZINT_FORMAT "\n",
453                  p->size, pos);
454     }
455     assert (p->size >= 0);
456     src = p->buf + 3;
457     decode_ptr(&src, &p->no_items);
458
459     p->offset = 0;
460     p->dirty = 0;
461     p->deleted = 0;
462     p->decodeClientData = (*b->method->codec.start)();
463     return p;
464 }
465
466 struct ISAMB_block *new_block (ISAMB b, int leaf, int cat)
467 {
468     struct ISAMB_block *p;
469
470     p = xmalloc (sizeof(*p));
471     p->buf = xmalloc (b->file[cat].head.block_size);
472
473     if (!b->file[cat].head.free_list)
474     {
475         zint block_no;
476         block_no = b->file[cat].head.last_block++;
477         p->pos = block_no * CAT_MAX + cat;
478     }
479     else
480     {
481         p->pos = b->file[cat].head.free_list;
482         assert((p->pos & CAT_MASK) == cat);
483         if (!get_block (b, p->pos, p->buf, 0))
484         {
485             yaz_log (b->log_io, "bf_read: new_block");
486             if (!bf_read (b->file[cat].bf, p->pos/CAT_MAX, 0, 0, p->buf))
487             {
488                 yaz_log (YLOG_FATAL, "isamb: read fail for pos=%ld block=%ld",
489                          (long) p->pos/CAT_MAX, (long) p->pos/CAT_MAX);
490                 abort ();
491             }
492         }
493         yaz_log (b->log_freelist, "got block " ZINT_FORMAT " from freelist %d:" ZINT_FORMAT, p->pos,
494                  cat, p->pos/CAT_MAX);
495         memcpy (&b->file[cat].head.free_list, p->buf, sizeof(int));
496     }
497     p->cat = cat;
498     b->file[cat].head_dirty = 1;
499     memset (p->buf, 0, b->file[cat].head.block_size);
500     p->bytes = p->buf + b->file[cat].head.block_offset;
501     p->leaf = leaf;
502     p->size = 0;
503     p->dirty = 1;
504     p->deleted = 0;
505     p->offset = 0;
506     p->no_items = 0;
507     p->decodeClientData = (*b->method->codec.start)();
508     return p;
509 }
510
511 struct ISAMB_block *new_leaf (ISAMB b, int cat)
512 {
513     return new_block (b, 1, cat);
514 }
515
516
517 struct ISAMB_block *new_int (ISAMB b, int cat)
518 {
519     return new_block (b, 0, cat);
520 }
521
522 static void check_block (ISAMB b, struct ISAMB_block *p)
523 {
524     assert(b); /* mostly to make the compiler shut up about unused b */
525     if (p->leaf)
526     {
527         ;
528     }
529     else
530     {
531         /* sanity check */
532         char *startp = p->bytes;
533         const char *src = startp;
534         char *endp = p->bytes + p->size;
535         ISAMB_P pos;
536             
537         decode_ptr (&src, &pos);
538         assert ((pos&CAT_MASK) == p->cat);
539         while (src != endp)
540         {
541             zint item_len;
542             decode_ptr (&src, &item_len);
543             assert (item_len > 0 && item_len < 80);
544             src += item_len;
545             decode_ptr (&src, &pos);
546             if ((pos&CAT_MASK) != p->cat)
547             {
548                 assert ((pos&CAT_MASK) == p->cat);
549             }
550         }
551     }
552 }
553
554 void close_block (ISAMB b, struct ISAMB_block *p)
555 {
556     if (!p)
557         return;
558     if (p->deleted)
559     {
560         yaz_log (b->log_freelist, "release block " ZINT_FORMAT " from freelist %d:" ZINT_FORMAT,
561                  p->pos, p->cat, p->pos/CAT_MAX);
562         memcpy (p->buf, &b->file[p->cat].head.free_list, sizeof(int));
563         b->file[p->cat].head.free_list = p->pos;
564         if (!get_block (b, p->pos, p->buf, 1))
565         {
566             yaz_log (b->log_io, "bf_write: close_block (deleted)");
567             bf_write (b->file[p->cat].bf, p->pos/CAT_MAX, 0, 0, p->buf);
568         }
569     }
570     else if (p->dirty)
571     {
572         int offset = b->file[p->cat].head.block_offset;
573         int size = p->size + offset;
574         char *dst =  p->buf + 3;
575         assert (p->size >= 0);
576         
577         /* memset becuase encode_ptr usually does not write all bytes */
578         memset(p->buf, 0, b->file[p->cat].head.block_offset);
579         p->buf[0] = p->leaf;
580         p->buf[1] = size & 255;
581         p->buf[2] = size >> 8;
582         encode_ptr(&dst, p->no_items);
583         check_block(b, p);
584         if (!get_block (b, p->pos, p->buf, 1))
585         {
586             yaz_log (b->log_io, "bf_write: close_block");
587             bf_write (b->file[p->cat].bf, p->pos/CAT_MAX, 0, 0, p->buf);
588         }
589     }
590     (*b->method->codec.stop)(p->decodeClientData);
591     xfree (p->buf);
592     xfree (p);
593 }
594
595 int insert_sub (ISAMB b, struct ISAMB_block **p,
596                 void *new_item, int *mode,
597                 ISAMC_I *stream,
598                 struct ISAMB_block **sp,
599                 void *sub_item, int *sub_size,
600                 const void *max_item);
601
602 int insert_int (ISAMB b, struct ISAMB_block *p, void *lookahead_item,
603                 int *mode,
604                 ISAMC_I *stream, struct ISAMB_block **sp,
605                 void *split_item, int *split_size, const void *last_max_item)
606 {
607     char *startp = p->bytes;
608     const char *src = startp;
609     char *endp = p->bytes + p->size;
610     ISAMB_P pos;
611     struct ISAMB_block *sub_p1 = 0, *sub_p2 = 0;
612     char sub_item[DST_ITEM_MAX];
613     int sub_size;
614     int more = 0;
615     zint diff_terms = 0;
616
617     *sp = 0;
618
619     assert(p->size >= 0);
620     decode_ptr (&src, &pos);
621     while (src != endp)
622     {
623         zint item_len;
624         int d;
625         const char *src0 = src;
626         decode_ptr (&src, &item_len);
627         d = (*b->method->compare_item)(src, lookahead_item);
628         if (d > 0)
629         {
630             sub_p1 = open_block (b, pos);
631             assert (sub_p1);
632             diff_terms -= sub_p1->no_items;
633             more = insert_sub (b, &sub_p1, lookahead_item, mode,
634                                stream, &sub_p2, 
635                                sub_item, &sub_size, src);
636             diff_terms += sub_p1->no_items;
637             src = src0;
638             break;
639         }
640         src += item_len;
641         decode_ptr (&src, &pos);
642     }
643     if (!sub_p1)
644     {
645         sub_p1 = open_block (b, pos);
646         assert (sub_p1);
647         diff_terms -= sub_p1->no_items;
648         more = insert_sub (b, &sub_p1, lookahead_item, mode, stream, &sub_p2, 
649                            sub_item, &sub_size, last_max_item);
650         diff_terms += sub_p1->no_items;
651     }
652     if (sub_p2)
653         diff_terms += sub_p2->no_items;
654     if (diff_terms)
655     {
656         p->dirty = 1;
657         p->no_items += diff_terms;
658     }
659     if (sub_p2)
660     {
661         /* there was a split - must insert pointer in this one */
662         char dst_buf[DST_BUF_SIZE];
663         char *dst = dst_buf;
664
665         assert (sub_size < 80 && sub_size > 1);
666
667         memcpy (dst, startp, src - startp);
668                 
669         dst += src - startp;
670
671         encode_ptr (&dst, sub_size);      /* sub length and item */
672         memcpy (dst, sub_item, sub_size);
673         dst += sub_size;
674
675         encode_ptr (&dst, sub_p2->pos);   /* pos */
676
677         if (endp - src)                   /* remaining data */
678         {
679             memcpy (dst, src, endp - src);
680             dst += endp - src;
681         }
682         p->size = dst - dst_buf;
683         assert (p->size >= 0);
684
685
686         if (p->size <= b->file[p->cat].head.block_max)
687         {
688             memcpy (startp, dst_buf, dst - dst_buf);
689         }
690         else
691         {
692             struct ISAMB_block *sub_p3;
693             zint split_size_tmp;
694             zint no_items_first_half = 0;
695             int p_new_size;
696             const char *half;
697             src = dst_buf;
698             endp = dst;
699
700             half = src + b->file[p->cat].head.block_size/2;
701             decode_ptr (&src, &pos);
702
703             /* read sub block so we can get no_items for it */
704             sub_p3 = open_block(b, pos);
705             no_items_first_half += sub_p3->no_items;
706             close_block(b, sub_p3);
707
708             while (src <= half)
709             {
710                 decode_ptr (&src, &split_size_tmp);
711                 *split_size = (int) split_size_tmp;
712
713                 src += *split_size;
714                 decode_ptr (&src, &pos);
715
716                 /* read sub block so we can get no_items for it */
717                 sub_p3 = open_block(b, pos);
718                 no_items_first_half += sub_p3->no_items;
719                 close_block(b, sub_p3);
720             }
721             /*  p is first half */
722             p_new_size = src - dst_buf;
723             memcpy (p->bytes, dst_buf, p_new_size);
724
725             decode_ptr (&src, &split_size_tmp);
726             *split_size = (int) split_size_tmp;
727             memcpy (split_item, src, *split_size);
728             src += *split_size;
729
730             /*  *sp is second half */
731             *sp = new_int (b, p->cat);
732             (*sp)->size = endp - src;
733             memcpy ((*sp)->bytes, src, (*sp)->size);
734
735             p->size = p_new_size;
736
737             /*  adjust no_items in first&second half */
738             (*sp)->no_items = p->no_items - no_items_first_half;
739             p->no_items = no_items_first_half;
740         }
741         p->dirty = 1;
742         close_block (b, sub_p2);
743     }
744     close_block (b, sub_p1);
745     return more;
746 }
747
748 int insert_leaf (ISAMB b, struct ISAMB_block **sp1, void *lookahead_item,
749                  int *lookahead_mode, ISAMC_I *stream,
750                  struct ISAMB_block **sp2,
751                  void *sub_item, int *sub_size,
752                  const void *max_item)
753 {
754     struct ISAMB_block *p = *sp1;
755     char *endp = 0;
756     const char *src = 0;
757     char dst_buf[DST_BUF_SIZE], *dst = dst_buf;
758     int new_size;
759     void *c1 = (*b->method->codec.start)();
760     void *c2 = (*b->method->codec.start)();
761     int more = 1;
762     int quater = b->file[b->no_cat-1].head.block_max / 4;
763     char *mid_cut = dst_buf + quater * 2;
764     char *tail_cut = dst_buf + quater * 3;
765     char *maxp = dst_buf + b->file[b->no_cat-1].head.block_max;
766     char *half1 = 0;
767     char *half2 = 0;
768     char cut_item_buf[DST_ITEM_MAX];
769     int cut_item_size = 0;
770     int no_items = 0;    /* number of items (total) */
771     int no_items_1 = 0;  /* number of items (first half) */
772
773     if (p && p->size)
774     {
775         char file_item_buf[DST_ITEM_MAX];
776         char *file_item = file_item_buf;
777             
778         src = p->bytes;
779         endp = p->bytes + p->size;
780         (*b->method->codec.decode)(c1, &file_item, &src);
781         while (1)
782         {
783             const char *dst_item = 0; /* resulting item to be inserted */
784             char *lookahead_next;
785             int d = -1;
786             
787             if (lookahead_item)
788                 d = (*b->method->compare_item)(file_item_buf, lookahead_item);
789             
790             /* d now holds comparison between existing file item and
791                lookahead item 
792                d = 0: equal
793                d > 0: lookahead before file
794                d < 0: lookahead after file
795             */
796             if (d > 0)
797             {
798                 /* lookahead must be inserted */
799                 dst_item = lookahead_item;
800                 /* if this is not an insertion, it's really bad .. */
801                 if (!*lookahead_mode)
802                 {
803                     yaz_log (YLOG_WARN, "isamb: Inconsistent register (1)");
804                     assert (*lookahead_mode);
805                 }
806             }
807             else
808                 dst_item = file_item_buf;
809
810                
811             if (!*lookahead_mode && d == 0)
812             {
813                 /* it's a deletion and they match so there is nothing to be
814                    inserted anyway .. But mark the thing bad (file item
815                    was part of input.. The item will not be part of output */
816                 p->dirty = 1;
817             }
818             else if (!half1 && dst > mid_cut)
819             {
820                 /* we have reached the splitting point for the first time */
821                 const char *dst_item_0 = dst_item;
822                 half1 = dst; /* candidate for splitting */
823
824                 /* encode the resulting item */
825                 (*b->method->codec.encode)(c2, &dst, &dst_item);
826                 
827                 cut_item_size = dst_item - dst_item_0;
828                 assert(cut_item_size > 0);
829                 memcpy (cut_item_buf, dst_item_0, cut_item_size);
830                 
831                 half2 = dst;
832                 no_items_1 = no_items;
833                 no_items++;
834             }
835             else
836             {
837                 /* encode the resulting item */
838                 (*b->method->codec.encode)(c2, &dst, &dst_item);
839                 no_items++;
840             }
841
842             /* now move "pointers" .. result has been encoded .. */
843             if (d > 0)  
844             {
845                 /* we must move the lookahead pointer */
846
847                 if (dst > maxp)
848                     /* no more room. Mark lookahead as "gone".. */
849                     lookahead_item = 0;
850                 else
851                 {
852                     /* move it really.. */
853                     lookahead_next = lookahead_item;
854                     if (!(*stream->read_item)(stream->clientData,
855                                               &lookahead_next,
856                                               lookahead_mode))
857                     {
858                         /* end of stream reached: no "more" and no lookahead */
859                         lookahead_item = 0;
860                         more = 0;
861                     }
862                     if (lookahead_item && max_item &&
863                         (*b->method->compare_item)(max_item, lookahead_item) <= 0)
864                     {
865                         /* the lookahead goes beyond what we allow in this
866                            leaf. Mark it as "gone" */
867                         lookahead_item = 0;
868                     }
869                     
870                     p->dirty = 1;
871                 }
872             }
873             else if (d == 0)
874             {
875                 /* exact match .. move both pointers */
876
877                 lookahead_next = lookahead_item;
878                 if (!(*stream->read_item)(stream->clientData,
879                                           &lookahead_next, lookahead_mode))
880                 {
881                     lookahead_item = 0;
882                     more = 0;
883                 }
884                 if (src == endp)
885                     break;  /* end of file stream reached .. */
886                 file_item = file_item_buf; /* move file pointer */
887                 (*b->method->codec.decode)(c1, &file_item, &src);
888             }
889             else
890             {
891                 /* file pointer must be moved */
892                 if (src == endp)
893                     break;
894                 file_item = file_item_buf;
895                 (*b->method->codec.decode)(c1, &file_item, &src);
896             }
897         }
898     }
899     maxp = dst_buf + b->file[b->no_cat-1].head.block_max + quater;
900     /* this loop runs when we are "appending" to a leaf page. That is
901        either it's empty (new) or all file items have been read in
902        previous loop */
903     while (lookahead_item)
904     {
905         char *dst_item;
906         const char *src = lookahead_item;
907         char *dst_0 = dst;
908         
909         /* compare lookahead with max item */
910         if (max_item &&
911             (*b->method->compare_item)(max_item, lookahead_item) <= 0)
912         {
913             /* stop if we have reached the value of max item */
914             break;
915         }
916         if (!*lookahead_mode)
917         {
918             /* this is append. So a delete is bad */
919             yaz_log (YLOG_WARN, "isamb: Inconsistent register (2)");
920             abort();
921         }
922         else if (!half1 && dst > tail_cut)
923         {
924             const char *src_0 = src;
925             half1 = dst; /* candidate for splitting */
926             
927             (*b->method->codec.encode)(c2, &dst, &src);
928             
929             cut_item_size = src - src_0;
930             assert(cut_item_size > 0);
931             memcpy (cut_item_buf, src_0, cut_item_size);
932             
933             no_items_1 = no_items;
934             half2 = dst;
935         }
936         else
937             (*b->method->codec.encode)(c2, &dst, &src);
938
939         if (dst > maxp)
940         {
941             dst = dst_0;
942             break;
943         }
944         no_items++;
945         if (p)
946             p->dirty = 1;
947         dst_item = lookahead_item;
948         if (!(*stream->read_item)(stream->clientData, &dst_item,
949                                   lookahead_mode))
950         {
951             lookahead_item = 0;
952             more = 0;
953         }
954     }
955     new_size = dst - dst_buf;
956     if (p && p->cat != b->no_cat-1 && 
957         new_size > b->file[p->cat].head.block_max)
958     {
959         /* non-btree block will be removed */
960         p->deleted = 1;
961         close_block (b, p);
962         /* delete it too!! */
963         p = 0; /* make a new one anyway */
964     }
965     if (!p)
966     {   /* must create a new one */
967         int i;
968         for (i = 0; i < b->no_cat; i++)
969             if (new_size <= b->file[i].head.block_max)
970                 break;
971         if (i == b->no_cat)
972             i = b->no_cat - 1;
973         p = new_leaf (b, i);
974     }
975     if (new_size > b->file[p->cat].head.block_max)
976     {
977         char *first_dst;
978         const char *cut_item = cut_item_buf;
979
980         assert (half1);
981         assert (half2);
982
983         assert(cut_item_size > 0);
984         
985         /* first half */
986         p->size = half1 - dst_buf;
987         memcpy (p->bytes, dst_buf, half1 - dst_buf);
988         p->no_items = no_items_1;
989
990         /* second half */
991         *sp2 = new_leaf (b, p->cat);
992
993         (*b->method->codec.reset)(c2);
994
995         first_dst = (*sp2)->bytes;
996
997         (*b->method->codec.encode)(c2, &first_dst, &cut_item);
998
999         memcpy (first_dst, half2, dst - half2);
1000
1001         (*sp2)->size = (first_dst - (*sp2)->bytes) + (dst - half2);
1002         (*sp2)->no_items = no_items - no_items_1;
1003         (*sp2)->dirty = 1;
1004         p->dirty = 1;
1005         memcpy (sub_item, cut_item_buf, cut_item_size);
1006         *sub_size = cut_item_size;
1007     }
1008     else
1009     {
1010         memcpy (p->bytes, dst_buf, dst - dst_buf);
1011         p->size = new_size;
1012         p->no_items = no_items;
1013     }
1014     (*b->method->codec.stop)(c1);
1015     (*b->method->codec.stop)(c2);
1016     *sp1 = p;
1017     return more;
1018 }
1019
1020 int insert_sub (ISAMB b, struct ISAMB_block **p, void *new_item,
1021                 int *mode,
1022                 ISAMC_I *stream,
1023                 struct ISAMB_block **sp,
1024                 void *sub_item, int *sub_size,
1025                 const void *max_item)
1026 {
1027     if (!*p || (*p)->leaf)
1028         return insert_leaf (b, p, new_item, mode, stream, sp, sub_item, 
1029                             sub_size, max_item);
1030     else
1031         return insert_int (b, *p, new_item, mode, stream, sp, sub_item,
1032                            sub_size, max_item);
1033 }
1034
1035 int isamb_unlink (ISAMB b, ISAMC_P pos)
1036 {
1037     struct ISAMB_block *p1;
1038
1039     if (!pos)
1040         return 0;
1041     p1 = open_block(b, pos);
1042     p1->deleted = 1;
1043     if (!p1->leaf)
1044     {
1045         zint sub_p;
1046         zint item_len;
1047         const char *src = p1->bytes + p1->offset;
1048
1049         decode_ptr(&src, &sub_p);
1050         isamb_unlink(b, sub_p);
1051         
1052         while (src != p1->bytes + p1->size)
1053         {
1054             decode_ptr(&src, &item_len);
1055             src += item_len;
1056             decode_ptr(&src, &sub_p);
1057             isamb_unlink(b, sub_p);
1058         }
1059     }
1060     close_block(b, p1);
1061     return 0;
1062 }
1063
1064 ISAMB_P isamb_merge (ISAMB b, ISAMC_P pos, ISAMC_I *stream)
1065 {
1066     char item_buf[DST_ITEM_MAX];
1067     char *item_ptr;
1068     int i_mode;
1069     int more;
1070     int must_delete = 0;
1071
1072     if (b->cache < 0)
1073     {
1074         int more = 1;
1075         while (more)
1076         {
1077             item_ptr = item_buf;
1078             more =
1079                 (*stream->read_item)(stream->clientData, &item_ptr, &i_mode);
1080         }
1081         return 1;
1082     }
1083     item_ptr = item_buf;
1084     more = (*stream->read_item)(stream->clientData, &item_ptr, &i_mode);
1085     while (more)
1086     {
1087         struct ISAMB_block *p = 0, *sp = 0;
1088         char sub_item[DST_ITEM_MAX];
1089         int sub_size;
1090         
1091         if (pos)
1092             p = open_block (b, pos);
1093         more = insert_sub (b, &p, item_buf, &i_mode, stream, &sp,
1094                            sub_item, &sub_size, 0);
1095         if (sp)
1096         {    /* increase level of tree by one */
1097             struct ISAMB_block *p2 = new_int (b, p->cat);
1098             char *dst = p2->bytes + p2->size;
1099             
1100             encode_ptr (&dst, p->pos);
1101             assert (sub_size < 40);
1102             encode_ptr (&dst, sub_size);
1103             memcpy (dst, sub_item, sub_size);
1104             dst += sub_size;
1105             encode_ptr (&dst, sp->pos);
1106             
1107             p2->size = dst - p2->bytes;
1108             p2->no_items = p->no_items + sp->no_items;
1109             pos = p2->pos;  /* return new super page */
1110             close_block (b, sp);
1111             close_block (b, p2);
1112         }
1113         else
1114         {
1115             pos = p->pos;   /* return current one (again) */
1116         }
1117         if (p->no_items == 0)
1118             must_delete = 1;
1119         else
1120             must_delete = 0;
1121         close_block (b, p);
1122     }
1123     if (must_delete)
1124     {
1125         isamb_unlink(b, pos);
1126         return 0;
1127     }
1128     return pos;
1129 }
1130
1131 ISAMB_PP isamb_pp_open_x (ISAMB isamb, ISAMB_P pos, int *level, int scope)
1132 {
1133     ISAMB_PP pp = xmalloc (sizeof(*pp));
1134     int i;
1135
1136     assert(pos);
1137
1138     pp->isamb = isamb;
1139     pp->block = xmalloc (ISAMB_MAX_LEVEL * sizeof(*pp->block));
1140
1141     pp->pos = pos;
1142     pp->level = 0;
1143     pp->maxlevel=0;
1144     pp->total_size = 0;
1145     pp->no_blocks = 0;
1146     pp->skipped_numbers=0;
1147     pp->returned_numbers=0;
1148     pp->scope=scope;
1149     for (i=0;i<ISAMB_MAX_LEVEL;i++)
1150         pp->skipped_nodes[i] = pp->accessed_nodes[i]=0;
1151     while (1)
1152     {
1153         struct ISAMB_block *p = open_block (isamb, pos);
1154         const char *src = p->bytes + p->offset;
1155         pp->block[pp->level] = p;
1156
1157         pp->total_size += p->size;
1158         pp->no_blocks++;
1159         if (p->leaf)
1160             break;
1161         decode_ptr (&src, &pos);
1162         p->offset = src - p->bytes;
1163         pp->level++;
1164         pp->accessed_nodes[pp->level]++; 
1165     }
1166     pp->block[pp->level+1] = 0;
1167     pp->maxlevel=pp->level;
1168     if (level)
1169         *level = pp->level;
1170     return pp;
1171 }
1172
1173 ISAMB_PP isamb_pp_open (ISAMB isamb, ISAMB_P pos, int scope)
1174 {
1175     return isamb_pp_open_x (isamb, pos, 0, scope);
1176 }
1177
1178 void isamb_pp_close_x (ISAMB_PP pp, int *size, int *blocks)
1179 {
1180     int i;
1181     if (!pp)
1182         return;
1183     yaz_log(YLOG_DEBUG,"isamb_pp_close lev=%d returned "ZINT_FORMAT" values," 
1184                     "skipped "ZINT_FORMAT,
1185         pp->maxlevel, pp->skipped_numbers, pp->returned_numbers);
1186     for (i=pp->maxlevel;i>=0;i--)
1187         if ( pp->skipped_nodes[i] || pp->accessed_nodes[i])
1188             yaz_log(YLOG_DEBUG,"isamb_pp_close  level leaf-%d: "
1189                             ZINT_FORMAT" read, "ZINT_FORMAT" skipped", i,
1190                  pp->accessed_nodes[i], pp->skipped_nodes[i]);
1191     pp->isamb->skipped_numbers += pp->skipped_numbers;
1192     pp->isamb->returned_numbers += pp->returned_numbers;
1193     for (i=pp->maxlevel;i>=0;i--)
1194     {
1195         pp->isamb->accessed_nodes[i] += pp->accessed_nodes[i];
1196         pp->isamb->skipped_nodes[i] += pp->skipped_nodes[i];
1197     }
1198     if (size)
1199         *size = pp->total_size;
1200     if (blocks)
1201         *blocks = pp->no_blocks;
1202     for (i = 0; i <= pp->level; i++)
1203         close_block (pp->isamb, pp->block[i]);
1204     xfree (pp->block);
1205     xfree (pp);
1206 }
1207
1208 int isamb_block_info (ISAMB isamb, int cat)
1209 {
1210     if (cat >= 0 && cat < isamb->no_cat)
1211         return isamb->file[cat].head.block_size;
1212     return -1;
1213 }
1214
1215 void isamb_pp_close (ISAMB_PP pp)
1216 {
1217     isamb_pp_close_x (pp, 0, 0);
1218 }
1219
1220 /* simple recursive dumper .. */
1221 static void isamb_dump_r (ISAMB b, ISAMB_P pos, void (*pr)(const char *str),
1222                           int level)
1223 {
1224     char buf[1024];
1225     char prefix_str[1024];
1226     if (pos)
1227     {
1228         struct ISAMB_block *p = open_block (b, pos);
1229         sprintf(prefix_str, "%*s " ZINT_FORMAT " cat=%d size=%d max=%d items="
1230                 ZINT_FORMAT, level*2, "",
1231                 pos, p->cat, p->size, b->file[p->cat].head.block_max,
1232                 p->no_items);
1233         (*pr)(prefix_str);
1234         sprintf(prefix_str, "%*s " ZINT_FORMAT, level*2, "", pos);
1235         if (p->leaf)
1236         {
1237             while (p->offset < p->size)
1238             {
1239                 const char *src = p->bytes + p->offset;
1240                 char *dst = buf;
1241                 (*b->method->codec.decode)(p->decodeClientData, &dst, &src);
1242                 (*b->method->log_item)(YLOG_DEBUG, buf, prefix_str);
1243                 p->offset = src - (char*) p->bytes;
1244             }
1245             assert(p->offset == p->size);
1246         }
1247         else
1248         {
1249             const char *src = p->bytes + p->offset;
1250             ISAMB_P sub;
1251             zint item_len;
1252
1253             decode_ptr (&src, &sub);
1254             p->offset = src - (char*) p->bytes;
1255
1256             isamb_dump_r(b, sub, pr, level+1);
1257             
1258             while (p->offset < p->size)
1259             {
1260                 decode_ptr (&src, &item_len);
1261                 (*b->method->log_item)(YLOG_DEBUG, src, prefix_str);
1262                 src += item_len;
1263                 decode_ptr (&src, &sub);
1264                 
1265                 p->offset = src - (char*) p->bytes;
1266                 
1267                 isamb_dump_r(b, sub, pr, level+1);
1268             }           
1269         }
1270         close_block(b,p);
1271     }
1272 }
1273
1274 void isamb_dump (ISAMB b, ISAMB_P pos, void (*pr)(const char *str))
1275 {
1276     isamb_dump_r(b, pos, pr, 0);
1277 }
1278
1279 int isamb_pp_read (ISAMB_PP pp, void *buf)
1280 {
1281     return isamb_pp_forward(pp, buf, 0);
1282 }
1283
1284
1285 static int isamb_pp_on_right_node(ISAMB_PP pp, int level, const void *untilbuf)
1286 { /* looks one node higher to see if we should be on this node at all */
1287   /* useful in backing off quickly, and in avoiding tail descends */
1288   /* call with pp->level to begin with */
1289     struct ISAMB_block *p;
1290     int cmp;
1291     const char *src;
1292     zint item_len;
1293     assert(level>=0);
1294     if ( level == 0) {
1295 #if ISAMB_DEBUG
1296             yaz_log(YLOG_DEBUG,"isamb_pp_on_right returning true for root");
1297 #endif
1298         return 1; /* we can never skip the root node */
1299     }
1300     level--;
1301     p=pp->block[level];
1302     assert(p->offset <= p->size);
1303     if (p->offset < p->size )
1304     {
1305         assert(p->offset>0); 
1306         src=p->bytes + p->offset;
1307         decode_ptr(&src, &item_len);
1308 #if ISAMB_DEBUG
1309         (*pp->isamb->method->codec.log_item)(YLOG_DEBUG,untilbuf,"on_leaf: until");
1310         (*pp->isamb->method->codec.log_item)(YLOG_DEBUG,src,"on_leaf: value");
1311 #endif
1312         cmp=(*pp->isamb->method->compare_item)(untilbuf,src);
1313         if (cmp<pp->scope) {  /* cmp<2 */
1314 #if ISAMB_DEBUG
1315             yaz_log(YLOG_DEBUG,"isamb_pp_on_right returning true "
1316                             "cmp=%d lev=%d ofs=%d",cmp,level,p->offset);
1317 #endif
1318             return 1; 
1319         }
1320         else {
1321 #if ISAMB_DEBUG
1322             yaz_log(YLOG_DEBUG,"isamb_pp_on_right returning false "
1323                             "cmp=%d lev=%d ofs=%d",cmp,level,p->offset);
1324 #endif
1325             return 0; 
1326         }
1327     }
1328     else {
1329 #if ISAMB_DEBUG
1330         yaz_log(YLOG_DEBUG,"isamb_pp_on_right at tail, looking higher "
1331                         "lev=%d",level);
1332 #endif
1333         return isamb_pp_on_right_node(pp, level, untilbuf);
1334     }
1335 } /* isamb_pp_on_right_node */
1336
1337 static int isamb_pp_read_on_leaf(ISAMB_PP pp, void *buf)
1338 { /* reads the next item on the current leaf, returns 0 if end of leaf*/
1339     struct ISAMB_block *p = pp->block[pp->level];
1340     char *dst;
1341     const char *src;
1342     assert(pp);
1343     assert(buf);
1344     if (p->offset == p->size) {
1345 #if ISAMB_DEBUG
1346         yaz_log(YLOG_DEBUG,"isamb_pp_read_on_leaf returning 0 on node %d",p->pos);
1347 #endif
1348         return 0; /* at end of leaf */
1349     }
1350     src=p->bytes + p->offset;
1351     dst=buf;
1352     (*pp->isamb->method->codec.decode)(p->decodeClientData,&dst, &src);
1353     p->offset = src - (char*) p->bytes;
1354 #if ISAMB_DEBUG
1355     (*pp->isamb->method->codec.log_item)(YLOG_DEBUG, buf, "read_on_leaf returning 1");
1356 #endif
1357     pp->returned_numbers++;
1358     return 1;
1359 } /* read_on_leaf */
1360
1361 static int isamb_pp_forward_on_leaf(ISAMB_PP pp, void *buf, const void *untilbuf)
1362 { /* forwards on the current leaf, returns 0 if not found */
1363     int cmp;
1364     int skips=0;
1365     while (1){
1366         if (!isamb_pp_read_on_leaf(pp,buf))
1367             return 0;
1368         /* FIXME - this is an extra function call, inline the read? */
1369         cmp=(*pp->isamb->method->compare_item)(untilbuf,buf);
1370         if (cmp <pp->scope){  /* cmp<2  found a good one */
1371 #if ISAMB_DEBUG
1372             if (skips)
1373                 yaz_log(YLOG_DEBUG, "isam_pp_fwd_on_leaf skipped %d items",skips);
1374 #endif
1375             pp->returned_numbers++;
1376             return 1;
1377         }
1378         if (!skips)
1379             if (!isamb_pp_on_right_node(pp, pp->level, untilbuf))
1380                 return 0; /* never mind the rest of this leaf */
1381         pp->skipped_numbers++;
1382         skips++;
1383     }
1384 } /* forward_on_leaf */
1385
1386 static int isamb_pp_climb_level(ISAMB_PP pp, ISAMB_P *pos)
1387 { /* climbs higher in the tree, until finds a level with data left */
1388   /* returns the node to (consider to) descend to in *pos) */
1389     struct ISAMB_block *p = pp->block[pp->level];
1390     const char *src;
1391     zint item_len;
1392 #if ISAMB_DEBUG
1393     yaz_log(YLOG_DEBUG,"isamb_pp_climb_level starting "
1394                    "at level %d node %d ofs=%d sz=%d",
1395                     pp->level, p->pos, p->offset, p->size);
1396 #endif
1397     assert(pp->level >= 0);
1398     assert(p->offset <= p->size);
1399     if (pp->level==0)
1400     {
1401 #if ISAMB_DEBUG
1402         yaz_log(YLOG_DEBUG,"isamb_pp_climb_level returning 0 at root");
1403 #endif
1404         return 0;
1405     }
1406     assert(pp->level>0); 
1407     close_block(pp->isamb, pp->block[pp->level]);
1408     pp->block[pp->level]=0;
1409     (pp->level)--;
1410     p=pp->block[pp->level];
1411 #if ISAMB_DEBUG
1412     yaz_log(YLOG_DEBUG,"isamb_pp_climb_level climbed to level %d node %d ofs=%d",
1413                     pp->level, p->pos, p->offset);
1414 #endif
1415     assert(!p->leaf);
1416     assert(p->offset <= p->size);
1417     if (p->offset == p->size ) {
1418         /* we came from the last pointer, climb on */
1419         if (!isamb_pp_climb_level(pp,pos))
1420             return 0;
1421         p=pp->block[pp->level];
1422     }
1423     else
1424     {
1425         /* skip the child we just came from */
1426 #if ISAMB_DEBUG
1427         yaz_log(YLOG_DEBUG,"isam_pp_climb_level: skipping lev=%d ofs=%d sz=%d", 
1428                         pp->level, p->offset, p->size);
1429 #endif
1430         assert (p->offset < p->size );
1431         src=p->bytes + p->offset;
1432         decode_ptr(&src, &item_len);
1433         src += item_len;
1434         decode_ptr(&src, pos);
1435         p->offset=src - (char *)p->bytes;
1436             
1437     }
1438     return 1;
1439 } /* climb_level */
1440
1441
1442 static zint isamb_pp_forward_unode(ISAMB_PP pp, zint pos, const void *untilbuf)
1443 { /* scans a upper node until it finds a child <= untilbuf */
1444   /* pp points to the key value, as always. pos is the child read from */
1445   /* the buffer */
1446   /* if all values are too small, returns the last child in the node */
1447   /* FIXME - this can be detected, and avoided by looking at the */
1448   /* parent node, but that gets messy. Presumably the cost is */
1449   /* pretty low anyway */
1450     struct ISAMB_block *p = pp->block[pp->level];
1451     const char *src=p->bytes + p->offset;
1452     zint item_len;
1453     int cmp;
1454     zint nxtpos;
1455 #if ISAMB_DEBUG
1456     int skips=0;
1457     yaz_log(YLOG_DEBUG,"isamb_pp_forward_unode starting "
1458                    "at level %d node %d ofs=%di sz=%d",
1459                     pp->level, p->pos, p->offset, p->size);
1460 #endif
1461     assert(!p->leaf);
1462     assert(p->offset <= p->size);
1463     if (p->offset == p->size) {
1464 #if ISAMB_DEBUG
1465             yaz_log(YLOG_DEBUG,"isamb_pp_forward_unode returning at end "
1466                    "at level %d node %d ofs=%di sz=%d",
1467                     pp->level, p->pos, p->offset, p->size);
1468 #endif
1469         return pos; /* already at the end of it */
1470     }
1471     while(p->offset < p->size) {
1472         decode_ptr(&src,&item_len);
1473         cmp=(*pp->isamb->method->compare_item)(untilbuf,src);
1474         src+=item_len;
1475         decode_ptr(&src,&nxtpos);
1476         if (cmp<pp->scope) /* cmp<2 */
1477         {
1478 #if ISAMB_DEBUG
1479             yaz_log(YLOG_DEBUG,"isamb_pp_forward_unode returning a hit "
1480                    "at level %d node %d ofs=%d sz=%d",
1481                     pp->level, p->pos, p->offset, p->size);
1482 #endif
1483             return pos;
1484         } /* found one */
1485         pos=nxtpos;
1486         p->offset=src-(char*)p->bytes;
1487         (pp->skipped_nodes[pp->maxlevel - pp->level -1])++;
1488 #if ISAMB_DEBUG
1489         skips++;
1490 #endif
1491     }
1492 #if ISAMB_DEBUG
1493             yaz_log(YLOG_DEBUG,"isamb_pp_forward_unode returning at tail "
1494                    "at level %d node %d ofs=%d sz=%d skips=%d",
1495                     pp->level, p->pos, p->offset, p->size, skips);
1496 #endif
1497     return pos; /* that's the last one in the line */
1498     
1499 } /* forward_unode */
1500
1501 static void isamb_pp_descend_to_leaf(ISAMB_PP pp, ISAMB_P pos, const void *untilbuf)
1502 { /* climbs down the tree, from pos, to the leftmost leaf */
1503     struct ISAMB_block *p = pp->block[pp->level];
1504     const char *src;
1505     assert(!p->leaf);
1506 #if ISAMB_DEBUG
1507     yaz_log(YLOG_DEBUG,"isamb_pp_descend_to_leaf "
1508                    "starting at lev %d node %d ofs=%d lf=%d u=%p", 
1509                    pp->level, p->pos, p->offset, p->leaf, untilbuf);
1510 #endif
1511     if (untilbuf)
1512         pos=isamb_pp_forward_unode(pp,pos,untilbuf);
1513     ++(pp->level);
1514     assert(pos);
1515     p=open_block(pp->isamb, pos);
1516     pp->block[pp->level]=p;
1517     ++(pp->accessed_nodes[pp->maxlevel-pp->level]);
1518     ++(pp->no_blocks);
1519 #if ISAMB_DEBUG
1520     yaz_log(YLOG_DEBUG,"isamb_pp_descend_to_leaf "
1521                    "got lev %d node %d lf=%d", 
1522                    pp->level, p->pos, p->leaf);
1523 #endif
1524     if (p->leaf)
1525         return;
1526     assert (p->offset==0 );
1527     src=p->bytes + p->offset;
1528     decode_ptr(&src, &pos);
1529     p->offset=src-(char*)p->bytes;
1530     isamb_pp_descend_to_leaf(pp,pos,untilbuf);
1531 #if ISAMB_DEBUG
1532     yaz_log(YLOG_DEBUG,"isamb_pp_descend_to_leaf "
1533                    "returning at lev %d node %d ofs=%d lf=%d", 
1534                    pp->level, p->pos, p->offset, p->leaf);
1535 #endif
1536 } /* descend_to_leaf */
1537
1538 static int isamb_pp_find_next_leaf(ISAMB_PP pp)
1539 { /* finds the next leaf by climbing up and down */
1540     ISAMB_P pos;
1541     if (!isamb_pp_climb_level(pp,&pos))
1542         return 0;
1543     isamb_pp_descend_to_leaf(pp, pos,0);
1544     return 1;
1545 }
1546
1547 static int isamb_pp_climb_desc(ISAMB_PP pp,  const void *untilbuf)
1548 { /* climbs up and descends to a leaf where values >= *untilbuf are found */
1549     ISAMB_P pos;
1550 #if ISAMB_DEBUG
1551     struct ISAMB_block *p = pp->block[pp->level];
1552     yaz_log(YLOG_DEBUG,"isamb_pp_climb_desc starting "
1553                    "at level %d node %d ofs=%d sz=%d",
1554                     pp->level, p->pos, p->offset, p->size);
1555 #endif
1556     if (!isamb_pp_climb_level(pp,&pos))
1557         return 0;
1558     /* see if it would pay to climb one higher */
1559     if (!isamb_pp_on_right_node(pp, pp->level, untilbuf))
1560         if (!isamb_pp_climb_level(pp,&pos))
1561             return 0;
1562     isamb_pp_descend_to_leaf(pp, pos,untilbuf);
1563 #if ISAMB_DEBUG
1564     p = pp->block[pp->level];
1565     yaz_log(YLOG_DEBUG,"isamb_pp_climb_desc done "
1566                    "at level %d node %d ofs=%d sz=%d",
1567                     pp->level, p->pos, p->offset, p->size);
1568 #endif
1569     return 1;
1570 } /* climb_desc */
1571
1572 int isamb_pp_forward (ISAMB_PP pp, void *buf, const void *untilbuf)
1573 {
1574 #if ISAMB_DEBUG
1575     struct ISAMB_block *p = pp->block[pp->level];
1576     assert(p->leaf);
1577     yaz_log(YLOG_DEBUG,"isamb_pp_forward starting "
1578                    "at level %d node %d ofs=%d sz=%d u=%p sc=%d",
1579                     pp->level, p->pos, p->offset, p->size,untilbuf, scope);
1580 #endif
1581     if (untilbuf) {
1582         if (isamb_pp_forward_on_leaf( pp, buf, untilbuf)) {
1583 #if ISAMB_DEBUG
1584             yaz_log(YLOG_DEBUG,"isamb_pp_forward (f) returning (A) "
1585                    "at level %d node %d ofs=%d sz=%d",
1586                     pp->level, p->pos, p->offset, p->size);
1587 #endif
1588             return 1;
1589         }
1590         if (! isamb_pp_climb_desc( pp, untilbuf)) {
1591 #if ISAMB_DEBUG
1592             yaz_log(YLOG_DEBUG,"isamb_pp_forward (f) returning notfound (B) "
1593                    "at level %d node %d ofs=%d sz=%d",
1594                     pp->level, p->pos, p->offset, p->size);
1595 #endif
1596             return 0; /* could not find a leaf */
1597         }
1598         do{
1599             if (isamb_pp_forward_on_leaf( pp, buf, untilbuf)) {
1600 #if ISAMB_DEBUG
1601             yaz_log(YLOG_DEBUG,"isamb_pp_forward (f) returning (C) "
1602                    "at level %d node %d ofs=%d sz=%d",
1603                     pp->level, p->pos, p->offset, p->size);
1604 #endif
1605                 return 1;
1606             }
1607         }while ( isamb_pp_find_next_leaf(pp));
1608         return 0; /* could not find at all */
1609     }
1610     else { /* no untilbuf, a straight read */
1611         /* FIXME - this should be moved
1612          * directly into the pp_read */
1613         /* keeping here now, to keep same
1614          * interface as the old fwd */
1615         if (isamb_pp_read_on_leaf( pp, buf)) {
1616 #if ISAMB_DEBUG
1617             yaz_log(YLOG_DEBUG,"isamb_pp_forward (read) returning (D) "
1618                    "at level %d node %d ofs=%d sz=%d",
1619                     pp->level, p->pos, p->offset, p->size);
1620 #endif
1621             return 1;
1622         }
1623         if (isamb_pp_find_next_leaf(pp)) {
1624 #if ISAMB_DEBUG
1625             yaz_log(YLOG_DEBUG,"isamb_pp_forward (read) returning (E) "
1626                    "at level %d node %d ofs=%d sz=%d",
1627                     pp->level, p->pos, p->offset, p->size);
1628 #endif
1629             return isamb_pp_read_on_leaf(pp, buf);
1630         }
1631         else
1632             return 0;
1633     }
1634 } /* isam_pp_forward (new version) */
1635
1636 void isamb_pp_pos( ISAMB_PP pp, double *current, double *total )
1637 { /* return an estimate of the current position and of the total number of */
1638   /* occureences in the isam tree, based on the current leaf */
1639     struct ISAMB_block *p = pp->block[pp->level];
1640     assert(total);
1641     assert(current);
1642     assert(p->leaf);
1643
1644     *total = pp->block[0]->no_items;
1645     *current = (double) pp->returned_numbers;
1646 #if ISAMB_DEBUG
1647     yaz_log(YLOG_LOG, "isamb_pp_pos returning: cur= %0.1f tot=%0.1f rn="
1648          ZINT_FORMAT, *current, *total, pp->returned_numbers);
1649 #endif
1650 }