Added include stdlib.h a few places to get prototype for atoi/exit/..
[idzebra-moved-to-github.git] / isamb / isamb.c
1 /* $Id: isamb.c,v 1.71 2005-01-16 23:14:57 adam Exp $
2    Copyright (C) 1995-2005
3    Index Data ApS
4
5 This file is part of the Zebra server.
6
7 Zebra is free software; you can redistribute it and/or modify it under
8 the terms of the GNU General Public License as published by the Free
9 Software Foundation; either version 2, or (at your option) any later
10 version.
11
12 Zebra is distributed in the hope that it will be useful, but WITHOUT ANY
13 WARRANTY; without even the implied warranty of MERCHANTABILITY or
14 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
15 for more details.
16
17 You should have received a copy of the GNU General Public License
18 along with Zebra; see the file LICENSE.zebra.  If not, write to the
19 Free Software Foundation, 59 Temple Place - Suite 330, Boston, MA
20 02111-1307, USA.
21 */
22
23 #include <stdlib.h>
24 #include <string.h>
25 #include <yaz/log.h>
26 #include <yaz/xmalloc.h>
27 #include <idzebra/isamb.h>
28 #include <assert.h>
29
30 #ifndef ISAMB_DEBUG
31 #define ISAMB_DEBUG 0
32 #endif
33
34
35 #define ISAMB_MAJOR_VERSION 3
36 #define ISAMB_MINOR_VERSION 0
37
38 struct ISAMB_head {
39     zint first_block;
40     zint last_block;
41     zint free_list;
42     zint no_items;
43     int block_size;
44     int block_max;
45     int block_offset;
46 };
47
48 /* if 1, upper nodes items are encoded; 0 if not encoded */
49 #define INT_ENCODE 1
50
51 /* maximum size of encoded buffer */
52 #define DST_ITEM_MAX 256
53
54 #define ISAMB_MAX_LEVEL 10
55 /* approx 2*max page + max size of item */
56 #define DST_BUF_SIZE 16840
57
58 #define ISAMB_CACHE_ENTRY_SIZE 4096
59
60 /* CAT_MAX: _must_ be power of 2 */
61 #define CAT_MAX 4
62 #define CAT_MASK (CAT_MAX-1)
63 /* CAT_NO: <= CAT_MAX */
64 #define CAT_NO 4
65
66 /* Smallest block size */
67 #define ISAMB_MIN_SIZE 32
68 /* Size factor */
69 #define ISAMB_FAC_SIZE 4
70
71 /* ISAMB_PTR_CODEC = 1 var, =0 fixed */
72 #define ISAMB_PTR_CODEC 1
73
74 struct ISAMB_cache_entry {
75     ISAMB_P pos;
76     unsigned char *buf;
77     int dirty;
78     int hits;
79     struct ISAMB_cache_entry *next;
80 };
81
82 struct ISAMB_file {
83     BFile bf;
84     int head_dirty;
85     struct ISAMB_head head;
86     struct ISAMB_cache_entry *cache_entries;
87 };
88
89 struct ISAMB_s {
90     BFiles bfs;
91     ISAMC_M *method;
92
93     struct ISAMB_file *file;
94     int no_cat;
95     int cache; /* 0 = no cache, 1 = use cache, -1 = dummy isam (for testing only) */
96     int log_io;        /* log level for bf_read/bf_write calls */
97     int log_freelist;  /* log level for freelist handling */
98     zint skipped_numbers; /* on a leaf node */
99     zint returned_numbers; 
100     zint skipped_nodes[ISAMB_MAX_LEVEL]; /* [0]=skipped leaves, 1 = higher etc */
101     zint accessed_nodes[ISAMB_MAX_LEVEL]; /* nodes we did not skip */
102 };
103
104 struct ISAMB_block {
105     ISAMB_P pos;
106     int cat;
107     int size;
108     int leaf;
109     int dirty;
110     int deleted;
111     int offset;
112     zint no_items;  /* number of nodes in this + children */
113     char *bytes;
114     char *cbuf;
115     unsigned char *buf;
116     void *decodeClientData;
117     int log_rw;
118 };
119
120 struct ISAMB_PP_s {
121     ISAMB isamb;
122     ISAMB_P pos;
123     int level;
124     int maxlevel; /* total depth */
125     zint total_size;
126     zint no_blocks;
127     zint skipped_numbers; /* on a leaf node */
128     zint returned_numbers; 
129     zint skipped_nodes[ISAMB_MAX_LEVEL]; /* [0]=skipped leaves, 1 = higher etc */
130     zint accessed_nodes[ISAMB_MAX_LEVEL]; /* nodes we did not skip */
131     struct ISAMB_block **block;
132     int scope;  /* on what level we forward */
133 };
134
135
136 #define encode_item_len encode_ptr
137 #if ISAMB_PTR_CODEC
138 static void encode_ptr(char **dst, zint pos)
139 {
140     unsigned char *bp = (unsigned char*) *dst;
141
142     while (pos > 127)
143     {
144          *bp++ = 128 | (pos & 127);
145          pos = pos >> 7;
146     }
147     *bp++ = pos;
148     *dst = (char *) bp;
149 }
150 #else
151 static void encode_ptr(char **dst, zint pos)
152 {
153     memcpy(*dst, &pos, sizeof(pos));
154     (*dst) += sizeof(pos);
155 }
156 #endif
157
158 #define decode_item_len decode_ptr
159 #if ISAMB_PTR_CODEC
160 static void decode_ptr(const char **src1, zint *pos)
161 {
162     const unsigned char **src = (const unsigned char **) src1;
163     zint d = 0;
164     unsigned char c;
165     unsigned r = 0;
166
167     while (((c = *(*src)++) & 128))
168     {
169         d += ((zint) (c & 127) << r);
170         r += 7;
171     }
172     d += ((zint) c << r);
173     *pos = d;
174 }
175 #else
176 static void decode_ptr(const char **src, zint *pos)
177 {
178      memcpy(pos, *src, sizeof(*pos));
179      (*src) += sizeof(*pos);
180 }
181 #endif
182
183 ISAMB isamb_open(BFiles bfs, const char *name, int writeflag, ISAMC_M *method,
184                   int cache)
185 {
186     ISAMB isamb = xmalloc(sizeof(*isamb));
187     int i, b_size = ISAMB_MIN_SIZE;
188
189     isamb->bfs = bfs;
190     isamb->method = (ISAMC_M *) xmalloc(sizeof(*method));
191     memcpy(isamb->method, method, sizeof(*method));
192     isamb->no_cat = CAT_NO;
193     isamb->log_io = 0;
194     isamb->log_freelist = 0;
195     isamb->cache = cache;
196     isamb->skipped_numbers = 0;
197     isamb->returned_numbers = 0;
198     for (i = 0; i<ISAMB_MAX_LEVEL; i++)
199       isamb->skipped_nodes[i] = isamb->accessed_nodes[i] = 0;
200
201     assert(cache == 0);
202     isamb->file = xmalloc(sizeof(*isamb->file) * isamb->no_cat);
203     for (i = 0; i < isamb->no_cat; i++)
204     {
205         char fname[DST_BUF_SIZE];
206         char hbuf[DST_BUF_SIZE];
207         isamb->file[i].cache_entries = 0;
208         isamb->file[i].head_dirty = 0;
209         sprintf(fname, "%s%c", name, i+'A');
210         if (cache)
211             isamb->file[i].bf = bf_open(bfs, fname, ISAMB_CACHE_ENTRY_SIZE,
212                                          writeflag);
213         else
214             isamb->file[i].bf = bf_open(bfs, fname, b_size, writeflag);
215
216         /* fill-in default values (for empty isamb) */
217         isamb->file[i].head.first_block = ISAMB_CACHE_ENTRY_SIZE/b_size+1;
218         isamb->file[i].head.last_block = isamb->file[i].head.first_block;
219         isamb->file[i].head.block_size = b_size;
220 #if ISAMB_PTR_CODEC
221         if (i == isamb->no_cat-1 || b_size > 128)
222             isamb->file[i].head.block_offset = 8;
223         else
224             isamb->file[i].head.block_offset = 4;
225 #else
226         isamb->file[i].head.block_offset = 11;
227 #endif
228         isamb->file[i].head.block_max =
229             b_size - isamb->file[i].head.block_offset;
230         isamb->file[i].head.free_list = 0;
231         if (bf_read(isamb->file[i].bf, 0, 0, 0, hbuf))
232         {
233             /* got header assume "isamb"major minor len can fit in 16 bytes */
234             zint zint_tmp;
235             int major, minor, len, pos = 0;
236             int left;
237             const char *src = 0;
238             if (memcmp(hbuf, "isamb", 5))
239             {
240                 yaz_log(YLOG_WARN, "bad isamb header for file %s", fname);
241                 return 0;
242             }
243             if (sscanf(hbuf+5, "%d %d %d", &major, &minor, &len) != 3)
244             {
245                 yaz_log(YLOG_WARN, "bad isamb header for file %s", fname);
246                 return 0;
247             }
248             if (major != ISAMB_MAJOR_VERSION)
249             {
250                 yaz_log(YLOG_WARN, "bad major version for file %s %d, must be %d",
251                      fname, major, ISAMB_MAJOR_VERSION);
252                 return 0;
253             }
254             for (left = len - b_size; left > 0; left = left - b_size)
255             {
256                 pos++;
257                 if (!bf_read(isamb->file[i].bf, pos, 0, 0, hbuf + pos*b_size))
258                 {
259                     yaz_log(YLOG_WARN, "truncated isamb header for " 
260                          "file=%s len=%d pos=%d",
261                          fname, len, pos);
262                     return 0;
263                 }
264             }
265             src = hbuf + 16;
266             decode_ptr(&src, &isamb->file[i].head.first_block);
267             decode_ptr(&src, &isamb->file[i].head.last_block);
268             decode_ptr(&src, &zint_tmp);
269             isamb->file[i].head.block_size = zint_tmp;
270             decode_ptr(&src, &zint_tmp);
271             isamb->file[i].head.block_max = zint_tmp;
272             decode_ptr(&src, &isamb->file[i].head.free_list);
273         }
274         assert (isamb->file[i].head.block_size >= isamb->file[i].head.block_offset);
275         isamb->file[i].head_dirty = 0;
276         assert(isamb->file[i].head.block_size == b_size);
277         b_size = b_size * ISAMB_FAC_SIZE;
278     }
279 #if ISAMB_DEBUG
280     yaz_log(YLOG_WARN, "isamb debug enabled. Things will be slower than usual");
281 #endif
282     return isamb;
283 }
284
285 static void flush_blocks (ISAMB b, int cat)
286 {
287     while (b->file[cat].cache_entries)
288     {
289         struct ISAMB_cache_entry *ce_this = b->file[cat].cache_entries;
290         b->file[cat].cache_entries = ce_this->next;
291
292         if (ce_this->dirty)
293         {
294             yaz_log(b->log_io, "bf_write: flush_blocks");
295             bf_write(b->file[cat].bf, ce_this->pos, 0, 0, ce_this->buf);
296         }
297         xfree(ce_this->buf);
298         xfree(ce_this);
299     }
300 }
301
302 static int get_block (ISAMB b, ISAMC_P pos, char *userbuf, int wr)
303 {
304     int cat = (int) (pos&CAT_MASK);
305     int off = (int) (((pos/CAT_MAX) & 
306                (ISAMB_CACHE_ENTRY_SIZE / b->file[cat].head.block_size - 1))
307         * b->file[cat].head.block_size);
308     zint norm = pos / (CAT_MASK*ISAMB_CACHE_ENTRY_SIZE / b->file[cat].head.block_size);
309     int no = 0;
310     struct ISAMB_cache_entry **ce, *ce_this = 0, **ce_last = 0;
311
312     if (!b->cache)
313         return 0;
314
315     assert (ISAMB_CACHE_ENTRY_SIZE >= b->file[cat].head.block_size);
316     for (ce = &b->file[cat].cache_entries; *ce; ce = &(*ce)->next, no++)
317     {
318         ce_last = ce;
319         if ((*ce)->pos == norm)
320         {
321             ce_this = *ce;
322             *ce = (*ce)->next;   /* remove from list */
323             
324             ce_this->next = b->file[cat].cache_entries;  /* move to front */
325             b->file[cat].cache_entries = ce_this;
326             
327             if (wr)
328             {
329                 memcpy (ce_this->buf + off, userbuf, 
330                         b->file[cat].head.block_size);
331                 ce_this->dirty = 1;
332             }
333             else
334                 memcpy (userbuf, ce_this->buf + off,
335                         b->file[cat].head.block_size);
336             return 1;
337         }
338     }
339     if (no >= 40)
340     {
341         assert (no == 40);
342         assert (ce_last && *ce_last);
343         ce_this = *ce_last;
344         *ce_last = 0;  /* remove the last entry from list */
345         if (ce_this->dirty)
346         {
347             yaz_log(b->log_io, "bf_write: get_block");
348             bf_write(b->file[cat].bf, ce_this->pos, 0, 0, ce_this->buf);
349         }
350         xfree(ce_this->buf);
351         xfree(ce_this);
352     }
353     ce_this = xmalloc(sizeof(*ce_this));
354     ce_this->next = b->file[cat].cache_entries;
355     b->file[cat].cache_entries = ce_this;
356     ce_this->buf = xmalloc(ISAMB_CACHE_ENTRY_SIZE);
357     ce_this->pos = norm;
358     yaz_log(b->log_io, "bf_read: get_block");
359     if (!bf_read(b->file[cat].bf, norm, 0, 0, ce_this->buf))
360         memset (ce_this->buf, 0, ISAMB_CACHE_ENTRY_SIZE);
361     if (wr)
362     {
363         memcpy (ce_this->buf + off, userbuf, b->file[cat].head.block_size);
364         ce_this->dirty = 1;
365     }
366     else
367     {
368         ce_this->dirty = 0;
369         memcpy (userbuf, ce_this->buf + off, b->file[cat].head.block_size);
370     }
371     return 1;
372 }
373
374
375 void isamb_close (ISAMB isamb)
376 {
377     int i;
378     for (i = 0; isamb->accessed_nodes[i]; i++)
379         yaz_log(YLOG_DEBUG, "isamb_close  level leaf-%d: "ZINT_FORMAT" read, "
380                         ZINT_FORMAT" skipped",
381              i, isamb->accessed_nodes[i], isamb->skipped_nodes[i]);
382     yaz_log(YLOG_DEBUG, "isamb_close returned "ZINT_FORMAT" values, "
383                    "skipped "ZINT_FORMAT,
384          isamb->skipped_numbers, isamb->returned_numbers);
385     for (i = 0; i<isamb->no_cat; i++)
386     {
387         flush_blocks (isamb, i);
388         if (isamb->file[i].head_dirty)
389         {
390             char hbuf[DST_BUF_SIZE];
391             int major = ISAMB_MAJOR_VERSION;
392             int minor = ISAMB_MINOR_VERSION;
393             int len = 16;
394             char *dst = hbuf + 16;
395             int pos = 0, left;
396             int b_size = isamb->file[i].head.block_size;
397
398             encode_ptr(&dst, isamb->file[i].head.first_block);
399             encode_ptr(&dst, isamb->file[i].head.last_block);
400             encode_ptr(&dst, isamb->file[i].head.block_size);
401             encode_ptr(&dst, isamb->file[i].head.block_max);
402             encode_ptr(&dst, isamb->file[i].head.free_list);
403             memset(dst, '\0', b_size); /* ensure no random bytes are written */
404
405             len = dst - hbuf;
406
407             /* print exactly 16 bytes (including trailing 0) */
408             sprintf(hbuf, "isamb%02d %02d %02d\r\n", major, minor, len);
409
410             bf_write(isamb->file[i].bf, pos, 0, 0, hbuf);
411
412             for (left = len - b_size; left > 0; left = left - b_size)
413             {
414                 pos++;
415                 bf_write(isamb->file[i].bf, pos, 0, 0, hbuf + pos*b_size);
416             }
417         }
418         bf_close (isamb->file[i].bf);
419     }
420     xfree(isamb->file);
421     xfree(isamb->method);
422     xfree(isamb);
423 }
424
425 /* open_block: read one block at pos.
426    Decode leading sys bytes .. consisting of
427    Offset:Meaning
428    0: leader byte, != 0 leaf, == 0, non-leaf
429    1-2: used size of block
430    3-7*: number of items and all children
431    
432    * Reserve 5 bytes for large block sizes. 1 for small ones .. Number
433    of items. We can thus have at most 2^40 nodes. 
434 */
435 static struct ISAMB_block *open_block(ISAMB b, ISAMC_P pos)
436 {
437     int cat = (int) (pos&CAT_MASK);
438     const char *src;
439     int offset = b->file[cat].head.block_offset;
440     struct ISAMB_block *p;
441     if (!pos)
442         return 0;
443     p = xmalloc(sizeof(*p));
444     p->pos = pos;
445     p->cat = (int) (pos & CAT_MASK);
446     p->buf = xmalloc(b->file[cat].head.block_size);
447     p->cbuf = 0;
448
449     if (!get_block (b, pos, p->buf, 0))
450     {
451         yaz_log(b->log_io, "bf_read: open_block");
452         if (!bf_read(b->file[cat].bf, pos/CAT_MAX, 0, 0, p->buf))
453         {
454             yaz_log(YLOG_FATAL, "isamb: read fail for pos=%ld block=%ld",
455                      (long) pos, (long) pos/CAT_MAX);
456             abort();
457         }
458     }
459     p->bytes = p->buf + offset;
460     p->leaf = p->buf[0];
461     p->size = (p->buf[1] + 256 * p->buf[2]) - offset;
462     if (p->size < 0)
463     {
464         yaz_log(YLOG_FATAL, "Bad block size %d in pos=" ZINT_FORMAT "\n",
465                  p->size, pos);
466     }
467     assert (p->size >= 0);
468     src = p->buf + 3;
469     decode_ptr(&src, &p->no_items);
470
471     p->offset = 0;
472     p->dirty = 0;
473     p->deleted = 0;
474     p->decodeClientData = (*b->method->codec.start)();
475     return p;
476 }
477
478 struct ISAMB_block *new_block (ISAMB b, int leaf, int cat)
479 {
480     struct ISAMB_block *p;
481
482     p = xmalloc(sizeof(*p));
483     p->buf = xmalloc(b->file[cat].head.block_size);
484
485     if (!b->file[cat].head.free_list)
486     {
487         zint block_no;
488         block_no = b->file[cat].head.last_block++;
489         p->pos = block_no * CAT_MAX + cat;
490     }
491     else
492     {
493         p->pos = b->file[cat].head.free_list;
494         assert((p->pos & CAT_MASK) == cat);
495         if (!get_block (b, p->pos, p->buf, 0))
496         {
497             yaz_log(b->log_io, "bf_read: new_block");
498             if (!bf_read(b->file[cat].bf, p->pos/CAT_MAX, 0, 0, p->buf))
499             {
500                 yaz_log(YLOG_FATAL, "isamb: read fail for pos=%ld block=%ld",
501                          (long) p->pos/CAT_MAX, (long) p->pos/CAT_MAX);
502                 abort ();
503             }
504         }
505         yaz_log(b->log_freelist, "got block " ZINT_FORMAT " from freelist %d:" ZINT_FORMAT, p->pos,
506                  cat, p->pos/CAT_MAX);
507         memcpy (&b->file[cat].head.free_list, p->buf, sizeof(zint));
508     }
509     p->cat = cat;
510     b->file[cat].head_dirty = 1;
511     memset (p->buf, 0, b->file[cat].head.block_size);
512     p->bytes = p->buf + b->file[cat].head.block_offset;
513     p->leaf = leaf;
514     p->size = 0;
515     p->dirty = 1;
516     p->deleted = 0;
517     p->offset = 0;
518     p->no_items = 0;
519     p->decodeClientData = (*b->method->codec.start)();
520     return p;
521 }
522
523 struct ISAMB_block *new_leaf (ISAMB b, int cat)
524 {
525     return new_block (b, 1, cat);
526 }
527
528
529 struct ISAMB_block *new_int (ISAMB b, int cat)
530 {
531     return new_block (b, 0, cat);
532 }
533
534 static void check_block (ISAMB b, struct ISAMB_block *p)
535 {
536     assert(b); /* mostly to make the compiler shut up about unused b */
537     if (p->leaf)
538     {
539         ;
540     }
541     else
542     {
543         /* sanity check */
544         char *startp = p->bytes;
545         const char *src = startp;
546         char *endp = p->bytes + p->size;
547         ISAMB_P pos;
548         void *c1 = (*b->method->codec.start)();
549             
550         decode_ptr(&src, &pos);
551         assert ((pos&CAT_MASK) == p->cat);
552         while (src != endp)
553         {
554 #if INT_ENCODE
555             char file_item_buf[DST_ITEM_MAX];
556             char *file_item = file_item_buf;
557             (*b->method->codec.reset)(c1);
558             (*b->method->codec.decode)(c1, &file_item, &src);
559 #else
560             zint item_len;
561             decode_item_len(&src, &item_len);
562             assert (item_len > 0 && item_len < 80);
563             src += item_len;
564 #endif
565             decode_ptr(&src, &pos);
566             if ((pos&CAT_MASK) != p->cat)
567             {
568                 assert ((pos&CAT_MASK) == p->cat);
569             }
570         }
571         (*b->method->codec.stop)(c1);
572     }
573 }
574
575 void close_block(ISAMB b, struct ISAMB_block *p)
576 {
577     if (!p)
578         return;
579     if (p->deleted)
580     {
581         yaz_log(b->log_freelist, "release block " ZINT_FORMAT " from freelist %d:" ZINT_FORMAT,
582                  p->pos, p->cat, p->pos/CAT_MAX);
583         memcpy (p->buf, &b->file[p->cat].head.free_list, sizeof(zint));
584         b->file[p->cat].head.free_list = p->pos;
585         if (!get_block (b, p->pos, p->buf, 1))
586         {
587             yaz_log(b->log_io, "bf_write: close_block (deleted)");
588             bf_write(b->file[p->cat].bf, p->pos/CAT_MAX, 0, 0, p->buf);
589         }
590     }
591     else if (p->dirty)
592     {
593         int offset = b->file[p->cat].head.block_offset;
594         int size = p->size + offset;
595         char *dst =  p->buf + 3;
596         assert (p->size >= 0);
597         
598         /* memset becuase encode_ptr usually does not write all bytes */
599         memset(p->buf, 0, b->file[p->cat].head.block_offset);
600         p->buf[0] = p->leaf;
601         p->buf[1] = size & 255;
602         p->buf[2] = size >> 8;
603         encode_ptr(&dst, p->no_items);
604         check_block(b, p);
605         if (!get_block (b, p->pos, p->buf, 1))
606         {
607             yaz_log(b->log_io, "bf_write: close_block");
608             bf_write(b->file[p->cat].bf, p->pos/CAT_MAX, 0, 0, p->buf);
609         }
610     }
611     (*b->method->codec.stop)(p->decodeClientData);
612     xfree(p->buf);
613     xfree(p);
614 }
615
616 int insert_sub (ISAMB b, struct ISAMB_block **p,
617                 void *new_item, int *mode,
618                 ISAMC_I *stream,
619                 struct ISAMB_block **sp,
620                 void *sub_item, int *sub_size,
621                 const void *max_item);
622
623 int insert_int (ISAMB b, struct ISAMB_block *p, void *lookahead_item,
624                 int *mode,
625                 ISAMC_I *stream, struct ISAMB_block **sp,
626                 void *split_item, int *split_size, const void *last_max_item)
627 {
628     char *startp = p->bytes;
629     const char *src = startp;
630     char *endp = p->bytes + p->size;
631     ISAMB_P pos;
632     struct ISAMB_block *sub_p1 = 0, *sub_p2 = 0;
633     char sub_item[DST_ITEM_MAX];
634     int sub_size;
635     int more = 0;
636     zint diff_terms = 0;
637     void *c1 = (*b->method->codec.start)();
638
639     *sp = 0;
640
641     assert(p->size >= 0);
642     decode_ptr(&src, &pos);
643     while (src != endp)
644     {
645         int d;
646         const char *src0 = src;
647 #if INT_ENCODE
648         char file_item_buf[DST_ITEM_MAX];
649         char *file_item = file_item_buf;
650         (*b->method->codec.reset)(c1);
651         (*b->method->codec.decode)(c1, &file_item, &src);
652         d = (*b->method->compare_item)(file_item_buf, lookahead_item);
653         if (d > 0)
654         {
655             sub_p1 = open_block(b, pos);
656             assert (sub_p1);
657             diff_terms -= sub_p1->no_items;
658             more = insert_sub (b, &sub_p1, lookahead_item, mode,
659                                stream, &sub_p2, 
660                                sub_item, &sub_size, file_item_buf);
661             diff_terms += sub_p1->no_items;
662             src = src0;
663             break;
664         }
665 #else
666         zint item_len;
667         decode_item_len(&src, &item_len);
668         d = (*b->method->compare_item)(src, lookahead_item);
669         if (d > 0)
670         {
671             sub_p1 = open_block(b, pos);
672             assert (sub_p1);
673             diff_terms -= sub_p1->no_items;
674             more = insert_sub (b, &sub_p1, lookahead_item, mode,
675                                stream, &sub_p2, 
676                                sub_item, &sub_size, src);
677             diff_terms += sub_p1->no_items;
678             src = src0;
679             break;
680         }
681         src += item_len;
682 #endif
683         decode_ptr(&src, &pos);
684     }
685     if (!sub_p1)
686     {
687         /* we reached the end. So lookahead > last item */
688         sub_p1 = open_block(b, pos);
689         assert (sub_p1);
690         diff_terms -= sub_p1->no_items;
691         more = insert_sub (b, &sub_p1, lookahead_item, mode, stream, &sub_p2, 
692                            sub_item, &sub_size, last_max_item);
693         diff_terms += sub_p1->no_items;
694     }
695     if (sub_p2)
696         diff_terms += sub_p2->no_items;
697     if (diff_terms)
698     {
699         p->dirty = 1;
700         p->no_items += diff_terms;
701     }
702     if (sub_p2)
703     {
704         /* there was a split - must insert pointer in this one */
705         char dst_buf[DST_BUF_SIZE];
706         char *dst = dst_buf;
707 #if INT_ENCODE
708         const char *sub_item_ptr = sub_item;
709 #endif
710         assert (sub_size < 80 && sub_size > 1);
711
712         memcpy (dst, startp, src - startp);
713                 
714         dst += src - startp;
715
716 #if INT_ENCODE
717         (*b->method->codec.reset)(c1);
718         (*b->method->codec.encode)(c1, &dst, &sub_item_ptr);
719 #else
720         encode_item_len (&dst, sub_size);      /* sub length and item */
721         memcpy (dst, sub_item, sub_size);
722         dst += sub_size;
723 #endif
724
725         encode_ptr(&dst, sub_p2->pos);   /* pos */
726
727         if (endp - src)                   /* remaining data */
728         {
729             memcpy (dst, src, endp - src);
730             dst += endp - src;
731         }
732         p->size = dst - dst_buf;
733         assert (p->size >= 0);
734
735
736         if (p->size <= b->file[p->cat].head.block_max)
737         {
738             /* it fits OK in this block */
739             memcpy (startp, dst_buf, dst - dst_buf);
740         }
741         else
742         {
743             /* must split _this_ block as well .. */
744             struct ISAMB_block *sub_p3;
745 #if INT_ENCODE
746             char file_item_buf[DST_ITEM_MAX];
747             char *file_item = file_item_buf;
748 #else
749             zint split_size_tmp;
750 #endif
751             zint no_items_first_half = 0;
752             int p_new_size;
753             const char *half;
754             src = dst_buf;
755             endp = dst;
756
757             half = src + b->file[p->cat].head.block_size/2;
758             decode_ptr(&src, &pos);
759
760             /* read sub block so we can get no_items for it */
761             sub_p3 = open_block(b, pos);
762             no_items_first_half += sub_p3->no_items;
763             close_block(b, sub_p3);
764
765             while (src <= half)
766             {
767 #if INT_ENCODE
768                 file_item = file_item_buf;
769                 (*b->method->codec.reset)(c1);
770                 (*b->method->codec.decode)(c1, &file_item, &src);
771 #else
772                 decode_item_len(&src, &split_size_tmp);
773                 *split_size = (int) split_size_tmp;
774                 src += *split_size;
775 #endif
776                 decode_ptr(&src, &pos);
777
778                 /* read sub block so we can get no_items for it */
779                 sub_p3 = open_block(b, pos);
780                 no_items_first_half += sub_p3->no_items;
781                 close_block(b, sub_p3);
782             }
783             /*  p is first half */
784             p_new_size = src - dst_buf;
785             memcpy (p->bytes, dst_buf, p_new_size);
786
787 #if INT_ENCODE
788             file_item = file_item_buf;
789             (*b->method->codec.reset)(c1);
790             (*b->method->codec.decode)(c1, &file_item, &src);
791             *split_size = file_item - file_item_buf;
792             memcpy(split_item, file_item_buf, *split_size);
793 #else
794             decode_item_len(&src, &split_size_tmp);
795             *split_size = (int) split_size_tmp;
796             memcpy (split_item, src, *split_size);
797             src += *split_size;
798 #endif
799             /*  *sp is second half */
800             *sp = new_int (b, p->cat);
801             (*sp)->size = endp - src;
802             memcpy ((*sp)->bytes, src, (*sp)->size);
803
804             p->size = p_new_size;
805
806             /*  adjust no_items in first&second half */
807             (*sp)->no_items = p->no_items - no_items_first_half;
808             p->no_items = no_items_first_half;
809         }
810         p->dirty = 1;
811         close_block(b, sub_p2);
812     }
813     close_block(b, sub_p1);
814     (*b->method->codec.stop)(c1);
815     return more;
816 }
817
818 int insert_leaf (ISAMB b, struct ISAMB_block **sp1, void *lookahead_item,
819                  int *lookahead_mode, ISAMC_I *stream,
820                  struct ISAMB_block **sp2,
821                  void *sub_item, int *sub_size,
822                  const void *max_item)
823 {
824     struct ISAMB_block *p = *sp1;
825     char *endp = 0;
826     const char *src = 0;
827     char dst_buf[DST_BUF_SIZE], *dst = dst_buf;
828     int new_size;
829     void *c1 = (*b->method->codec.start)();
830     void *c2 = (*b->method->codec.start)();
831     int more = 1;
832     int quater = b->file[b->no_cat-1].head.block_max / 4;
833     char *mid_cut = dst_buf + quater * 2;
834     char *tail_cut = dst_buf + quater * 3;
835     char *maxp = dst_buf + b->file[b->no_cat-1].head.block_max;
836     char *half1 = 0;
837     char *half2 = 0;
838     char cut_item_buf[DST_ITEM_MAX];
839     int cut_item_size = 0;
840     int no_items = 0;    /* number of items (total) */
841     int no_items_1 = 0;  /* number of items (first half) */
842
843     if (p && p->size)
844     {
845         char file_item_buf[DST_ITEM_MAX];
846         char *file_item = file_item_buf;
847             
848         src = p->bytes;
849         endp = p->bytes + p->size;
850         (*b->method->codec.decode)(c1, &file_item, &src);
851         while (1)
852         {
853             const char *dst_item = 0; /* resulting item to be inserted */
854             char *lookahead_next;
855             int d = -1;
856             
857             if (lookahead_item)
858                 d = (*b->method->compare_item)(file_item_buf, lookahead_item);
859             
860             /* d now holds comparison between existing file item and
861                lookahead item 
862                d = 0: equal
863                d > 0: lookahead before file
864                d < 0: lookahead after file
865             */
866             if (d > 0)
867             {
868                 /* lookahead must be inserted */
869                 dst_item = lookahead_item;
870                 /* if this is not an insertion, it's really bad .. */
871                 if (!*lookahead_mode)
872                 {
873                     yaz_log(YLOG_WARN, "isamb: Inconsistent register (1)");
874                     assert (*lookahead_mode);
875                 }
876             }
877             else
878                 dst_item = file_item_buf;
879
880                
881             if (!*lookahead_mode && d == 0)
882             {
883                 /* it's a deletion and they match so there is nothing to be
884                    inserted anyway .. But mark the thing bad (file item
885                    was part of input.. The item will not be part of output */
886                 p->dirty = 1;
887             }
888             else if (!half1 && dst > mid_cut)
889             {
890                 /* we have reached the splitting point for the first time */
891                 const char *dst_item_0 = dst_item;
892                 half1 = dst; /* candidate for splitting */
893
894                 /* encode the resulting item */
895                 (*b->method->codec.encode)(c2, &dst, &dst_item);
896                 
897                 cut_item_size = dst_item - dst_item_0;
898                 assert(cut_item_size > 0);
899                 memcpy (cut_item_buf, dst_item_0, cut_item_size);
900                 
901                 half2 = dst;
902                 no_items_1 = no_items;
903                 no_items++;
904             }
905             else
906             {
907                 /* encode the resulting item */
908                 (*b->method->codec.encode)(c2, &dst, &dst_item);
909                 no_items++;
910             }
911
912             /* now move "pointers" .. result has been encoded .. */
913             if (d > 0)  
914             {
915                 /* we must move the lookahead pointer */
916
917                 if (dst > maxp)
918                     /* no more room. Mark lookahead as "gone".. */
919                     lookahead_item = 0;
920                 else
921                 {
922                     /* move it really.. */
923                     lookahead_next = lookahead_item;
924                     if (!(*stream->read_item)(stream->clientData,
925                                               &lookahead_next,
926                                               lookahead_mode))
927                     {
928                         /* end of stream reached: no "more" and no lookahead */
929                         lookahead_item = 0;
930                         more = 0;
931                     }
932                     if (lookahead_item && max_item &&
933                         (*b->method->compare_item)(max_item, lookahead_item) <= 0)
934                     {
935                         /* the lookahead goes beyond what we allow in this
936                            leaf. Mark it as "gone" */
937                         lookahead_item = 0;
938                     }
939                     
940                     p->dirty = 1;
941                 }
942             }
943             else if (d == 0)
944             {
945                 /* exact match .. move both pointers */
946
947                 lookahead_next = lookahead_item;
948                 if (!(*stream->read_item)(stream->clientData,
949                                           &lookahead_next, lookahead_mode))
950                 {
951                     lookahead_item = 0;
952                     more = 0;
953                 }
954                 if (src == endp)
955                     break;  /* end of file stream reached .. */
956                 file_item = file_item_buf; /* move file pointer */
957                 (*b->method->codec.decode)(c1, &file_item, &src);
958             }
959             else
960             {
961                 /* file pointer must be moved */
962                 if (src == endp)
963                     break;
964                 file_item = file_item_buf;
965                 (*b->method->codec.decode)(c1, &file_item, &src);
966             }
967         }
968     }
969     maxp = dst_buf + b->file[b->no_cat-1].head.block_max + quater;
970     /* this loop runs when we are "appending" to a leaf page. That is
971        either it's empty (new) or all file items have been read in
972        previous loop */
973     while (lookahead_item)
974     {
975         char *dst_item;
976         const char *src = lookahead_item;
977         char *dst_0 = dst;
978         
979         /* compare lookahead with max item */
980         if (max_item &&
981             (*b->method->compare_item)(max_item, lookahead_item) <= 0)
982         {
983             /* stop if we have reached the value of max item */
984             break;
985         }
986         if (!*lookahead_mode)
987         {
988             /* this is append. So a delete is bad */
989             yaz_log(YLOG_WARN, "isamb: Inconsistent register (2)");
990             abort();
991         }
992         else if (!half1 && dst > tail_cut)
993         {
994             const char *src_0 = src;
995             half1 = dst; /* candidate for splitting */
996             
997             (*b->method->codec.encode)(c2, &dst, &src);
998             
999             cut_item_size = src - src_0;
1000             assert(cut_item_size > 0);
1001             memcpy (cut_item_buf, src_0, cut_item_size);
1002             
1003             no_items_1 = no_items;
1004             half2 = dst;
1005         }
1006         else
1007             (*b->method->codec.encode)(c2, &dst, &src);
1008
1009         if (dst > maxp)
1010         {
1011             dst = dst_0;
1012             break;
1013         }
1014         no_items++;
1015         if (p)
1016             p->dirty = 1;
1017         dst_item = lookahead_item;
1018         if (!(*stream->read_item)(stream->clientData, &dst_item,
1019                                   lookahead_mode))
1020         {
1021             lookahead_item = 0;
1022             more = 0;
1023         }
1024     }
1025     new_size = dst - dst_buf;
1026     if (p && p->cat != b->no_cat-1 && 
1027         new_size > b->file[p->cat].head.block_max)
1028     {
1029         /* non-btree block will be removed */
1030         p->deleted = 1;
1031         close_block(b, p);
1032         /* delete it too!! */
1033         p = 0; /* make a new one anyway */
1034     }
1035     if (!p)
1036     {   /* must create a new one */
1037         int i;
1038         for (i = 0; i < b->no_cat; i++)
1039             if (new_size <= b->file[i].head.block_max)
1040                 break;
1041         if (i == b->no_cat)
1042             i = b->no_cat - 1;
1043         p = new_leaf (b, i);
1044     }
1045     if (new_size > b->file[p->cat].head.block_max)
1046     {
1047         char *first_dst;
1048         const char *cut_item = cut_item_buf;
1049
1050         assert (half1);
1051         assert (half2);
1052
1053         assert(cut_item_size > 0);
1054         
1055         /* first half */
1056         p->size = half1 - dst_buf;
1057         memcpy (p->bytes, dst_buf, half1 - dst_buf);
1058         p->no_items = no_items_1;
1059
1060         /* second half */
1061         *sp2 = new_leaf (b, p->cat);
1062
1063         (*b->method->codec.reset)(c2);
1064
1065         first_dst = (*sp2)->bytes;
1066
1067         (*b->method->codec.encode)(c2, &first_dst, &cut_item);
1068
1069         memcpy (first_dst, half2, dst - half2);
1070
1071         (*sp2)->size = (first_dst - (*sp2)->bytes) + (dst - half2);
1072         (*sp2)->no_items = no_items - no_items_1;
1073         (*sp2)->dirty = 1;
1074         p->dirty = 1;
1075         memcpy (sub_item, cut_item_buf, cut_item_size);
1076         *sub_size = cut_item_size;
1077     }
1078     else
1079     {
1080         memcpy (p->bytes, dst_buf, dst - dst_buf);
1081         p->size = new_size;
1082         p->no_items = no_items;
1083     }
1084     (*b->method->codec.stop)(c1);
1085     (*b->method->codec.stop)(c2);
1086     *sp1 = p;
1087     return more;
1088 }
1089
1090 int insert_sub (ISAMB b, struct ISAMB_block **p, void *new_item,
1091                 int *mode,
1092                 ISAMC_I *stream,
1093                 struct ISAMB_block **sp,
1094                 void *sub_item, int *sub_size,
1095                 const void *max_item)
1096 {
1097     if (!*p || (*p)->leaf)
1098         return insert_leaf (b, p, new_item, mode, stream, sp, sub_item, 
1099                             sub_size, max_item);
1100     else
1101         return insert_int (b, *p, new_item, mode, stream, sp, sub_item,
1102                            sub_size, max_item);
1103 }
1104
1105 int isamb_unlink (ISAMB b, ISAMC_P pos)
1106 {
1107     struct ISAMB_block *p1;
1108
1109     if (!pos)
1110         return 0;
1111     p1 = open_block(b, pos);
1112     p1->deleted = 1;
1113     if (!p1->leaf)
1114     {
1115         zint sub_p;
1116         const char *src = p1->bytes + p1->offset;
1117 #if INT_ENCODE
1118         void *c1 = (*b->method->codec.start)();
1119 #endif
1120         decode_ptr(&src, &sub_p);
1121         isamb_unlink(b, sub_p);
1122         
1123         while (src != p1->bytes + p1->size)
1124         {
1125 #if INT_ENCODE
1126             char file_item_buf[DST_ITEM_MAX];
1127             char *file_item = file_item_buf;
1128             (*b->method->codec.reset)(c1);
1129             (*b->method->codec.decode)(c1, &file_item, &src);
1130 #else
1131             zint item_len;
1132             decode_item_len(&src, &item_len);
1133             src += item_len;
1134 #endif
1135             decode_ptr(&src, &sub_p);
1136             isamb_unlink(b, sub_p);
1137         }
1138 #if INT_ENCODE
1139         (*b->method->codec.stop)(c1);
1140 #endif
1141     }
1142     close_block(b, p1);
1143     return 0;
1144 }
1145
1146 ISAMB_P isamb_merge (ISAMB b, ISAMC_P pos, ISAMC_I *stream)
1147 {
1148     char item_buf[DST_ITEM_MAX];
1149     char *item_ptr;
1150     int i_mode;
1151     int more;
1152     int must_delete = 0;
1153
1154     if (b->cache < 0)
1155     {
1156         int more = 1;
1157         while (more)
1158         {
1159             item_ptr = item_buf;
1160             more =
1161                 (*stream->read_item)(stream->clientData, &item_ptr, &i_mode);
1162         }
1163         return 1;
1164     }
1165     item_ptr = item_buf;
1166     more = (*stream->read_item)(stream->clientData, &item_ptr, &i_mode);
1167     while (more)
1168     {
1169         struct ISAMB_block *p = 0, *sp = 0;
1170         char sub_item[DST_ITEM_MAX];
1171         int sub_size;
1172         
1173         if (pos)
1174             p = open_block(b, pos);
1175         more = insert_sub (b, &p, item_buf, &i_mode, stream, &sp,
1176                            sub_item, &sub_size, 0);
1177         if (sp)
1178         {    /* increase level of tree by one */
1179             struct ISAMB_block *p2 = new_int (b, p->cat);
1180             char *dst = p2->bytes + p2->size;
1181 #if INT_ENCODE
1182             void *c1 = (*b->method->codec.start)();
1183             const char *sub_item_ptr = sub_item;
1184 #endif
1185
1186             encode_ptr(&dst, p->pos);
1187             assert (sub_size < 80 && sub_size > 1);
1188 #if INT_ENCODE
1189             (*b->method->codec.reset)(c1);
1190             (*b->method->codec.encode)(c1, &dst, &sub_item_ptr);
1191 #else
1192             encode_item_len (&dst, sub_size);
1193             memcpy (dst, sub_item, sub_size);
1194             dst += sub_size;
1195 #endif
1196             encode_ptr(&dst, sp->pos);
1197             
1198             p2->size = dst - p2->bytes;
1199             p2->no_items = p->no_items + sp->no_items;
1200             pos = p2->pos;  /* return new super page */
1201             close_block(b, sp);
1202             close_block(b, p2);
1203 #if INT_ENCODE
1204             (*b->method->codec.stop)(c1);
1205 #endif
1206         }
1207         else
1208         {
1209             pos = p->pos;   /* return current one (again) */
1210         }
1211         if (p->no_items == 0)
1212             must_delete = 1;
1213         else
1214             must_delete = 0;
1215         close_block(b, p);
1216     }
1217     if (must_delete)
1218     {
1219         isamb_unlink(b, pos);
1220         return 0;
1221     }
1222     return pos;
1223 }
1224
1225 ISAMB_PP isamb_pp_open_x(ISAMB isamb, ISAMB_P pos, int *level, int scope)
1226 {
1227     ISAMB_PP pp = xmalloc(sizeof(*pp));
1228     int i;
1229
1230     assert(pos);
1231
1232     pp->isamb = isamb;
1233     pp->block = xmalloc(ISAMB_MAX_LEVEL * sizeof(*pp->block));
1234
1235     pp->pos = pos;
1236     pp->level = 0;
1237     pp->maxlevel = 0;
1238     pp->total_size = 0;
1239     pp->no_blocks = 0;
1240     pp->skipped_numbers = 0;
1241     pp->returned_numbers = 0;
1242     pp->scope = scope;
1243     for (i = 0; i<ISAMB_MAX_LEVEL; i++)
1244         pp->skipped_nodes[i] = pp->accessed_nodes[i] = 0;
1245     while (1)
1246     {
1247         struct ISAMB_block *p = open_block(isamb, pos);
1248         const char *src = p->bytes + p->offset;
1249         pp->block[pp->level] = p;
1250
1251         pp->total_size += p->size;
1252         pp->no_blocks++;
1253         if (p->leaf)
1254             break;
1255         decode_ptr(&src, &pos);
1256         p->offset = src - p->bytes;
1257         pp->level++;
1258         pp->accessed_nodes[pp->level]++; 
1259     }
1260     pp->block[pp->level+1] = 0;
1261     pp->maxlevel = pp->level;
1262     if (level)
1263         *level = pp->level;
1264     return pp;
1265 }
1266
1267 ISAMB_PP isamb_pp_open (ISAMB isamb, ISAMB_P pos, int scope)
1268 {
1269     return isamb_pp_open_x(isamb, pos, 0, scope);
1270 }
1271
1272 void isamb_pp_close_x(ISAMB_PP pp, int *size, int *blocks)
1273 {
1274     int i;
1275     if (!pp)
1276         return;
1277     yaz_log(YLOG_DEBUG, "isamb_pp_close lev=%d returned "ZINT_FORMAT" values, " 
1278                     "skipped "ZINT_FORMAT,
1279         pp->maxlevel, pp->skipped_numbers, pp->returned_numbers);
1280     for (i = pp->maxlevel; i>=0; i--)
1281         if (pp->skipped_nodes[i] || pp->accessed_nodes[i])
1282             yaz_log(YLOG_DEBUG, "isamb_pp_close  level leaf-%d: "
1283                             ZINT_FORMAT" read, "ZINT_FORMAT" skipped", i,
1284                  pp->accessed_nodes[i], pp->skipped_nodes[i]);
1285     pp->isamb->skipped_numbers += pp->skipped_numbers;
1286     pp->isamb->returned_numbers += pp->returned_numbers;
1287     for (i = pp->maxlevel; i>=0; i--)
1288     {
1289         pp->isamb->accessed_nodes[i] += pp->accessed_nodes[i];
1290         pp->isamb->skipped_nodes[i] += pp->skipped_nodes[i];
1291     }
1292     if (size)
1293         *size = pp->total_size;
1294     if (blocks)
1295         *blocks = pp->no_blocks;
1296     for (i = 0; i <= pp->level; i++)
1297         close_block(pp->isamb, pp->block[i]);
1298     xfree(pp->block);
1299     xfree(pp);
1300 }
1301
1302 int isamb_block_info (ISAMB isamb, int cat)
1303 {
1304     if (cat >= 0 && cat < isamb->no_cat)
1305         return isamb->file[cat].head.block_size;
1306     return -1;
1307 }
1308
1309 void isamb_pp_close (ISAMB_PP pp)
1310 {
1311     isamb_pp_close_x(pp, 0, 0);
1312 }
1313
1314 /* simple recursive dumper .. */
1315 static void isamb_dump_r (ISAMB b, ISAMB_P pos, void (*pr)(const char *str),
1316                           int level)
1317 {
1318     char buf[1024];
1319     char prefix_str[1024];
1320     if (pos)
1321     {
1322         struct ISAMB_block *p = open_block(b, pos);
1323         sprintf(prefix_str, "%*s " ZINT_FORMAT " cat=%d size=%d max=%d items="
1324                 ZINT_FORMAT, level*2, "",
1325                 pos, p->cat, p->size, b->file[p->cat].head.block_max,
1326                 p->no_items);
1327         (*pr)(prefix_str);
1328         sprintf(prefix_str, "%*s " ZINT_FORMAT, level*2, "", pos);
1329         if (p->leaf)
1330         {
1331             while (p->offset < p->size)
1332             {
1333                 const char *src = p->bytes + p->offset;
1334                 char *dst = buf;
1335                 (*b->method->codec.decode)(p->decodeClientData, &dst, &src);
1336                 (*b->method->log_item)(YLOG_DEBUG, buf, prefix_str);
1337                 p->offset = src - (char*) p->bytes;
1338             }
1339             assert(p->offset == p->size);
1340         }
1341         else
1342         {
1343             const char *src = p->bytes + p->offset;
1344             ISAMB_P sub;
1345
1346             decode_ptr(&src, &sub);
1347             p->offset = src - (char*) p->bytes;
1348
1349             isamb_dump_r(b, sub, pr, level+1);
1350             
1351             while (p->offset < p->size)
1352             {
1353 #if INT_ENCODE
1354                 char file_item_buf[DST_ITEM_MAX];
1355                 char *file_item = file_item_buf;
1356                 void *c1 = (*b->method->codec.start)();
1357                 (*b->method->codec.decode)(c1, &file_item, &src);
1358                 (*b->method->codec.stop)(c1);
1359                 (*b->method->log_item)(YLOG_DEBUG, file_item_buf, prefix_str);
1360 #else
1361                 zint item_len;
1362                 decode_item_len(&src, &item_len);
1363                 (*b->method->log_item)(YLOG_DEBUG, src, prefix_str);
1364                 src += item_len;
1365 #endif
1366                 decode_ptr(&src, &sub);
1367                 
1368                 p->offset = src - (char*) p->bytes;
1369                 
1370                 isamb_dump_r(b, sub, pr, level+1);
1371             }           
1372         }
1373         close_block(b, p);
1374     }
1375 }
1376
1377 void isamb_dump(ISAMB b, ISAMB_P pos, void (*pr)(const char *str))
1378 {
1379     isamb_dump_r(b, pos, pr, 0);
1380 }
1381
1382 int isamb_pp_read(ISAMB_PP pp, void *buf)
1383 {
1384     return isamb_pp_forward(pp, buf, 0);
1385 }
1386
1387
1388 static int isamb_pp_on_right_node(ISAMB_PP pp, int level, const void *untilbuf)
1389 { /* looks one node higher to see if we should be on this node at all */
1390   /* useful in backing off quickly, and in avoiding tail descends */
1391   /* call with pp->level to begin with */
1392     struct ISAMB_block *p;
1393     int cmp;
1394     const char *src;
1395     ISAMB b = pp->isamb;
1396
1397     assert(level >= 0);
1398     if (level == 0)
1399     {
1400 #if ISAMB_DEBUG
1401             yaz_log(YLOG_DEBUG, "isamb_pp_on_right returning true for root");
1402 #endif
1403         return 1; /* we can never skip the root node */
1404     }
1405     level--;
1406     p = pp->block[level];
1407     assert(p->offset <= p->size);
1408     if (p->offset < p->size)
1409     {
1410 #if INT_ENCODE
1411         char file_item_buf[DST_ITEM_MAX];
1412         char *file_item = file_item_buf;
1413         void *c1 = (*b->method->codec.start)();
1414         assert(p->offset > 0); 
1415         src = p->bytes + p->offset;
1416         (*b->method->codec.decode)(c1, &file_item, &src);
1417         (*b->method->codec.stop)(c1);
1418         cmp = (*b->method->compare_item)(untilbuf, file_item_buf);
1419 #else
1420         zint item_len;
1421         assert(p->offset > 0); 
1422         src = p->bytes + p->offset;
1423         decode_item_len(&src, &item_len);
1424 #if ISAMB_DEBUG
1425         (*b->method->codec.log_item)(YLOG_DEBUG, untilbuf, "on_leaf: until");
1426         (*b->method->codec.log_item)(YLOG_DEBUG, src, "on_leaf: value");
1427 #endif
1428         cmp = (*b->method->compare_item)(untilbuf, src);
1429 #endif
1430         if (cmp < pp->scope)
1431         {  /* cmp<2 */
1432 #if ISAMB_DEBUG
1433             yaz_log(YLOG_DEBUG, "isamb_pp_on_right returning true "
1434                             "cmp=%d lev=%d ofs=%d", cmp, level, p->offset);
1435 #endif
1436             return 1; 
1437         }
1438         else
1439         {
1440 #if ISAMB_DEBUG
1441             yaz_log(YLOG_DEBUG, "isamb_pp_on_right returning false "
1442                             "cmp=%d lev=%d ofs=%d", cmp, level, p->offset);
1443 #endif
1444             return 0; 
1445         }
1446     }
1447     else {
1448 #if ISAMB_DEBUG
1449         yaz_log(YLOG_DEBUG, "isamb_pp_on_right at tail, looking higher "
1450                         "lev=%d", level);
1451 #endif
1452         return isamb_pp_on_right_node(pp, level, untilbuf);
1453     }
1454 } /* isamb_pp_on_right_node */
1455
1456 static int isamb_pp_read_on_leaf(ISAMB_PP pp, void *buf)
1457
1458     /* reads the next item on the current leaf, returns 0 if end of leaf*/
1459     struct ISAMB_block *p = pp->block[pp->level];
1460     char *dst;
1461     const char *src;
1462     assert(pp);
1463     assert(buf);
1464     if (p->offset == p->size)
1465     {
1466 #if ISAMB_DEBUG
1467         yaz_log(YLOG_DEBUG, "isamb_pp_read_on_leaf returning 0 on "
1468                 "node %d", p->pos);
1469 #endif
1470         return 0; /* at end of leaf */
1471     }
1472     src = p->bytes + p->offset;
1473     dst = buf;
1474     (*pp->isamb->method->codec.decode)(p->decodeClientData, &dst, &src);
1475     p->offset = src - (char*) p->bytes;
1476 #if ISAMB_DEBUG
1477     (*pp->isamb->method->codec.log_item)(YLOG_DEBUG, buf,
1478                                          "read_on_leaf returning 1");
1479 #endif
1480     pp->returned_numbers++;
1481     return 1;
1482 } /* read_on_leaf */
1483
1484 static int isamb_pp_forward_on_leaf(ISAMB_PP pp, void *buf, const void *untilbuf)
1485 { /* forwards on the current leaf, returns 0 if not found */
1486     int cmp;
1487     int skips = 0;
1488     while (1)
1489     {
1490         if (!isamb_pp_read_on_leaf(pp, buf))
1491             return 0;
1492         /* FIXME - this is an extra function call, inline the read? */
1493         cmp=(*pp->isamb->method->compare_item)(untilbuf, buf);
1494         if (cmp <pp->scope)
1495         {  /* cmp<2  found a good one */
1496 #if ISAMB_DEBUG
1497             if (skips)
1498                 yaz_log(YLOG_DEBUG, "isam_pp_fwd_on_leaf skipped %d items", skips);
1499 #endif
1500             pp->returned_numbers++;
1501             return 1;
1502         }
1503         if (!skips)
1504             if (!isamb_pp_on_right_node(pp, pp->level, untilbuf))
1505                 return 0; /* never mind the rest of this leaf */
1506         pp->skipped_numbers++;
1507         skips++;
1508     }
1509 } /* forward_on_leaf */
1510
1511 static int isamb_pp_climb_level(ISAMB_PP pp, ISAMB_P *pos)
1512 { /* climbs higher in the tree, until finds a level with data left */
1513   /* returns the node to (consider to) descend to in *pos) */
1514     struct ISAMB_block *p = pp->block[pp->level];
1515     const char *src;
1516 #if ISAMB_DEBUG
1517     yaz_log(YLOG_DEBUG, "isamb_pp_climb_level starting "
1518                    "at level %d node %d ofs=%d sz=%d",
1519                     pp->level, p->pos, p->offset, p->size);
1520 #endif
1521     assert(pp->level >= 0);
1522     assert(p->offset <= p->size);
1523     if (pp->level==0)
1524     {
1525 #if ISAMB_DEBUG
1526         yaz_log(YLOG_DEBUG, "isamb_pp_climb_level returning 0 at root");
1527 #endif
1528         return 0;
1529     }
1530     assert(pp->level>0); 
1531     close_block(pp->isamb, pp->block[pp->level]);
1532     pp->block[pp->level] = 0;
1533     (pp->level)--;
1534     p = pp->block[pp->level];
1535 #if ISAMB_DEBUG
1536     yaz_log(YLOG_DEBUG, "isamb_pp_climb_level climbed to level %d node %d ofs=%d",
1537                     pp->level, p->pos, p->offset);
1538 #endif
1539     assert(!p->leaf);
1540     assert(p->offset <= p->size);
1541     if (p->offset == p->size)
1542     {
1543         /* we came from the last pointer, climb on */
1544         if (!isamb_pp_climb_level(pp, pos))
1545             return 0;
1546         p = pp->block[pp->level];
1547     }
1548     else
1549     {
1550 #if INT_ENCODE
1551         char file_item_buf[DST_ITEM_MAX];
1552         char *file_item = file_item_buf;
1553         ISAMB b = pp->isamb;
1554         void *c1 = (*b->method->codec.start)();
1555 #else
1556         zint item_len;
1557 #endif
1558         /* skip the child we just came from */
1559 #if ISAMB_DEBUG
1560         yaz_log(YLOG_DEBUG, "isam_pp_climb_level: skipping lev=%d ofs=%d sz=%d", 
1561                         pp->level, p->offset, p->size);
1562 #endif
1563         assert (p->offset < p->size);
1564         src = p->bytes + p->offset;
1565 #if INT_ENCODE
1566         (*b->method->codec.decode)(c1, &file_item, &src);
1567         (*b->method->codec.stop)(c1);
1568 #else
1569         decode_item_len(&src, &item_len);
1570         src += item_len;
1571 #endif
1572         decode_ptr(&src, pos);
1573         p->offset = src - (char *)p->bytes;
1574             
1575     }
1576     return 1;
1577 } /* climb_level */
1578
1579
1580 static zint isamb_pp_forward_unode(ISAMB_PP pp, zint pos, const void *untilbuf)
1581 { /* scans a upper node until it finds a child <= untilbuf */
1582   /* pp points to the key value, as always. pos is the child read from */
1583   /* the buffer */
1584   /* if all values are too small, returns the last child in the node */
1585   /* FIXME - this can be detected, and avoided by looking at the */
1586   /* parent node, but that gets messy. Presumably the cost is */
1587   /* pretty low anyway */
1588     ISAMB b = pp->isamb;
1589     struct ISAMB_block *p = pp->block[pp->level];
1590     const char *src = p->bytes + p->offset;
1591     int cmp;
1592     zint nxtpos;
1593 #if ISAMB_DEBUG
1594     int skips = 0;
1595     yaz_log(YLOG_DEBUG, "isamb_pp_forward_unode starting "
1596                    "at level %d node %d ofs=%di sz=%d",
1597                     pp->level, p->pos, p->offset, p->size);
1598 #endif
1599     assert(!p->leaf);
1600     assert(p->offset <= p->size);
1601     if (p->offset == p->size)
1602     {
1603 #if ISAMB_DEBUG
1604             yaz_log(YLOG_DEBUG, "isamb_pp_forward_unode returning at end "
1605                    "at level %d node %d ofs=%di sz=%d",
1606                     pp->level, p->pos, p->offset, p->size);
1607 #endif
1608         return pos; /* already at the end of it */
1609     }
1610     while(p->offset < p->size)
1611     {
1612 #if INT_ENCODE
1613         char file_item_buf[DST_ITEM_MAX];
1614         char *file_item = file_item_buf;
1615         void *c1 = (*b->method->codec.start)();
1616         (*b->method->codec.decode)(c1, &file_item, &src);
1617         (*b->method->codec.stop)(c1);
1618         cmp = (*b->method->compare_item)(untilbuf, file_item_buf);
1619 #else
1620         zint item_len;
1621         decode_item_len(&src, &item_len);
1622         cmp = (*b->method->compare_item)(untilbuf, src);
1623         src += item_len;
1624 #endif
1625         decode_ptr(&src, &nxtpos);
1626         if (cmp<pp->scope) /* cmp<2 */
1627         {
1628 #if ISAMB_DEBUG
1629             yaz_log(YLOG_DEBUG, "isamb_pp_forward_unode returning a hit "
1630                    "at level %d node %d ofs=%d sz=%d",
1631                     pp->level, p->pos, p->offset, p->size);
1632 #endif
1633             return pos;
1634         } /* found one */
1635         pos = nxtpos;
1636         p->offset = src-(char*)p->bytes;
1637         (pp->skipped_nodes[pp->maxlevel - pp->level -1])++;
1638 #if ISAMB_DEBUG
1639         skips++;
1640 #endif
1641     }
1642 #if ISAMB_DEBUG
1643             yaz_log(YLOG_DEBUG, "isamb_pp_forward_unode returning at tail "
1644                    "at level %d node %d ofs=%d sz=%d skips=%d",
1645                     pp->level, p->pos, p->offset, p->size, skips);
1646 #endif
1647     return pos; /* that's the last one in the line */
1648     
1649 } /* forward_unode */
1650
1651 static void isamb_pp_descend_to_leaf(ISAMB_PP pp, ISAMB_P pos,
1652                                      const void *untilbuf)
1653 { /* climbs down the tree, from pos, to the leftmost leaf */
1654     struct ISAMB_block *p = pp->block[pp->level];
1655     const char *src;
1656     assert(!p->leaf);
1657 #if ISAMB_DEBUG
1658     yaz_log(YLOG_DEBUG, "isamb_pp_descend_to_leaf "
1659                    "starting at lev %d node %d ofs=%d lf=%d u=%p", 
1660                    pp->level, p->pos, p->offset, p->leaf, untilbuf);
1661 #endif
1662     if (untilbuf)
1663         pos = isamb_pp_forward_unode(pp, pos, untilbuf);
1664     ++(pp->level);
1665     assert(pos);
1666     p = open_block(pp->isamb, pos);
1667     pp->block[pp->level] = p;
1668     ++(pp->accessed_nodes[pp->maxlevel-pp->level]);
1669     ++(pp->no_blocks);
1670 #if ISAMB_DEBUG
1671     yaz_log(YLOG_DEBUG, "isamb_pp_descend_to_leaf "
1672                    "got lev %d node %d lf=%d", 
1673                    pp->level, p->pos, p->leaf);
1674 #endif
1675     if (p->leaf)
1676         return;
1677     assert (p->offset==0);
1678     src = p->bytes + p->offset;
1679     decode_ptr(&src, &pos);
1680     p->offset = src-(char*)p->bytes;
1681     isamb_pp_descend_to_leaf(pp, pos, untilbuf);
1682 #if ISAMB_DEBUG
1683     yaz_log(YLOG_DEBUG, "isamb_pp_descend_to_leaf "
1684                    "returning at lev %d node %d ofs=%d lf=%d", 
1685                    pp->level, p->pos, p->offset, p->leaf);
1686 #endif
1687 } /* descend_to_leaf */
1688
1689 static int isamb_pp_find_next_leaf(ISAMB_PP pp)
1690 { /* finds the next leaf by climbing up and down */
1691     ISAMB_P pos;
1692     if (!isamb_pp_climb_level(pp, &pos))
1693         return 0;
1694     isamb_pp_descend_to_leaf(pp, pos, 0);
1695     return 1;
1696 }
1697
1698 static int isamb_pp_climb_desc(ISAMB_PP pp,  const void *untilbuf)
1699 { /* climbs up and descends to a leaf where values >= *untilbuf are found */
1700     ISAMB_P pos;
1701 #if ISAMB_DEBUG
1702     struct ISAMB_block *p = pp->block[pp->level];
1703     yaz_log(YLOG_DEBUG, "isamb_pp_climb_desc starting "
1704                    "at level %d node %d ofs=%d sz=%d",
1705                     pp->level, p->pos, p->offset, p->size);
1706 #endif
1707     if (!isamb_pp_climb_level(pp, &pos))
1708         return 0;
1709     /* see if it would pay to climb one higher */
1710     if (!isamb_pp_on_right_node(pp, pp->level, untilbuf))
1711         if (!isamb_pp_climb_level(pp, &pos))
1712             return 0;
1713     isamb_pp_descend_to_leaf(pp, pos, untilbuf);
1714 #if ISAMB_DEBUG
1715     p = pp->block[pp->level];
1716     yaz_log(YLOG_DEBUG, "isamb_pp_climb_desc done "
1717                    "at level %d node %d ofs=%d sz=%d",
1718                     pp->level, p->pos, p->offset, p->size);
1719 #endif
1720     return 1;
1721 } /* climb_desc */
1722
1723 int isamb_pp_forward (ISAMB_PP pp, void *buf, const void *untilbuf)
1724 {
1725 #if ISAMB_DEBUG
1726     struct ISAMB_block *p = pp->block[pp->level];
1727     assert(p->leaf);
1728     yaz_log(YLOG_DEBUG, "isamb_pp_forward starting "
1729                    "at level %d node %d ofs=%d sz=%d u=%p sc=%d",
1730                     pp->level, p->pos, p->offset, p->size, untilbuf, scope);
1731 #endif
1732     if (untilbuf)
1733     {
1734         if (isamb_pp_forward_on_leaf(pp, buf, untilbuf))
1735         {
1736 #if ISAMB_DEBUG
1737             yaz_log(YLOG_DEBUG, "isamb_pp_forward (f) returning (A) "
1738                    "at level %d node %d ofs=%d sz=%d",
1739                     pp->level, p->pos, p->offset, p->size);
1740 #endif
1741             return 1;
1742         }
1743         if (! isamb_pp_climb_desc(pp, untilbuf))
1744         {
1745 #if ISAMB_DEBUG
1746             yaz_log(YLOG_DEBUG, "isamb_pp_forward (f) returning notfound (B) "
1747                    "at level %d node %d ofs=%d sz=%d",
1748                     pp->level, p->pos, p->offset, p->size);
1749 #endif
1750             return 0; /* could not find a leaf */
1751         }
1752         do {
1753             if (isamb_pp_forward_on_leaf(pp, buf, untilbuf))
1754             {
1755 #if ISAMB_DEBUG
1756                 yaz_log(YLOG_DEBUG, "isamb_pp_forward (f) returning (c) "
1757                         "at level %d node %d ofs=%d sz=%d",
1758                         pp->level, p->pos, p->offset, p->size);
1759 #endif
1760                 return 1;
1761             }
1762         } while (isamb_pp_find_next_leaf(pp));
1763         return 0; /* could not find at all */
1764     }
1765     else { /* no untilbuf, a straight read */
1766         /* FIXME - this should be moved
1767          * directly into the pp_read */
1768         /* keeping here now, to keep same
1769          * interface as the old fwd */
1770         if (isamb_pp_read_on_leaf(pp, buf))
1771         {
1772 #if ISAMB_DEBUG
1773             yaz_log(YLOG_DEBUG, "isamb_pp_forward (read) returning (D) "
1774                    "at level %d node %d ofs=%d sz=%d",
1775                     pp->level, p->pos, p->offset, p->size);
1776 #endif
1777             return 1;
1778         }
1779         if (isamb_pp_find_next_leaf(pp))
1780         {
1781 #if ISAMB_DEBUG
1782             yaz_log(YLOG_DEBUG, "isamb_pp_forward (read) returning (E) "
1783                    "at level %d node %d ofs=%d sz=%d",
1784                     pp->level, p->pos, p->offset, p->size);
1785 #endif
1786             return isamb_pp_read_on_leaf(pp, buf);
1787         }
1788         else
1789             return 0;
1790     }
1791 } /* isam_pp_forward (new version) */
1792
1793 void isamb_pp_pos(ISAMB_PP pp, double *current, double *total)
1794 { /* return an estimate of the current position and of the total number of */
1795   /* occureences in the isam tree, based on the current leaf */
1796     struct ISAMB_block *p = pp->block[pp->level];
1797     assert(total);
1798     assert(current);
1799     assert(p->leaf);
1800
1801     *total = pp->block[0]->no_items;
1802     *current = (double) pp->returned_numbers;
1803 #if ISAMB_DEBUG
1804     yaz_log(YLOG_LOG, "isamb_pp_pos returning: cur= %0.1f tot=%0.1f rn="
1805          ZINT_FORMAT, *current, *total, pp->returned_numbers);
1806 #endif
1807 }
1808
1809 int isamb_pp_forward2(ISAMB_PP pp, void *buf, const void *untilb)
1810 {
1811     char *dst = buf;
1812     const char *src;
1813     struct ISAMB_block *p = pp->block[pp->level];
1814     ISAMB b = pp->isamb;
1815     if (!p)
1816         return 0;
1817 again:
1818     while (p->offset == p->size)
1819     {
1820         ISAMB_P pos;
1821 #if INT_ENCODE
1822         const char *src_0;
1823         void *c1;
1824         char file_item_buf[DST_ITEM_MAX];
1825         char *file_item = file_item_buf;
1826 #else
1827         zint item_len;
1828 #endif
1829         while (p->offset == p->size)
1830         {
1831             if (pp->level == 0)
1832                 return 0;
1833             close_block (pp->isamb, pp->block[pp->level]);
1834             pp->block[pp->level] = 0;
1835             (pp->level)--;
1836             p = pp->block[pp->level];
1837             assert (!p->leaf);  
1838         }
1839
1840         assert(!p->leaf);
1841         src = p->bytes + p->offset;
1842        
1843 #if INT_ENCODE
1844         c1 = (*b->method->codec.start)();
1845         (*b->method->codec.decode)(c1, &file_item, &src);
1846 #else
1847         decode_ptr (&src, &item_len);
1848         src += item_len;
1849 #endif        
1850         decode_ptr (&src, &pos);
1851         p->offset = src - (char*) p->bytes;
1852
1853         src = p->bytes + p->offset;
1854
1855         while(1)
1856         {
1857             if (!untilb || p->offset == p->size)
1858                 break;
1859             assert(p->offset < p->size);
1860 #if INT_ENCODE
1861             src_0 = src;
1862             file_item = file_item_buf;
1863             (*b->method->codec.reset)(c1);
1864             (*b->method->codec.decode)(c1, &file_item, &src);
1865             if ((*b->method->compare_item)(untilb, file_item_buf) <= 1)
1866             {
1867                 src = src_0;
1868                 break;
1869             }
1870 #else
1871             decode_item_len(&src, &item_len);
1872             if ((*b->method->compare_item)(untilb, src) <= 1)
1873                 break;
1874             src += item_len;
1875 #endif
1876             decode_ptr (&src, &pos);
1877             p->offset = src - (char*) p->bytes;
1878         }
1879
1880         pp->level++;
1881
1882         while (1)
1883         {
1884             pp->block[pp->level] = p = open_block (pp->isamb, pos);
1885
1886             pp->total_size += p->size;
1887             pp->no_blocks++;
1888             
1889             if (p->leaf) 
1890             {
1891                 break;
1892             }
1893             
1894             src = p->bytes + p->offset;
1895             while(1)
1896             {
1897                 decode_ptr (&src, &pos);
1898                 p->offset = src - (char*) p->bytes;
1899                 
1900                 if (!untilb || p->offset == p->size)
1901                     break;
1902                 assert(p->offset < p->size);
1903 #if INT_ENCODE
1904                 src_0 = src;
1905                 file_item = file_item_buf;
1906                 (*b->method->codec.reset)(c1);
1907                 (*b->method->codec.decode)(c1, &file_item, &src);
1908                 if ((*b->method->compare_item)(untilb, file_item_buf) <= 1)
1909                 {
1910                     src = src_0;
1911                     break;
1912                 }
1913 #else
1914                 decode_ptr (&src, &item_len);
1915                 if ((*b->method->compare_item)(untilb, src) <= 1)
1916                     break;
1917                 src += item_len;
1918 #endif
1919             }
1920             pp->level++;
1921         }
1922 #if INT_ENCODE
1923         (*b->method->codec.stop)(c1);
1924 #endif
1925     }
1926     assert (p->offset < p->size);
1927     assert (p->leaf);
1928     while(1)
1929     {
1930         char *dst0 = dst;
1931         src = p->bytes + p->offset;
1932         (*pp->isamb->method->codec.decode)(p->decodeClientData, &dst, &src);
1933         p->offset = src - (char*) p->bytes;
1934         if (!untilb || (*pp->isamb->method->compare_item)(untilb, dst0) <= 1)
1935             break;
1936         dst = dst0;
1937         if (p->offset == p->size) goto again;
1938     }
1939     return 1;
1940 }