7bdab0febd8ea537234177c7c2e5ec4b66b618dc
[idzebra-moved-to-github.git] / isamb / isamb.c
1 /* $Id: isamb.c,v 1.70 2005-01-16 01:22:14 adam Exp $
2    Copyright (C) 1995-2005
3    Index Data ApS
4
5 This file is part of the Zebra server.
6
7 Zebra is free software; you can redistribute it and/or modify it under
8 the terms of the GNU General Public License as published by the Free
9 Software Foundation; either version 2, or (at your option) any later
10 version.
11
12 Zebra is distributed in the hope that it will be useful, but WITHOUT ANY
13 WARRANTY; without even the implied warranty of MERCHANTABILITY or
14 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
15 for more details.
16
17 You should have received a copy of the GNU General Public License
18 along with Zebra; see the file LICENSE.zebra.  If not, write to the
19 Free Software Foundation, 59 Temple Place - Suite 330, Boston, MA
20 02111-1307, USA.
21 */
22
23 #include <string.h>
24 #include <yaz/log.h>
25 #include <yaz/xmalloc.h>
26 #include <idzebra/isamb.h>
27 #include <assert.h>
28
29 #ifndef ISAMB_DEBUG
30 #define ISAMB_DEBUG 0
31 #endif
32
33
34 #define ISAMB_MAJOR_VERSION 3
35 #define ISAMB_MINOR_VERSION 0
36
37 struct ISAMB_head {
38     zint first_block;
39     zint last_block;
40     zint free_list;
41     zint no_items;
42     int block_size;
43     int block_max;
44     int block_offset;
45 };
46
47 /* if 1, upper nodes items are encoded; 0 if not encoded */
48 #define INT_ENCODE 1
49
50 /* maximum size of encoded buffer */
51 #define DST_ITEM_MAX 256
52
53 #define ISAMB_MAX_LEVEL 10
54 /* approx 2*max page + max size of item */
55 #define DST_BUF_SIZE 16840
56
57 #define ISAMB_CACHE_ENTRY_SIZE 4096
58
59 /* CAT_MAX: _must_ be power of 2 */
60 #define CAT_MAX 4
61 #define CAT_MASK (CAT_MAX-1)
62 /* CAT_NO: <= CAT_MAX */
63 #define CAT_NO 4
64
65 /* Smallest block size */
66 #define ISAMB_MIN_SIZE 32
67 /* Size factor */
68 #define ISAMB_FAC_SIZE 4
69
70 /* ISAMB_PTR_CODEC = 1 var, =0 fixed */
71 #define ISAMB_PTR_CODEC 1
72
73 struct ISAMB_cache_entry {
74     ISAMB_P pos;
75     unsigned char *buf;
76     int dirty;
77     int hits;
78     struct ISAMB_cache_entry *next;
79 };
80
81 struct ISAMB_file {
82     BFile bf;
83     int head_dirty;
84     struct ISAMB_head head;
85     struct ISAMB_cache_entry *cache_entries;
86 };
87
88 struct ISAMB_s {
89     BFiles bfs;
90     ISAMC_M *method;
91
92     struct ISAMB_file *file;
93     int no_cat;
94     int cache; /* 0 = no cache, 1 = use cache, -1 = dummy isam (for testing only) */
95     int log_io;        /* log level for bf_read/bf_write calls */
96     int log_freelist;  /* log level for freelist handling */
97     zint skipped_numbers; /* on a leaf node */
98     zint returned_numbers; 
99     zint skipped_nodes[ISAMB_MAX_LEVEL]; /* [0]=skipped leaves, 1 = higher etc */
100     zint accessed_nodes[ISAMB_MAX_LEVEL]; /* nodes we did not skip */
101 };
102
103 struct ISAMB_block {
104     ISAMB_P pos;
105     int cat;
106     int size;
107     int leaf;
108     int dirty;
109     int deleted;
110     int offset;
111     zint no_items;  /* number of nodes in this + children */
112     char *bytes;
113     char *cbuf;
114     unsigned char *buf;
115     void *decodeClientData;
116     int log_rw;
117 };
118
119 struct ISAMB_PP_s {
120     ISAMB isamb;
121     ISAMB_P pos;
122     int level;
123     int maxlevel; /* total depth */
124     zint total_size;
125     zint no_blocks;
126     zint skipped_numbers; /* on a leaf node */
127     zint returned_numbers; 
128     zint skipped_nodes[ISAMB_MAX_LEVEL]; /* [0]=skipped leaves, 1 = higher etc */
129     zint accessed_nodes[ISAMB_MAX_LEVEL]; /* nodes we did not skip */
130     struct ISAMB_block **block;
131     int scope;  /* on what level we forward */
132 };
133
134
135 #define encode_item_len encode_ptr
136 #if ISAMB_PTR_CODEC
137 static void encode_ptr(char **dst, zint pos)
138 {
139     unsigned char *bp = (unsigned char*) *dst;
140
141     while (pos > 127)
142     {
143          *bp++ = 128 | (pos & 127);
144          pos = pos >> 7;
145     }
146     *bp++ = pos;
147     *dst = (char *) bp;
148 }
149 #else
150 static void encode_ptr(char **dst, zint pos)
151 {
152     memcpy(*dst, &pos, sizeof(pos));
153     (*dst) += sizeof(pos);
154 }
155 #endif
156
157 #define decode_item_len decode_ptr
158 #if ISAMB_PTR_CODEC
159 static void decode_ptr(const char **src1, zint *pos)
160 {
161     const unsigned char **src = (const unsigned char **) src1;
162     zint d = 0;
163     unsigned char c;
164     unsigned r = 0;
165
166     while (((c = *(*src)++) & 128))
167     {
168         d += ((zint) (c & 127) << r);
169         r += 7;
170     }
171     d += ((zint) c << r);
172     *pos = d;
173 }
174 #else
175 static void decode_ptr(const char **src, zint *pos)
176 {
177      memcpy(pos, *src, sizeof(*pos));
178      (*src) += sizeof(*pos);
179 }
180 #endif
181
182 ISAMB isamb_open(BFiles bfs, const char *name, int writeflag, ISAMC_M *method,
183                   int cache)
184 {
185     ISAMB isamb = xmalloc(sizeof(*isamb));
186     int i, b_size = ISAMB_MIN_SIZE;
187
188     isamb->bfs = bfs;
189     isamb->method = (ISAMC_M *) xmalloc(sizeof(*method));
190     memcpy(isamb->method, method, sizeof(*method));
191     isamb->no_cat = CAT_NO;
192     isamb->log_io = 0;
193     isamb->log_freelist = 0;
194     isamb->cache = cache;
195     isamb->skipped_numbers = 0;
196     isamb->returned_numbers = 0;
197     for (i = 0; i<ISAMB_MAX_LEVEL; i++)
198       isamb->skipped_nodes[i] = isamb->accessed_nodes[i] = 0;
199
200     assert(cache == 0);
201     isamb->file = xmalloc(sizeof(*isamb->file) * isamb->no_cat);
202     for (i = 0; i < isamb->no_cat; i++)
203     {
204         char fname[DST_BUF_SIZE];
205         char hbuf[DST_BUF_SIZE];
206         isamb->file[i].cache_entries = 0;
207         isamb->file[i].head_dirty = 0;
208         sprintf(fname, "%s%c", name, i+'A');
209         if (cache)
210             isamb->file[i].bf = bf_open(bfs, fname, ISAMB_CACHE_ENTRY_SIZE,
211                                          writeflag);
212         else
213             isamb->file[i].bf = bf_open(bfs, fname, b_size, writeflag);
214
215         /* fill-in default values (for empty isamb) */
216         isamb->file[i].head.first_block = ISAMB_CACHE_ENTRY_SIZE/b_size+1;
217         isamb->file[i].head.last_block = isamb->file[i].head.first_block;
218         isamb->file[i].head.block_size = b_size;
219 #if ISAMB_PTR_CODEC
220         if (i == isamb->no_cat-1 || b_size > 128)
221             isamb->file[i].head.block_offset = 8;
222         else
223             isamb->file[i].head.block_offset = 4;
224 #else
225         isamb->file[i].head.block_offset = 11;
226 #endif
227         isamb->file[i].head.block_max =
228             b_size - isamb->file[i].head.block_offset;
229         isamb->file[i].head.free_list = 0;
230         if (bf_read(isamb->file[i].bf, 0, 0, 0, hbuf))
231         {
232             /* got header assume "isamb"major minor len can fit in 16 bytes */
233             zint zint_tmp;
234             int major, minor, len, pos = 0;
235             int left;
236             const char *src = 0;
237             if (memcmp(hbuf, "isamb", 5))
238             {
239                 yaz_log(YLOG_WARN, "bad isamb header for file %s", fname);
240                 return 0;
241             }
242             if (sscanf(hbuf+5, "%d %d %d", &major, &minor, &len) != 3)
243             {
244                 yaz_log(YLOG_WARN, "bad isamb header for file %s", fname);
245                 return 0;
246             }
247             if (major != ISAMB_MAJOR_VERSION)
248             {
249                 yaz_log(YLOG_WARN, "bad major version for file %s %d, must be %d",
250                      fname, major, ISAMB_MAJOR_VERSION);
251                 return 0;
252             }
253             for (left = len - b_size; left > 0; left = left - b_size)
254             {
255                 pos++;
256                 if (!bf_read(isamb->file[i].bf, pos, 0, 0, hbuf + pos*b_size))
257                 {
258                     yaz_log(YLOG_WARN, "truncated isamb header for " 
259                          "file=%s len=%d pos=%d",
260                          fname, len, pos);
261                     return 0;
262                 }
263             }
264             src = hbuf + 16;
265             decode_ptr(&src, &isamb->file[i].head.first_block);
266             decode_ptr(&src, &isamb->file[i].head.last_block);
267             decode_ptr(&src, &zint_tmp);
268             isamb->file[i].head.block_size = zint_tmp;
269             decode_ptr(&src, &zint_tmp);
270             isamb->file[i].head.block_max = zint_tmp;
271             decode_ptr(&src, &isamb->file[i].head.free_list);
272         }
273         assert (isamb->file[i].head.block_size >= isamb->file[i].head.block_offset);
274         isamb->file[i].head_dirty = 0;
275         assert(isamb->file[i].head.block_size == b_size);
276         b_size = b_size * ISAMB_FAC_SIZE;
277     }
278 #if ISAMB_DEBUG
279     yaz_log(YLOG_WARN, "isamb debug enabled. Things will be slower than usual");
280 #endif
281     return isamb;
282 }
283
284 static void flush_blocks (ISAMB b, int cat)
285 {
286     while (b->file[cat].cache_entries)
287     {
288         struct ISAMB_cache_entry *ce_this = b->file[cat].cache_entries;
289         b->file[cat].cache_entries = ce_this->next;
290
291         if (ce_this->dirty)
292         {
293             yaz_log(b->log_io, "bf_write: flush_blocks");
294             bf_write(b->file[cat].bf, ce_this->pos, 0, 0, ce_this->buf);
295         }
296         xfree(ce_this->buf);
297         xfree(ce_this);
298     }
299 }
300
301 static int get_block (ISAMB b, ISAMC_P pos, char *userbuf, int wr)
302 {
303     int cat = (int) (pos&CAT_MASK);
304     int off = (int) (((pos/CAT_MAX) & 
305                (ISAMB_CACHE_ENTRY_SIZE / b->file[cat].head.block_size - 1))
306         * b->file[cat].head.block_size);
307     zint norm = pos / (CAT_MASK*ISAMB_CACHE_ENTRY_SIZE / b->file[cat].head.block_size);
308     int no = 0;
309     struct ISAMB_cache_entry **ce, *ce_this = 0, **ce_last = 0;
310
311     if (!b->cache)
312         return 0;
313
314     assert (ISAMB_CACHE_ENTRY_SIZE >= b->file[cat].head.block_size);
315     for (ce = &b->file[cat].cache_entries; *ce; ce = &(*ce)->next, no++)
316     {
317         ce_last = ce;
318         if ((*ce)->pos == norm)
319         {
320             ce_this = *ce;
321             *ce = (*ce)->next;   /* remove from list */
322             
323             ce_this->next = b->file[cat].cache_entries;  /* move to front */
324             b->file[cat].cache_entries = ce_this;
325             
326             if (wr)
327             {
328                 memcpy (ce_this->buf + off, userbuf, 
329                         b->file[cat].head.block_size);
330                 ce_this->dirty = 1;
331             }
332             else
333                 memcpy (userbuf, ce_this->buf + off,
334                         b->file[cat].head.block_size);
335             return 1;
336         }
337     }
338     if (no >= 40)
339     {
340         assert (no == 40);
341         assert (ce_last && *ce_last);
342         ce_this = *ce_last;
343         *ce_last = 0;  /* remove the last entry from list */
344         if (ce_this->dirty)
345         {
346             yaz_log(b->log_io, "bf_write: get_block");
347             bf_write(b->file[cat].bf, ce_this->pos, 0, 0, ce_this->buf);
348         }
349         xfree(ce_this->buf);
350         xfree(ce_this);
351     }
352     ce_this = xmalloc(sizeof(*ce_this));
353     ce_this->next = b->file[cat].cache_entries;
354     b->file[cat].cache_entries = ce_this;
355     ce_this->buf = xmalloc(ISAMB_CACHE_ENTRY_SIZE);
356     ce_this->pos = norm;
357     yaz_log(b->log_io, "bf_read: get_block");
358     if (!bf_read(b->file[cat].bf, norm, 0, 0, ce_this->buf))
359         memset (ce_this->buf, 0, ISAMB_CACHE_ENTRY_SIZE);
360     if (wr)
361     {
362         memcpy (ce_this->buf + off, userbuf, b->file[cat].head.block_size);
363         ce_this->dirty = 1;
364     }
365     else
366     {
367         ce_this->dirty = 0;
368         memcpy (userbuf, ce_this->buf + off, b->file[cat].head.block_size);
369     }
370     return 1;
371 }
372
373
374 void isamb_close (ISAMB isamb)
375 {
376     int i;
377     for (i = 0; isamb->accessed_nodes[i]; i++)
378         yaz_log(YLOG_DEBUG, "isamb_close  level leaf-%d: "ZINT_FORMAT" read, "
379                         ZINT_FORMAT" skipped",
380              i, isamb->accessed_nodes[i], isamb->skipped_nodes[i]);
381     yaz_log(YLOG_DEBUG, "isamb_close returned "ZINT_FORMAT" values, "
382                    "skipped "ZINT_FORMAT,
383          isamb->skipped_numbers, isamb->returned_numbers);
384     for (i = 0; i<isamb->no_cat; i++)
385     {
386         flush_blocks (isamb, i);
387         if (isamb->file[i].head_dirty)
388         {
389             char hbuf[DST_BUF_SIZE];
390             int major = ISAMB_MAJOR_VERSION;
391             int minor = ISAMB_MINOR_VERSION;
392             int len = 16;
393             char *dst = hbuf + 16;
394             int pos = 0, left;
395             int b_size = isamb->file[i].head.block_size;
396
397             encode_ptr(&dst, isamb->file[i].head.first_block);
398             encode_ptr(&dst, isamb->file[i].head.last_block);
399             encode_ptr(&dst, isamb->file[i].head.block_size);
400             encode_ptr(&dst, isamb->file[i].head.block_max);
401             encode_ptr(&dst, isamb->file[i].head.free_list);
402             memset(dst, '\0', b_size); /* ensure no random bytes are written */
403
404             len = dst - hbuf;
405
406             /* print exactly 16 bytes (including trailing 0) */
407             sprintf(hbuf, "isamb%02d %02d %02d\r\n", major, minor, len);
408
409             bf_write(isamb->file[i].bf, pos, 0, 0, hbuf);
410
411             for (left = len - b_size; left > 0; left = left - b_size)
412             {
413                 pos++;
414                 bf_write(isamb->file[i].bf, pos, 0, 0, hbuf + pos*b_size);
415             }
416         }
417         bf_close (isamb->file[i].bf);
418     }
419     xfree(isamb->file);
420     xfree(isamb->method);
421     xfree(isamb);
422 }
423
424 /* open_block: read one block at pos.
425    Decode leading sys bytes .. consisting of
426    Offset:Meaning
427    0: leader byte, != 0 leaf, == 0, non-leaf
428    1-2: used size of block
429    3-7*: number of items and all children
430    
431    * Reserve 5 bytes for large block sizes. 1 for small ones .. Number
432    of items. We can thus have at most 2^40 nodes. 
433 */
434 static struct ISAMB_block *open_block(ISAMB b, ISAMC_P pos)
435 {
436     int cat = (int) (pos&CAT_MASK);
437     const char *src;
438     int offset = b->file[cat].head.block_offset;
439     struct ISAMB_block *p;
440     if (!pos)
441         return 0;
442     p = xmalloc(sizeof(*p));
443     p->pos = pos;
444     p->cat = (int) (pos & CAT_MASK);
445     p->buf = xmalloc(b->file[cat].head.block_size);
446     p->cbuf = 0;
447
448     if (!get_block (b, pos, p->buf, 0))
449     {
450         yaz_log(b->log_io, "bf_read: open_block");
451         if (!bf_read(b->file[cat].bf, pos/CAT_MAX, 0, 0, p->buf))
452         {
453             yaz_log(YLOG_FATAL, "isamb: read fail for pos=%ld block=%ld",
454                      (long) pos, (long) pos/CAT_MAX);
455             abort();
456         }
457     }
458     p->bytes = p->buf + offset;
459     p->leaf = p->buf[0];
460     p->size = (p->buf[1] + 256 * p->buf[2]) - offset;
461     if (p->size < 0)
462     {
463         yaz_log(YLOG_FATAL, "Bad block size %d in pos=" ZINT_FORMAT "\n",
464                  p->size, pos);
465     }
466     assert (p->size >= 0);
467     src = p->buf + 3;
468     decode_ptr(&src, &p->no_items);
469
470     p->offset = 0;
471     p->dirty = 0;
472     p->deleted = 0;
473     p->decodeClientData = (*b->method->codec.start)();
474     return p;
475 }
476
477 struct ISAMB_block *new_block (ISAMB b, int leaf, int cat)
478 {
479     struct ISAMB_block *p;
480
481     p = xmalloc(sizeof(*p));
482     p->buf = xmalloc(b->file[cat].head.block_size);
483
484     if (!b->file[cat].head.free_list)
485     {
486         zint block_no;
487         block_no = b->file[cat].head.last_block++;
488         p->pos = block_no * CAT_MAX + cat;
489     }
490     else
491     {
492         p->pos = b->file[cat].head.free_list;
493         assert((p->pos & CAT_MASK) == cat);
494         if (!get_block (b, p->pos, p->buf, 0))
495         {
496             yaz_log(b->log_io, "bf_read: new_block");
497             if (!bf_read(b->file[cat].bf, p->pos/CAT_MAX, 0, 0, p->buf))
498             {
499                 yaz_log(YLOG_FATAL, "isamb: read fail for pos=%ld block=%ld",
500                          (long) p->pos/CAT_MAX, (long) p->pos/CAT_MAX);
501                 abort ();
502             }
503         }
504         yaz_log(b->log_freelist, "got block " ZINT_FORMAT " from freelist %d:" ZINT_FORMAT, p->pos,
505                  cat, p->pos/CAT_MAX);
506         memcpy (&b->file[cat].head.free_list, p->buf, sizeof(zint));
507     }
508     p->cat = cat;
509     b->file[cat].head_dirty = 1;
510     memset (p->buf, 0, b->file[cat].head.block_size);
511     p->bytes = p->buf + b->file[cat].head.block_offset;
512     p->leaf = leaf;
513     p->size = 0;
514     p->dirty = 1;
515     p->deleted = 0;
516     p->offset = 0;
517     p->no_items = 0;
518     p->decodeClientData = (*b->method->codec.start)();
519     return p;
520 }
521
522 struct ISAMB_block *new_leaf (ISAMB b, int cat)
523 {
524     return new_block (b, 1, cat);
525 }
526
527
528 struct ISAMB_block *new_int (ISAMB b, int cat)
529 {
530     return new_block (b, 0, cat);
531 }
532
533 static void check_block (ISAMB b, struct ISAMB_block *p)
534 {
535     assert(b); /* mostly to make the compiler shut up about unused b */
536     if (p->leaf)
537     {
538         ;
539     }
540     else
541     {
542         /* sanity check */
543         char *startp = p->bytes;
544         const char *src = startp;
545         char *endp = p->bytes + p->size;
546         ISAMB_P pos;
547         void *c1 = (*b->method->codec.start)();
548             
549         decode_ptr(&src, &pos);
550         assert ((pos&CAT_MASK) == p->cat);
551         while (src != endp)
552         {
553 #if INT_ENCODE
554             char file_item_buf[DST_ITEM_MAX];
555             char *file_item = file_item_buf;
556             (*b->method->codec.reset)(c1);
557             (*b->method->codec.decode)(c1, &file_item, &src);
558 #else
559             zint item_len;
560             decode_item_len(&src, &item_len);
561             assert (item_len > 0 && item_len < 80);
562             src += item_len;
563 #endif
564             decode_ptr(&src, &pos);
565             if ((pos&CAT_MASK) != p->cat)
566             {
567                 assert ((pos&CAT_MASK) == p->cat);
568             }
569         }
570         (*b->method->codec.stop)(c1);
571     }
572 }
573
574 void close_block(ISAMB b, struct ISAMB_block *p)
575 {
576     if (!p)
577         return;
578     if (p->deleted)
579     {
580         yaz_log(b->log_freelist, "release block " ZINT_FORMAT " from freelist %d:" ZINT_FORMAT,
581                  p->pos, p->cat, p->pos/CAT_MAX);
582         memcpy (p->buf, &b->file[p->cat].head.free_list, sizeof(zint));
583         b->file[p->cat].head.free_list = p->pos;
584         if (!get_block (b, p->pos, p->buf, 1))
585         {
586             yaz_log(b->log_io, "bf_write: close_block (deleted)");
587             bf_write(b->file[p->cat].bf, p->pos/CAT_MAX, 0, 0, p->buf);
588         }
589     }
590     else if (p->dirty)
591     {
592         int offset = b->file[p->cat].head.block_offset;
593         int size = p->size + offset;
594         char *dst =  p->buf + 3;
595         assert (p->size >= 0);
596         
597         /* memset becuase encode_ptr usually does not write all bytes */
598         memset(p->buf, 0, b->file[p->cat].head.block_offset);
599         p->buf[0] = p->leaf;
600         p->buf[1] = size & 255;
601         p->buf[2] = size >> 8;
602         encode_ptr(&dst, p->no_items);
603         check_block(b, p);
604         if (!get_block (b, p->pos, p->buf, 1))
605         {
606             yaz_log(b->log_io, "bf_write: close_block");
607             bf_write(b->file[p->cat].bf, p->pos/CAT_MAX, 0, 0, p->buf);
608         }
609     }
610     (*b->method->codec.stop)(p->decodeClientData);
611     xfree(p->buf);
612     xfree(p);
613 }
614
615 int insert_sub (ISAMB b, struct ISAMB_block **p,
616                 void *new_item, int *mode,
617                 ISAMC_I *stream,
618                 struct ISAMB_block **sp,
619                 void *sub_item, int *sub_size,
620                 const void *max_item);
621
622 int insert_int (ISAMB b, struct ISAMB_block *p, void *lookahead_item,
623                 int *mode,
624                 ISAMC_I *stream, struct ISAMB_block **sp,
625                 void *split_item, int *split_size, const void *last_max_item)
626 {
627     char *startp = p->bytes;
628     const char *src = startp;
629     char *endp = p->bytes + p->size;
630     ISAMB_P pos;
631     struct ISAMB_block *sub_p1 = 0, *sub_p2 = 0;
632     char sub_item[DST_ITEM_MAX];
633     int sub_size;
634     int more = 0;
635     zint diff_terms = 0;
636     void *c1 = (*b->method->codec.start)();
637
638     *sp = 0;
639
640     assert(p->size >= 0);
641     decode_ptr(&src, &pos);
642     while (src != endp)
643     {
644         int d;
645         const char *src0 = src;
646 #if INT_ENCODE
647         char file_item_buf[DST_ITEM_MAX];
648         char *file_item = file_item_buf;
649         (*b->method->codec.reset)(c1);
650         (*b->method->codec.decode)(c1, &file_item, &src);
651         d = (*b->method->compare_item)(file_item_buf, lookahead_item);
652         if (d > 0)
653         {
654             sub_p1 = open_block(b, pos);
655             assert (sub_p1);
656             diff_terms -= sub_p1->no_items;
657             more = insert_sub (b, &sub_p1, lookahead_item, mode,
658                                stream, &sub_p2, 
659                                sub_item, &sub_size, file_item_buf);
660             diff_terms += sub_p1->no_items;
661             src = src0;
662             break;
663         }
664 #else
665         zint item_len;
666         decode_item_len(&src, &item_len);
667         d = (*b->method->compare_item)(src, lookahead_item);
668         if (d > 0)
669         {
670             sub_p1 = open_block(b, pos);
671             assert (sub_p1);
672             diff_terms -= sub_p1->no_items;
673             more = insert_sub (b, &sub_p1, lookahead_item, mode,
674                                stream, &sub_p2, 
675                                sub_item, &sub_size, src);
676             diff_terms += sub_p1->no_items;
677             src = src0;
678             break;
679         }
680         src += item_len;
681 #endif
682         decode_ptr(&src, &pos);
683     }
684     if (!sub_p1)
685     {
686         /* we reached the end. So lookahead > last item */
687         sub_p1 = open_block(b, pos);
688         assert (sub_p1);
689         diff_terms -= sub_p1->no_items;
690         more = insert_sub (b, &sub_p1, lookahead_item, mode, stream, &sub_p2, 
691                            sub_item, &sub_size, last_max_item);
692         diff_terms += sub_p1->no_items;
693     }
694     if (sub_p2)
695         diff_terms += sub_p2->no_items;
696     if (diff_terms)
697     {
698         p->dirty = 1;
699         p->no_items += diff_terms;
700     }
701     if (sub_p2)
702     {
703         /* there was a split - must insert pointer in this one */
704         char dst_buf[DST_BUF_SIZE];
705         char *dst = dst_buf;
706 #if INT_ENCODE
707         const char *sub_item_ptr = sub_item;
708 #endif
709         assert (sub_size < 80 && sub_size > 1);
710
711         memcpy (dst, startp, src - startp);
712                 
713         dst += src - startp;
714
715 #if INT_ENCODE
716         (*b->method->codec.reset)(c1);
717         (*b->method->codec.encode)(c1, &dst, &sub_item_ptr);
718 #else
719         encode_item_len (&dst, sub_size);      /* sub length and item */
720         memcpy (dst, sub_item, sub_size);
721         dst += sub_size;
722 #endif
723
724         encode_ptr(&dst, sub_p2->pos);   /* pos */
725
726         if (endp - src)                   /* remaining data */
727         {
728             memcpy (dst, src, endp - src);
729             dst += endp - src;
730         }
731         p->size = dst - dst_buf;
732         assert (p->size >= 0);
733
734
735         if (p->size <= b->file[p->cat].head.block_max)
736         {
737             /* it fits OK in this block */
738             memcpy (startp, dst_buf, dst - dst_buf);
739         }
740         else
741         {
742             /* must split _this_ block as well .. */
743             struct ISAMB_block *sub_p3;
744 #if INT_ENCODE
745             char file_item_buf[DST_ITEM_MAX];
746             char *file_item = file_item_buf;
747 #else
748             zint split_size_tmp;
749 #endif
750             zint no_items_first_half = 0;
751             int p_new_size;
752             const char *half;
753             src = dst_buf;
754             endp = dst;
755
756             half = src + b->file[p->cat].head.block_size/2;
757             decode_ptr(&src, &pos);
758
759             /* read sub block so we can get no_items for it */
760             sub_p3 = open_block(b, pos);
761             no_items_first_half += sub_p3->no_items;
762             close_block(b, sub_p3);
763
764             while (src <= half)
765             {
766 #if INT_ENCODE
767                 file_item = file_item_buf;
768                 (*b->method->codec.reset)(c1);
769                 (*b->method->codec.decode)(c1, &file_item, &src);
770 #else
771                 decode_item_len(&src, &split_size_tmp);
772                 *split_size = (int) split_size_tmp;
773                 src += *split_size;
774 #endif
775                 decode_ptr(&src, &pos);
776
777                 /* read sub block so we can get no_items for it */
778                 sub_p3 = open_block(b, pos);
779                 no_items_first_half += sub_p3->no_items;
780                 close_block(b, sub_p3);
781             }
782             /*  p is first half */
783             p_new_size = src - dst_buf;
784             memcpy (p->bytes, dst_buf, p_new_size);
785
786 #if INT_ENCODE
787             file_item = file_item_buf;
788             (*b->method->codec.reset)(c1);
789             (*b->method->codec.decode)(c1, &file_item, &src);
790             *split_size = file_item - file_item_buf;
791             memcpy(split_item, file_item_buf, *split_size);
792 #else
793             decode_item_len(&src, &split_size_tmp);
794             *split_size = (int) split_size_tmp;
795             memcpy (split_item, src, *split_size);
796             src += *split_size;
797 #endif
798             /*  *sp is second half */
799             *sp = new_int (b, p->cat);
800             (*sp)->size = endp - src;
801             memcpy ((*sp)->bytes, src, (*sp)->size);
802
803             p->size = p_new_size;
804
805             /*  adjust no_items in first&second half */
806             (*sp)->no_items = p->no_items - no_items_first_half;
807             p->no_items = no_items_first_half;
808         }
809         p->dirty = 1;
810         close_block(b, sub_p2);
811     }
812     close_block(b, sub_p1);
813     (*b->method->codec.stop)(c1);
814     return more;
815 }
816
817 int insert_leaf (ISAMB b, struct ISAMB_block **sp1, void *lookahead_item,
818                  int *lookahead_mode, ISAMC_I *stream,
819                  struct ISAMB_block **sp2,
820                  void *sub_item, int *sub_size,
821                  const void *max_item)
822 {
823     struct ISAMB_block *p = *sp1;
824     char *endp = 0;
825     const char *src = 0;
826     char dst_buf[DST_BUF_SIZE], *dst = dst_buf;
827     int new_size;
828     void *c1 = (*b->method->codec.start)();
829     void *c2 = (*b->method->codec.start)();
830     int more = 1;
831     int quater = b->file[b->no_cat-1].head.block_max / 4;
832     char *mid_cut = dst_buf + quater * 2;
833     char *tail_cut = dst_buf + quater * 3;
834     char *maxp = dst_buf + b->file[b->no_cat-1].head.block_max;
835     char *half1 = 0;
836     char *half2 = 0;
837     char cut_item_buf[DST_ITEM_MAX];
838     int cut_item_size = 0;
839     int no_items = 0;    /* number of items (total) */
840     int no_items_1 = 0;  /* number of items (first half) */
841
842     if (p && p->size)
843     {
844         char file_item_buf[DST_ITEM_MAX];
845         char *file_item = file_item_buf;
846             
847         src = p->bytes;
848         endp = p->bytes + p->size;
849         (*b->method->codec.decode)(c1, &file_item, &src);
850         while (1)
851         {
852             const char *dst_item = 0; /* resulting item to be inserted */
853             char *lookahead_next;
854             int d = -1;
855             
856             if (lookahead_item)
857                 d = (*b->method->compare_item)(file_item_buf, lookahead_item);
858             
859             /* d now holds comparison between existing file item and
860                lookahead item 
861                d = 0: equal
862                d > 0: lookahead before file
863                d < 0: lookahead after file
864             */
865             if (d > 0)
866             {
867                 /* lookahead must be inserted */
868                 dst_item = lookahead_item;
869                 /* if this is not an insertion, it's really bad .. */
870                 if (!*lookahead_mode)
871                 {
872                     yaz_log(YLOG_WARN, "isamb: Inconsistent register (1)");
873                     assert (*lookahead_mode);
874                 }
875             }
876             else
877                 dst_item = file_item_buf;
878
879                
880             if (!*lookahead_mode && d == 0)
881             {
882                 /* it's a deletion and they match so there is nothing to be
883                    inserted anyway .. But mark the thing bad (file item
884                    was part of input.. The item will not be part of output */
885                 p->dirty = 1;
886             }
887             else if (!half1 && dst > mid_cut)
888             {
889                 /* we have reached the splitting point for the first time */
890                 const char *dst_item_0 = dst_item;
891                 half1 = dst; /* candidate for splitting */
892
893                 /* encode the resulting item */
894                 (*b->method->codec.encode)(c2, &dst, &dst_item);
895                 
896                 cut_item_size = dst_item - dst_item_0;
897                 assert(cut_item_size > 0);
898                 memcpy (cut_item_buf, dst_item_0, cut_item_size);
899                 
900                 half2 = dst;
901                 no_items_1 = no_items;
902                 no_items++;
903             }
904             else
905             {
906                 /* encode the resulting item */
907                 (*b->method->codec.encode)(c2, &dst, &dst_item);
908                 no_items++;
909             }
910
911             /* now move "pointers" .. result has been encoded .. */
912             if (d > 0)  
913             {
914                 /* we must move the lookahead pointer */
915
916                 if (dst > maxp)
917                     /* no more room. Mark lookahead as "gone".. */
918                     lookahead_item = 0;
919                 else
920                 {
921                     /* move it really.. */
922                     lookahead_next = lookahead_item;
923                     if (!(*stream->read_item)(stream->clientData,
924                                               &lookahead_next,
925                                               lookahead_mode))
926                     {
927                         /* end of stream reached: no "more" and no lookahead */
928                         lookahead_item = 0;
929                         more = 0;
930                     }
931                     if (lookahead_item && max_item &&
932                         (*b->method->compare_item)(max_item, lookahead_item) <= 0)
933                     {
934                         /* the lookahead goes beyond what we allow in this
935                            leaf. Mark it as "gone" */
936                         lookahead_item = 0;
937                     }
938                     
939                     p->dirty = 1;
940                 }
941             }
942             else if (d == 0)
943             {
944                 /* exact match .. move both pointers */
945
946                 lookahead_next = lookahead_item;
947                 if (!(*stream->read_item)(stream->clientData,
948                                           &lookahead_next, lookahead_mode))
949                 {
950                     lookahead_item = 0;
951                     more = 0;
952                 }
953                 if (src == endp)
954                     break;  /* end of file stream reached .. */
955                 file_item = file_item_buf; /* move file pointer */
956                 (*b->method->codec.decode)(c1, &file_item, &src);
957             }
958             else
959             {
960                 /* file pointer must be moved */
961                 if (src == endp)
962                     break;
963                 file_item = file_item_buf;
964                 (*b->method->codec.decode)(c1, &file_item, &src);
965             }
966         }
967     }
968     maxp = dst_buf + b->file[b->no_cat-1].head.block_max + quater;
969     /* this loop runs when we are "appending" to a leaf page. That is
970        either it's empty (new) or all file items have been read in
971        previous loop */
972     while (lookahead_item)
973     {
974         char *dst_item;
975         const char *src = lookahead_item;
976         char *dst_0 = dst;
977         
978         /* compare lookahead with max item */
979         if (max_item &&
980             (*b->method->compare_item)(max_item, lookahead_item) <= 0)
981         {
982             /* stop if we have reached the value of max item */
983             break;
984         }
985         if (!*lookahead_mode)
986         {
987             /* this is append. So a delete is bad */
988             yaz_log(YLOG_WARN, "isamb: Inconsistent register (2)");
989             abort();
990         }
991         else if (!half1 && dst > tail_cut)
992         {
993             const char *src_0 = src;
994             half1 = dst; /* candidate for splitting */
995             
996             (*b->method->codec.encode)(c2, &dst, &src);
997             
998             cut_item_size = src - src_0;
999             assert(cut_item_size > 0);
1000             memcpy (cut_item_buf, src_0, cut_item_size);
1001             
1002             no_items_1 = no_items;
1003             half2 = dst;
1004         }
1005         else
1006             (*b->method->codec.encode)(c2, &dst, &src);
1007
1008         if (dst > maxp)
1009         {
1010             dst = dst_0;
1011             break;
1012         }
1013         no_items++;
1014         if (p)
1015             p->dirty = 1;
1016         dst_item = lookahead_item;
1017         if (!(*stream->read_item)(stream->clientData, &dst_item,
1018                                   lookahead_mode))
1019         {
1020             lookahead_item = 0;
1021             more = 0;
1022         }
1023     }
1024     new_size = dst - dst_buf;
1025     if (p && p->cat != b->no_cat-1 && 
1026         new_size > b->file[p->cat].head.block_max)
1027     {
1028         /* non-btree block will be removed */
1029         p->deleted = 1;
1030         close_block(b, p);
1031         /* delete it too!! */
1032         p = 0; /* make a new one anyway */
1033     }
1034     if (!p)
1035     {   /* must create a new one */
1036         int i;
1037         for (i = 0; i < b->no_cat; i++)
1038             if (new_size <= b->file[i].head.block_max)
1039                 break;
1040         if (i == b->no_cat)
1041             i = b->no_cat - 1;
1042         p = new_leaf (b, i);
1043     }
1044     if (new_size > b->file[p->cat].head.block_max)
1045     {
1046         char *first_dst;
1047         const char *cut_item = cut_item_buf;
1048
1049         assert (half1);
1050         assert (half2);
1051
1052         assert(cut_item_size > 0);
1053         
1054         /* first half */
1055         p->size = half1 - dst_buf;
1056         memcpy (p->bytes, dst_buf, half1 - dst_buf);
1057         p->no_items = no_items_1;
1058
1059         /* second half */
1060         *sp2 = new_leaf (b, p->cat);
1061
1062         (*b->method->codec.reset)(c2);
1063
1064         first_dst = (*sp2)->bytes;
1065
1066         (*b->method->codec.encode)(c2, &first_dst, &cut_item);
1067
1068         memcpy (first_dst, half2, dst - half2);
1069
1070         (*sp2)->size = (first_dst - (*sp2)->bytes) + (dst - half2);
1071         (*sp2)->no_items = no_items - no_items_1;
1072         (*sp2)->dirty = 1;
1073         p->dirty = 1;
1074         memcpy (sub_item, cut_item_buf, cut_item_size);
1075         *sub_size = cut_item_size;
1076     }
1077     else
1078     {
1079         memcpy (p->bytes, dst_buf, dst - dst_buf);
1080         p->size = new_size;
1081         p->no_items = no_items;
1082     }
1083     (*b->method->codec.stop)(c1);
1084     (*b->method->codec.stop)(c2);
1085     *sp1 = p;
1086     return more;
1087 }
1088
1089 int insert_sub (ISAMB b, struct ISAMB_block **p, void *new_item,
1090                 int *mode,
1091                 ISAMC_I *stream,
1092                 struct ISAMB_block **sp,
1093                 void *sub_item, int *sub_size,
1094                 const void *max_item)
1095 {
1096     if (!*p || (*p)->leaf)
1097         return insert_leaf (b, p, new_item, mode, stream, sp, sub_item, 
1098                             sub_size, max_item);
1099     else
1100         return insert_int (b, *p, new_item, mode, stream, sp, sub_item,
1101                            sub_size, max_item);
1102 }
1103
1104 int isamb_unlink (ISAMB b, ISAMC_P pos)
1105 {
1106     struct ISAMB_block *p1;
1107
1108     if (!pos)
1109         return 0;
1110     p1 = open_block(b, pos);
1111     p1->deleted = 1;
1112     if (!p1->leaf)
1113     {
1114         zint sub_p;
1115         const char *src = p1->bytes + p1->offset;
1116 #if INT_ENCODE
1117         void *c1 = (*b->method->codec.start)();
1118 #endif
1119         decode_ptr(&src, &sub_p);
1120         isamb_unlink(b, sub_p);
1121         
1122         while (src != p1->bytes + p1->size)
1123         {
1124 #if INT_ENCODE
1125             char file_item_buf[DST_ITEM_MAX];
1126             char *file_item = file_item_buf;
1127             (*b->method->codec.reset)(c1);
1128             (*b->method->codec.decode)(c1, &file_item, &src);
1129 #else
1130             zint item_len;
1131             decode_item_len(&src, &item_len);
1132             src += item_len;
1133 #endif
1134             decode_ptr(&src, &sub_p);
1135             isamb_unlink(b, sub_p);
1136         }
1137 #if INT_ENCODE
1138         (*b->method->codec.stop)(c1);
1139 #endif
1140     }
1141     close_block(b, p1);
1142     return 0;
1143 }
1144
1145 ISAMB_P isamb_merge (ISAMB b, ISAMC_P pos, ISAMC_I *stream)
1146 {
1147     char item_buf[DST_ITEM_MAX];
1148     char *item_ptr;
1149     int i_mode;
1150     int more;
1151     int must_delete = 0;
1152
1153     if (b->cache < 0)
1154     {
1155         int more = 1;
1156         while (more)
1157         {
1158             item_ptr = item_buf;
1159             more =
1160                 (*stream->read_item)(stream->clientData, &item_ptr, &i_mode);
1161         }
1162         return 1;
1163     }
1164     item_ptr = item_buf;
1165     more = (*stream->read_item)(stream->clientData, &item_ptr, &i_mode);
1166     while (more)
1167     {
1168         struct ISAMB_block *p = 0, *sp = 0;
1169         char sub_item[DST_ITEM_MAX];
1170         int sub_size;
1171         
1172         if (pos)
1173             p = open_block(b, pos);
1174         more = insert_sub (b, &p, item_buf, &i_mode, stream, &sp,
1175                            sub_item, &sub_size, 0);
1176         if (sp)
1177         {    /* increase level of tree by one */
1178             struct ISAMB_block *p2 = new_int (b, p->cat);
1179             char *dst = p2->bytes + p2->size;
1180 #if INT_ENCODE
1181             void *c1 = (*b->method->codec.start)();
1182             const char *sub_item_ptr = sub_item;
1183 #endif
1184
1185             encode_ptr(&dst, p->pos);
1186             assert (sub_size < 80 && sub_size > 1);
1187 #if INT_ENCODE
1188             (*b->method->codec.reset)(c1);
1189             (*b->method->codec.encode)(c1, &dst, &sub_item_ptr);
1190 #else
1191             encode_item_len (&dst, sub_size);
1192             memcpy (dst, sub_item, sub_size);
1193             dst += sub_size;
1194 #endif
1195             encode_ptr(&dst, sp->pos);
1196             
1197             p2->size = dst - p2->bytes;
1198             p2->no_items = p->no_items + sp->no_items;
1199             pos = p2->pos;  /* return new super page */
1200             close_block(b, sp);
1201             close_block(b, p2);
1202 #if INT_ENCODE
1203             (*b->method->codec.stop)(c1);
1204 #endif
1205         }
1206         else
1207         {
1208             pos = p->pos;   /* return current one (again) */
1209         }
1210         if (p->no_items == 0)
1211             must_delete = 1;
1212         else
1213             must_delete = 0;
1214         close_block(b, p);
1215     }
1216     if (must_delete)
1217     {
1218         isamb_unlink(b, pos);
1219         return 0;
1220     }
1221     return pos;
1222 }
1223
1224 ISAMB_PP isamb_pp_open_x(ISAMB isamb, ISAMB_P pos, int *level, int scope)
1225 {
1226     ISAMB_PP pp = xmalloc(sizeof(*pp));
1227     int i;
1228
1229     assert(pos);
1230
1231     pp->isamb = isamb;
1232     pp->block = xmalloc(ISAMB_MAX_LEVEL * sizeof(*pp->block));
1233
1234     pp->pos = pos;
1235     pp->level = 0;
1236     pp->maxlevel = 0;
1237     pp->total_size = 0;
1238     pp->no_blocks = 0;
1239     pp->skipped_numbers = 0;
1240     pp->returned_numbers = 0;
1241     pp->scope = scope;
1242     for (i = 0; i<ISAMB_MAX_LEVEL; i++)
1243         pp->skipped_nodes[i] = pp->accessed_nodes[i] = 0;
1244     while (1)
1245     {
1246         struct ISAMB_block *p = open_block(isamb, pos);
1247         const char *src = p->bytes + p->offset;
1248         pp->block[pp->level] = p;
1249
1250         pp->total_size += p->size;
1251         pp->no_blocks++;
1252         if (p->leaf)
1253             break;
1254         decode_ptr(&src, &pos);
1255         p->offset = src - p->bytes;
1256         pp->level++;
1257         pp->accessed_nodes[pp->level]++; 
1258     }
1259     pp->block[pp->level+1] = 0;
1260     pp->maxlevel = pp->level;
1261     if (level)
1262         *level = pp->level;
1263     return pp;
1264 }
1265
1266 ISAMB_PP isamb_pp_open (ISAMB isamb, ISAMB_P pos, int scope)
1267 {
1268     return isamb_pp_open_x(isamb, pos, 0, scope);
1269 }
1270
1271 void isamb_pp_close_x(ISAMB_PP pp, int *size, int *blocks)
1272 {
1273     int i;
1274     if (!pp)
1275         return;
1276     yaz_log(YLOG_DEBUG, "isamb_pp_close lev=%d returned "ZINT_FORMAT" values, " 
1277                     "skipped "ZINT_FORMAT,
1278         pp->maxlevel, pp->skipped_numbers, pp->returned_numbers);
1279     for (i = pp->maxlevel; i>=0; i--)
1280         if (pp->skipped_nodes[i] || pp->accessed_nodes[i])
1281             yaz_log(YLOG_DEBUG, "isamb_pp_close  level leaf-%d: "
1282                             ZINT_FORMAT" read, "ZINT_FORMAT" skipped", i,
1283                  pp->accessed_nodes[i], pp->skipped_nodes[i]);
1284     pp->isamb->skipped_numbers += pp->skipped_numbers;
1285     pp->isamb->returned_numbers += pp->returned_numbers;
1286     for (i = pp->maxlevel; i>=0; i--)
1287     {
1288         pp->isamb->accessed_nodes[i] += pp->accessed_nodes[i];
1289         pp->isamb->skipped_nodes[i] += pp->skipped_nodes[i];
1290     }
1291     if (size)
1292         *size = pp->total_size;
1293     if (blocks)
1294         *blocks = pp->no_blocks;
1295     for (i = 0; i <= pp->level; i++)
1296         close_block(pp->isamb, pp->block[i]);
1297     xfree(pp->block);
1298     xfree(pp);
1299 }
1300
1301 int isamb_block_info (ISAMB isamb, int cat)
1302 {
1303     if (cat >= 0 && cat < isamb->no_cat)
1304         return isamb->file[cat].head.block_size;
1305     return -1;
1306 }
1307
1308 void isamb_pp_close (ISAMB_PP pp)
1309 {
1310     isamb_pp_close_x(pp, 0, 0);
1311 }
1312
1313 /* simple recursive dumper .. */
1314 static void isamb_dump_r (ISAMB b, ISAMB_P pos, void (*pr)(const char *str),
1315                           int level)
1316 {
1317     char buf[1024];
1318     char prefix_str[1024];
1319     if (pos)
1320     {
1321         struct ISAMB_block *p = open_block(b, pos);
1322         sprintf(prefix_str, "%*s " ZINT_FORMAT " cat=%d size=%d max=%d items="
1323                 ZINT_FORMAT, level*2, "",
1324                 pos, p->cat, p->size, b->file[p->cat].head.block_max,
1325                 p->no_items);
1326         (*pr)(prefix_str);
1327         sprintf(prefix_str, "%*s " ZINT_FORMAT, level*2, "", pos);
1328         if (p->leaf)
1329         {
1330             while (p->offset < p->size)
1331             {
1332                 const char *src = p->bytes + p->offset;
1333                 char *dst = buf;
1334                 (*b->method->codec.decode)(p->decodeClientData, &dst, &src);
1335                 (*b->method->log_item)(YLOG_DEBUG, buf, prefix_str);
1336                 p->offset = src - (char*) p->bytes;
1337             }
1338             assert(p->offset == p->size);
1339         }
1340         else
1341         {
1342             const char *src = p->bytes + p->offset;
1343             ISAMB_P sub;
1344
1345             decode_ptr(&src, &sub);
1346             p->offset = src - (char*) p->bytes;
1347
1348             isamb_dump_r(b, sub, pr, level+1);
1349             
1350             while (p->offset < p->size)
1351             {
1352 #if INT_ENCODE
1353                 char file_item_buf[DST_ITEM_MAX];
1354                 char *file_item = file_item_buf;
1355                 void *c1 = (*b->method->codec.start)();
1356                 (*b->method->codec.decode)(c1, &file_item, &src);
1357                 (*b->method->codec.stop)(c1);
1358                 (*b->method->log_item)(YLOG_DEBUG, file_item_buf, prefix_str);
1359 #else
1360                 zint item_len;
1361                 decode_item_len(&src, &item_len);
1362                 (*b->method->log_item)(YLOG_DEBUG, src, prefix_str);
1363                 src += item_len;
1364 #endif
1365                 decode_ptr(&src, &sub);
1366                 
1367                 p->offset = src - (char*) p->bytes;
1368                 
1369                 isamb_dump_r(b, sub, pr, level+1);
1370             }           
1371         }
1372         close_block(b, p);
1373     }
1374 }
1375
1376 void isamb_dump(ISAMB b, ISAMB_P pos, void (*pr)(const char *str))
1377 {
1378     isamb_dump_r(b, pos, pr, 0);
1379 }
1380
1381 int isamb_pp_read(ISAMB_PP pp, void *buf)
1382 {
1383     return isamb_pp_forward(pp, buf, 0);
1384 }
1385
1386
1387 static int isamb_pp_on_right_node(ISAMB_PP pp, int level, const void *untilbuf)
1388 { /* looks one node higher to see if we should be on this node at all */
1389   /* useful in backing off quickly, and in avoiding tail descends */
1390   /* call with pp->level to begin with */
1391     struct ISAMB_block *p;
1392     int cmp;
1393     const char *src;
1394     ISAMB b = pp->isamb;
1395
1396     assert(level >= 0);
1397     if (level == 0)
1398     {
1399 #if ISAMB_DEBUG
1400             yaz_log(YLOG_DEBUG, "isamb_pp_on_right returning true for root");
1401 #endif
1402         return 1; /* we can never skip the root node */
1403     }
1404     level--;
1405     p = pp->block[level];
1406     assert(p->offset <= p->size);
1407     if (p->offset < p->size)
1408     {
1409 #if INT_ENCODE
1410         char file_item_buf[DST_ITEM_MAX];
1411         char *file_item = file_item_buf;
1412         void *c1 = (*b->method->codec.start)();
1413         assert(p->offset > 0); 
1414         src = p->bytes + p->offset;
1415         (*b->method->codec.decode)(c1, &file_item, &src);
1416         (*b->method->codec.stop)(c1);
1417         cmp = (*b->method->compare_item)(untilbuf, file_item_buf);
1418 #else
1419         zint item_len;
1420         assert(p->offset > 0); 
1421         src = p->bytes + p->offset;
1422         decode_item_len(&src, &item_len);
1423 #if ISAMB_DEBUG
1424         (*b->method->codec.log_item)(YLOG_DEBUG, untilbuf, "on_leaf: until");
1425         (*b->method->codec.log_item)(YLOG_DEBUG, src, "on_leaf: value");
1426 #endif
1427         cmp = (*b->method->compare_item)(untilbuf, src);
1428 #endif
1429         if (cmp < pp->scope)
1430         {  /* cmp<2 */
1431 #if ISAMB_DEBUG
1432             yaz_log(YLOG_DEBUG, "isamb_pp_on_right returning true "
1433                             "cmp=%d lev=%d ofs=%d", cmp, level, p->offset);
1434 #endif
1435             return 1; 
1436         }
1437         else
1438         {
1439 #if ISAMB_DEBUG
1440             yaz_log(YLOG_DEBUG, "isamb_pp_on_right returning false "
1441                             "cmp=%d lev=%d ofs=%d", cmp, level, p->offset);
1442 #endif
1443             return 0; 
1444         }
1445     }
1446     else {
1447 #if ISAMB_DEBUG
1448         yaz_log(YLOG_DEBUG, "isamb_pp_on_right at tail, looking higher "
1449                         "lev=%d", level);
1450 #endif
1451         return isamb_pp_on_right_node(pp, level, untilbuf);
1452     }
1453 } /* isamb_pp_on_right_node */
1454
1455 static int isamb_pp_read_on_leaf(ISAMB_PP pp, void *buf)
1456
1457     /* reads the next item on the current leaf, returns 0 if end of leaf*/
1458     struct ISAMB_block *p = pp->block[pp->level];
1459     char *dst;
1460     const char *src;
1461     assert(pp);
1462     assert(buf);
1463     if (p->offset == p->size)
1464     {
1465 #if ISAMB_DEBUG
1466         yaz_log(YLOG_DEBUG, "isamb_pp_read_on_leaf returning 0 on "
1467                 "node %d", p->pos);
1468 #endif
1469         return 0; /* at end of leaf */
1470     }
1471     src = p->bytes + p->offset;
1472     dst = buf;
1473     (*pp->isamb->method->codec.decode)(p->decodeClientData, &dst, &src);
1474     p->offset = src - (char*) p->bytes;
1475 #if ISAMB_DEBUG
1476     (*pp->isamb->method->codec.log_item)(YLOG_DEBUG, buf,
1477                                          "read_on_leaf returning 1");
1478 #endif
1479     pp->returned_numbers++;
1480     return 1;
1481 } /* read_on_leaf */
1482
1483 static int isamb_pp_forward_on_leaf(ISAMB_PP pp, void *buf, const void *untilbuf)
1484 { /* forwards on the current leaf, returns 0 if not found */
1485     int cmp;
1486     int skips = 0;
1487     while (1)
1488     {
1489         if (!isamb_pp_read_on_leaf(pp, buf))
1490             return 0;
1491         /* FIXME - this is an extra function call, inline the read? */
1492         cmp=(*pp->isamb->method->compare_item)(untilbuf, buf);
1493         if (cmp <pp->scope)
1494         {  /* cmp<2  found a good one */
1495 #if ISAMB_DEBUG
1496             if (skips)
1497                 yaz_log(YLOG_DEBUG, "isam_pp_fwd_on_leaf skipped %d items", skips);
1498 #endif
1499             pp->returned_numbers++;
1500             return 1;
1501         }
1502         if (!skips)
1503             if (!isamb_pp_on_right_node(pp, pp->level, untilbuf))
1504                 return 0; /* never mind the rest of this leaf */
1505         pp->skipped_numbers++;
1506         skips++;
1507     }
1508 } /* forward_on_leaf */
1509
1510 static int isamb_pp_climb_level(ISAMB_PP pp, ISAMB_P *pos)
1511 { /* climbs higher in the tree, until finds a level with data left */
1512   /* returns the node to (consider to) descend to in *pos) */
1513     struct ISAMB_block *p = pp->block[pp->level];
1514     const char *src;
1515 #if ISAMB_DEBUG
1516     yaz_log(YLOG_DEBUG, "isamb_pp_climb_level starting "
1517                    "at level %d node %d ofs=%d sz=%d",
1518                     pp->level, p->pos, p->offset, p->size);
1519 #endif
1520     assert(pp->level >= 0);
1521     assert(p->offset <= p->size);
1522     if (pp->level==0)
1523     {
1524 #if ISAMB_DEBUG
1525         yaz_log(YLOG_DEBUG, "isamb_pp_climb_level returning 0 at root");
1526 #endif
1527         return 0;
1528     }
1529     assert(pp->level>0); 
1530     close_block(pp->isamb, pp->block[pp->level]);
1531     pp->block[pp->level] = 0;
1532     (pp->level)--;
1533     p = pp->block[pp->level];
1534 #if ISAMB_DEBUG
1535     yaz_log(YLOG_DEBUG, "isamb_pp_climb_level climbed to level %d node %d ofs=%d",
1536                     pp->level, p->pos, p->offset);
1537 #endif
1538     assert(!p->leaf);
1539     assert(p->offset <= p->size);
1540     if (p->offset == p->size)
1541     {
1542         /* we came from the last pointer, climb on */
1543         if (!isamb_pp_climb_level(pp, pos))
1544             return 0;
1545         p = pp->block[pp->level];
1546     }
1547     else
1548     {
1549 #if INT_ENCODE
1550         char file_item_buf[DST_ITEM_MAX];
1551         char *file_item = file_item_buf;
1552         ISAMB b = pp->isamb;
1553         void *c1 = (*b->method->codec.start)();
1554 #else
1555         zint item_len;
1556 #endif
1557         /* skip the child we just came from */
1558 #if ISAMB_DEBUG
1559         yaz_log(YLOG_DEBUG, "isam_pp_climb_level: skipping lev=%d ofs=%d sz=%d", 
1560                         pp->level, p->offset, p->size);
1561 #endif
1562         assert (p->offset < p->size);
1563         src = p->bytes + p->offset;
1564 #if INT_ENCODE
1565         (*b->method->codec.decode)(c1, &file_item, &src);
1566         (*b->method->codec.stop)(c1);
1567 #else
1568         decode_item_len(&src, &item_len);
1569         src += item_len;
1570 #endif
1571         decode_ptr(&src, pos);
1572         p->offset = src - (char *)p->bytes;
1573             
1574     }
1575     return 1;
1576 } /* climb_level */
1577
1578
1579 static zint isamb_pp_forward_unode(ISAMB_PP pp, zint pos, const void *untilbuf)
1580 { /* scans a upper node until it finds a child <= untilbuf */
1581   /* pp points to the key value, as always. pos is the child read from */
1582   /* the buffer */
1583   /* if all values are too small, returns the last child in the node */
1584   /* FIXME - this can be detected, and avoided by looking at the */
1585   /* parent node, but that gets messy. Presumably the cost is */
1586   /* pretty low anyway */
1587     ISAMB b = pp->isamb;
1588     struct ISAMB_block *p = pp->block[pp->level];
1589     const char *src = p->bytes + p->offset;
1590     int cmp;
1591     zint nxtpos;
1592 #if ISAMB_DEBUG
1593     int skips = 0;
1594     yaz_log(YLOG_DEBUG, "isamb_pp_forward_unode starting "
1595                    "at level %d node %d ofs=%di sz=%d",
1596                     pp->level, p->pos, p->offset, p->size);
1597 #endif
1598     assert(!p->leaf);
1599     assert(p->offset <= p->size);
1600     if (p->offset == p->size)
1601     {
1602 #if ISAMB_DEBUG
1603             yaz_log(YLOG_DEBUG, "isamb_pp_forward_unode returning at end "
1604                    "at level %d node %d ofs=%di sz=%d",
1605                     pp->level, p->pos, p->offset, p->size);
1606 #endif
1607         return pos; /* already at the end of it */
1608     }
1609     while(p->offset < p->size)
1610     {
1611 #if INT_ENCODE
1612         char file_item_buf[DST_ITEM_MAX];
1613         char *file_item = file_item_buf;
1614         void *c1 = (*b->method->codec.start)();
1615         (*b->method->codec.decode)(c1, &file_item, &src);
1616         (*b->method->codec.stop)(c1);
1617         cmp = (*b->method->compare_item)(untilbuf, file_item_buf);
1618 #else
1619         zint item_len;
1620         decode_item_len(&src, &item_len);
1621         cmp = (*b->method->compare_item)(untilbuf, src);
1622         src += item_len;
1623 #endif
1624         decode_ptr(&src, &nxtpos);
1625         if (cmp<pp->scope) /* cmp<2 */
1626         {
1627 #if ISAMB_DEBUG
1628             yaz_log(YLOG_DEBUG, "isamb_pp_forward_unode returning a hit "
1629                    "at level %d node %d ofs=%d sz=%d",
1630                     pp->level, p->pos, p->offset, p->size);
1631 #endif
1632             return pos;
1633         } /* found one */
1634         pos = nxtpos;
1635         p->offset = src-(char*)p->bytes;
1636         (pp->skipped_nodes[pp->maxlevel - pp->level -1])++;
1637 #if ISAMB_DEBUG
1638         skips++;
1639 #endif
1640     }
1641 #if ISAMB_DEBUG
1642             yaz_log(YLOG_DEBUG, "isamb_pp_forward_unode returning at tail "
1643                    "at level %d node %d ofs=%d sz=%d skips=%d",
1644                     pp->level, p->pos, p->offset, p->size, skips);
1645 #endif
1646     return pos; /* that's the last one in the line */
1647     
1648 } /* forward_unode */
1649
1650 static void isamb_pp_descend_to_leaf(ISAMB_PP pp, ISAMB_P pos,
1651                                      const void *untilbuf)
1652 { /* climbs down the tree, from pos, to the leftmost leaf */
1653     struct ISAMB_block *p = pp->block[pp->level];
1654     const char *src;
1655     assert(!p->leaf);
1656 #if ISAMB_DEBUG
1657     yaz_log(YLOG_DEBUG, "isamb_pp_descend_to_leaf "
1658                    "starting at lev %d node %d ofs=%d lf=%d u=%p", 
1659                    pp->level, p->pos, p->offset, p->leaf, untilbuf);
1660 #endif
1661     if (untilbuf)
1662         pos = isamb_pp_forward_unode(pp, pos, untilbuf);
1663     ++(pp->level);
1664     assert(pos);
1665     p = open_block(pp->isamb, pos);
1666     pp->block[pp->level] = p;
1667     ++(pp->accessed_nodes[pp->maxlevel-pp->level]);
1668     ++(pp->no_blocks);
1669 #if ISAMB_DEBUG
1670     yaz_log(YLOG_DEBUG, "isamb_pp_descend_to_leaf "
1671                    "got lev %d node %d lf=%d", 
1672                    pp->level, p->pos, p->leaf);
1673 #endif
1674     if (p->leaf)
1675         return;
1676     assert (p->offset==0);
1677     src = p->bytes + p->offset;
1678     decode_ptr(&src, &pos);
1679     p->offset = src-(char*)p->bytes;
1680     isamb_pp_descend_to_leaf(pp, pos, untilbuf);
1681 #if ISAMB_DEBUG
1682     yaz_log(YLOG_DEBUG, "isamb_pp_descend_to_leaf "
1683                    "returning at lev %d node %d ofs=%d lf=%d", 
1684                    pp->level, p->pos, p->offset, p->leaf);
1685 #endif
1686 } /* descend_to_leaf */
1687
1688 static int isamb_pp_find_next_leaf(ISAMB_PP pp)
1689 { /* finds the next leaf by climbing up and down */
1690     ISAMB_P pos;
1691     if (!isamb_pp_climb_level(pp, &pos))
1692         return 0;
1693     isamb_pp_descend_to_leaf(pp, pos, 0);
1694     return 1;
1695 }
1696
1697 static int isamb_pp_climb_desc(ISAMB_PP pp,  const void *untilbuf)
1698 { /* climbs up and descends to a leaf where values >= *untilbuf are found */
1699     ISAMB_P pos;
1700 #if ISAMB_DEBUG
1701     struct ISAMB_block *p = pp->block[pp->level];
1702     yaz_log(YLOG_DEBUG, "isamb_pp_climb_desc starting "
1703                    "at level %d node %d ofs=%d sz=%d",
1704                     pp->level, p->pos, p->offset, p->size);
1705 #endif
1706     if (!isamb_pp_climb_level(pp, &pos))
1707         return 0;
1708     /* see if it would pay to climb one higher */
1709     if (!isamb_pp_on_right_node(pp, pp->level, untilbuf))
1710         if (!isamb_pp_climb_level(pp, &pos))
1711             return 0;
1712     isamb_pp_descend_to_leaf(pp, pos, untilbuf);
1713 #if ISAMB_DEBUG
1714     p = pp->block[pp->level];
1715     yaz_log(YLOG_DEBUG, "isamb_pp_climb_desc done "
1716                    "at level %d node %d ofs=%d sz=%d",
1717                     pp->level, p->pos, p->offset, p->size);
1718 #endif
1719     return 1;
1720 } /* climb_desc */
1721
1722 int isamb_pp_forward (ISAMB_PP pp, void *buf, const void *untilbuf)
1723 {
1724 #if ISAMB_DEBUG
1725     struct ISAMB_block *p = pp->block[pp->level];
1726     assert(p->leaf);
1727     yaz_log(YLOG_DEBUG, "isamb_pp_forward starting "
1728                    "at level %d node %d ofs=%d sz=%d u=%p sc=%d",
1729                     pp->level, p->pos, p->offset, p->size, untilbuf, scope);
1730 #endif
1731     if (untilbuf)
1732     {
1733         if (isamb_pp_forward_on_leaf(pp, buf, untilbuf))
1734         {
1735 #if ISAMB_DEBUG
1736             yaz_log(YLOG_DEBUG, "isamb_pp_forward (f) returning (A) "
1737                    "at level %d node %d ofs=%d sz=%d",
1738                     pp->level, p->pos, p->offset, p->size);
1739 #endif
1740             return 1;
1741         }
1742         if (! isamb_pp_climb_desc(pp, untilbuf))
1743         {
1744 #if ISAMB_DEBUG
1745             yaz_log(YLOG_DEBUG, "isamb_pp_forward (f) returning notfound (B) "
1746                    "at level %d node %d ofs=%d sz=%d",
1747                     pp->level, p->pos, p->offset, p->size);
1748 #endif
1749             return 0; /* could not find a leaf */
1750         }
1751         do {
1752             if (isamb_pp_forward_on_leaf(pp, buf, untilbuf))
1753             {
1754 #if ISAMB_DEBUG
1755                 yaz_log(YLOG_DEBUG, "isamb_pp_forward (f) returning (c) "
1756                         "at level %d node %d ofs=%d sz=%d",
1757                         pp->level, p->pos, p->offset, p->size);
1758 #endif
1759                 return 1;
1760             }
1761         } while (isamb_pp_find_next_leaf(pp));
1762         return 0; /* could not find at all */
1763     }
1764     else { /* no untilbuf, a straight read */
1765         /* FIXME - this should be moved
1766          * directly into the pp_read */
1767         /* keeping here now, to keep same
1768          * interface as the old fwd */
1769         if (isamb_pp_read_on_leaf(pp, buf))
1770         {
1771 #if ISAMB_DEBUG
1772             yaz_log(YLOG_DEBUG, "isamb_pp_forward (read) returning (D) "
1773                    "at level %d node %d ofs=%d sz=%d",
1774                     pp->level, p->pos, p->offset, p->size);
1775 #endif
1776             return 1;
1777         }
1778         if (isamb_pp_find_next_leaf(pp))
1779         {
1780 #if ISAMB_DEBUG
1781             yaz_log(YLOG_DEBUG, "isamb_pp_forward (read) returning (E) "
1782                    "at level %d node %d ofs=%d sz=%d",
1783                     pp->level, p->pos, p->offset, p->size);
1784 #endif
1785             return isamb_pp_read_on_leaf(pp, buf);
1786         }
1787         else
1788             return 0;
1789     }
1790 } /* isam_pp_forward (new version) */
1791
1792 void isamb_pp_pos(ISAMB_PP pp, double *current, double *total)
1793 { /* return an estimate of the current position and of the total number of */
1794   /* occureences in the isam tree, based on the current leaf */
1795     struct ISAMB_block *p = pp->block[pp->level];
1796     assert(total);
1797     assert(current);
1798     assert(p->leaf);
1799
1800     *total = pp->block[0]->no_items;
1801     *current = (double) pp->returned_numbers;
1802 #if ISAMB_DEBUG
1803     yaz_log(YLOG_LOG, "isamb_pp_pos returning: cur= %0.1f tot=%0.1f rn="
1804          ZINT_FORMAT, *current, *total, pp->returned_numbers);
1805 #endif
1806 }
1807
1808 int isamb_pp_forward2(ISAMB_PP pp, void *buf, const void *untilb)
1809 {
1810     char *dst = buf;
1811     const char *src;
1812     struct ISAMB_block *p = pp->block[pp->level];
1813     ISAMB b = pp->isamb;
1814     if (!p)
1815         return 0;
1816 again:
1817     while (p->offset == p->size)
1818     {
1819         ISAMB_P pos;
1820 #if INT_ENCODE
1821         const char *src_0;
1822         void *c1;
1823         char file_item_buf[DST_ITEM_MAX];
1824         char *file_item = file_item_buf;
1825 #else
1826         zint item_len;
1827 #endif
1828         while (p->offset == p->size)
1829         {
1830             if (pp->level == 0)
1831                 return 0;
1832             close_block (pp->isamb, pp->block[pp->level]);
1833             pp->block[pp->level] = 0;
1834             (pp->level)--;
1835             p = pp->block[pp->level];
1836             assert (!p->leaf);  
1837         }
1838
1839         assert(!p->leaf);
1840         src = p->bytes + p->offset;
1841        
1842 #if INT_ENCODE
1843         c1 = (*b->method->codec.start)();
1844         (*b->method->codec.decode)(c1, &file_item, &src);
1845 #else
1846         decode_ptr (&src, &item_len);
1847         src += item_len;
1848 #endif        
1849         decode_ptr (&src, &pos);
1850         p->offset = src - (char*) p->bytes;
1851
1852         src = p->bytes + p->offset;
1853
1854         while(1)
1855         {
1856             if (!untilb || p->offset == p->size)
1857                 break;
1858             assert(p->offset < p->size);
1859 #if INT_ENCODE
1860             src_0 = src;
1861             file_item = file_item_buf;
1862             (*b->method->codec.reset)(c1);
1863             (*b->method->codec.decode)(c1, &file_item, &src);
1864             if ((*b->method->compare_item)(untilb, file_item_buf) <= 1)
1865             {
1866                 src = src_0;
1867                 break;
1868             }
1869 #else
1870             decode_item_len(&src, &item_len);
1871             if ((*b->method->compare_item)(untilb, src) <= 1)
1872                 break;
1873             src += item_len;
1874 #endif
1875             decode_ptr (&src, &pos);
1876             p->offset = src - (char*) p->bytes;
1877         }
1878
1879         pp->level++;
1880
1881         while (1)
1882         {
1883             pp->block[pp->level] = p = open_block (pp->isamb, pos);
1884
1885             pp->total_size += p->size;
1886             pp->no_blocks++;
1887             
1888             if (p->leaf) 
1889             {
1890                 break;
1891             }
1892             
1893             src = p->bytes + p->offset;
1894             while(1)
1895             {
1896                 decode_ptr (&src, &pos);
1897                 p->offset = src - (char*) p->bytes;
1898                 
1899                 if (!untilb || p->offset == p->size)
1900                     break;
1901                 assert(p->offset < p->size);
1902 #if INT_ENCODE
1903                 src_0 = src;
1904                 file_item = file_item_buf;
1905                 (*b->method->codec.reset)(c1);
1906                 (*b->method->codec.decode)(c1, &file_item, &src);
1907                 if ((*b->method->compare_item)(untilb, file_item_buf) <= 1)
1908                 {
1909                     src = src_0;
1910                     break;
1911                 }
1912 #else
1913                 decode_ptr (&src, &item_len);
1914                 if ((*b->method->compare_item)(untilb, src) <= 1)
1915                     break;
1916                 src += item_len;
1917 #endif
1918             }
1919             pp->level++;
1920         }
1921 #if INT_ENCODE
1922         (*b->method->codec.stop)(c1);
1923 #endif
1924     }
1925     assert (p->offset < p->size);
1926     assert (p->leaf);
1927     while(1)
1928     {
1929         char *dst0 = dst;
1930         src = p->bytes + p->offset;
1931         (*pp->isamb->method->codec.decode)(p->decodeClientData, &dst, &src);
1932         p->offset = src - (char*) p->bytes;
1933         if (!untilb || (*pp->isamb->method->compare_item)(untilb, dst0) <= 1)
1934             break;
1935         dst = dst0;
1936         if (p->offset == p->size) goto again;
1937     }
1938     return 1;
1939 }