Removed some FIXMEs. Mostly comments on things that had already been fixed.
[idzebra-moved-to-github.git] / isamb / isamb.c
1 /* $Id: isamb.c,v 1.57 2004-09-03 14:59:49 heikki Exp $
2    Copyright (C) 1995,1996,1997,1998,1999,2000,2001,2002,2003,2004
3    Index Data Aps
4
5 This file is part of the Zebra server.
6
7 Zebra is free software; you can redistribute it and/or modify it under
8 the terms of the GNU General Public License as published by the Free
9 Software Foundation; either version 2, or (at your option) any later
10 version.
11
12 Zebra is distributed in the hope that it will be useful, but WITHOUT ANY
13 WARRANTY; without even the implied warranty of MERCHANTABILITY or
14 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
15 for more details.
16
17 You should have received a copy of the GNU General Public License
18 along with Zebra; see the file LICENSE.zebra.  If not, write to the
19 Free Software Foundation, 59 Temple Place - Suite 330, Boston, MA
20 02111-1307, USA.
21 */
22
23 #include <string.h>
24 #include <yaz/xmalloc.h>
25 #include <yaz/log.h>
26 #include <isamb.h>
27 #include <assert.h>
28
29 #ifndef ISAMB_DEBUG
30 #define ISAMB_DEBUG 0
31 #endif
32
33 #define ISAMB_MAJOR_VERSION 2
34 #define ISAMB_MINOR_VERSION 0
35
36 struct ISAMB_head {
37     zint first_block;
38     zint last_block;
39     zint free_list;
40     zint no_items;
41     int block_size;
42     int block_max;
43     int block_offset;
44 };
45
46 /* maximum size of encoded buffer */
47 #define DST_ITEM_MAX 256
48
49 #define ISAMB_MAX_LEVEL 10
50 /* approx 2*max page + max size of item */
51 #define DST_BUF_SIZE 16840
52
53 #define ISAMB_CACHE_ENTRY_SIZE 4096
54
55 /* CAT_MAX: _must_ be power of 2 */
56 #define CAT_MAX 4
57 #define CAT_MASK (CAT_MAX-1)
58 /* CAT_NO: <= CAT_MAX */
59 #define CAT_NO 4
60
61 /* ISAMB_PTR_CODEC=1 var, =0 fixed */
62 #define ISAMB_PTR_CODEC 1
63
64 struct ISAMB_cache_entry {
65     ISAMB_P pos;
66     unsigned char *buf;
67     int dirty;
68     int hits;
69     struct ISAMB_cache_entry *next;
70 };
71
72 struct ISAMB_file {
73     BFile bf;
74     int head_dirty;
75     struct ISAMB_head head;
76     struct ISAMB_cache_entry *cache_entries;
77 };
78
79 struct ISAMB_s {
80     BFiles bfs;
81     ISAMC_M *method;
82
83     struct ISAMB_file *file;
84     int no_cat;
85     int cache; /* 0=no cache, 1=use cache, -1=dummy isam (for testing only) */
86     int log_io;        /* log level for bf_read/bf_write calls */
87     int log_freelist;  /* log level for freelist handling */
88     zint skipped_numbers; /* on a leaf node */
89     zint returned_numbers; 
90     zint skipped_nodes[ISAMB_MAX_LEVEL]; /* [0]=skipped leaves, 1=higher etc */
91     zint accessed_nodes[ISAMB_MAX_LEVEL]; /* nodes we did not skip */
92 };
93
94 struct ISAMB_block {
95     ISAMB_P pos;
96     int cat;
97     int size;
98     int leaf;
99     int dirty;
100     int deleted;
101     int offset;
102     zint no_items;  /* number of nodes in this + children */
103     char *bytes;
104     char *cbuf;
105     unsigned char *buf;
106     void *decodeClientData;
107     int log_rw;
108 };
109
110 struct ISAMB_PP_s {
111     ISAMB isamb;
112     ISAMB_P pos;
113     int level;
114     int maxlevel; /* total depth */
115     zint total_size;
116     zint no_blocks;
117     zint skipped_numbers; /* on a leaf node */
118     zint returned_numbers; 
119     zint skipped_nodes[ISAMB_MAX_LEVEL]; /* [0]=skipped leaves, 1=higher etc */
120     zint accessed_nodes[ISAMB_MAX_LEVEL]; /* nodes we did not skip */
121     struct ISAMB_block **block;
122 };
123
124
125 #if ISAMB_PTR_CODEC
126 static void encode_ptr (char **dst, zint pos)
127 {
128     unsigned char *bp = (unsigned char*) *dst;
129
130     while (pos > 127)
131     {
132          *bp++ = 128 | (pos & 127);
133          pos = pos >> 7;
134     }
135     *bp++ = pos;
136     *dst = (char *) bp;
137 }
138 #else
139 static void encode_ptr (char **dst, zint pos)
140 {
141     memcpy(*dst, &pos, sizeof(pos));
142     (*dst) += sizeof(pos);
143 }
144 #endif
145
146 #if ISAMB_PTR_CODEC
147 static void decode_ptr (const char **src1, zint *pos)
148 {
149     const unsigned char **src = (const unsigned char **) src1;
150     zint d = 0;
151     unsigned char c;
152     unsigned r = 0;
153
154     while (((c = *(*src)++) & 128))
155     {
156         d += ((zint) (c & 127) << r);
157         r += 7;
158     }
159     d += ((zint) c << r);
160     *pos = d;
161 }
162 #else
163 static void decode_ptr (const char **src, zint *pos)
164 {
165      memcpy (pos, *src, sizeof(*pos));
166      (*src) += sizeof(*pos);
167 }
168 #endif
169
170 ISAMB isamb_open (BFiles bfs, const char *name, int writeflag, ISAMC_M *method,
171                   int cache)
172 {
173     ISAMB isamb = xmalloc (sizeof(*isamb));
174     int i, b_size = 32;
175
176     isamb->bfs = bfs;
177     isamb->method = (ISAMC_M *) xmalloc (sizeof(*method));
178     memcpy (isamb->method, method, sizeof(*method));
179     isamb->no_cat = CAT_NO;
180     isamb->log_io = 0;
181     isamb->log_freelist = 0;
182     isamb->cache = cache;
183     isamb->skipped_numbers=0;
184     isamb->returned_numbers=0;
185     for (i=0;i<ISAMB_MAX_LEVEL;i++)
186       isamb->skipped_nodes[i]= isamb->accessed_nodes[i]=0;
187
188     assert (cache == 0);
189     isamb->file = xmalloc (sizeof(*isamb->file) * isamb->no_cat);
190     for (i = 0; i < isamb->no_cat; i++)
191     {
192         char fname[DST_BUF_SIZE];
193         char hbuf[DST_BUF_SIZE];
194         isamb->file[i].cache_entries = 0;
195         isamb->file[i].head_dirty = 0;
196         sprintf (fname, "%s%c", name, i+'A');
197         if (cache)
198             isamb->file[i].bf = bf_open (bfs, fname, ISAMB_CACHE_ENTRY_SIZE,
199                                          writeflag);
200         else
201             isamb->file[i].bf = bf_open (bfs, fname, b_size, writeflag);
202
203         /* fill-in default values (for empty isamb) */
204         isamb->file[i].head.first_block = ISAMB_CACHE_ENTRY_SIZE/b_size+1;
205         isamb->file[i].head.last_block = isamb->file[i].head.first_block;
206         isamb->file[i].head.block_size = b_size;
207         if (i == isamb->no_cat-1 || b_size > 128)
208             isamb->file[i].head.block_offset = 8;
209         else
210             isamb->file[i].head.block_offset = 4;
211         isamb->file[i].head.block_max =
212             b_size - isamb->file[i].head.block_offset;
213         isamb->file[i].head.free_list = 0;
214         if (bf_read (isamb->file[i].bf, 0, 0, 0, hbuf))
215         {
216             /* got header assume "isamb"major minor len can fit in 16 bytes */
217             zint zint_tmp;
218             int major, minor, len, pos = 0;
219             int left;
220             const char *src = 0;
221             if (memcmp(hbuf, "isamb", 5))
222             {
223                 logf(LOG_WARN, "bad isamb header for file %s", fname);
224                 return 0;
225             }
226             if (sscanf(hbuf+5, "%d %d %d", &major, &minor, &len) != 3)
227             {
228                 logf(LOG_WARN, "bad isamb header for file %s", fname);
229                 return 0;
230             }
231             if (major != ISAMB_MAJOR_VERSION)
232             {
233                 logf(LOG_WARN, "bad major version for file %s %d, must be %d",
234                      fname, major, ISAMB_MAJOR_VERSION);
235                 return 0;
236             }
237             for (left = len - b_size; left > 0; left = left - b_size)
238             {
239                 pos++;
240                 if (!bf_read (isamb->file[i].bf, pos, 0, 0, hbuf + pos*b_size))
241                 {
242                     logf(LOG_WARN, "truncated isamb header for " 
243                          "file=%s len=%d pos=%d",
244                          fname, len, pos);
245                     return 0;
246                 }
247             }
248             src = hbuf + 16;
249             decode_ptr(&src, &isamb->file[i].head.first_block);
250             decode_ptr(&src, &isamb->file[i].head.last_block);
251             decode_ptr(&src, &zint_tmp);
252             isamb->file[i].head.block_size = zint_tmp;
253             decode_ptr(&src, &zint_tmp);
254             isamb->file[i].head.block_max = zint_tmp;
255             decode_ptr(&src, &isamb->file[i].head.free_list);
256         }
257         assert (isamb->file[i].head.block_size >= isamb->file[i].head.block_offset);
258         isamb->file[i].head_dirty = 0;
259         assert(isamb->file[i].head.block_size == b_size);
260         b_size = b_size * 4;
261     }
262 #if ISAMB_DEBUG
263     logf(LOG_WARN, "isamb debug enabled. Things will be slower than usual");
264 #endif
265     return isamb;
266 }
267
268 static void flush_blocks (ISAMB b, int cat)
269 {
270     while (b->file[cat].cache_entries)
271     {
272         struct ISAMB_cache_entry *ce_this = b->file[cat].cache_entries;
273         b->file[cat].cache_entries = ce_this->next;
274
275         if (ce_this->dirty)
276         {
277             yaz_log (b->log_io, "bf_write: flush_blocks");
278             bf_write (b->file[cat].bf, ce_this->pos, 0, 0, ce_this->buf);
279         }
280         xfree (ce_this->buf);
281         xfree (ce_this);
282     }
283 }
284
285 static int get_block (ISAMB b, ISAMC_P pos, char *userbuf, int wr)
286 {
287     int cat = (int) (pos&CAT_MASK);
288     int off = (int) (((pos/CAT_MAX) & 
289                (ISAMB_CACHE_ENTRY_SIZE / b->file[cat].head.block_size - 1))
290         * b->file[cat].head.block_size);
291     zint norm = pos / (CAT_MASK*ISAMB_CACHE_ENTRY_SIZE / b->file[cat].head.block_size);
292     int no = 0;
293     struct ISAMB_cache_entry **ce, *ce_this = 0, **ce_last = 0;
294
295     if (!b->cache)
296         return 0;
297
298     assert (ISAMB_CACHE_ENTRY_SIZE >= b->file[cat].head.block_size);
299     for (ce = &b->file[cat].cache_entries; *ce; ce = &(*ce)->next, no++)
300     {
301         ce_last = ce;
302         if ((*ce)->pos == norm)
303         {
304             ce_this = *ce;
305             *ce = (*ce)->next;   /* remove from list */
306             
307             ce_this->next = b->file[cat].cache_entries;  /* move to front */
308             b->file[cat].cache_entries = ce_this;
309             
310             if (wr)
311             {
312                 memcpy (ce_this->buf + off, userbuf, 
313                         b->file[cat].head.block_size);
314                 ce_this->dirty = 1;
315             }
316             else
317                 memcpy (userbuf, ce_this->buf + off,
318                         b->file[cat].head.block_size);
319             return 1;
320         }
321     }
322     if (no >= 40)
323     {
324         assert (no == 40);
325         assert (ce_last && *ce_last);
326         ce_this = *ce_last;
327         *ce_last = 0;  /* remove the last entry from list */
328         if (ce_this->dirty)
329         {
330             yaz_log (b->log_io, "bf_write: get_block");
331             bf_write (b->file[cat].bf, ce_this->pos, 0, 0, ce_this->buf);
332         }
333         xfree (ce_this->buf);
334         xfree (ce_this);
335     }
336     ce_this = xmalloc (sizeof(*ce_this));
337     ce_this->next = b->file[cat].cache_entries;
338     b->file[cat].cache_entries = ce_this;
339     ce_this->buf = xmalloc (ISAMB_CACHE_ENTRY_SIZE);
340     ce_this->pos = norm;
341     yaz_log (b->log_io, "bf_read: get_block");
342     if (!bf_read (b->file[cat].bf, norm, 0, 0, ce_this->buf))
343         memset (ce_this->buf, 0, ISAMB_CACHE_ENTRY_SIZE);
344     if (wr)
345     {
346         memcpy (ce_this->buf + off, userbuf, b->file[cat].head.block_size);
347         ce_this->dirty = 1;
348     }
349     else
350     {
351         ce_this->dirty = 0;
352         memcpy (userbuf, ce_this->buf + off, b->file[cat].head.block_size);
353     }
354     return 1;
355 }
356
357
358 void isamb_close (ISAMB isamb)
359 {
360     int i;
361     for (i=0;isamb->accessed_nodes[i];i++)
362         logf(LOG_DEBUG,"isamb_close  level leaf-%d: "ZINT_FORMAT" read, "
363                         ZINT_FORMAT" skipped",
364              i, isamb->accessed_nodes[i], isamb->skipped_nodes[i]);
365     logf(LOG_DEBUG,"isamb_close returned "ZINT_FORMAT" values, "
366                    "skipped "ZINT_FORMAT,
367          isamb->skipped_numbers, isamb->returned_numbers);
368     for (i = 0; i<isamb->no_cat; i++)
369     {
370         flush_blocks (isamb, i);
371         if (isamb->file[i].head_dirty)
372         {
373             char hbuf[DST_BUF_SIZE];
374             int major = ISAMB_MAJOR_VERSION;
375             int minor = ISAMB_MINOR_VERSION;
376             int len = 16;
377             char *dst = hbuf + 16;
378             int pos = 0, left;
379             int b_size = isamb->file[i].head.block_size;
380
381             encode_ptr(&dst, isamb->file[i].head.first_block);
382             encode_ptr(&dst, isamb->file[i].head.last_block);
383             encode_ptr(&dst, isamb->file[i].head.block_size);
384             encode_ptr(&dst, isamb->file[i].head.block_max);
385             encode_ptr(&dst, isamb->file[i].head.free_list);
386             memset(dst, '\0', 16); /* ensure no random bytes are written */
387
388             len = dst - hbuf;
389
390             /* print exactly 16 bytes (including trailing 0) */
391             sprintf(hbuf, "isamb%02d %02d %02d\r\n", major, minor, len);
392
393             bf_write (isamb->file[i].bf, pos, 0, 0, hbuf);
394
395             for (left = len - b_size; left > 0; left = left - b_size)
396             {
397                 pos++;
398                 bf_write (isamb->file[i].bf, pos, 0, 0, hbuf + pos*b_size);
399             }
400         }
401         bf_close (isamb->file[i].bf);
402     }
403     xfree (isamb->file);
404     xfree (isamb->method);
405     xfree (isamb);
406 }
407
408 /* open_block: read one block at pos.
409    Decode leading sys bytes .. consisting of
410    Offset:Meaning
411    0: leader byte, != 0 leaf, == 0, non-leaf
412    1-2: used size of block
413    3-7*: number of items and all children
414    
415    * Reserve 5 bytes for large block sizes. 1 for small ones .. Number
416    of items. We can thus have at most 2^40 nodes. 
417 */
418 static struct ISAMB_block *open_block (ISAMB b, ISAMC_P pos)
419 {
420     int cat = (int) (pos&CAT_MASK);
421     const char *src;
422     int offset = b->file[cat].head.block_offset;
423     struct ISAMB_block *p;
424     if (!pos)
425         return 0;
426     p = xmalloc (sizeof(*p));
427     p->pos = pos;
428     p->cat = (int) (pos & CAT_MASK);
429     p->buf = xmalloc (b->file[cat].head.block_size);
430     p->cbuf = 0;
431
432     if (!get_block (b, pos, p->buf, 0))
433     {
434         yaz_log (b->log_io, "bf_read: open_block");
435         if (!bf_read (b->file[cat].bf, pos/CAT_MAX, 0, 0, p->buf))
436         {
437             yaz_log (LOG_FATAL, "isamb: read fail for pos=%ld block=%ld",
438                      (long) pos, (long) pos/CAT_MAX);
439             abort();
440         }
441     }
442     p->bytes = p->buf + offset;
443     p->leaf = p->buf[0];
444     p->size = (p->buf[1] + 256 * p->buf[2]) - offset;
445     if (p->size < 0)
446     {
447         yaz_log (LOG_FATAL, "Bad block size %d in pos=" ZINT_FORMAT "\n",
448                  p->size, pos);
449     }
450     assert (p->size >= 0);
451     src = p->buf + 3;
452     decode_ptr(&src, &p->no_items);
453
454     p->offset = 0;
455     p->dirty = 0;
456     p->deleted = 0;
457     p->decodeClientData = (*b->method->codec.start)();
458     return p;
459 }
460
461 struct ISAMB_block *new_block (ISAMB b, int leaf, int cat)
462 {
463     struct ISAMB_block *p;
464
465     p = xmalloc (sizeof(*p));
466     p->buf = xmalloc (b->file[cat].head.block_size);
467
468     if (!b->file[cat].head.free_list)
469     {
470         zint block_no;
471         block_no = b->file[cat].head.last_block++;
472         p->pos = block_no * CAT_MAX + cat;
473     }
474     else
475     {
476         p->pos = b->file[cat].head.free_list;
477         assert((p->pos & CAT_MASK) == cat);
478         if (!get_block (b, p->pos, p->buf, 0))
479         {
480             yaz_log (b->log_io, "bf_read: new_block");
481             if (!bf_read (b->file[cat].bf, p->pos/CAT_MAX, 0, 0, p->buf))
482             {
483                 yaz_log (LOG_FATAL, "isamb: read fail for pos=%ld block=%ld",
484                          (long) p->pos/CAT_MAX, (long) p->pos/CAT_MAX);
485                 abort ();
486             }
487         }
488         yaz_log (b->log_freelist, "got block " ZINT_FORMAT " from freelist %d:" ZINT_FORMAT, p->pos,
489                  cat, p->pos/CAT_MAX);
490         memcpy (&b->file[cat].head.free_list, p->buf, sizeof(int));
491     }
492     p->cat = cat;
493     b->file[cat].head_dirty = 1;
494     memset (p->buf, 0, b->file[cat].head.block_size);
495     p->bytes = p->buf + b->file[cat].head.block_offset;
496     p->leaf = leaf;
497     p->size = 0;
498     p->dirty = 1;
499     p->deleted = 0;
500     p->offset = 0;
501     p->no_items = 0;
502     p->decodeClientData = (*b->method->codec.start)();
503     return p;
504 }
505
506 struct ISAMB_block *new_leaf (ISAMB b, int cat)
507 {
508     return new_block (b, 1, cat);
509 }
510
511
512 struct ISAMB_block *new_int (ISAMB b, int cat)
513 {
514     return new_block (b, 0, cat);
515 }
516
517 static void check_block (ISAMB b, struct ISAMB_block *p)
518 {
519     assert(b); /* mostly to make the compiler shut up about unused b */
520     if (p->leaf)
521     {
522         ;
523     }
524     else
525     {
526         /* sanity check */
527         char *startp = p->bytes;
528         const char *src = startp;
529         char *endp = p->bytes + p->size;
530         ISAMB_P pos;
531             
532         decode_ptr (&src, &pos);
533         assert ((pos&CAT_MASK) == p->cat);
534         while (src != endp)
535         {
536             zint item_len;
537             decode_ptr (&src, &item_len);
538             assert (item_len > 0 && item_len < 80);
539             src += item_len;
540             decode_ptr (&src, &pos);
541             if ((pos&CAT_MASK) != p->cat)
542             {
543                 assert ((pos&CAT_MASK) == p->cat);
544             }
545         }
546     }
547 }
548
549 void close_block (ISAMB b, struct ISAMB_block *p)
550 {
551     if (!p)
552         return;
553     if (p->deleted)
554     {
555         yaz_log (b->log_freelist, "release block " ZINT_FORMAT " from freelist %d:" ZINT_FORMAT,
556                  p->pos, p->cat, p->pos/CAT_MAX);
557         memcpy (p->buf, &b->file[p->cat].head.free_list, sizeof(int));
558         b->file[p->cat].head.free_list = p->pos;
559         if (!get_block (b, p->pos, p->buf, 1))
560         {
561             yaz_log (b->log_io, "bf_write: close_block (deleted)");
562             bf_write (b->file[p->cat].bf, p->pos/CAT_MAX, 0, 0, p->buf);
563         }
564     }
565     else if (p->dirty)
566     {
567         int offset = b->file[p->cat].head.block_offset;
568         int size = p->size + offset;
569         char *dst =  p->buf + 3;
570         assert (p->size >= 0);
571         
572         /* memset becuase encode_ptr usually does not write all bytes */
573         memset(p->buf, 0, b->file[p->cat].head.block_offset);
574         p->buf[0] = p->leaf;
575         p->buf[1] = size & 255;
576         p->buf[2] = size >> 8;
577         encode_ptr(&dst, p->no_items);
578         check_block(b, p);
579         if (!get_block (b, p->pos, p->buf, 1))
580         {
581             yaz_log (b->log_io, "bf_write: close_block");
582             bf_write (b->file[p->cat].bf, p->pos/CAT_MAX, 0, 0, p->buf);
583         }
584     }
585     (*b->method->codec.stop)(p->decodeClientData);
586     xfree (p->buf);
587     xfree (p);
588 }
589
590 int insert_sub (ISAMB b, struct ISAMB_block **p,
591                 void *new_item, int *mode,
592                 ISAMC_I *stream,
593                 struct ISAMB_block **sp,
594                 void *sub_item, int *sub_size,
595                 const void *max_item);
596
597 int insert_int (ISAMB b, struct ISAMB_block *p, void *lookahead_item,
598                 int *mode,
599                 ISAMC_I *stream, struct ISAMB_block **sp,
600                 void *split_item, int *split_size, const void *last_max_item)
601 {
602     char *startp = p->bytes;
603     const char *src = startp;
604     char *endp = p->bytes + p->size;
605     ISAMB_P pos;
606     struct ISAMB_block *sub_p1 = 0, *sub_p2 = 0;
607     char sub_item[DST_ITEM_MAX];
608     int sub_size;
609     int more = 0;
610     zint diff_terms = 0;
611
612     *sp = 0;
613
614     assert(p->size >= 0);
615     decode_ptr (&src, &pos);
616     while (src != endp)
617     {
618         zint item_len;
619         int d;
620         const char *src0 = src;
621         decode_ptr (&src, &item_len);
622         d = (*b->method->compare_item)(src, lookahead_item);
623         if (d > 0)
624         {
625             sub_p1 = open_block (b, pos);
626             assert (sub_p1);
627             diff_terms -= sub_p1->no_items;
628             more = insert_sub (b, &sub_p1, lookahead_item, mode,
629                                stream, &sub_p2, 
630                                sub_item, &sub_size, src);
631             diff_terms += sub_p1->no_items;
632             src = src0;
633             break;
634         }
635         src += item_len;
636         decode_ptr (&src, &pos);
637     }
638     if (!sub_p1)
639     {
640         sub_p1 = open_block (b, pos);
641         assert (sub_p1);
642         diff_terms -= sub_p1->no_items;
643         more = insert_sub (b, &sub_p1, lookahead_item, mode, stream, &sub_p2, 
644                            sub_item, &sub_size, last_max_item);
645         diff_terms += sub_p1->no_items;
646     }
647     if (sub_p2)
648         diff_terms += sub_p2->no_items;
649     if (diff_terms)
650     {
651         p->dirty = 1;
652         p->no_items += diff_terms;
653     }
654     if (sub_p2)
655     {
656         /* there was a split - must insert pointer in this one */
657         char dst_buf[DST_BUF_SIZE];
658         char *dst = dst_buf;
659
660         assert (sub_size < 80 && sub_size > 1);
661
662         memcpy (dst, startp, src - startp);
663                 
664         dst += src - startp;
665
666         encode_ptr (&dst, sub_size);      /* sub length and item */
667         memcpy (dst, sub_item, sub_size);
668         dst += sub_size;
669
670         encode_ptr (&dst, sub_p2->pos);   /* pos */
671
672         if (endp - src)                   /* remaining data */
673         {
674             memcpy (dst, src, endp - src);
675             dst += endp - src;
676         }
677         p->size = dst - dst_buf;
678         assert (p->size >= 0);
679
680
681         if (p->size <= b->file[p->cat].head.block_max)
682         {
683             memcpy (startp, dst_buf, dst - dst_buf);
684         }
685         else
686         {
687             struct ISAMB_block *sub_p3;
688             zint split_size_tmp;
689             zint no_items_first_half = 0;
690             int p_new_size;
691             const char *half;
692             src = dst_buf;
693             endp = dst;
694
695             half = src + b->file[p->cat].head.block_size/2;
696             decode_ptr (&src, &pos);
697
698             /* read sub block so we can get no_items for it */
699             sub_p3 = open_block(b, pos);
700             no_items_first_half += sub_p3->no_items;
701             close_block(b, sub_p3);
702
703             while (src <= half)
704             {
705                 decode_ptr (&src, &split_size_tmp);
706                 *split_size = (int) split_size_tmp;
707
708                 src += *split_size;
709                 decode_ptr (&src, &pos);
710
711                 /* read sub block so we can get no_items for it */
712                 sub_p3 = open_block(b, pos);
713                 no_items_first_half += sub_p3->no_items;
714                 close_block(b, sub_p3);
715             }
716             /*  p is first half */
717             p_new_size = src - dst_buf;
718             memcpy (p->bytes, dst_buf, p_new_size);
719
720             decode_ptr (&src, &split_size_tmp);
721             *split_size = (int) split_size_tmp;
722             memcpy (split_item, src, *split_size);
723             src += *split_size;
724
725             /*  *sp is second half */
726             *sp = new_int (b, p->cat);
727             (*sp)->size = endp - src;
728             memcpy ((*sp)->bytes, src, (*sp)->size);
729
730             p->size = p_new_size;
731
732             /*  adjust no_items in first&second half */
733             (*sp)->no_items = p->no_items - no_items_first_half;
734             p->no_items = no_items_first_half;
735         }
736         p->dirty = 1;
737         close_block (b, sub_p2);
738     }
739     close_block (b, sub_p1);
740     return more;
741 }
742
743 int insert_leaf (ISAMB b, struct ISAMB_block **sp1, void *lookahead_item,
744                  int *lookahead_mode, ISAMC_I *stream,
745                  struct ISAMB_block **sp2,
746                  void *sub_item, int *sub_size,
747                  const void *max_item)
748 {
749     struct ISAMB_block *p = *sp1;
750     char *endp = 0;
751     const char *src = 0;
752     char dst_buf[DST_BUF_SIZE], *dst = dst_buf;
753     int new_size;
754     void *c1 = (*b->method->codec.start)();
755     void *c2 = (*b->method->codec.start)();
756     int more = 1;
757     int quater = b->file[b->no_cat-1].head.block_max / 4;
758     char *mid_cut = dst_buf + quater * 2;
759     char *tail_cut = dst_buf + quater * 3;
760     char *maxp = dst_buf + b->file[b->no_cat-1].head.block_max;
761     char *half1 = 0;
762     char *half2 = 0;
763     char cut_item_buf[DST_ITEM_MAX];
764     int cut_item_size = 0;
765     int no_items = 0;    /* number of items (total) */
766     int no_items_1 = 0;  /* number of items (first half) */
767
768     if (p && p->size)
769     {
770         char file_item_buf[DST_ITEM_MAX];
771         char *file_item = file_item_buf;
772             
773         src = p->bytes;
774         endp = p->bytes + p->size;
775         (*b->method->codec.decode)(c1, &file_item, &src);
776         while (1)
777         {
778             const char *dst_item = 0; /* resulting item to be inserted */
779             char *lookahead_next;
780             int d = -1;
781             
782             if (lookahead_item)
783                 d = (*b->method->compare_item)(file_item_buf, lookahead_item);
784             
785             /* d now holds comparison between existing file item and
786                lookahead item 
787                d = 0: equal
788                d > 0: lookahead before file
789                d < 0: lookahead after file
790             */
791             if (d > 0)
792             {
793                 /* lookahead must be inserted */
794                 dst_item = lookahead_item;
795                 /* if this is not an insertion, it's really bad .. */
796                 if (!*lookahead_mode)
797                 {
798                     yaz_log (LOG_WARN, "isamb: Inconsistent register (1)");
799                     assert (*lookahead_mode);
800                 }
801             }
802             else
803                 dst_item = file_item_buf;
804
805                
806             if (!*lookahead_mode && d == 0)
807             {
808                 /* it's a deletion and they match so there is nothing to be
809                    inserted anyway .. But mark the thing bad (file item
810                    was part of input.. The item will not be part of output */
811                 p->dirty = 1;
812             }
813             else if (!half1 && dst > mid_cut)
814             {
815                 /* we have reached the splitting point for the first time */
816                 const char *dst_item_0 = dst_item;
817                 half1 = dst; /* candidate for splitting */
818
819                 /* encode the resulting item */
820                 (*b->method->codec.encode)(c2, &dst, &dst_item);
821                 
822                 cut_item_size = dst_item - dst_item_0;
823                 assert(cut_item_size > 0);
824                 memcpy (cut_item_buf, dst_item_0, cut_item_size);
825                 
826                 half2 = dst;
827                 no_items_1 = no_items;
828                 no_items++;
829             }
830             else
831             {
832                 /* encode the resulting item */
833                 (*b->method->codec.encode)(c2, &dst, &dst_item);
834                 no_items++;
835             }
836
837             /* now move "pointers" .. result has been encoded .. */
838             if (d > 0)  
839             {
840                 /* we must move the lookahead pointer */
841
842                 if (dst > maxp)
843                     /* no more room. Mark lookahead as "gone".. */
844                     lookahead_item = 0;
845                 else
846                 {
847                     /* move it really.. */
848                     lookahead_next = lookahead_item;
849                     if (!(*stream->read_item)(stream->clientData,
850                                               &lookahead_next,
851                                               lookahead_mode))
852                     {
853                         /* end of stream reached: no "more" and no lookahead */
854                         lookahead_item = 0;
855                         more = 0;
856                     }
857                     if (lookahead_item && max_item &&
858                         (*b->method->compare_item)(max_item, lookahead_item) <= 0)
859                     {
860                         /* the lookahead goes beyond what we allow in this
861                            leaf. Mark it as "gone" */
862                         lookahead_item = 0;
863                     }
864                     
865                     p->dirty = 1;
866                 }
867             }
868             else if (d == 0)
869             {
870                 /* exact match .. move both pointers */
871
872                 lookahead_next = lookahead_item;
873                 if (!(*stream->read_item)(stream->clientData,
874                                           &lookahead_next, lookahead_mode))
875                 {
876                     lookahead_item = 0;
877                     more = 0;
878                 }
879                 if (src == endp)
880                     break;  /* end of file stream reached .. */
881                 file_item = file_item_buf; /* move file pointer */
882                 (*b->method->codec.decode)(c1, &file_item, &src);
883             }
884             else
885             {
886                 /* file pointer must be moved */
887                 if (src == endp)
888                     break;
889                 file_item = file_item_buf;
890                 (*b->method->codec.decode)(c1, &file_item, &src);
891             }
892         }
893     }
894     maxp = dst_buf + b->file[b->no_cat-1].head.block_max + quater;
895     /* this loop runs when we are "appending" to a leaf page. That is
896        either it's empty (new) or all file items have been read in
897        previous loop */
898     while (lookahead_item)
899     {
900         char *dst_item;
901         const char *src = lookahead_item;
902         char *dst_0 = dst;
903         
904         /* compare lookahead with max item */
905         if (max_item &&
906             (*b->method->compare_item)(max_item, lookahead_item) <= 0)
907         {
908             /* stop if we have reached the value of max item */
909             break;
910         }
911         if (!*lookahead_mode)
912         {
913             /* this is append. So a delete is bad */
914             yaz_log (LOG_WARN, "isamb: Inconsistent register (2)");
915             abort();
916         }
917         else if (!half1 && dst > tail_cut)
918         {
919             const char *src_0 = src;
920             half1 = dst; /* candidate for splitting */
921             
922             (*b->method->codec.encode)(c2, &dst, &src);
923             
924             cut_item_size = src - src_0;
925             assert(cut_item_size > 0);
926             memcpy (cut_item_buf, src_0, cut_item_size);
927             
928             no_items_1 = no_items;
929             half2 = dst;
930         }
931         else
932             (*b->method->codec.encode)(c2, &dst, &src);
933
934         if (dst > maxp)
935         {
936             dst = dst_0;
937             break;
938         }
939         no_items++;
940         if (p)
941             p->dirty = 1;
942         dst_item = lookahead_item;
943         if (!(*stream->read_item)(stream->clientData, &dst_item,
944                                   lookahead_mode))
945         {
946             lookahead_item = 0;
947             more = 0;
948         }
949     }
950     new_size = dst - dst_buf;
951     if (p && p->cat != b->no_cat-1 && 
952         new_size > b->file[p->cat].head.block_max)
953     {
954         /* non-btree block will be removed */
955         p->deleted = 1;
956         close_block (b, p);
957         /* delete it too!! */
958         p = 0; /* make a new one anyway */
959     }
960     if (!p)
961     {   /* must create a new one */
962         int i;
963         for (i = 0; i < b->no_cat; i++)
964             if (new_size <= b->file[i].head.block_max)
965                 break;
966         if (i == b->no_cat)
967             i = b->no_cat - 1;
968         p = new_leaf (b, i);
969     }
970     if (new_size > b->file[p->cat].head.block_max)
971     {
972         char *first_dst;
973         const char *cut_item = cut_item_buf;
974
975         assert (half1);
976         assert (half2);
977
978         assert(cut_item_size > 0);
979         
980         /* first half */
981         p->size = half1 - dst_buf;
982         memcpy (p->bytes, dst_buf, half1 - dst_buf);
983         p->no_items = no_items_1;
984
985         /* second half */
986         *sp2 = new_leaf (b, p->cat);
987
988         (*b->method->codec.reset)(c2);
989
990         first_dst = (*sp2)->bytes;
991
992         (*b->method->codec.encode)(c2, &first_dst, &cut_item);
993
994         memcpy (first_dst, half2, dst - half2);
995
996         (*sp2)->size = (first_dst - (*sp2)->bytes) + (dst - half2);
997         (*sp2)->no_items = no_items - no_items_1;
998         (*sp2)->dirty = 1;
999         p->dirty = 1;
1000         memcpy (sub_item, cut_item_buf, cut_item_size);
1001         *sub_size = cut_item_size;
1002     }
1003     else
1004     {
1005         memcpy (p->bytes, dst_buf, dst - dst_buf);
1006         p->size = new_size;
1007         p->no_items = no_items;
1008     }
1009     (*b->method->codec.stop)(c1);
1010     (*b->method->codec.stop)(c2);
1011     *sp1 = p;
1012     return more;
1013 }
1014
1015 int insert_sub (ISAMB b, struct ISAMB_block **p, void *new_item,
1016                 int *mode,
1017                 ISAMC_I *stream,
1018                 struct ISAMB_block **sp,
1019                 void *sub_item, int *sub_size,
1020                 const void *max_item)
1021 {
1022     if (!*p || (*p)->leaf)
1023         return insert_leaf (b, p, new_item, mode, stream, sp, sub_item, 
1024                             sub_size, max_item);
1025     else
1026         return insert_int (b, *p, new_item, mode, stream, sp, sub_item,
1027                            sub_size, max_item);
1028 }
1029
1030 int isamb_unlink (ISAMB b, ISAMC_P pos)
1031 {
1032     struct ISAMB_block *p1;
1033
1034     if (!pos)
1035         return 0;
1036     p1 = open_block(b, pos);
1037     p1->deleted = 1;
1038     if (!p1->leaf)
1039     {
1040         zint sub_p;
1041         zint item_len;
1042         const char *src = p1->bytes + p1->offset;
1043
1044         decode_ptr(&src, &sub_p);
1045         isamb_unlink(b, sub_p);
1046         
1047         while (src != p1->bytes + p1->size)
1048         {
1049             decode_ptr(&src, &item_len);
1050             src += item_len;
1051             decode_ptr(&src, &sub_p);
1052             isamb_unlink(b, sub_p);
1053         }
1054     }
1055     close_block(b, p1);
1056     return 0;
1057 }
1058
1059 ISAMB_P isamb_merge (ISAMB b, ISAMC_P pos, ISAMC_I *stream)
1060 {
1061     char item_buf[DST_ITEM_MAX];
1062     char *item_ptr;
1063     int i_mode;
1064     int more;
1065     int must_delete = 0;
1066
1067     if (b->cache < 0)
1068     {
1069         int more = 1;
1070         while (more)
1071         {
1072             item_ptr = item_buf;
1073             more =
1074                 (*stream->read_item)(stream->clientData, &item_ptr, &i_mode);
1075         }
1076         return 1;
1077     }
1078     item_ptr = item_buf;
1079     more = (*stream->read_item)(stream->clientData, &item_ptr, &i_mode);
1080     while (more)
1081     {
1082         struct ISAMB_block *p = 0, *sp = 0;
1083         char sub_item[DST_ITEM_MAX];
1084         int sub_size;
1085         
1086         if (pos)
1087             p = open_block (b, pos);
1088         more = insert_sub (b, &p, item_buf, &i_mode, stream, &sp,
1089                            sub_item, &sub_size, 0);
1090         if (sp)
1091         {    /* increase level of tree by one */
1092             struct ISAMB_block *p2 = new_int (b, p->cat);
1093             char *dst = p2->bytes + p2->size;
1094             
1095             encode_ptr (&dst, p->pos);
1096             assert (sub_size < 40);
1097             encode_ptr (&dst, sub_size);
1098             memcpy (dst, sub_item, sub_size);
1099             dst += sub_size;
1100             encode_ptr (&dst, sp->pos);
1101             
1102             p2->size = dst - p2->bytes;
1103             p2->no_items = p->no_items + sp->no_items;
1104             pos = p2->pos;  /* return new super page */
1105             close_block (b, sp);
1106             close_block (b, p2);
1107         }
1108         else
1109         {
1110             pos = p->pos;   /* return current one (again) */
1111         }
1112         if (p->no_items == 0)
1113             must_delete = 1;
1114         else
1115             must_delete = 0;
1116         close_block (b, p);
1117     }
1118     if (must_delete)
1119     {
1120         isamb_unlink(b, pos);
1121         return 0;
1122     }
1123     return pos;
1124 }
1125
1126 ISAMB_PP isamb_pp_open_x (ISAMB isamb, ISAMB_P pos, int *level)
1127 {
1128     ISAMB_PP pp = xmalloc (sizeof(*pp));
1129     int i;
1130
1131     pp->isamb = isamb;
1132     pp->block = xmalloc (ISAMB_MAX_LEVEL * sizeof(*pp->block));
1133
1134     pp->pos = pos;
1135     pp->level = 0;
1136     pp->maxlevel=0;
1137     pp->total_size = 0;
1138     pp->no_blocks = 0;
1139     pp->skipped_numbers=0;
1140     pp->returned_numbers=0;
1141     for (i=0;i<ISAMB_MAX_LEVEL;i++)
1142         pp->skipped_nodes[i] = pp->accessed_nodes[i]=0;
1143     while (1)
1144     {
1145         struct ISAMB_block *p = open_block (isamb, pos);
1146         const char *src = p->bytes + p->offset;
1147         pp->block[pp->level] = p;
1148
1149         pp->total_size += p->size;
1150         pp->no_blocks++;
1151         if (p->leaf)
1152             break;
1153
1154                                         
1155         decode_ptr (&src, &pos);
1156         p->offset = src - p->bytes;
1157         pp->level++;
1158         pp->accessed_nodes[pp->level]++; 
1159     }
1160     pp->block[pp->level+1] = 0;
1161     pp->maxlevel=pp->level;
1162     if (level)
1163         *level = pp->level;
1164     return pp;
1165 }
1166
1167 ISAMB_PP isamb_pp_open (ISAMB isamb, ISAMB_P pos)
1168 {
1169     return isamb_pp_open_x (isamb, pos, 0);
1170 }
1171
1172 void isamb_pp_close_x (ISAMB_PP pp, int *size, int *blocks)
1173 {
1174     int i;
1175     if (!pp)
1176         return;
1177     logf(LOG_DEBUG,"isamb_pp_close lev=%d returned "ZINT_FORMAT" values," 
1178                     "skipped "ZINT_FORMAT,
1179         pp->maxlevel, pp->skipped_numbers, pp->returned_numbers);
1180     for (i=pp->maxlevel;i>=0;i--)
1181         if ( pp->skipped_nodes[i] || pp->accessed_nodes[i])
1182             logf(LOG_DEBUG,"isamb_pp_close  level leaf-%d: "
1183                             ZINT_FORMAT" read, "ZINT_FORMAT" skipped", i,
1184                  pp->accessed_nodes[i], pp->skipped_nodes[i]);
1185     pp->isamb->skipped_numbers += pp->skipped_numbers;
1186     pp->isamb->returned_numbers += pp->returned_numbers;
1187     for (i=pp->maxlevel;i>=0;i--)
1188     {
1189         pp->isamb->accessed_nodes[i] += pp->accessed_nodes[i];
1190         pp->isamb->skipped_nodes[i] += pp->skipped_nodes[i];
1191     }
1192     if (size)
1193         *size = pp->total_size;
1194     if (blocks)
1195         *blocks = pp->no_blocks;
1196     for (i = 0; i <= pp->level; i++)
1197         close_block (pp->isamb, pp->block[i]);
1198     xfree (pp->block);
1199     xfree (pp);
1200 }
1201
1202 int isamb_block_info (ISAMB isamb, int cat)
1203 {
1204     if (cat >= 0 && cat < isamb->no_cat)
1205         return isamb->file[cat].head.block_size;
1206     return -1;
1207 }
1208
1209 void isamb_pp_close (ISAMB_PP pp)
1210 {
1211     isamb_pp_close_x (pp, 0, 0);
1212 }
1213
1214 /* simple recursive dumper .. */
1215 static void isamb_dump_r (ISAMB b, ISAMB_P pos, void (*pr)(const char *str),
1216                           int level)
1217 {
1218     char buf[1024];
1219     char prefix_str[1024];
1220     if (pos)
1221     {
1222         struct ISAMB_block *p = open_block (b, pos);
1223         sprintf(prefix_str, "%*s " ZINT_FORMAT " cat=%d size=%d max=%d items="
1224                 ZINT_FORMAT, level*2, "",
1225                 pos, p->cat, p->size, b->file[p->cat].head.block_max,
1226                 p->no_items);
1227         (*pr)(prefix_str);
1228         sprintf(prefix_str, "%*s " ZINT_FORMAT, level*2, "", pos);
1229         if (p->leaf)
1230         {
1231             while (p->offset < p->size)
1232             {
1233                 const char *src = p->bytes + p->offset;
1234                 char *dst = buf;
1235                 (*b->method->codec.decode)(p->decodeClientData, &dst, &src);
1236                 (*b->method->log_item)(LOG_DEBUG, buf, prefix_str);
1237                 p->offset = src - (char*) p->bytes;
1238             }
1239             assert(p->offset == p->size);
1240         }
1241         else
1242         {
1243             const char *src = p->bytes + p->offset;
1244             ISAMB_P sub;
1245             zint item_len;
1246
1247             decode_ptr (&src, &sub);
1248             p->offset = src - (char*) p->bytes;
1249
1250             isamb_dump_r(b, sub, pr, level+1);
1251             
1252             while (p->offset < p->size)
1253             {
1254                 decode_ptr (&src, &item_len);
1255                 (*b->method->log_item)(LOG_DEBUG, src, prefix_str);
1256                 src += item_len;
1257                 decode_ptr (&src, &sub);
1258                 
1259                 p->offset = src - (char*) p->bytes;
1260                 
1261                 isamb_dump_r(b, sub, pr, level+1);
1262             }           
1263         }
1264         close_block(b,p);
1265     }
1266 }
1267
1268 void isamb_dump (ISAMB b, ISAMB_P pos, void (*pr)(const char *str))
1269 {
1270     isamb_dump_r(b, pos, pr, 0);
1271 }
1272
1273 #if 0
1274 /* Old isamb_pp_read that Adam wrote, kept as a reference in case we need to
1275    debug the more complex pp_read that also forwards. May be deleted near end
1276    of 2004, if it has not shown to be useful */
1277
1278
1279 int isamb_pp_read (ISAMB_PP pp, void *buf)
1280 {
1281     char *dst = buf;
1282     char *src;
1283     struct ISAMB_block *p = pp->block[pp->level];
1284     if (!p)
1285         return 0;
1286
1287     while (p->offset == p->size)
1288     {
1289         int pos, item_len;
1290         while (p->offset == p->size)
1291         {
1292             if (pp->level == 0)
1293                 return 0;
1294             close_block (pp->isamb, pp->block[pp->level]);
1295             pp->block[pp->level] = 0;
1296             (pp->level)--;
1297             p = pp->block[pp->level];
1298             assert (!p->leaf);  
1299         }
1300         src = p->bytes + p->offset;
1301         
1302         decode_ptr (&src, &item_len);
1303         src += item_len;
1304         decode_ptr (&src, &pos);
1305         
1306         p->offset = src - (char*) p->bytes;
1307
1308         ++(pp->level);
1309         
1310         while (1)
1311         {
1312             pp->block[pp->level] = p = open_block (pp->isamb, pos);
1313
1314             pp->total_size += p->size;
1315             pp->no_blocks++;
1316             
1317             if (p->leaf) 
1318             {
1319                 break;
1320             }
1321             src = p->bytes + p->offset;
1322             decode_ptr (&src, &pos);
1323             p->offset = src - (char*) p->bytes;
1324             pp->level++;
1325         }
1326     }
1327     assert (p->offset < p->size);
1328     assert (p->leaf);
1329     src = p->bytes + p->offset;
1330     (*pp->isamb->method->codec.code_item)(ISAMC_DECODE, p->decodeClientData,
1331                                     &dst, &src);
1332     p->offset = src - (char*) p->bytes;
1333     /* key_logdump_txt(LOG_DEBUG,buf, "isamb_pp_read returning 1"); */
1334     return 1;
1335 }
1336
1337 #else
1338 int isamb_pp_read (ISAMB_PP pp, void *buf)
1339 {
1340     return isamb_pp_forward(pp, buf, 0);
1341 }
1342 #endif
1343
1344 #define NEW_FORWARD 1
1345
1346 #if NEW_FORWARD == 1
1347
1348 static int isamb_pp_on_right_node(ISAMB_PP pp, int level, const void *untilbuf)
1349 { /* looks one node higher to see if we should be on this node at all */
1350   /* useful in backing off quickly, and in avoiding tail descends */
1351   /* call with pp->level to begin with */
1352     struct ISAMB_block *p;
1353     int cmp;
1354     const char *src;
1355     zint item_len;
1356     assert(level>=0);
1357     if ( level == 0) {
1358 #if ISAMB_DEBUG
1359             logf(LOG_DEBUG,"isamb_pp_on_right returning true for root");
1360 #endif
1361         return 1; /* we can never skip the root node */
1362     }
1363     level--;
1364     p=pp->block[level];
1365     assert(p->offset <= p->size);
1366     if (p->offset < p->size )
1367     {
1368         assert(p->offset>0); 
1369         src=p->bytes + p->offset;
1370         decode_ptr(&src, &item_len);
1371 #if ISAMB_DEBUG
1372         (*pp->isamb->method->codec.log_item)(LOG_DEBUG,untilbuf,"on_leaf: until");
1373         (*pp->isamb->method->codec.log_item)(LOG_DEBUG,src,"on_leaf: value");
1374 #endif
1375         cmp=(*pp->isamb->method->compare_item)(untilbuf,src);
1376         if (cmp<2) {
1377 #if ISAMB_DEBUG
1378             logf(LOG_DEBUG,"isamb_pp_on_right returning true "
1379                             "cmp=%d lev=%d ofs=%d",cmp,level,p->offset);
1380 #endif
1381             return 1; 
1382         }
1383         else {
1384 #if ISAMB_DEBUG
1385             logf(LOG_DEBUG,"isamb_pp_on_right returning false "
1386                             "cmp=%d lev=%d ofs=%d",cmp,level,p->offset);
1387 #endif
1388             return 0; 
1389         }
1390     }
1391     else {
1392 #if ISAMB_DEBUG
1393         logf(LOG_DEBUG,"isamb_pp_on_right at tail, looking higher "
1394                         "lev=%d",level);
1395 #endif
1396         return isamb_pp_on_right_node(pp, level, untilbuf);
1397     }
1398 } /* isamb_pp_on_right_node */
1399
1400 static int isamb_pp_read_on_leaf(ISAMB_PP pp, void *buf)
1401 { /* reads the next item on the current leaf, returns 0 if end of leaf*/
1402     struct ISAMB_block *p = pp->block[pp->level];
1403     char *dst;
1404     const char *src;
1405     assert(pp);
1406     assert(buf);
1407     if (p->offset == p->size) {
1408 #if ISAMB_DEBUG
1409         logf(LOG_DEBUG,"isamb_pp_read_on_leaf returning 0 on node %d",p->pos);
1410 #endif
1411         return 0; /* at end of leaf */
1412     }
1413     src=p->bytes + p->offset;
1414     dst=buf;
1415     (*pp->isamb->method->codec.decode)(p->decodeClientData,&dst, &src);
1416     p->offset = src - (char*) p->bytes;
1417     /*
1418 #if ISAMB_DEBUG
1419     (*pp->isamb->method->codec.log_item)(LOG_DEBUG, buf, "read_on_leaf returning 1");
1420 #endif
1421 */
1422     pp->returned_numbers++;
1423     return 1;
1424 } /* read_on_leaf */
1425
1426 static int isamb_pp_forward_on_leaf(ISAMB_PP pp, void *buf, const void *untilbuf)
1427 { /* forwards on the current leaf, returns 0 if not found */
1428     int cmp;
1429     int skips=0;
1430     while (1){
1431         if (!isamb_pp_read_on_leaf(pp,buf))
1432             return 0;
1433         /* FIXME - this is an extra function call, inline the read? */
1434         cmp=(*pp->isamb->method->compare_item)(untilbuf,buf);
1435         if (cmp <2){  /* found a good one */
1436 #if ISAMB_DEBUG
1437             if (skips)
1438                 logf(LOG_DEBUG, "isam_pp_fwd_on_leaf skipped %d items",skips);
1439 #endif
1440             pp->returned_numbers++;
1441             return 1;
1442         }
1443         if (!skips)
1444             if (!isamb_pp_on_right_node(pp, pp->level, untilbuf))
1445                 return 0; /* never mind the rest of this leaf */
1446         pp->skipped_numbers++;
1447         skips++;
1448     }
1449 } /* forward_on_leaf */
1450
1451 static int isamb_pp_climb_level(ISAMB_PP pp, ISAMB_P *pos)
1452 { /* climbs higher in the tree, until finds a level with data left */
1453   /* returns the node to (consider to) descend to in *pos) */
1454     struct ISAMB_block *p = pp->block[pp->level];
1455     const char *src;
1456     zint item_len;
1457 #if ISAMB_DEBUG
1458     logf(LOG_DEBUG,"isamb_pp_climb_level starting "
1459                    "at level %d node %d ofs=%d sz=%d",
1460                     pp->level, p->pos, p->offset, p->size);
1461 #endif
1462     assert(pp->level >= 0);
1463     assert(p->offset <= p->size);
1464     if (pp->level==0)
1465     {
1466 #if ISAMB_DEBUG
1467         logf(LOG_DEBUG,"isamb_pp_climb_level returning 0 at root");
1468 #endif
1469         return 0;
1470     }
1471     assert(pp->level>0); 
1472     close_block(pp->isamb, pp->block[pp->level]);
1473     pp->block[pp->level]=0;
1474     (pp->level)--;
1475     p=pp->block[pp->level];
1476 #if ISAMB_DEBUG
1477     logf(LOG_DEBUG,"isamb_pp_climb_level climbed to level %d node %d ofs=%d",
1478                     pp->level, p->pos, p->offset);
1479 #endif
1480     assert(!p->leaf);
1481     assert(p->offset <= p->size);
1482     if (p->offset == p->size ) {
1483         /* we came from the last pointer, climb on */
1484         if (!isamb_pp_climb_level(pp,pos))
1485             return 0;
1486         p=pp->block[pp->level];
1487     }
1488     else
1489     {
1490         /* skip the child we just came from */
1491 #if ISAMB_DEBUG
1492         logf(LOG_DEBUG,"isam_pp_climb_level: skipping lev=%d ofs=%d sz=%d", 
1493                         pp->level, p->offset, p->size);
1494 #endif
1495         assert (p->offset < p->size );
1496         src=p->bytes + p->offset;
1497         decode_ptr(&src, &item_len);
1498         src += item_len;
1499         decode_ptr(&src, pos);
1500         p->offset=src - (char *)p->bytes;
1501             
1502     }
1503     return 1;
1504 } /* climb_level */
1505
1506
1507 static zint isamb_pp_forward_unode(ISAMB_PP pp, zint pos, const void *untilbuf)
1508 { /* scans a upper node until it finds a child <= untilbuf */
1509   /* pp points to the key value, as always. pos is the child read from */
1510   /* the buffer */
1511   /* if all values are too small, returns the last child in the node */
1512   /* FIXME - this can be detected, and avoided by looking at the */
1513   /* parent node, but that gets messy. Presumably the cost is */
1514   /* pretty low anyway */
1515     struct ISAMB_block *p = pp->block[pp->level];
1516     const char *src=p->bytes + p->offset;
1517     zint item_len;
1518     int cmp;
1519     zint nxtpos;
1520 #if ISAMB_DEBUG
1521     int skips=0;
1522     logf(LOG_DEBUG,"isamb_pp_forward_unode starting "
1523                    "at level %d node %d ofs=%di sz=%d",
1524                     pp->level, p->pos, p->offset, p->size);
1525 #endif
1526     assert(!p->leaf);
1527     assert(p->offset <= p->size);
1528     if (p->offset == p->size) {
1529 #if ISAMB_DEBUG
1530             logf(LOG_DEBUG,"isamb_pp_forward_unode returning at end "
1531                    "at level %d node %d ofs=%di sz=%d",
1532                     pp->level, p->pos, p->offset, p->size);
1533 #endif
1534         return pos; /* already at the end of it */
1535     }
1536     while(p->offset < p->size) {
1537         decode_ptr(&src,&item_len);
1538         cmp=(*pp->isamb->method->compare_item)(untilbuf,src);
1539         src+=item_len;
1540         decode_ptr(&src,&nxtpos);
1541         if (cmp<2)
1542         {
1543 #if ISAMB_DEBUG
1544             logf(LOG_DEBUG,"isamb_pp_forward_unode returning a hit "
1545                    "at level %d node %d ofs=%d sz=%d",
1546                     pp->level, p->pos, p->offset, p->size);
1547 #endif
1548             return pos;
1549         } /* found one */
1550         pos=nxtpos;
1551         p->offset=src-(char*)p->bytes;
1552         (pp->skipped_nodes[pp->maxlevel - pp->level -1])++;
1553 #if ISAMB_DEBUG
1554         skips++;
1555 #endif
1556     }
1557 #if ISAMB_DEBUG
1558             logf(LOG_DEBUG,"isamb_pp_forward_unode returning at tail "
1559                    "at level %d node %d ofs=%d sz=%d skips=%d",
1560                     pp->level, p->pos, p->offset, p->size, skips);
1561 #endif
1562     return pos; /* that's the last one in the line */
1563     
1564 } /* forward_unode */
1565
1566 static void isamb_pp_descend_to_leaf(ISAMB_PP pp, ISAMB_P pos, const void *untilbuf)
1567 { /* climbs down the tree, from pos, to the leftmost leaf */
1568     struct ISAMB_block *p = pp->block[pp->level];
1569     const char *src;
1570     assert(!p->leaf);
1571 #if ISAMB_DEBUG
1572     logf(LOG_DEBUG,"isamb_pp_descend_to_leaf "
1573                    "starting at lev %d node %d ofs=%d lf=%d u=%p", 
1574                    pp->level, p->pos, p->offset, p->leaf, untilbuf);
1575 #endif
1576     if (untilbuf)
1577         pos=isamb_pp_forward_unode(pp,pos,untilbuf);
1578     ++(pp->level);
1579     assert(pos);
1580     p=open_block(pp->isamb, pos);
1581     pp->block[pp->level]=p;
1582     ++(pp->accessed_nodes[pp->maxlevel-pp->level]);
1583     ++(pp->no_blocks);
1584 #if ISAMB_DEBUG
1585     logf(LOG_DEBUG,"isamb_pp_descend_to_leaf "
1586                    "got lev %d node %d lf=%d", 
1587                    pp->level, p->pos, p->leaf);
1588 #endif
1589     if (p->leaf)
1590         return;
1591     assert (p->offset==0 );
1592     src=p->bytes + p->offset;
1593     decode_ptr(&src, &pos);
1594     p->offset=src-(char*)p->bytes;
1595     isamb_pp_descend_to_leaf(pp,pos,untilbuf);
1596 #if ISAMB_DEBUG
1597     logf(LOG_DEBUG,"isamb_pp_descend_to_leaf "
1598                    "returning at lev %d node %d ofs=%d lf=%d", 
1599                    pp->level, p->pos, p->offset, p->leaf);
1600 #endif
1601 } /* descend_to_leaf */
1602
1603 static int isamb_pp_find_next_leaf(ISAMB_PP pp)
1604 { /* finds the next leaf by climbing up and down */
1605     ISAMB_P pos;
1606     if (!isamb_pp_climb_level(pp,&pos))
1607         return 0;
1608     isamb_pp_descend_to_leaf(pp, pos,0);
1609     return 1;
1610 }
1611
1612 static int isamb_pp_climb_desc(ISAMB_PP pp,  const void *untilbuf)
1613 { /* climbs up and descends to a leaf where values >= *untilbuf are found */
1614     ISAMB_P pos;
1615 #if ISAMB_DEBUG
1616     struct ISAMB_block *p = pp->block[pp->level];
1617     logf(LOG_DEBUG,"isamb_pp_climb_desc starting "
1618                    "at level %d node %d ofs=%d sz=%d",
1619                     pp->level, p->pos, p->offset, p->size);
1620 #endif
1621     if (!isamb_pp_climb_level(pp,&pos))
1622         return 0;
1623     /* see if it would pay to climb one higher */
1624     if (!isamb_pp_on_right_node(pp, pp->level, untilbuf))
1625         if (!isamb_pp_climb_level(pp,&pos))
1626             return 0;
1627     isamb_pp_descend_to_leaf(pp, pos,untilbuf);
1628 #if ISAMB_DEBUG
1629     p = pp->block[pp->level];
1630     logf(LOG_DEBUG,"isamb_pp_climb_desc done "
1631                    "at level %d node %d ofs=%d sz=%d",
1632                     pp->level, p->pos, p->offset, p->size);
1633 #endif
1634     return 1;
1635 } /* climb_desc */
1636
1637 int isamb_pp_forward (ISAMB_PP pp, void *buf, const void *untilbuf)
1638 {
1639 #if ISAMB_DEBUG
1640     struct ISAMB_block *p = pp->block[pp->level];
1641     assert(p->leaf);
1642     logf(LOG_DEBUG,"isamb_pp_forward starting "
1643                    "at level %d node %d ofs=%d sz=%d u=%p",
1644                     pp->level, p->pos, p->offset, p->size,untilbuf);
1645 #endif
1646     if (untilbuf) {
1647         if (isamb_pp_forward_on_leaf( pp, buf, untilbuf)) {
1648 #if ISAMB_DEBUG
1649             logf(LOG_DEBUG,"isamb_pp_forward (f) returning (A) "
1650                    "at level %d node %d ofs=%d sz=%d",
1651                     pp->level, p->pos, p->offset, p->size);
1652 #endif
1653             return 1;
1654         }
1655         if (! isamb_pp_climb_desc( pp, untilbuf)) {
1656 #if ISAMB_DEBUG
1657             logf(LOG_DEBUG,"isamb_pp_forward (f) returning notfound (B) "
1658                    "at level %d node %d ofs=%d sz=%d",
1659                     pp->level, p->pos, p->offset, p->size);
1660 #endif
1661             return 0; /* could not find a leaf */
1662         }
1663         do{
1664             if (isamb_pp_forward_on_leaf( pp, buf, untilbuf)) {
1665 #if ISAMB_DEBUG
1666             logf(LOG_DEBUG,"isamb_pp_forward (f) returning (C) "
1667                    "at level %d node %d ofs=%d sz=%d",
1668                     pp->level, p->pos, p->offset, p->size);
1669 #endif
1670                 return 1;
1671             }
1672         }while ( isamb_pp_find_next_leaf(pp));
1673         return 0; /* could not find at all */
1674     }
1675     else { /* no untilbuf, a straight read */
1676         /* FIXME - this should be moved
1677          * directly into the pp_read */
1678         /* keeping here now, to keep same
1679          * interface as the old fwd */
1680         if (isamb_pp_read_on_leaf( pp, buf)) {
1681 #if ISAMB_DEBUG
1682             logf(LOG_DEBUG,"isamb_pp_forward (read) returning (D) "
1683                    "at level %d node %d ofs=%d sz=%d",
1684                     pp->level, p->pos, p->offset, p->size);
1685 #endif
1686             return 1;
1687         }
1688         if (isamb_pp_find_next_leaf(pp)) {
1689 #if ISAMB_DEBUG
1690             logf(LOG_DEBUG,"isamb_pp_forward (read) returning (E) "
1691                    "at level %d node %d ofs=%d sz=%d",
1692                     pp->level, p->pos, p->offset, p->size);
1693 #endif
1694             return isamb_pp_read_on_leaf(pp, buf);
1695         }
1696         else
1697             return 0;
1698     }
1699 } /* isam_pp_forward (new version) */
1700
1701 #elif NEW_FORWARD == 0
1702
1703 int isamb_pp_forward (ISAMB_PP pp, void *buf, const void *untilbuf)
1704 {
1705     /* pseudocode:
1706      *   while 1
1707      *     while at end of node
1708      *       climb higher. If out, return 0
1709      *     while not on a leaf (and not at its end)
1710      *       decode next
1711      *       if cmp
1712      *         descend to node
1713      *     decode next
1714      *     if cmp
1715      *       return 1
1716      */
1717      /* 
1718       * The upper nodes consist of a sequence of nodenumbers and keys
1719       * When opening a block,  the first node number is read in, and
1720       * offset points to the first key, which is the upper limit of keys
1721       * in the node just read.
1722       */
1723     char *dst = buf;
1724     const char *src;
1725     struct ISAMB_block *p = pp->block[pp->level];
1726     int cmp;
1727     int item_len;
1728     int pos;
1729     int nxtpos;
1730     int descending=0; /* used to prevent a border condition error */
1731     if (!p)
1732         return 0;
1733 #if ISAMB_DEBUG
1734     logf(LOG_DEBUG,"isamb_pp_forward starting [%p] p=%d",pp,p->pos);
1735     
1736     (*pp->isamb->method->codec.log_item)(LOG_DEBUG, untilbuf, "until");
1737     (*pp->isamb->method->codec.log_item)(LOG_DEBUG, buf, "buf");
1738 #endif
1739
1740     while (1)
1741     {
1742         while ( (p->offset == p->size) && !descending )
1743         {  /* end of this block - climb higher */
1744             assert (p->offset <= p->size);
1745 #if ISAMB_DEBUG
1746             logf(LOG_DEBUG,"isamb_pp_forward climbing from l=%d",
1747                             pp->level);
1748 #endif
1749             if (pp->level == 0)
1750             {
1751 #if ISAMB_DEBUG
1752                 logf(LOG_DEBUG,"isamb_pp_forward returning 0 at root");
1753 #endif
1754                 return 0; /* at end of the root, nothing left */
1755             }
1756             close_block(pp->isamb, pp->block[pp->level]);
1757             pp->block[pp->level]=0;
1758             (pp->level)--;
1759             p=pp->block[pp->level];
1760 #if ISAMB_DEBUG
1761             logf(LOG_DEBUG,"isamb_pp_forward climbed to node %d off=%d",
1762                             p->pos, p->offset);
1763 #endif
1764             assert(!p->leaf);
1765             assert(p->offset <= p->size);
1766             /* skip the child we have handled */
1767             if (p->offset != p->size)
1768             { 
1769                 src = p->bytes + p->offset;
1770                 decode_ptr(&src, &item_len);
1771 #if ISAMB_DEBUG         
1772                 (*pp->isamb->method->codec.log_item)(LOG_DEBUG, src,
1773                                                " isamb_pp_forward "
1774                                                "climb skipping old key");
1775 #endif
1776                 src += item_len;
1777                 decode_ptr(&src,&pos);
1778                 p->offset = src - (char*) p->bytes;
1779                 break; /* even if this puts us at the end of the block, we
1780                           need to descend to the last pos. UGLY coding,
1781                           clean up some day */
1782             }
1783         }
1784         if (!p->leaf)
1785         { 
1786             src = p->bytes + p->offset;
1787             if (p->offset == p->size)
1788                 cmp=-2 ; /* descend to the last node, as we have
1789                             no value to cmp */
1790             else
1791             {
1792                 decode_ptr(&src, &item_len);
1793 #if ISAMB_DEBUG
1794                 logf(LOG_DEBUG,"isamb_pp_forward (B) on a high node. "
1795                      "ofs=%d sz=%d nxtpos=%d ",
1796                         p->offset,p->size,pos);
1797                 (*pp->isamb->method->codec.log_item)(LOG_DEBUG, src, "");
1798 #endif
1799                 if (untilbuf)
1800                     cmp=(*pp->isamb->method->compare_item)(untilbuf,src);
1801                 else
1802                     cmp=-2;
1803                 src += item_len;
1804                 decode_ptr(&src,&nxtpos);
1805             }
1806             if (cmp<2)
1807             { 
1808 #if ISAMB_DEBUG
1809                 logf(LOG_DEBUG,"isambb_pp_forward descending l=%d p=%d ",
1810                             pp->level, pos);
1811 #endif
1812                 descending=1; /* prevent climbing for a while */
1813                 ++(pp->level);
1814                 p = open_block(pp->isamb,pos);
1815                 pp->block[pp->level] = p ;
1816                 pp->total_size += p->size;
1817                 (pp->accessed_nodes[pp->maxlevel - pp->level])++;
1818                 pp->no_blocks++;
1819                 if ( !p->leaf)
1820                 { /* block starts with a pos */
1821                     src = p->bytes + p->offset;
1822                     decode_ptr(&src,&pos);
1823                     p->offset=src-(char*) p->bytes;
1824 #if ISAMB_DEBUG
1825                     logf(LOG_DEBUG,"isamb_pp_forward: block %d starts with %d",
1826                                     p->pos, pos);
1827 #endif
1828                 } 
1829             } /* descend to the node */
1830             else
1831             { /* skip the node */
1832                 p->offset = src - (char*) p->bytes;
1833                 pos=nxtpos;
1834                 (pp->skipped_nodes[pp->maxlevel - pp->level -1])++;
1835 #if ISAMB_DEBUG
1836                 logf(LOG_DEBUG,
1837                     "isamb_pp_forward: skipping block on level %d, noting "
1838                      "on %d (%d)",
1839                     pp->level, pp->maxlevel - pp->level-1 , 
1840                     pp->skipped_nodes[pp->maxlevel - pp->level-1 ]);
1841 #endif
1842                 /* 0 is always leafs, 1 is one level above leafs etc, no
1843                  * matter how high tree */
1844             }
1845         } /* not on a leaf */
1846         else
1847         { /* on a leaf */
1848             if (p->offset == p->size) { 
1849                 descending = 0;
1850             }
1851             else
1852             {
1853                 assert (p->offset < p->size);
1854                 src = p->bytes + p->offset;
1855                 dst=buf;
1856                 (*pp->isamb->method->codec.decode)(p->decodeClientData,
1857                                                 &dst, &src);
1858                 p->offset = src - (char*) p->bytes;
1859                 if (untilbuf)
1860                     cmp=(*pp->isamb->method->compare_item)(untilbuf,buf);
1861                 else
1862                     cmp=-2;
1863 #if ISAMB_DEBUG
1864                 logf(LOG_DEBUG,"isamb_pp_forward on a leaf. cmp=%d", 
1865                      cmp);
1866                 (*pp->isamb->method->codec.log_item)(LOG_DEBUG, buf, "");
1867 #endif
1868                 if (cmp <2)
1869                 {
1870 #if ISAMB_DEBUG
1871                     if (untilbuf)
1872                     {
1873                         (*pp->isamb->method->codec.log_item)(
1874                             LOG_DEBUG, buf,  "isamb_pp_forward returning 1");
1875                     }
1876                     else
1877                     {
1878                         (*pp->isamb->method->codec.log_item)(
1879                             LOG_DEBUG, buf, "isamb_pp_read returning 1 (fwd)");
1880                     }
1881 #endif
1882                     pp->returned_numbers++;
1883                     return 1;
1884                 }
1885                 else
1886                     pp->skipped_numbers++;
1887             }
1888         } /* leaf */
1889     } /* main loop */
1890 }
1891
1892 #elif NEW_FORWARD == 2
1893
1894 int isamb_pp_forward (ISAMB_PP pp, void *buf, const void *untilb)
1895 {
1896     char *dst = buf;
1897     const char *src;
1898     struct ISAMB_block *p = pp->block[pp->level];
1899     if (!p)
1900         return 0;
1901
1902 again:
1903     while (p->offset == p->size)
1904     {
1905         int pos, item_len;
1906         while (p->offset == p->size)
1907         {
1908             if (pp->level == 0)
1909                 return 0;
1910             close_block (pp->isamb, pp->block[pp->level]);
1911             pp->block[pp->level] = 0;
1912             (pp->level)--;
1913             p = pp->block[pp->level];
1914             assert (!p->leaf);  
1915         }
1916
1917         assert(!p->leaf);
1918         src = p->bytes + p->offset;
1919         
1920         decode_ptr (&src, &item_len);
1921         src += item_len;
1922         decode_ptr (&src, &pos);
1923         
1924         p->offset = src - (char*) p->bytes;
1925
1926         src = p->bytes + p->offset;
1927
1928         while(1)
1929         {
1930             if (!untilb || p->offset == p->size)
1931                 break;
1932             assert(p->offset < p->size);
1933             decode_ptr (&src, &item_len);
1934             if ((*pp->isamb->method->compare_item)(untilb, src) <= 1)
1935                 break;
1936             src += item_len;
1937             decode_ptr (&src, &pos);
1938             p->offset = src - (char*) p->bytes;
1939         }
1940
1941         pp->level++;
1942
1943         while (1)
1944         {
1945             pp->block[pp->level] = p = open_block (pp->isamb, pos);
1946
1947             pp->total_size += p->size;
1948             pp->no_blocks++;
1949             
1950             if (p->leaf) 
1951             {
1952                 break;
1953             }
1954             
1955             src = p->bytes + p->offset;
1956             while(1)
1957             {
1958                 decode_ptr (&src, &pos);
1959                 p->offset = src - (char*) p->bytes;
1960                 
1961                 if (!untilb || p->offset == p->size)
1962                     break;
1963                 assert(p->offset < p->size);
1964                 decode_ptr (&src, &item_len);
1965                 if ((*pp->isamb->method->compare_item)(untilb, src) <= 1)
1966                     break;
1967                 src += item_len;
1968             }
1969             pp->level++;
1970         }
1971     }
1972     assert (p->offset < p->size);
1973     assert (p->leaf);
1974     while(1)
1975     {
1976         char *dst0 = dst;
1977         src = p->bytes + p->offset;
1978         (*pp->isamb->method->codec.decode)(p->decodeClientData, &dst, &src);
1979         p->offset = src - (char*) p->bytes;
1980         if (!untilb || (*pp->isamb->method->compare_item)(untilb, dst0) <= 1)
1981             break;
1982         dst = dst0;
1983         if (p->offset == p->size) goto again;
1984     }
1985     /* key_logdump_txt(LOG_DEBUG,buf, "isamb_pp_read returning 1"); */
1986     return 1;
1987 }
1988
1989 #endif
1990
1991 void isamb_pp_pos( ISAMB_PP pp, double *current, double *total )
1992 { /* return an estimate of the current position and of the total number of */
1993   /* occureences in the isam tree, based on the current leaf */
1994     struct ISAMB_block *p = pp->block[pp->level];
1995     assert(total);
1996     assert(current);
1997     assert(p->leaf);
1998
1999     *total = pp->block[0]->no_items;
2000     *current = (double) pp->returned_numbers;
2001 #if ISAMB_DEBUG
2002     logf(LOG_LOG, "isamb_pp_pos returning: cur= %0.1f tot=%0.1f rn="
2003          ZINT_FORMAT, *current, *total, pp->returned_numbers);
2004 #endif
2005 }