Fixed bug #113: Compress upper nodes in isam-b.
[idzebra-moved-to-github.git] / isamb / isamb.c
1 /* $Id: isamb.c,v 1.67 2005-01-15 18:43:05 adam Exp $
2    Copyright (C) 1995-2005
3    Index Data Aps
4
5 This file is part of the Zebra server.
6
7 Zebra is free software; you can redistribute it and/or modify it under
8 the terms of the GNU General Public License as published by the Free
9 Software Foundation; either version 2, or (at your option) any later
10 version.
11
12 Zebra is distributed in the hope that it will be useful, but WITHOUT ANY
13 WARRANTY; without even the implied warranty of MERCHANTABILITY or
14 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
15 for more details.
16
17 You should have received a copy of the GNU General Public License
18 along with Zebra; see the file LICENSE.zebra.  If not, write to the
19 Free Software Foundation, 59 Temple Place - Suite 330, Boston, MA
20 02111-1307, USA.
21 */
22
23 #include <string.h>
24 #include <yaz/log.h>
25 #include <yaz/xmalloc.h>
26 #include <idzebra/isamb.h>
27 #include <assert.h>
28
29 #ifndef ISAMB_DEBUG
30 #define ISAMB_DEBUG 0
31 #endif
32
33
34 #define ISAMB_MAJOR_VERSION 3
35 #define ISAMB_MINOR_VERSION 0
36
37 struct ISAMB_head {
38     zint first_block;
39     zint last_block;
40     zint free_list;
41     zint no_items;
42     int block_size;
43     int block_max;
44     int block_offset;
45 };
46
47 /* if 1, interior nodes items are encoded; 0 if not encoded */
48 #define INT_ENCODE 1
49
50 /* maximum size of encoded buffer */
51 #define DST_ITEM_MAX 256
52
53 #define ISAMB_MAX_LEVEL 10
54 /* approx 2*max page + max size of item */
55 #define DST_BUF_SIZE 16840
56
57 #define ISAMB_CACHE_ENTRY_SIZE 4096
58
59 /* CAT_MAX: _must_ be power of 2 */
60 #define CAT_MAX 4
61 #define CAT_MASK (CAT_MAX-1)
62 /* CAT_NO: <= CAT_MAX */
63 #define CAT_NO 4
64
65 /* Smallest block size */
66 #define ISAMB_MIN_SIZE 32
67 /* Size factor */
68 #define ISAMB_FAC_SIZE 4
69
70 /* ISAMB_PTR_CODEC = 1 var, =0 fixed */
71 #define ISAMB_PTR_CODEC 1
72
73 struct ISAMB_cache_entry {
74     ISAMB_P pos;
75     unsigned char *buf;
76     int dirty;
77     int hits;
78     struct ISAMB_cache_entry *next;
79 };
80
81 struct ISAMB_file {
82     BFile bf;
83     int head_dirty;
84     struct ISAMB_head head;
85     struct ISAMB_cache_entry *cache_entries;
86 };
87
88 struct ISAMB_s {
89     BFiles bfs;
90     ISAMC_M *method;
91
92     struct ISAMB_file *file;
93     int no_cat;
94     int cache; /* 0 = no cache, 1 = use cache, -1 = dummy isam (for testing only) */
95     int log_io;        /* log level for bf_read/bf_write calls */
96     int log_freelist;  /* log level for freelist handling */
97     zint skipped_numbers; /* on a leaf node */
98     zint returned_numbers; 
99     zint skipped_nodes[ISAMB_MAX_LEVEL]; /* [0]=skipped leaves, 1 = higher etc */
100     zint accessed_nodes[ISAMB_MAX_LEVEL]; /* nodes we did not skip */
101 };
102
103 struct ISAMB_block {
104     ISAMB_P pos;
105     int cat;
106     int size;
107     int leaf;
108     int dirty;
109     int deleted;
110     int offset;
111     zint no_items;  /* number of nodes in this + children */
112     char *bytes;
113     char *cbuf;
114     unsigned char *buf;
115     void *decodeClientData;
116     int log_rw;
117 };
118
119 struct ISAMB_PP_s {
120     ISAMB isamb;
121     ISAMB_P pos;
122     int level;
123     int maxlevel; /* total depth */
124     zint total_size;
125     zint no_blocks;
126     zint skipped_numbers; /* on a leaf node */
127     zint returned_numbers; 
128     zint skipped_nodes[ISAMB_MAX_LEVEL]; /* [0]=skipped leaves, 1 = higher etc */
129     zint accessed_nodes[ISAMB_MAX_LEVEL]; /* nodes we did not skip */
130     struct ISAMB_block **block;
131     int scope;  /* on what level we forward */
132 };
133
134
135 #define encode_item_len encode_ptr
136 #if ISAMB_PTR_CODEC
137 static void encode_ptr(char **dst, zint pos)
138 {
139     unsigned char *bp = (unsigned char*) *dst;
140
141     while (pos > 127)
142     {
143          *bp++ = 128 | (pos & 127);
144          pos = pos >> 7;
145     }
146     *bp++ = pos;
147     *dst = (char *) bp;
148 }
149 #else
150 static void encode_ptr(char **dst, zint pos)
151 {
152     memcpy(*dst, &pos, sizeof(pos));
153     (*dst) += sizeof(pos);
154 }
155 #endif
156
157 #define decode_item_len decode_ptr
158 #if ISAMB_PTR_CODEC
159 static void decode_ptr(const char **src1, zint *pos)
160 {
161     const unsigned char **src = (const unsigned char **) src1;
162     zint d = 0;
163     unsigned char c;
164     unsigned r = 0;
165
166     while (((c = *(*src)++) & 128))
167     {
168         d += ((zint) (c & 127) << r);
169         r += 7;
170     }
171     d += ((zint) c << r);
172     *pos = d;
173 }
174 #else
175 static void decode_ptr(const char **src, zint *pos)
176 {
177      memcpy(pos, *src, sizeof(*pos));
178      (*src) += sizeof(*pos);
179 }
180 #endif
181
182 ISAMB isamb_open(BFiles bfs, const char *name, int writeflag, ISAMC_M *method,
183                   int cache)
184 {
185     ISAMB isamb = xmalloc(sizeof(*isamb));
186     int i, b_size = ISAMB_MIN_SIZE;
187
188     isamb->bfs = bfs;
189     isamb->method = (ISAMC_M *) xmalloc(sizeof(*method));
190     memcpy (isamb->method, method, sizeof(*method));
191     isamb->no_cat = CAT_NO;
192     isamb->log_io = 0;
193     isamb->log_freelist = 0;
194     isamb->cache = cache;
195     isamb->skipped_numbers = 0;
196     isamb->returned_numbers = 0;
197     for (i = 0;i<ISAMB_MAX_LEVEL;i++)
198       isamb->skipped_nodes[i]= isamb->accessed_nodes[i]=0;
199
200     assert(cache == 0);
201     isamb->file = xmalloc(sizeof(*isamb->file) * isamb->no_cat);
202     for (i = 0; i < isamb->no_cat; i++)
203     {
204         char fname[DST_BUF_SIZE];
205         char hbuf[DST_BUF_SIZE];
206         isamb->file[i].cache_entries = 0;
207         isamb->file[i].head_dirty = 0;
208         sprintf(fname, "%s%c", name, i+'A');
209         if (cache)
210             isamb->file[i].bf = bf_open(bfs, fname, ISAMB_CACHE_ENTRY_SIZE,
211                                          writeflag);
212         else
213             isamb->file[i].bf = bf_open(bfs, fname, b_size, writeflag);
214
215         /* fill-in default values (for empty isamb) */
216         isamb->file[i].head.first_block = ISAMB_CACHE_ENTRY_SIZE/b_size+1;
217         isamb->file[i].head.last_block = isamb->file[i].head.first_block;
218         isamb->file[i].head.block_size = b_size;
219 #if ISAMB_PTR_CODEC
220         if (i == isamb->no_cat-1 || b_size > 128)
221             isamb->file[i].head.block_offset = 8;
222         else
223             isamb->file[i].head.block_offset = 4;
224 #else
225         isamb->file[i].head.block_offset = 11;
226 #endif
227         isamb->file[i].head.block_max =
228             b_size - isamb->file[i].head.block_offset;
229         isamb->file[i].head.free_list = 0;
230         if (bf_read(isamb->file[i].bf, 0, 0, 0, hbuf))
231         {
232             /* got header assume "isamb"major minor len can fit in 16 bytes */
233             zint zint_tmp;
234             int major, minor, len, pos = 0;
235             int left;
236             const char *src = 0;
237             if (memcmp(hbuf, "isamb", 5))
238             {
239                 yaz_log(YLOG_WARN, "bad isamb header for file %s", fname);
240                 return 0;
241             }
242             if (sscanf(hbuf+5, "%d %d %d", &major, &minor, &len) != 3)
243             {
244                 yaz_log(YLOG_WARN, "bad isamb header for file %s", fname);
245                 return 0;
246             }
247             if (major != ISAMB_MAJOR_VERSION)
248             {
249                 yaz_log(YLOG_WARN, "bad major version for file %s %d, must be %d",
250                      fname, major, ISAMB_MAJOR_VERSION);
251                 return 0;
252             }
253             for (left = len - b_size; left > 0; left = left - b_size)
254             {
255                 pos++;
256                 if (!bf_read(isamb->file[i].bf, pos, 0, 0, hbuf + pos*b_size))
257                 {
258                     yaz_log(YLOG_WARN, "truncated isamb header for " 
259                          "file=%s len=%d pos=%d",
260                          fname, len, pos);
261                     return 0;
262                 }
263             }
264             src = hbuf + 16;
265             decode_ptr(&src, &isamb->file[i].head.first_block);
266             decode_ptr(&src, &isamb->file[i].head.last_block);
267             decode_ptr(&src, &zint_tmp);
268             isamb->file[i].head.block_size = zint_tmp;
269             decode_ptr(&src, &zint_tmp);
270             isamb->file[i].head.block_max = zint_tmp;
271             decode_ptr(&src, &isamb->file[i].head.free_list);
272         }
273         assert (isamb->file[i].head.block_size >= isamb->file[i].head.block_offset);
274         isamb->file[i].head_dirty = 0;
275         assert(isamb->file[i].head.block_size == b_size);
276         b_size = b_size * ISAMB_FAC_SIZE;
277     }
278 #if ISAMB_DEBUG
279     yaz_log(YLOG_WARN, "isamb debug enabled. Things will be slower than usual");
280 #endif
281     return isamb;
282 }
283
284 static void flush_blocks (ISAMB b, int cat)
285 {
286     while (b->file[cat].cache_entries)
287     {
288         struct ISAMB_cache_entry *ce_this = b->file[cat].cache_entries;
289         b->file[cat].cache_entries = ce_this->next;
290
291         if (ce_this->dirty)
292         {
293             yaz_log(b->log_io, "bf_write: flush_blocks");
294             bf_write(b->file[cat].bf, ce_this->pos, 0, 0, ce_this->buf);
295         }
296         xfree(ce_this->buf);
297         xfree(ce_this);
298     }
299 }
300
301 static int get_block (ISAMB b, ISAMC_P pos, char *userbuf, int wr)
302 {
303     int cat = (int) (pos&CAT_MASK);
304     int off = (int) (((pos/CAT_MAX) & 
305                (ISAMB_CACHE_ENTRY_SIZE / b->file[cat].head.block_size - 1))
306         * b->file[cat].head.block_size);
307     zint norm = pos / (CAT_MASK*ISAMB_CACHE_ENTRY_SIZE / b->file[cat].head.block_size);
308     int no = 0;
309     struct ISAMB_cache_entry **ce, *ce_this = 0, **ce_last = 0;
310
311     if (!b->cache)
312         return 0;
313
314     assert (ISAMB_CACHE_ENTRY_SIZE >= b->file[cat].head.block_size);
315     for (ce = &b->file[cat].cache_entries; *ce; ce = &(*ce)->next, no++)
316     {
317         ce_last = ce;
318         if ((*ce)->pos == norm)
319         {
320             ce_this = *ce;
321             *ce = (*ce)->next;   /* remove from list */
322             
323             ce_this->next = b->file[cat].cache_entries;  /* move to front */
324             b->file[cat].cache_entries = ce_this;
325             
326             if (wr)
327             {
328                 memcpy (ce_this->buf + off, userbuf, 
329                         b->file[cat].head.block_size);
330                 ce_this->dirty = 1;
331             }
332             else
333                 memcpy (userbuf, ce_this->buf + off,
334                         b->file[cat].head.block_size);
335             return 1;
336         }
337     }
338     if (no >= 40)
339     {
340         assert (no == 40);
341         assert (ce_last && *ce_last);
342         ce_this = *ce_last;
343         *ce_last = 0;  /* remove the last entry from list */
344         if (ce_this->dirty)
345         {
346             yaz_log(b->log_io, "bf_write: get_block");
347             bf_write(b->file[cat].bf, ce_this->pos, 0, 0, ce_this->buf);
348         }
349         xfree(ce_this->buf);
350         xfree(ce_this);
351     }
352     ce_this = xmalloc(sizeof(*ce_this));
353     ce_this->next = b->file[cat].cache_entries;
354     b->file[cat].cache_entries = ce_this;
355     ce_this->buf = xmalloc(ISAMB_CACHE_ENTRY_SIZE);
356     ce_this->pos = norm;
357     yaz_log(b->log_io, "bf_read: get_block");
358     if (!bf_read(b->file[cat].bf, norm, 0, 0, ce_this->buf))
359         memset (ce_this->buf, 0, ISAMB_CACHE_ENTRY_SIZE);
360     if (wr)
361     {
362         memcpy (ce_this->buf + off, userbuf, b->file[cat].head.block_size);
363         ce_this->dirty = 1;
364     }
365     else
366     {
367         ce_this->dirty = 0;
368         memcpy (userbuf, ce_this->buf + off, b->file[cat].head.block_size);
369     }
370     return 1;
371 }
372
373
374 void isamb_close (ISAMB isamb)
375 {
376     int i;
377     for (i = 0;isamb->accessed_nodes[i];i++)
378         yaz_log(YLOG_DEBUG, "isamb_close  level leaf-%d: "ZINT_FORMAT" read, "
379                         ZINT_FORMAT" skipped",
380              i, isamb->accessed_nodes[i], isamb->skipped_nodes[i]);
381     yaz_log(YLOG_DEBUG, "isamb_close returned "ZINT_FORMAT" values, "
382                    "skipped "ZINT_FORMAT,
383          isamb->skipped_numbers, isamb->returned_numbers);
384     for (i = 0; i<isamb->no_cat; i++)
385     {
386         flush_blocks (isamb, i);
387         if (isamb->file[i].head_dirty)
388         {
389             char hbuf[DST_BUF_SIZE];
390             int major = ISAMB_MAJOR_VERSION;
391             int minor = ISAMB_MINOR_VERSION;
392             int len = 16;
393             char *dst = hbuf + 16;
394             int pos = 0, left;
395             int b_size = isamb->file[i].head.block_size;
396
397             encode_ptr(&dst, isamb->file[i].head.first_block);
398             encode_ptr(&dst, isamb->file[i].head.last_block);
399             encode_ptr(&dst, isamb->file[i].head.block_size);
400             encode_ptr(&dst, isamb->file[i].head.block_max);
401             encode_ptr(&dst, isamb->file[i].head.free_list);
402             memset(dst, '\0', b_size); /* ensure no random bytes are written */
403
404             len = dst - hbuf;
405
406             /* print exactly 16 bytes (including trailing 0) */
407             sprintf(hbuf, "isamb%02d %02d %02d\r\n", major, minor, len);
408
409             bf_write(isamb->file[i].bf, pos, 0, 0, hbuf);
410
411             for (left = len - b_size; left > 0; left = left - b_size)
412             {
413                 pos++;
414                 bf_write(isamb->file[i].bf, pos, 0, 0, hbuf + pos*b_size);
415             }
416         }
417         bf_close (isamb->file[i].bf);
418     }
419     xfree(isamb->file);
420     xfree(isamb->method);
421     xfree(isamb);
422 }
423
424 /* open_block: read one block at pos.
425    Decode leading sys bytes .. consisting of
426    Offset:Meaning
427    0: leader byte, != 0 leaf, == 0, non-leaf
428    1-2: used size of block
429    3-7*: number of items and all children
430    
431    * Reserve 5 bytes for large block sizes. 1 for small ones .. Number
432    of items. We can thus have at most 2^40 nodes. 
433 */
434 static struct ISAMB_block *open_block(ISAMB b, ISAMC_P pos)
435 {
436     int cat = (int) (pos&CAT_MASK);
437     const char *src;
438     int offset = b->file[cat].head.block_offset;
439     struct ISAMB_block *p;
440     if (!pos)
441         return 0;
442     p = xmalloc(sizeof(*p));
443     p->pos = pos;
444     p->cat = (int) (pos & CAT_MASK);
445     p->buf = xmalloc(b->file[cat].head.block_size);
446     p->cbuf = 0;
447
448     if (!get_block (b, pos, p->buf, 0))
449     {
450         yaz_log(b->log_io, "bf_read: open_block");
451         if (!bf_read(b->file[cat].bf, pos/CAT_MAX, 0, 0, p->buf))
452         {
453             yaz_log(YLOG_FATAL, "isamb: read fail for pos=%ld block=%ld",
454                      (long) pos, (long) pos/CAT_MAX);
455             abort();
456         }
457     }
458     p->bytes = p->buf + offset;
459     p->leaf = p->buf[0];
460     p->size = (p->buf[1] + 256 * p->buf[2]) - offset;
461     if (p->size < 0)
462     {
463         yaz_log(YLOG_FATAL, "Bad block size %d in pos=" ZINT_FORMAT "\n",
464                  p->size, pos);
465     }
466     assert (p->size >= 0);
467     src = p->buf + 3;
468     decode_ptr(&src, &p->no_items);
469
470     p->offset = 0;
471     p->dirty = 0;
472     p->deleted = 0;
473     p->decodeClientData = (*b->method->codec.start)();
474     return p;
475 }
476
477 struct ISAMB_block *new_block (ISAMB b, int leaf, int cat)
478 {
479     struct ISAMB_block *p;
480
481     p = xmalloc(sizeof(*p));
482     p->buf = xmalloc(b->file[cat].head.block_size);
483
484     if (!b->file[cat].head.free_list)
485     {
486         zint block_no;
487         block_no = b->file[cat].head.last_block++;
488         p->pos = block_no * CAT_MAX + cat;
489     }
490     else
491     {
492         p->pos = b->file[cat].head.free_list;
493         assert((p->pos & CAT_MASK) == cat);
494         if (!get_block (b, p->pos, p->buf, 0))
495         {
496             yaz_log(b->log_io, "bf_read: new_block");
497             if (!bf_read(b->file[cat].bf, p->pos/CAT_MAX, 0, 0, p->buf))
498             {
499                 yaz_log(YLOG_FATAL, "isamb: read fail for pos=%ld block=%ld",
500                          (long) p->pos/CAT_MAX, (long) p->pos/CAT_MAX);
501                 abort ();
502             }
503         }
504         yaz_log(b->log_freelist, "got block " ZINT_FORMAT " from freelist %d:" ZINT_FORMAT, p->pos,
505                  cat, p->pos/CAT_MAX);
506         memcpy (&b->file[cat].head.free_list, p->buf, sizeof(zint));
507     }
508     p->cat = cat;
509     b->file[cat].head_dirty = 1;
510     memset (p->buf, 0, b->file[cat].head.block_size);
511     p->bytes = p->buf + b->file[cat].head.block_offset;
512     p->leaf = leaf;
513     p->size = 0;
514     p->dirty = 1;
515     p->deleted = 0;
516     p->offset = 0;
517     p->no_items = 0;
518     p->decodeClientData = (*b->method->codec.start)();
519     return p;
520 }
521
522 struct ISAMB_block *new_leaf (ISAMB b, int cat)
523 {
524     return new_block (b, 1, cat);
525 }
526
527
528 struct ISAMB_block *new_int (ISAMB b, int cat)
529 {
530     return new_block (b, 0, cat);
531 }
532
533 static void check_block (ISAMB b, struct ISAMB_block *p)
534 {
535     assert(b); /* mostly to make the compiler shut up about unused b */
536     if (p->leaf)
537     {
538         ;
539     }
540     else
541     {
542         /* sanity check */
543         char *startp = p->bytes;
544         const char *src = startp;
545         char *endp = p->bytes + p->size;
546         ISAMB_P pos;
547         void *c1 = (*b->method->codec.start)();
548             
549         decode_ptr(&src, &pos);
550         assert ((pos&CAT_MASK) == p->cat);
551         while (src != endp)
552         {
553 #if INT_ENCODE
554             char file_item_buf[DST_ITEM_MAX];
555             char *file_item = file_item_buf;
556             (*b->method->codec.reset)(c1);
557             (*b->method->codec.decode)(c1, &file_item, &src);
558 #else
559             zint item_len;
560             decode_item_len(&src, &item_len);
561             assert (item_len > 0 && item_len < 80);
562             src += item_len;
563 #endif
564             decode_ptr(&src, &pos);
565             if ((pos&CAT_MASK) != p->cat)
566             {
567                 assert ((pos&CAT_MASK) == p->cat);
568             }
569         }
570         (*b->method->codec.stop)(c1);
571     }
572 }
573
574 void close_block(ISAMB b, struct ISAMB_block *p)
575 {
576     if (!p)
577         return;
578     if (p->deleted)
579     {
580         yaz_log(b->log_freelist, "release block " ZINT_FORMAT " from freelist %d:" ZINT_FORMAT,
581                  p->pos, p->cat, p->pos/CAT_MAX);
582         memcpy (p->buf, &b->file[p->cat].head.free_list, sizeof(zint));
583         b->file[p->cat].head.free_list = p->pos;
584         if (!get_block (b, p->pos, p->buf, 1))
585         {
586             yaz_log(b->log_io, "bf_write: close_block (deleted)");
587             bf_write(b->file[p->cat].bf, p->pos/CAT_MAX, 0, 0, p->buf);
588         }
589     }
590     else if (p->dirty)
591     {
592         int offset = b->file[p->cat].head.block_offset;
593         int size = p->size + offset;
594         char *dst =  p->buf + 3;
595         assert (p->size >= 0);
596         
597         /* memset becuase encode_ptr usually does not write all bytes */
598         memset(p->buf, 0, b->file[p->cat].head.block_offset);
599         p->buf[0] = p->leaf;
600         p->buf[1] = size & 255;
601         p->buf[2] = size >> 8;
602         encode_ptr(&dst, p->no_items);
603         check_block(b, p);
604         if (!get_block (b, p->pos, p->buf, 1))
605         {
606             yaz_log(b->log_io, "bf_write: close_block");
607             bf_write(b->file[p->cat].bf, p->pos/CAT_MAX, 0, 0, p->buf);
608         }
609     }
610     (*b->method->codec.stop)(p->decodeClientData);
611     xfree(p->buf);
612     xfree(p);
613 }
614
615 int insert_sub (ISAMB b, struct ISAMB_block **p,
616                 void *new_item, int *mode,
617                 ISAMC_I *stream,
618                 struct ISAMB_block **sp,
619                 void *sub_item, int *sub_size,
620                 const void *max_item);
621
622 int insert_int (ISAMB b, struct ISAMB_block *p, void *lookahead_item,
623                 int *mode,
624                 ISAMC_I *stream, struct ISAMB_block **sp,
625                 void *split_item, int *split_size, const void *last_max_item)
626 {
627     char *startp = p->bytes;
628     const char *src = startp;
629     char *endp = p->bytes + p->size;
630     ISAMB_P pos;
631     struct ISAMB_block *sub_p1 = 0, *sub_p2 = 0;
632     char sub_item[DST_ITEM_MAX];
633     int sub_size;
634     int more = 0;
635     zint diff_terms = 0;
636     void *c1 = (*b->method->codec.start)();
637
638     *sp = 0;
639
640     assert(p->size >= 0);
641     decode_ptr(&src, &pos);
642     while (src != endp)
643     {
644         int d;
645         const char *src0 = src;
646 #if INT_ENCODE
647         char file_item_buf[DST_ITEM_MAX];
648         char *file_item = file_item_buf;
649         (*b->method->codec.reset)(c1);
650         (*b->method->codec.decode)(c1, &file_item, &src);
651         d = (*b->method->compare_item)(file_item_buf, lookahead_item);
652         if (d > 0)
653         {
654             sub_p1 = open_block(b, pos);
655             assert (sub_p1);
656             diff_terms -= sub_p1->no_items;
657             more = insert_sub (b, &sub_p1, lookahead_item, mode,
658                                stream, &sub_p2, 
659                                sub_item, &sub_size, file_item_buf);
660             diff_terms += sub_p1->no_items;
661             src = src0;
662             break;
663         }
664 #else
665         zint item_len;
666         decode_item_len(&src, &item_len);
667         d = (*b->method->compare_item)(src, lookahead_item);
668         if (d > 0)
669         {
670             sub_p1 = open_block(b, pos);
671             assert (sub_p1);
672             diff_terms -= sub_p1->no_items;
673             more = insert_sub (b, &sub_p1, lookahead_item, mode,
674                                stream, &sub_p2, 
675                                sub_item, &sub_size, src);
676             diff_terms += sub_p1->no_items;
677             src = src0;
678             break;
679         }
680         src += item_len;
681 #endif
682         decode_ptr(&src, &pos);
683     }
684     if (!sub_p1)
685     {
686         /* we reached the end. So lookahead > last item */
687         sub_p1 = open_block(b, pos);
688         assert (sub_p1);
689         diff_terms -= sub_p1->no_items;
690         more = insert_sub (b, &sub_p1, lookahead_item, mode, stream, &sub_p2, 
691                            sub_item, &sub_size, last_max_item);
692         diff_terms += sub_p1->no_items;
693     }
694     if (sub_p2)
695         diff_terms += sub_p2->no_items;
696     if (diff_terms)
697     {
698         p->dirty = 1;
699         p->no_items += diff_terms;
700     }
701     if (sub_p2)
702     {
703         /* there was a split - must insert pointer in this one */
704         char dst_buf[DST_BUF_SIZE];
705         char *dst = dst_buf;
706
707         assert (sub_size < 80 && sub_size > 1);
708
709         memcpy (dst, startp, src - startp);
710                 
711         dst += src - startp;
712
713 #if INT_ENCODE
714         const char *sub_item_ptr = sub_item;
715         (*b->method->codec.reset)(c1);
716         (*b->method->codec.encode)(c1, &dst, &sub_item_ptr);
717 #else
718         encode_item_len (&dst, sub_size);      /* sub length and item */
719         memcpy (dst, sub_item, sub_size);
720         dst += sub_size;
721 #endif
722
723         encode_ptr(&dst, sub_p2->pos);   /* pos */
724
725         if (endp - src)                   /* remaining data */
726         {
727             memcpy (dst, src, endp - src);
728             dst += endp - src;
729         }
730         p->size = dst - dst_buf;
731         assert (p->size >= 0);
732
733
734         if (p->size <= b->file[p->cat].head.block_max)
735         {
736             /* it fits OK in this block */
737             memcpy (startp, dst_buf, dst - dst_buf);
738         }
739         else
740         {
741             /* must split _this_ block as well .. */
742             struct ISAMB_block *sub_p3;
743 #if INT_ENCODE
744 #else
745             zint split_size_tmp;
746 #endif
747             zint no_items_first_half = 0;
748             int p_new_size;
749             const char *half;
750             src = dst_buf;
751             endp = dst;
752
753             half = src + b->file[p->cat].head.block_size/2;
754             decode_ptr(&src, &pos);
755
756             /* read sub block so we can get no_items for it */
757             sub_p3 = open_block(b, pos);
758             no_items_first_half += sub_p3->no_items;
759             close_block(b, sub_p3);
760
761             while (src <= half)
762             {
763 #if INT_ENCODE
764                 char file_item_buf[DST_ITEM_MAX];
765                 char *file_item = file_item_buf;
766                 (*b->method->codec.reset)(c1);
767                 (*b->method->codec.decode)(c1, &file_item, &src);
768 #else
769                 decode_item_len(&src, &split_size_tmp);
770                 *split_size = (int) split_size_tmp;
771                 src += *split_size;
772 #endif
773                 decode_ptr(&src, &pos);
774
775                 /* read sub block so we can get no_items for it */
776                 sub_p3 = open_block(b, pos);
777                 no_items_first_half += sub_p3->no_items;
778                 close_block(b, sub_p3);
779             }
780             /*  p is first half */
781             p_new_size = src - dst_buf;
782             memcpy (p->bytes, dst_buf, p_new_size);
783
784 #if INT_ENCODE
785             char file_item_buf[DST_ITEM_MAX];
786             char *file_item = file_item_buf;
787             (*b->method->codec.reset)(c1);
788             (*b->method->codec.decode)(c1, &file_item, &src);
789             *split_size = file_item - file_item_buf;
790             memcpy(split_item, file_item_buf, *split_size);
791 #else
792             decode_item_len(&src, &split_size_tmp);
793             *split_size = (int) split_size_tmp;
794             memcpy (split_item, src, *split_size);
795             src += *split_size;
796 #endif
797             /*  *sp is second half */
798             *sp = new_int (b, p->cat);
799             (*sp)->size = endp - src;
800             memcpy ((*sp)->bytes, src, (*sp)->size);
801
802             p->size = p_new_size;
803
804             /*  adjust no_items in first&second half */
805             (*sp)->no_items = p->no_items - no_items_first_half;
806             p->no_items = no_items_first_half;
807         }
808         p->dirty = 1;
809         close_block(b, sub_p2);
810     }
811     close_block(b, sub_p1);
812     (*b->method->codec.stop)(c1);
813     return more;
814 }
815
816 int insert_leaf (ISAMB b, struct ISAMB_block **sp1, void *lookahead_item,
817                  int *lookahead_mode, ISAMC_I *stream,
818                  struct ISAMB_block **sp2,
819                  void *sub_item, int *sub_size,
820                  const void *max_item)
821 {
822     struct ISAMB_block *p = *sp1;
823     char *endp = 0;
824     const char *src = 0;
825     char dst_buf[DST_BUF_SIZE], *dst = dst_buf;
826     int new_size;
827     void *c1 = (*b->method->codec.start)();
828     void *c2 = (*b->method->codec.start)();
829     int more = 1;
830     int quater = b->file[b->no_cat-1].head.block_max / 4;
831     char *mid_cut = dst_buf + quater * 2;
832     char *tail_cut = dst_buf + quater * 3;
833     char *maxp = dst_buf + b->file[b->no_cat-1].head.block_max;
834     char *half1 = 0;
835     char *half2 = 0;
836     char cut_item_buf[DST_ITEM_MAX];
837     int cut_item_size = 0;
838     int no_items = 0;    /* number of items (total) */
839     int no_items_1 = 0;  /* number of items (first half) */
840
841     if (p && p->size)
842     {
843         char file_item_buf[DST_ITEM_MAX];
844         char *file_item = file_item_buf;
845             
846         src = p->bytes;
847         endp = p->bytes + p->size;
848         (*b->method->codec.decode)(c1, &file_item, &src);
849         while (1)
850         {
851             const char *dst_item = 0; /* resulting item to be inserted */
852             char *lookahead_next;
853             int d = -1;
854             
855             if (lookahead_item)
856                 d = (*b->method->compare_item)(file_item_buf, lookahead_item);
857             
858             /* d now holds comparison between existing file item and
859                lookahead item 
860                d = 0: equal
861                d > 0: lookahead before file
862                d < 0: lookahead after file
863             */
864             if (d > 0)
865             {
866                 /* lookahead must be inserted */
867                 dst_item = lookahead_item;
868                 /* if this is not an insertion, it's really bad .. */
869                 if (!*lookahead_mode)
870                 {
871                     yaz_log(YLOG_WARN, "isamb: Inconsistent register (1)");
872                     assert (*lookahead_mode);
873                 }
874             }
875             else
876                 dst_item = file_item_buf;
877
878                
879             if (!*lookahead_mode && d == 0)
880             {
881                 /* it's a deletion and they match so there is nothing to be
882                    inserted anyway .. But mark the thing bad (file item
883                    was part of input.. The item will not be part of output */
884                 p->dirty = 1;
885             }
886             else if (!half1 && dst > mid_cut)
887             {
888                 /* we have reached the splitting point for the first time */
889                 const char *dst_item_0 = dst_item;
890                 half1 = dst; /* candidate for splitting */
891
892                 /* encode the resulting item */
893                 (*b->method->codec.encode)(c2, &dst, &dst_item);
894                 
895                 cut_item_size = dst_item - dst_item_0;
896                 assert(cut_item_size > 0);
897                 memcpy (cut_item_buf, dst_item_0, cut_item_size);
898                 
899                 half2 = dst;
900                 no_items_1 = no_items;
901                 no_items++;
902             }
903             else
904             {
905                 /* encode the resulting item */
906                 (*b->method->codec.encode)(c2, &dst, &dst_item);
907                 no_items++;
908             }
909
910             /* now move "pointers" .. result has been encoded .. */
911             if (d > 0)  
912             {
913                 /* we must move the lookahead pointer */
914
915                 if (dst > maxp)
916                     /* no more room. Mark lookahead as "gone".. */
917                     lookahead_item = 0;
918                 else
919                 {
920                     /* move it really.. */
921                     lookahead_next = lookahead_item;
922                     if (!(*stream->read_item)(stream->clientData,
923                                               &lookahead_next,
924                                               lookahead_mode))
925                     {
926                         /* end of stream reached: no "more" and no lookahead */
927                         lookahead_item = 0;
928                         more = 0;
929                     }
930                     if (lookahead_item && max_item &&
931                         (*b->method->compare_item)(max_item, lookahead_item) <= 0)
932                     {
933                         /* the lookahead goes beyond what we allow in this
934                            leaf. Mark it as "gone" */
935                         lookahead_item = 0;
936                     }
937                     
938                     p->dirty = 1;
939                 }
940             }
941             else if (d == 0)
942             {
943                 /* exact match .. move both pointers */
944
945                 lookahead_next = lookahead_item;
946                 if (!(*stream->read_item)(stream->clientData,
947                                           &lookahead_next, lookahead_mode))
948                 {
949                     lookahead_item = 0;
950                     more = 0;
951                 }
952                 if (src == endp)
953                     break;  /* end of file stream reached .. */
954                 file_item = file_item_buf; /* move file pointer */
955                 (*b->method->codec.decode)(c1, &file_item, &src);
956             }
957             else
958             {
959                 /* file pointer must be moved */
960                 if (src == endp)
961                     break;
962                 file_item = file_item_buf;
963                 (*b->method->codec.decode)(c1, &file_item, &src);
964             }
965         }
966     }
967     maxp = dst_buf + b->file[b->no_cat-1].head.block_max + quater;
968     /* this loop runs when we are "appending" to a leaf page. That is
969        either it's empty (new) or all file items have been read in
970        previous loop */
971     while (lookahead_item)
972     {
973         char *dst_item;
974         const char *src = lookahead_item;
975         char *dst_0 = dst;
976         
977         /* compare lookahead with max item */
978         if (max_item &&
979             (*b->method->compare_item)(max_item, lookahead_item) <= 0)
980         {
981             /* stop if we have reached the value of max item */
982             break;
983         }
984         if (!*lookahead_mode)
985         {
986             /* this is append. So a delete is bad */
987             yaz_log(YLOG_WARN, "isamb: Inconsistent register (2)");
988             abort();
989         }
990         else if (!half1 && dst > tail_cut)
991         {
992             const char *src_0 = src;
993             half1 = dst; /* candidate for splitting */
994             
995             (*b->method->codec.encode)(c2, &dst, &src);
996             
997             cut_item_size = src - src_0;
998             assert(cut_item_size > 0);
999             memcpy (cut_item_buf, src_0, cut_item_size);
1000             
1001             no_items_1 = no_items;
1002             half2 = dst;
1003         }
1004         else
1005             (*b->method->codec.encode)(c2, &dst, &src);
1006
1007         if (dst > maxp)
1008         {
1009             dst = dst_0;
1010             break;
1011         }
1012         no_items++;
1013         if (p)
1014             p->dirty = 1;
1015         dst_item = lookahead_item;
1016         if (!(*stream->read_item)(stream->clientData, &dst_item,
1017                                   lookahead_mode))
1018         {
1019             lookahead_item = 0;
1020             more = 0;
1021         }
1022     }
1023     new_size = dst - dst_buf;
1024     if (p && p->cat != b->no_cat-1 && 
1025         new_size > b->file[p->cat].head.block_max)
1026     {
1027         /* non-btree block will be removed */
1028         p->deleted = 1;
1029         close_block(b, p);
1030         /* delete it too!! */
1031         p = 0; /* make a new one anyway */
1032     }
1033     if (!p)
1034     {   /* must create a new one */
1035         int i;
1036         for (i = 0; i < b->no_cat; i++)
1037             if (new_size <= b->file[i].head.block_max)
1038                 break;
1039         if (i == b->no_cat)
1040             i = b->no_cat - 1;
1041         p = new_leaf (b, i);
1042     }
1043     if (new_size > b->file[p->cat].head.block_max)
1044     {
1045         char *first_dst;
1046         const char *cut_item = cut_item_buf;
1047
1048         assert (half1);
1049         assert (half2);
1050
1051         assert(cut_item_size > 0);
1052         
1053         /* first half */
1054         p->size = half1 - dst_buf;
1055         memcpy (p->bytes, dst_buf, half1 - dst_buf);
1056         p->no_items = no_items_1;
1057
1058         /* second half */
1059         *sp2 = new_leaf (b, p->cat);
1060
1061         (*b->method->codec.reset)(c2);
1062
1063         first_dst = (*sp2)->bytes;
1064
1065         (*b->method->codec.encode)(c2, &first_dst, &cut_item);
1066
1067         memcpy (first_dst, half2, dst - half2);
1068
1069         (*sp2)->size = (first_dst - (*sp2)->bytes) + (dst - half2);
1070         (*sp2)->no_items = no_items - no_items_1;
1071         (*sp2)->dirty = 1;
1072         p->dirty = 1;
1073         memcpy (sub_item, cut_item_buf, cut_item_size);
1074         *sub_size = cut_item_size;
1075     }
1076     else
1077     {
1078         memcpy (p->bytes, dst_buf, dst - dst_buf);
1079         p->size = new_size;
1080         p->no_items = no_items;
1081     }
1082     (*b->method->codec.stop)(c1);
1083     (*b->method->codec.stop)(c2);
1084     *sp1 = p;
1085     return more;
1086 }
1087
1088 int insert_sub (ISAMB b, struct ISAMB_block **p, void *new_item,
1089                 int *mode,
1090                 ISAMC_I *stream,
1091                 struct ISAMB_block **sp,
1092                 void *sub_item, int *sub_size,
1093                 const void *max_item)
1094 {
1095     if (!*p || (*p)->leaf)
1096         return insert_leaf (b, p, new_item, mode, stream, sp, sub_item, 
1097                             sub_size, max_item);
1098     else
1099         return insert_int (b, *p, new_item, mode, stream, sp, sub_item,
1100                            sub_size, max_item);
1101 }
1102
1103 int isamb_unlink (ISAMB b, ISAMC_P pos)
1104 {
1105     struct ISAMB_block *p1;
1106
1107     if (!pos)
1108         return 0;
1109     p1 = open_block(b, pos);
1110     p1->deleted = 1;
1111     if (!p1->leaf)
1112     {
1113         zint sub_p;
1114         const char *src = p1->bytes + p1->offset;
1115
1116         decode_ptr(&src, &sub_p);
1117         isamb_unlink(b, sub_p);
1118         void *c1 = (*b->method->codec.start)();
1119         
1120         while (src != p1->bytes + p1->size)
1121         {
1122 #if INT_ENCODE
1123             char file_item_buf[DST_ITEM_MAX];
1124             char *file_item = file_item_buf;
1125             (*b->method->codec.reset)(c1);
1126             (*b->method->codec.decode)(c1, &file_item, &src);
1127 #else
1128             zint item_len;
1129             decode_item_len(&src, &item_len);
1130             src += item_len;
1131 #endif
1132             decode_ptr(&src, &sub_p);
1133             isamb_unlink(b, sub_p);
1134         }
1135         (*b->method->codec.stop)(c1);
1136     }
1137     close_block(b, p1);
1138     return 0;
1139 }
1140
1141 ISAMB_P isamb_merge (ISAMB b, ISAMC_P pos, ISAMC_I *stream)
1142 {
1143     char item_buf[DST_ITEM_MAX];
1144     char *item_ptr;
1145     int i_mode;
1146     int more;
1147     int must_delete = 0;
1148
1149     if (b->cache < 0)
1150     {
1151         int more = 1;
1152         while (more)
1153         {
1154             item_ptr = item_buf;
1155             more =
1156                 (*stream->read_item)(stream->clientData, &item_ptr, &i_mode);
1157         }
1158         return 1;
1159     }
1160     item_ptr = item_buf;
1161     more = (*stream->read_item)(stream->clientData, &item_ptr, &i_mode);
1162     while (more)
1163     {
1164         struct ISAMB_block *p = 0, *sp = 0;
1165         char sub_item[DST_ITEM_MAX];
1166         int sub_size;
1167         
1168         if (pos)
1169             p = open_block(b, pos);
1170         more = insert_sub (b, &p, item_buf, &i_mode, stream, &sp,
1171                            sub_item, &sub_size, 0);
1172         if (sp)
1173         {    /* increase level of tree by one */
1174             struct ISAMB_block *p2 = new_int (b, p->cat);
1175             char *dst = p2->bytes + p2->size;
1176             void *c1 = (*b->method->codec.start)();
1177             
1178             encode_ptr(&dst, p->pos);
1179             assert (sub_size < 80 && sub_size > 1);
1180 #if INT_ENCODE
1181             const char *sub_item_ptr = sub_item;
1182             (*b->method->codec.reset)(c1);
1183             (*b->method->codec.encode)(c1, &dst, &sub_item_ptr);
1184 #else
1185             encode_item_len (&dst, sub_size);
1186             memcpy (dst, sub_item, sub_size);
1187             dst += sub_size;
1188 #endif
1189             encode_ptr(&dst, sp->pos);
1190             
1191             p2->size = dst - p2->bytes;
1192             p2->no_items = p->no_items + sp->no_items;
1193             pos = p2->pos;  /* return new super page */
1194             close_block(b, sp);
1195             close_block(b, p2);
1196             (*b->method->codec.stop)(c1);
1197         }
1198         else
1199         {
1200             pos = p->pos;   /* return current one (again) */
1201         }
1202         if (p->no_items == 0)
1203             must_delete = 1;
1204         else
1205             must_delete = 0;
1206         close_block(b, p);
1207     }
1208     if (must_delete)
1209     {
1210         isamb_unlink(b, pos);
1211         return 0;
1212     }
1213     return pos;
1214 }
1215
1216 ISAMB_PP isamb_pp_open_x(ISAMB isamb, ISAMB_P pos, int *level, int scope)
1217 {
1218     ISAMB_PP pp = xmalloc(sizeof(*pp));
1219     int i;
1220
1221     assert(pos);
1222
1223     pp->isamb = isamb;
1224     pp->block = xmalloc(ISAMB_MAX_LEVEL * sizeof(*pp->block));
1225
1226     pp->pos = pos;
1227     pp->level = 0;
1228     pp->maxlevel = 0;
1229     pp->total_size = 0;
1230     pp->no_blocks = 0;
1231     pp->skipped_numbers = 0;
1232     pp->returned_numbers = 0;
1233     pp->scope = scope;
1234     for (i = 0;i<ISAMB_MAX_LEVEL;i++)
1235         pp->skipped_nodes[i] = pp->accessed_nodes[i]=0;
1236     while (1)
1237     {
1238         struct ISAMB_block *p = open_block(isamb, pos);
1239         const char *src = p->bytes + p->offset;
1240         pp->block[pp->level] = p;
1241
1242         pp->total_size += p->size;
1243         pp->no_blocks++;
1244         if (p->leaf)
1245             break;
1246         decode_ptr(&src, &pos);
1247         p->offset = src - p->bytes;
1248         pp->level++;
1249         pp->accessed_nodes[pp->level]++; 
1250     }
1251     pp->block[pp->level+1] = 0;
1252     pp->maxlevel = pp->level;
1253     if (level)
1254         *level = pp->level;
1255     return pp;
1256 }
1257
1258 ISAMB_PP isamb_pp_open (ISAMB isamb, ISAMB_P pos, int scope)
1259 {
1260     return isamb_pp_open_x(isamb, pos, 0, scope);
1261 }
1262
1263 void isamb_pp_close_x(ISAMB_PP pp, int *size, int *blocks)
1264 {
1265     int i;
1266     if (!pp)
1267         return;
1268     yaz_log(YLOG_DEBUG, "isamb_pp_close lev=%d returned "ZINT_FORMAT" values, " 
1269                     "skipped "ZINT_FORMAT,
1270         pp->maxlevel, pp->skipped_numbers, pp->returned_numbers);
1271     for (i = pp->maxlevel;i>=0;i--)
1272         if (pp->skipped_nodes[i] || pp->accessed_nodes[i])
1273             yaz_log(YLOG_DEBUG, "isamb_pp_close  level leaf-%d: "
1274                             ZINT_FORMAT" read, "ZINT_FORMAT" skipped", i,
1275                  pp->accessed_nodes[i], pp->skipped_nodes[i]);
1276     pp->isamb->skipped_numbers += pp->skipped_numbers;
1277     pp->isamb->returned_numbers += pp->returned_numbers;
1278     for (i = pp->maxlevel;i>=0;i--)
1279     {
1280         pp->isamb->accessed_nodes[i] += pp->accessed_nodes[i];
1281         pp->isamb->skipped_nodes[i] += pp->skipped_nodes[i];
1282     }
1283     if (size)
1284         *size = pp->total_size;
1285     if (blocks)
1286         *blocks = pp->no_blocks;
1287     for (i = 0; i <= pp->level; i++)
1288         close_block(pp->isamb, pp->block[i]);
1289     xfree(pp->block);
1290     xfree(pp);
1291 }
1292
1293 int isamb_block_info (ISAMB isamb, int cat)
1294 {
1295     if (cat >= 0 && cat < isamb->no_cat)
1296         return isamb->file[cat].head.block_size;
1297     return -1;
1298 }
1299
1300 void isamb_pp_close (ISAMB_PP pp)
1301 {
1302     isamb_pp_close_x(pp, 0, 0);
1303 }
1304
1305 /* simple recursive dumper .. */
1306 static void isamb_dump_r (ISAMB b, ISAMB_P pos, void (*pr)(const char *str),
1307                           int level)
1308 {
1309     char buf[1024];
1310     char prefix_str[1024];
1311     if (pos)
1312     {
1313         struct ISAMB_block *p = open_block(b, pos);
1314         sprintf(prefix_str, "%*s " ZINT_FORMAT " cat=%d size=%d max=%d items="
1315                 ZINT_FORMAT, level*2, "",
1316                 pos, p->cat, p->size, b->file[p->cat].head.block_max,
1317                 p->no_items);
1318         (*pr)(prefix_str);
1319         sprintf(prefix_str, "%*s " ZINT_FORMAT, level*2, "", pos);
1320         if (p->leaf)
1321         {
1322             while (p->offset < p->size)
1323             {
1324                 const char *src = p->bytes + p->offset;
1325                 char *dst = buf;
1326                 (*b->method->codec.decode)(p->decodeClientData, &dst, &src);
1327                 (*b->method->log_item)(YLOG_DEBUG, buf, prefix_str);
1328                 p->offset = src - (char*) p->bytes;
1329             }
1330             assert(p->offset == p->size);
1331         }
1332         else
1333         {
1334             const char *src = p->bytes + p->offset;
1335             ISAMB_P sub;
1336
1337             decode_ptr(&src, &sub);
1338             p->offset = src - (char*) p->bytes;
1339
1340             isamb_dump_r(b, sub, pr, level+1);
1341             
1342             while (p->offset < p->size)
1343             {
1344 #if INT_ENCODE
1345                 char file_item_buf[DST_ITEM_MAX];
1346                 char *file_item = file_item_buf;
1347                 void *c1 = (*b->method->codec.start)();
1348                 (*b->method->codec.decode)(c1, &file_item, &src);
1349                 (*b->method->codec.stop)(c1);
1350                 (*b->method->log_item)(YLOG_DEBUG, file_item_buf, prefix_str);
1351 #else
1352                 zint item_len;
1353                 decode_item_len(&src, &item_len);
1354                 (*b->method->log_item)(YLOG_DEBUG, src, prefix_str);
1355                 src += item_len;
1356 #endif
1357                 decode_ptr(&src, &sub);
1358                 
1359                 p->offset = src - (char*) p->bytes;
1360                 
1361                 isamb_dump_r(b, sub, pr, level+1);
1362             }           
1363         }
1364         close_block(b, p);
1365     }
1366 }
1367
1368 void isamb_dump(ISAMB b, ISAMB_P pos, void (*pr)(const char *str))
1369 {
1370     isamb_dump_r(b, pos, pr, 0);
1371 }
1372
1373 int isamb_pp_read(ISAMB_PP pp, void *buf)
1374 {
1375     return isamb_pp_forward(pp, buf, 0);
1376 }
1377
1378
1379 static int isamb_pp_on_right_node(ISAMB_PP pp, int level, const void *untilbuf)
1380 { /* looks one node higher to see if we should be on this node at all */
1381   /* useful in backing off quickly, and in avoiding tail descends */
1382   /* call with pp->level to begin with */
1383     struct ISAMB_block *p;
1384     int cmp;
1385     const char *src;
1386     ISAMB b = pp->isamb;
1387
1388     assert(level >= 0);
1389     if (level == 0)
1390     {
1391 #if ISAMB_DEBUG
1392             yaz_log(YLOG_DEBUG, "isamb_pp_on_right returning true for root");
1393 #endif
1394         return 1; /* we can never skip the root node */
1395     }
1396     level--;
1397     p = pp->block[level];
1398     assert(p->offset <= p->size);
1399     if (p->offset < p->size)
1400     {
1401 #if INT_ENCODE
1402         char file_item_buf[DST_ITEM_MAX];
1403         char *file_item = file_item_buf;
1404         void *c1 = (*b->method->codec.start)();
1405         assert(p->offset > 0); 
1406         src = p->bytes + p->offset;
1407         (*b->method->codec.decode)(c1, &file_item, &src);
1408         (*b->method->codec.stop)(c1);
1409         cmp = (*b->method->compare_item)(untilbuf, file_item_buf);
1410 #else
1411         zint item_len;
1412         assert(p->offset > 0); 
1413         src = p->bytes + p->offset;
1414         decode_item_len(&src, &item_len);
1415 #if ISAMB_DEBUG
1416         (*b->method->codec.log_item)(YLOG_DEBUG, untilbuf, "on_leaf: until");
1417         (*b->method->codec.log_item)(YLOG_DEBUG, src, "on_leaf: value");
1418 #endif
1419         cmp = (*b->method->compare_item)(untilbuf, src);
1420 #endif
1421         if (cmp < pp->scope)
1422         {  /* cmp<2 */
1423 #if ISAMB_DEBUG
1424             yaz_log(YLOG_DEBUG, "isamb_pp_on_right returning true "
1425                             "cmp=%d lev=%d ofs=%d", cmp, level, p->offset);
1426 #endif
1427             return 1; 
1428         }
1429         else
1430         {
1431 #if ISAMB_DEBUG
1432             yaz_log(YLOG_DEBUG, "isamb_pp_on_right returning false "
1433                             "cmp=%d lev=%d ofs=%d", cmp, level, p->offset);
1434 #endif
1435             return 0; 
1436         }
1437     }
1438     else {
1439 #if ISAMB_DEBUG
1440         yaz_log(YLOG_DEBUG, "isamb_pp_on_right at tail, looking higher "
1441                         "lev=%d", level);
1442 #endif
1443         return isamb_pp_on_right_node(pp, level, untilbuf);
1444     }
1445 } /* isamb_pp_on_right_node */
1446
1447 static int isamb_pp_read_on_leaf(ISAMB_PP pp, void *buf)
1448
1449     /* reads the next item on the current leaf, returns 0 if end of leaf*/
1450     struct ISAMB_block *p = pp->block[pp->level];
1451     char *dst;
1452     const char *src;
1453     assert(pp);
1454     assert(buf);
1455     if (p->offset == p->size)
1456     {
1457 #if ISAMB_DEBUG
1458         yaz_log(YLOG_DEBUG, "isamb_pp_read_on_leaf returning 0 on "
1459                 "node %d", p->pos);
1460 #endif
1461         return 0; /* at end of leaf */
1462     }
1463     src = p->bytes + p->offset;
1464     dst = buf;
1465     (*pp->isamb->method->codec.decode)(p->decodeClientData, &dst, &src);
1466     p->offset = src - (char*) p->bytes;
1467 #if ISAMB_DEBUG
1468     (*pp->isamb->method->codec.log_item)(YLOG_DEBUG, buf,
1469                                          "read_on_leaf returning 1");
1470 #endif
1471     pp->returned_numbers++;
1472     return 1;
1473 } /* read_on_leaf */
1474
1475 static int isamb_pp_forward_on_leaf(ISAMB_PP pp, void *buf, const void *untilbuf)
1476 { /* forwards on the current leaf, returns 0 if not found */
1477     int cmp;
1478     int skips = 0;
1479     while (1)
1480     {
1481         if (!isamb_pp_read_on_leaf(pp, buf))
1482             return 0;
1483         /* FIXME - this is an extra function call, inline the read? */
1484         cmp=(*pp->isamb->method->compare_item)(untilbuf, buf);
1485         if (cmp <pp->scope)
1486         {  /* cmp<2  found a good one */
1487 #if ISAMB_DEBUG
1488             if (skips)
1489                 yaz_log(YLOG_DEBUG, "isam_pp_fwd_on_leaf skipped %d items", skips);
1490 #endif
1491             pp->returned_numbers++;
1492             return 1;
1493         }
1494         if (!skips)
1495             if (!isamb_pp_on_right_node(pp, pp->level, untilbuf))
1496                 return 0; /* never mind the rest of this leaf */
1497         pp->skipped_numbers++;
1498         skips++;
1499     }
1500 } /* forward_on_leaf */
1501
1502 static int isamb_pp_climb_level(ISAMB_PP pp, ISAMB_P *pos)
1503 { /* climbs higher in the tree, until finds a level with data left */
1504   /* returns the node to (consider to) descend to in *pos) */
1505     struct ISAMB_block *p = pp->block[pp->level];
1506     const char *src;
1507     ISAMB b = pp->isamb;
1508 #if ISAMB_DEBUG
1509     yaz_log(YLOG_DEBUG, "isamb_pp_climb_level starting "
1510                    "at level %d node %d ofs=%d sz=%d",
1511                     pp->level, p->pos, p->offset, p->size);
1512 #endif
1513     assert(pp->level >= 0);
1514     assert(p->offset <= p->size);
1515     if (pp->level==0)
1516     {
1517 #if ISAMB_DEBUG
1518         yaz_log(YLOG_DEBUG, "isamb_pp_climb_level returning 0 at root");
1519 #endif
1520         return 0;
1521     }
1522     assert(pp->level>0); 
1523     close_block(pp->isamb, pp->block[pp->level]);
1524     pp->block[pp->level]=0;
1525     (pp->level)--;
1526     p = pp->block[pp->level];
1527 #if ISAMB_DEBUG
1528     yaz_log(YLOG_DEBUG, "isamb_pp_climb_level climbed to level %d node %d ofs=%d",
1529                     pp->level, p->pos, p->offset);
1530 #endif
1531     assert(!p->leaf);
1532     assert(p->offset <= p->size);
1533     if (p->offset == p->size)
1534     {
1535         /* we came from the last pointer, climb on */
1536         if (!isamb_pp_climb_level(pp, pos))
1537             return 0;
1538         p = pp->block[pp->level];
1539     }
1540     else
1541     {
1542         /* skip the child we just came from */
1543 #if ISAMB_DEBUG
1544         yaz_log(YLOG_DEBUG, "isam_pp_climb_level: skipping lev=%d ofs=%d sz=%d", 
1545                         pp->level, p->offset, p->size);
1546 #endif
1547         assert (p->offset < p->size);
1548         src = p->bytes + p->offset;
1549 #if INT_ENCODE
1550         char file_item_buf[DST_ITEM_MAX];
1551         char *file_item = file_item_buf;
1552         void *c1 = (*b->method->codec.start)();
1553         (*b->method->codec.decode)(c1, &file_item, &src);
1554         (*b->method->codec.stop)(c1);
1555 #else
1556         zint item_len;
1557         decode_item_len(&src, &item_len);
1558         src += item_len;
1559 #endif
1560         decode_ptr(&src, pos);
1561         p->offset = src - (char *)p->bytes;
1562             
1563     }
1564     return 1;
1565 } /* climb_level */
1566
1567
1568 static zint isamb_pp_forward_unode(ISAMB_PP pp, zint pos, const void *untilbuf)
1569 { /* scans a upper node until it finds a child <= untilbuf */
1570   /* pp points to the key value, as always. pos is the child read from */
1571   /* the buffer */
1572   /* if all values are too small, returns the last child in the node */
1573   /* FIXME - this can be detected, and avoided by looking at the */
1574   /* parent node, but that gets messy. Presumably the cost is */
1575   /* pretty low anyway */
1576     ISAMB b = pp->isamb;
1577     struct ISAMB_block *p = pp->block[pp->level];
1578     const char *src = p->bytes + p->offset;
1579     int cmp;
1580     zint nxtpos;
1581 #if ISAMB_DEBUG
1582     int skips = 0;
1583     yaz_log(YLOG_DEBUG, "isamb_pp_forward_unode starting "
1584                    "at level %d node %d ofs=%di sz=%d",
1585                     pp->level, p->pos, p->offset, p->size);
1586 #endif
1587     assert(!p->leaf);
1588     assert(p->offset <= p->size);
1589     if (p->offset == p->size)
1590     {
1591 #if ISAMB_DEBUG
1592             yaz_log(YLOG_DEBUG, "isamb_pp_forward_unode returning at end "
1593                    "at level %d node %d ofs=%di sz=%d",
1594                     pp->level, p->pos, p->offset, p->size);
1595 #endif
1596         return pos; /* already at the end of it */
1597     }
1598     while(p->offset < p->size)
1599     {
1600 #if INT_ENCODE
1601         char file_item_buf[DST_ITEM_MAX];
1602         char *file_item = file_item_buf;
1603         void *c1 = (*b->method->codec.start)();
1604         (*b->method->codec.decode)(c1, &file_item, &src);
1605         (*b->method->codec.stop)(c1);
1606         cmp = (*b->method->compare_item)(untilbuf, file_item_buf);
1607 #else
1608         zint item_len;
1609         decode_item_len(&src, &item_len);
1610         cmp = (*b->method->compare_item)(untilbuf, src);
1611         src += item_len;
1612 #endif
1613         decode_ptr(&src, &nxtpos);
1614         if (cmp<pp->scope) /* cmp<2 */
1615         {
1616 #if ISAMB_DEBUG
1617             yaz_log(YLOG_DEBUG, "isamb_pp_forward_unode returning a hit "
1618                    "at level %d node %d ofs=%d sz=%d",
1619                     pp->level, p->pos, p->offset, p->size);
1620 #endif
1621             return pos;
1622         } /* found one */
1623         pos = nxtpos;
1624         p->offset = src-(char*)p->bytes;
1625         (pp->skipped_nodes[pp->maxlevel - pp->level -1])++;
1626 #if ISAMB_DEBUG
1627         skips++;
1628 #endif
1629     }
1630 #if ISAMB_DEBUG
1631             yaz_log(YLOG_DEBUG, "isamb_pp_forward_unode returning at tail "
1632                    "at level %d node %d ofs=%d sz=%d skips=%d",
1633                     pp->level, p->pos, p->offset, p->size, skips);
1634 #endif
1635     return pos; /* that's the last one in the line */
1636     
1637 } /* forward_unode */
1638
1639 static void isamb_pp_descend_to_leaf(ISAMB_PP pp, ISAMB_P pos,
1640                                      const void *untilbuf)
1641 { /* climbs down the tree, from pos, to the leftmost leaf */
1642     struct ISAMB_block *p = pp->block[pp->level];
1643     const char *src;
1644     assert(!p->leaf);
1645 #if ISAMB_DEBUG
1646     yaz_log(YLOG_DEBUG, "isamb_pp_descend_to_leaf "
1647                    "starting at lev %d node %d ofs=%d lf=%d u=%p", 
1648                    pp->level, p->pos, p->offset, p->leaf, untilbuf);
1649 #endif
1650     if (untilbuf)
1651         pos = isamb_pp_forward_unode(pp, pos, untilbuf);
1652     ++(pp->level);
1653     assert(pos);
1654     p = open_block(pp->isamb, pos);
1655     pp->block[pp->level] = p;
1656     ++(pp->accessed_nodes[pp->maxlevel-pp->level]);
1657     ++(pp->no_blocks);
1658 #if ISAMB_DEBUG
1659     yaz_log(YLOG_DEBUG, "isamb_pp_descend_to_leaf "
1660                    "got lev %d node %d lf=%d", 
1661                    pp->level, p->pos, p->leaf);
1662 #endif
1663     if (p->leaf)
1664         return;
1665     assert (p->offset==0);
1666     src = p->bytes + p->offset;
1667     decode_ptr(&src, &pos);
1668     p->offset = src-(char*)p->bytes;
1669     isamb_pp_descend_to_leaf(pp, pos, untilbuf);
1670 #if ISAMB_DEBUG
1671     yaz_log(YLOG_DEBUG, "isamb_pp_descend_to_leaf "
1672                    "returning at lev %d node %d ofs=%d lf=%d", 
1673                    pp->level, p->pos, p->offset, p->leaf);
1674 #endif
1675 } /* descend_to_leaf */
1676
1677 static int isamb_pp_find_next_leaf(ISAMB_PP pp)
1678 { /* finds the next leaf by climbing up and down */
1679     ISAMB_P pos;
1680     if (!isamb_pp_climb_level(pp, &pos))
1681         return 0;
1682     isamb_pp_descend_to_leaf(pp, pos, 0);
1683     return 1;
1684 }
1685
1686 static int isamb_pp_climb_desc(ISAMB_PP pp,  const void *untilbuf)
1687 { /* climbs up and descends to a leaf where values >= *untilbuf are found */
1688     ISAMB_P pos;
1689 #if ISAMB_DEBUG
1690     struct ISAMB_block *p = pp->block[pp->level];
1691     yaz_log(YLOG_DEBUG, "isamb_pp_climb_desc starting "
1692                    "at level %d node %d ofs=%d sz=%d",
1693                     pp->level, p->pos, p->offset, p->size);
1694 #endif
1695     if (!isamb_pp_climb_level(pp, &pos))
1696         return 0;
1697     /* see if it would pay to climb one higher */
1698     if (!isamb_pp_on_right_node(pp, pp->level, untilbuf))
1699         if (!isamb_pp_climb_level(pp, &pos))
1700             return 0;
1701     isamb_pp_descend_to_leaf(pp, pos, untilbuf);
1702 #if ISAMB_DEBUG
1703     p = pp->block[pp->level];
1704     yaz_log(YLOG_DEBUG, "isamb_pp_climb_desc done "
1705                    "at level %d node %d ofs=%d sz=%d",
1706                     pp->level, p->pos, p->offset, p->size);
1707 #endif
1708     return 1;
1709 } /* climb_desc */
1710
1711 int isamb_pp_forward (ISAMB_PP pp, void *buf, const void *untilbuf)
1712 {
1713 #if ISAMB_DEBUG
1714     struct ISAMB_block *p = pp->block[pp->level];
1715     assert(p->leaf);
1716     yaz_log(YLOG_DEBUG, "isamb_pp_forward starting "
1717                    "at level %d node %d ofs=%d sz=%d u=%p sc=%d",
1718                     pp->level, p->pos, p->offset, p->size, untilbuf, scope);
1719 #endif
1720     if (untilbuf)
1721     {
1722         if (isamb_pp_forward_on_leaf(pp, buf, untilbuf))
1723         {
1724 #if ISAMB_DEBUG
1725             yaz_log(YLOG_DEBUG, "isamb_pp_forward (f) returning (A) "
1726                    "at level %d node %d ofs=%d sz=%d",
1727                     pp->level, p->pos, p->offset, p->size);
1728 #endif
1729             return 1;
1730         }
1731         if (! isamb_pp_climb_desc(pp, untilbuf))
1732         {
1733 #if ISAMB_DEBUG
1734             yaz_log(YLOG_DEBUG, "isamb_pp_forward (f) returning notfound (B) "
1735                    "at level %d node %d ofs=%d sz=%d",
1736                     pp->level, p->pos, p->offset, p->size);
1737 #endif
1738             return 0; /* could not find a leaf */
1739         }
1740         do {
1741             if (isamb_pp_forward_on_leaf(pp, buf, untilbuf))
1742             {
1743 #if ISAMB_DEBUG
1744             yaz_log(YLOG_DEBUG, "isamb_pp_forward (f) returning (C) "
1745                    "at level %d node %d ofs=%d sz=%d",
1746                     pp->level, p->pos, p->offset, p->size);
1747 #endif
1748                 return 1;
1749             }
1750         } while (isamb_pp_find_next_leaf(pp));
1751         return 0; /* could not find at all */
1752     }
1753     else { /* no untilbuf, a straight read */
1754         /* FIXME - this should be moved
1755          * directly into the pp_read */
1756         /* keeping here now, to keep same
1757          * interface as the old fwd */
1758         if (isamb_pp_read_on_leaf(pp, buf))
1759         {
1760 #if ISAMB_DEBUG
1761             yaz_log(YLOG_DEBUG, "isamb_pp_forward (read) returning (D) "
1762                    "at level %d node %d ofs=%d sz=%d",
1763                     pp->level, p->pos, p->offset, p->size);
1764 #endif
1765             return 1;
1766         }
1767         if (isamb_pp_find_next_leaf(pp))
1768         {
1769 #if ISAMB_DEBUG
1770             yaz_log(YLOG_DEBUG, "isamb_pp_forward (read) returning (E) "
1771                    "at level %d node %d ofs=%d sz=%d",
1772                     pp->level, p->pos, p->offset, p->size);
1773 #endif
1774             return isamb_pp_read_on_leaf(pp, buf);
1775         }
1776         else
1777             return 0;
1778     }
1779 } /* isam_pp_forward (new version) */
1780
1781 void isamb_pp_pos(ISAMB_PP pp, double *current, double *total)
1782 { /* return an estimate of the current position and of the total number of */
1783   /* occureences in the isam tree, based on the current leaf */
1784     struct ISAMB_block *p = pp->block[pp->level];
1785     assert(total);
1786     assert(current);
1787     assert(p->leaf);
1788
1789     *total = pp->block[0]->no_items;
1790     *current = (double) pp->returned_numbers;
1791 #if ISAMB_DEBUG
1792     yaz_log(YLOG_LOG, "isamb_pp_pos returning: cur= %0.1f tot=%0.1f rn="
1793          ZINT_FORMAT, *current, *total, pp->returned_numbers);
1794 #endif
1795 }
1796
1797 int isamb_pp_forward2(ISAMB_PP pp, void *buf, const void *untilb)
1798 {
1799     char *dst = buf;
1800     const char *src;
1801     struct ISAMB_block *p = pp->block[pp->level];
1802     ISAMB b = pp->isamb;
1803     if (!p)
1804         return 0;
1805 again:
1806     while (p->offset == p->size)
1807     {
1808         ISAMB_P pos;
1809 #if INT_ENCODE
1810         const char *src_0;
1811         void *c1;
1812         char file_item_buf[DST_ITEM_MAX];
1813         char *file_item = file_item_buf;
1814 #else
1815         zint item_len;
1816 #endif
1817         while (p->offset == p->size)
1818         {
1819             if (pp->level == 0)
1820                 return 0;
1821             close_block (pp->isamb, pp->block[pp->level]);
1822             pp->block[pp->level] = 0;
1823             (pp->level)--;
1824             p = pp->block[pp->level];
1825             assert (!p->leaf);  
1826         }
1827
1828         assert(!p->leaf);
1829         src = p->bytes + p->offset;
1830        
1831 #if INT_ENCODE
1832         c1 = (*b->method->codec.start)();
1833         (*b->method->codec.decode)(c1, &file_item, &src);
1834 #else
1835         decode_ptr (&src, &item_len);
1836         src += item_len;
1837 #endif        
1838         decode_ptr (&src, &pos);
1839         p->offset = src - (char*) p->bytes;
1840
1841         src = p->bytes + p->offset;
1842
1843         while(1)
1844         {
1845             if (!untilb || p->offset == p->size)
1846                 break;
1847             assert(p->offset < p->size);
1848 #if INT_ENCODE
1849             src_0 = src;
1850             file_item = file_item_buf;
1851             (*b->method->codec.reset)(c1);
1852             (*b->method->codec.decode)(c1, &file_item, &src);
1853             if ((*b->method->compare_item)(untilb, file_item_buf) <= 1)
1854             {
1855                 src = src_0;
1856                 break;
1857             }
1858 #else
1859             decode_item_len(&src, &item_len);
1860             if ((*b->method->compare_item)(untilb, src) <= 1)
1861                 break;
1862             src += item_len;
1863 #endif
1864             decode_ptr (&src, &pos);
1865             p->offset = src - (char*) p->bytes;
1866         }
1867
1868         pp->level++;
1869
1870         while (1)
1871         {
1872             pp->block[pp->level] = p = open_block (pp->isamb, pos);
1873
1874             pp->total_size += p->size;
1875             pp->no_blocks++;
1876             
1877             if (p->leaf) 
1878             {
1879                 break;
1880             }
1881             
1882             src = p->bytes + p->offset;
1883             while(1)
1884             {
1885                 decode_ptr (&src, &pos);
1886                 p->offset = src - (char*) p->bytes;
1887                 
1888                 if (!untilb || p->offset == p->size)
1889                     break;
1890                 assert(p->offset < p->size);
1891 #if INT_ENCODE
1892                 src_0 = src;
1893                 file_item = file_item_buf;
1894                 (*b->method->codec.reset)(c1);
1895                 (*b->method->codec.decode)(c1, &file_item, &src);
1896                 if ((*b->method->compare_item)(untilb, file_item_buf) <= 1)
1897                 {
1898                     src = src_0;
1899                     break;
1900                 }
1901 #else
1902                 decode_ptr (&src, &item_len);
1903                 if ((*b->method->compare_item)(untilb, src) <= 1)
1904                     break;
1905                 src += item_len;
1906 #endif
1907             }
1908             pp->level++;
1909         }
1910 #if INT_ENCODE
1911         (*b->method->codec.stop)(c1);
1912 #endif
1913     }
1914     assert (p->offset < p->size);
1915     assert (p->leaf);
1916     while(1)
1917     {
1918         char *dst0 = dst;
1919         src = p->bytes + p->offset;
1920         (*pp->isamb->method->codec.decode)(p->decodeClientData, &dst, &src);
1921         p->offset = src - (char*) p->bytes;
1922         if (!untilb || (*pp->isamb->method->compare_item)(untilb, dst0) <= 1)
1923             break;
1924         dst = dst0;
1925         if (p->offset == p->size) goto again;
1926     }
1927     return 1;
1928 }