57e05bdc4d498e446aab3d858aac850de1225133
[idzebra-moved-to-github.git] / isamb / isamb.c
1 /* $Id: isamb.c,v 1.81 2005-08-19 12:58:01 adam Exp $
2    Copyright (C) 1995-2005
3    Index Data ApS
4
5 This file is part of the Zebra server.
6
7 Zebra is free software; you can redistribute it and/or modify it under
8 the terms of the GNU General Public License as published by the Free
9 Software Foundation; either version 2, or (at your option) any later
10 version.
11
12 Zebra is distributed in the hope that it will be useful, but WITHOUT ANY
13 WARRANTY; without even the implied warranty of MERCHANTABILITY or
14 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
15 for more details.
16
17 You should have received a copy of the GNU General Public License
18 along with Zebra; see the file LICENSE.zebra.  If not, write to the
19 Free Software Foundation, 59 Temple Place - Suite 330, Boston, MA
20 02111-1307, USA.
21 */
22
23 #include <stdlib.h>
24 #include <string.h>
25 #include <yaz/log.h>
26 #include <yaz/xmalloc.h>
27 #include <idzebra/isamb.h>
28 #include <assert.h>
29
30 #ifndef ISAMB_DEBUG
31 #define ISAMB_DEBUG 0
32 #endif
33
34
35 #define ISAMB_MAJOR_VERSION 3
36 #define ISAMB_MINOR_VERSION 0
37
38 struct ISAMB_head {
39     zint first_block;
40     zint last_block;
41     zint free_list;
42     zint no_items;
43     int block_size;
44     int block_max;
45     int block_offset;
46 };
47
48 /* if 1, upper nodes items are encoded; 0 if not encoded */
49 #define INT_ENCODE 1
50
51 /* maximum size of encoded buffer */
52 #define DST_ITEM_MAX 256
53
54 #define ISAMB_MAX_LEVEL 10
55 /* approx 2*max page + max size of item */
56 #define DST_BUF_SIZE 16840
57
58 #define ISAMB_CACHE_ENTRY_SIZE 4096
59
60 /* CAT_MAX: _must_ be power of 2 */
61 #define CAT_MAX 4
62 #define CAT_MASK (CAT_MAX-1)
63 /* CAT_NO: <= CAT_MAX */
64 #define CAT_NO 4
65
66 /* Smallest block size */
67 #define ISAMB_MIN_SIZE 32
68 /* Size factor */
69 #define ISAMB_FAC_SIZE 4
70
71 /* ISAMB_PTR_CODEC = 1 var, =0 fixed */
72 #define ISAMB_PTR_CODEC 1
73
74 struct ISAMB_cache_entry {
75     ISAM_P pos;
76     unsigned char *buf;
77     int dirty;
78     int hits;
79     struct ISAMB_cache_entry *next;
80 };
81
82 struct ISAMB_file {
83     BFile bf;
84     int head_dirty;
85     struct ISAMB_head head;
86     struct ISAMB_cache_entry *cache_entries;
87 };
88
89 struct ISAMB_s {
90     BFiles bfs;
91     ISAMC_M *method;
92
93     struct ISAMB_file *file;
94     int no_cat;
95     int cache; /* 0 = no cache, 1 = use cache, -1 = dummy isam (for testing only) */
96     int log_io;        /* log level for bf_read/bf_write calls */
97     int log_freelist;  /* log level for freelist handling */
98     zint skipped_numbers; /* on a leaf node */
99     zint returned_numbers; 
100     zint skipped_nodes[ISAMB_MAX_LEVEL]; /* [0]=skipped leaves, 1 = higher etc */
101     zint accessed_nodes[ISAMB_MAX_LEVEL]; /* nodes we did not skip */
102 };
103
104 struct ISAMB_block {
105     ISAM_P pos;
106     int cat;
107     int size;
108     int leaf;
109     int dirty;
110     int deleted;
111     int offset;
112     zint no_items;  /* number of nodes in this + children */
113     char *bytes;
114     char *cbuf;
115     unsigned char *buf;
116     void *decodeClientData;
117     int log_rw;
118 };
119
120 struct ISAMB_PP_s {
121     ISAMB isamb;
122     ISAM_P pos;
123     int level;
124     int maxlevel; /* total depth */
125     zint total_size;
126     zint no_blocks;
127     zint skipped_numbers; /* on a leaf node */
128     zint returned_numbers; 
129     zint skipped_nodes[ISAMB_MAX_LEVEL]; /* [0]=skipped leaves, 1 = higher etc */
130     zint accessed_nodes[ISAMB_MAX_LEVEL]; /* nodes we did not skip */
131     struct ISAMB_block **block;
132     int scope;  /* on what level we forward */
133 };
134
135
136 #define encode_item_len encode_ptr
137 #if ISAMB_PTR_CODEC
138 static void encode_ptr(char **dst, zint pos)
139 {
140     unsigned char *bp = (unsigned char*) *dst;
141
142     while (pos > 127)
143     {
144          *bp++ = (unsigned char) (128 | (pos & 127));
145          pos = pos >> 7;
146     }
147     *bp++ = (unsigned char) pos;
148     *dst = (char *) bp;
149 }
150 #else
151 static void encode_ptr(char **dst, zint pos)
152 {
153     memcpy(*dst, &pos, sizeof(pos));
154     (*dst) += sizeof(pos);
155 }
156 #endif
157
158 #define decode_item_len decode_ptr
159 #if ISAMB_PTR_CODEC
160 static void decode_ptr(const char **src1, zint *pos)
161 {
162     const unsigned char **src = (const unsigned char **) src1;
163     zint d = 0;
164     unsigned char c;
165     unsigned r = 0;
166
167     while (((c = *(*src)++) & 128))
168     {
169         d += ((zint) (c & 127) << r);
170         r += 7;
171     }
172     d += ((zint) c << r);
173     *pos = d;
174 }
175 #else
176 static void decode_ptr(const char **src, zint *pos)
177 {
178      memcpy(pos, *src, sizeof(*pos));
179      (*src) += sizeof(*pos);
180 }
181 #endif
182
183 ISAMB isamb_open(BFiles bfs, const char *name, int writeflag, ISAMC_M *method,
184                  int cache)
185 {
186     ISAMB isamb = xmalloc(sizeof(*isamb));
187     int i, b_size = ISAMB_MIN_SIZE;
188
189     isamb->bfs = bfs;
190     isamb->method = (ISAMC_M *) xmalloc(sizeof(*method));
191     memcpy(isamb->method, method, sizeof(*method));
192     isamb->no_cat = CAT_NO;
193     isamb->log_io = 0;
194     isamb->log_freelist = 0;
195     isamb->cache = cache;
196     isamb->skipped_numbers = 0;
197     isamb->returned_numbers = 0;
198     for (i = 0; i<ISAMB_MAX_LEVEL; i++)
199         isamb->skipped_nodes[i] = isamb->accessed_nodes[i] = 0;
200
201     assert(cache == 0);
202     isamb->file = xmalloc(sizeof(*isamb->file) * isamb->no_cat);
203     for (i = 0; i < isamb->no_cat; i++)
204     {
205         char fname[DST_BUF_SIZE];
206         char hbuf[DST_BUF_SIZE];
207         isamb->file[i].cache_entries = 0;
208         isamb->file[i].head_dirty = 0;
209         sprintf(fname, "%s%c", name, i+'A');
210         if (cache)
211             isamb->file[i].bf = bf_open(bfs, fname, ISAMB_CACHE_ENTRY_SIZE,
212                                          writeflag);
213         else
214             isamb->file[i].bf = bf_open(bfs, fname, b_size, writeflag);
215
216         /* fill-in default values (for empty isamb) */
217         isamb->file[i].head.first_block = ISAMB_CACHE_ENTRY_SIZE/b_size+1;
218         isamb->file[i].head.last_block = isamb->file[i].head.first_block;
219         isamb->file[i].head.block_size = b_size;
220 #if ISAMB_PTR_CODEC
221         if (i == isamb->no_cat-1 || b_size > 128)
222             isamb->file[i].head.block_offset = 8;
223         else
224             isamb->file[i].head.block_offset = 4;
225 #else
226         isamb->file[i].head.block_offset = 11;
227 #endif
228         isamb->file[i].head.block_max =
229             b_size - isamb->file[i].head.block_offset;
230         isamb->file[i].head.free_list = 0;
231         if (bf_read(isamb->file[i].bf, 0, 0, 0, hbuf))
232         {
233             /* got header assume "isamb"major minor len can fit in 16 bytes */
234             zint zint_tmp;
235             int major, minor, len, pos = 0;
236             int left;
237             const char *src = 0;
238             if (memcmp(hbuf, "isamb", 5))
239             {
240                 yaz_log(YLOG_WARN, "bad isamb header for file %s", fname);
241                 return 0;
242             }
243             if (sscanf(hbuf+5, "%d %d %d", &major, &minor, &len) != 3)
244             {
245                 yaz_log(YLOG_WARN, "bad isamb header for file %s", fname);
246                 return 0;
247             }
248             if (major != ISAMB_MAJOR_VERSION)
249             {
250                 yaz_log(YLOG_WARN, "bad major version for file %s %d, must be %d",
251                      fname, major, ISAMB_MAJOR_VERSION);
252                 return 0;
253             }
254             for (left = len - b_size; left > 0; left = left - b_size)
255             {
256                 pos++;
257                 if (!bf_read(isamb->file[i].bf, pos, 0, 0, hbuf + pos*b_size))
258                 {
259                     yaz_log(YLOG_WARN, "truncated isamb header for " 
260                          "file=%s len=%d pos=%d",
261                          fname, len, pos);
262                     return 0;
263                 }
264             }
265             src = hbuf + 16;
266             decode_ptr(&src, &isamb->file[i].head.first_block);
267             decode_ptr(&src, &isamb->file[i].head.last_block);
268             decode_ptr(&src, &zint_tmp);
269             isamb->file[i].head.block_size = (int) zint_tmp;
270             decode_ptr(&src, &zint_tmp);
271             isamb->file[i].head.block_max = (int) zint_tmp;
272             decode_ptr(&src, &isamb->file[i].head.free_list);
273         }
274         assert (isamb->file[i].head.block_size >= isamb->file[i].head.block_offset);
275         isamb->file[i].head_dirty = 0;
276         assert(isamb->file[i].head.block_size == b_size);
277         b_size = b_size * ISAMB_FAC_SIZE;
278     }
279 #if ISAMB_DEBUG
280     yaz_log(YLOG_WARN, "isamb debug enabled. Things will be slower than usual");
281 #endif
282     return isamb;
283 }
284
285 static void flush_blocks (ISAMB b, int cat)
286 {
287     while (b->file[cat].cache_entries)
288     {
289         struct ISAMB_cache_entry *ce_this = b->file[cat].cache_entries;
290         b->file[cat].cache_entries = ce_this->next;
291
292         if (ce_this->dirty)
293         {
294             yaz_log(b->log_io, "bf_write: flush_blocks");
295             bf_write(b->file[cat].bf, ce_this->pos, 0, 0, ce_this->buf);
296         }
297         xfree(ce_this->buf);
298         xfree(ce_this);
299     }
300 }
301
302 static int cache_block (ISAMB b, ISAM_P pos, unsigned char *userbuf, int wr)
303 {
304     int cat = (int) (pos&CAT_MASK);
305     int off = (int) (((pos/CAT_MAX) & 
306                (ISAMB_CACHE_ENTRY_SIZE / b->file[cat].head.block_size - 1))
307         * b->file[cat].head.block_size);
308     zint norm = pos / (CAT_MASK*ISAMB_CACHE_ENTRY_SIZE / b->file[cat].head.block_size);
309     int no = 0;
310     struct ISAMB_cache_entry **ce, *ce_this = 0, **ce_last = 0;
311
312     if (!b->cache)
313         return 0;
314
315     assert (ISAMB_CACHE_ENTRY_SIZE >= b->file[cat].head.block_size);
316     for (ce = &b->file[cat].cache_entries; *ce; ce = &(*ce)->next, no++)
317     {
318         ce_last = ce;
319         if ((*ce)->pos == norm)
320         {
321             ce_this = *ce;
322             *ce = (*ce)->next;   /* remove from list */
323             
324             ce_this->next = b->file[cat].cache_entries;  /* move to front */
325             b->file[cat].cache_entries = ce_this;
326             
327             if (wr)
328             {
329                 memcpy (ce_this->buf + off, userbuf, 
330                         b->file[cat].head.block_size);
331                 ce_this->dirty = 1;
332             }
333             else
334                 memcpy (userbuf, ce_this->buf + off,
335                         b->file[cat].head.block_size);
336             return 1;
337         }
338     }
339     if (no >= 40)
340     {
341         assert (no == 40);
342         assert (ce_last && *ce_last);
343         ce_this = *ce_last;
344         *ce_last = 0;  /* remove the last entry from list */
345         if (ce_this->dirty)
346         {
347             yaz_log(b->log_io, "bf_write: cache_block");
348             bf_write(b->file[cat].bf, ce_this->pos, 0, 0, ce_this->buf);
349         }
350         xfree(ce_this->buf);
351         xfree(ce_this);
352     }
353     ce_this = xmalloc(sizeof(*ce_this));
354     ce_this->next = b->file[cat].cache_entries;
355     b->file[cat].cache_entries = ce_this;
356     ce_this->buf = xmalloc(ISAMB_CACHE_ENTRY_SIZE);
357     ce_this->pos = norm;
358     yaz_log(b->log_io, "bf_read: cache_block");
359     if (!bf_read(b->file[cat].bf, norm, 0, 0, ce_this->buf))
360         memset (ce_this->buf, 0, ISAMB_CACHE_ENTRY_SIZE);
361     if (wr)
362     {
363         memcpy (ce_this->buf + off, userbuf, b->file[cat].head.block_size);
364         ce_this->dirty = 1;
365     }
366     else
367     {
368         ce_this->dirty = 0;
369         memcpy (userbuf, ce_this->buf + off, b->file[cat].head.block_size);
370     }
371     return 1;
372 }
373
374
375 void isamb_close (ISAMB isamb)
376 {
377     int i;
378     for (i = 0; isamb->accessed_nodes[i]; i++)
379         yaz_log(YLOG_DEBUG, "isamb_close  level leaf-%d: "ZINT_FORMAT" read, "
380                         ZINT_FORMAT" skipped",
381              i, isamb->accessed_nodes[i], isamb->skipped_nodes[i]);
382     yaz_log(YLOG_DEBUG, "isamb_close returned "ZINT_FORMAT" values, "
383                    "skipped "ZINT_FORMAT,
384          isamb->skipped_numbers, isamb->returned_numbers);
385     for (i = 0; i<isamb->no_cat; i++)
386     {
387         flush_blocks (isamb, i);
388         if (isamb->file[i].head_dirty)
389         {
390             char hbuf[DST_BUF_SIZE];
391             int major = ISAMB_MAJOR_VERSION;
392             int minor = ISAMB_MINOR_VERSION;
393             int len = 16;
394             char *dst = hbuf + 16;
395             int pos = 0, left;
396             int b_size = isamb->file[i].head.block_size;
397
398             encode_ptr(&dst, isamb->file[i].head.first_block);
399             encode_ptr(&dst, isamb->file[i].head.last_block);
400             encode_ptr(&dst, isamb->file[i].head.block_size);
401             encode_ptr(&dst, isamb->file[i].head.block_max);
402             encode_ptr(&dst, isamb->file[i].head.free_list);
403             memset(dst, '\0', b_size); /* ensure no random bytes are written */
404
405             len = dst - hbuf;
406
407             /* print exactly 16 bytes (including trailing 0) */
408             sprintf(hbuf, "isamb%02d %02d %02d\r\n", major, minor, len);
409
410             bf_write(isamb->file[i].bf, pos, 0, 0, hbuf);
411
412             for (left = len - b_size; left > 0; left = left - b_size)
413             {
414                 pos++;
415                 bf_write(isamb->file[i].bf, pos, 0, 0, hbuf + pos*b_size);
416             }
417         }
418         bf_close (isamb->file[i].bf);
419     }
420     xfree(isamb->file);
421     xfree(isamb->method);
422     xfree(isamb);
423 }
424
425 /* open_block: read one block at pos.
426    Decode leading sys bytes .. consisting of
427    Offset:Meaning
428    0: leader byte, != 0 leaf, == 0, non-leaf
429    1-2: used size of block
430    3-7*: number of items and all children
431    
432    * Reserve 5 bytes for large block sizes. 1 for small ones .. Number
433    of items. We can thus have at most 2^40 nodes. 
434 */
435 static struct ISAMB_block *open_block(ISAMB b, ISAM_P pos)
436 {
437     int cat = (int) (pos&CAT_MASK);
438     const char *src;
439     int offset = b->file[cat].head.block_offset;
440     struct ISAMB_block *p;
441     if (!pos)
442         return 0;
443     p = xmalloc(sizeof(*p));
444     p->pos = pos;
445     p->cat = (int) (pos & CAT_MASK);
446     p->buf = xmalloc(b->file[cat].head.block_size);
447     p->cbuf = 0;
448
449     if (!cache_block (b, pos, p->buf, 0))
450     {
451         yaz_log(b->log_io, "bf_read: open_block");
452         if (!bf_read(b->file[cat].bf, pos/CAT_MAX, 0, 0, p->buf))
453         {
454             yaz_log(YLOG_FATAL, "isamb: read fail for pos=%ld block=%ld",
455                      (long) pos, (long) pos/CAT_MAX);
456             abort();
457         }
458     }
459     p->bytes = (char *)p->buf + offset;
460     p->leaf = p->buf[0];
461     p->size = (p->buf[1] + 256 * p->buf[2]) - offset;
462     if (p->size < 0)
463     {
464         yaz_log(YLOG_FATAL, "Bad block size %d in pos=" ZINT_FORMAT "\n",
465                  p->size, pos);
466     }
467     assert (p->size >= 0);
468     src = (char*) p->buf + 3;
469     decode_ptr(&src, &p->no_items);
470
471     p->offset = 0;
472     p->dirty = 0;
473     p->deleted = 0;
474     p->decodeClientData = (*b->method->codec.start)();
475     return p;
476 }
477
478 struct ISAMB_block *new_block (ISAMB b, int leaf, int cat)
479 {
480     struct ISAMB_block *p;
481
482     p = xmalloc(sizeof(*p));
483     p->buf = xmalloc(b->file[cat].head.block_size);
484
485     if (!b->file[cat].head.free_list)
486     {
487         zint block_no;
488         block_no = b->file[cat].head.last_block++;
489         p->pos = block_no * CAT_MAX + cat;
490     }
491     else
492     {
493         p->pos = b->file[cat].head.free_list;
494         assert((p->pos & CAT_MASK) == cat);
495         if (!cache_block (b, p->pos, p->buf, 0))
496         {
497             yaz_log(b->log_io, "bf_read: new_block");
498             if (!bf_read(b->file[cat].bf, p->pos/CAT_MAX, 0, 0, p->buf))
499             {
500                 yaz_log(YLOG_FATAL, "isamb: read fail for pos=%ld block=%ld",
501                          (long) p->pos/CAT_MAX, (long) p->pos/CAT_MAX);
502                 abort ();
503             }
504         }
505         yaz_log(b->log_freelist, "got block " ZINT_FORMAT " from freelist %d:" ZINT_FORMAT, p->pos,
506                  cat, p->pos/CAT_MAX);
507         memcpy (&b->file[cat].head.free_list, p->buf, sizeof(zint));
508     }
509     p->cat = cat;
510     b->file[cat].head_dirty = 1;
511     memset (p->buf, 0, b->file[cat].head.block_size);
512     p->bytes = (char*)p->buf + b->file[cat].head.block_offset;
513     p->leaf = leaf;
514     p->size = 0;
515     p->dirty = 1;
516     p->deleted = 0;
517     p->offset = 0;
518     p->no_items = 0;
519     p->decodeClientData = (*b->method->codec.start)();
520     return p;
521 }
522
523 struct ISAMB_block *new_leaf (ISAMB b, int cat)
524 {
525     return new_block (b, 1, cat);
526 }
527
528
529 struct ISAMB_block *new_int (ISAMB b, int cat)
530 {
531     return new_block (b, 0, cat);
532 }
533
534 static void check_block (ISAMB b, struct ISAMB_block *p)
535 {
536     assert(b); /* mostly to make the compiler shut up about unused b */
537     if (p->leaf)
538     {
539         ;
540     }
541     else
542     {
543         /* sanity check */
544         char *startp = p->bytes;
545         const char *src = startp;
546         char *endp = p->bytes + p->size;
547         ISAM_P pos;
548         void *c1 = (*b->method->codec.start)();
549             
550         decode_ptr(&src, &pos);
551         assert ((pos&CAT_MASK) == p->cat);
552         while (src != endp)
553         {
554 #if INT_ENCODE
555             char file_item_buf[DST_ITEM_MAX];
556             char *file_item = file_item_buf;
557             (*b->method->codec.reset)(c1);
558             (*b->method->codec.decode)(c1, &file_item, &src);
559 #else
560             zint item_len;
561             decode_item_len(&src, &item_len);
562             assert (item_len > 0 && item_len < 80);
563             src += item_len;
564 #endif
565             decode_ptr(&src, &pos);
566             if ((pos&CAT_MASK) != p->cat)
567             {
568                 assert ((pos&CAT_MASK) == p->cat);
569             }
570         }
571         (*b->method->codec.stop)(c1);
572     }
573 }
574
575 void close_block(ISAMB b, struct ISAMB_block *p)
576 {
577     if (!p)
578         return;
579     if (p->deleted)
580     {
581         yaz_log(b->log_freelist, "release block " ZINT_FORMAT " from freelist %d:" ZINT_FORMAT,
582                  p->pos, p->cat, p->pos/CAT_MAX);
583         memcpy (p->buf, &b->file[p->cat].head.free_list, sizeof(zint));
584         b->file[p->cat].head.free_list = p->pos;
585         if (!cache_block (b, p->pos, p->buf, 1))
586         {
587             yaz_log(b->log_io, "bf_write: close_block (deleted)");
588             bf_write(b->file[p->cat].bf, p->pos/CAT_MAX, 0, 0, p->buf);
589         }
590     }
591     else if (p->dirty)
592     {
593         int offset = b->file[p->cat].head.block_offset;
594         int size = p->size + offset;
595         char *dst =  (char*)p->buf + 3;
596         assert (p->size >= 0);
597         
598         /* memset becuase encode_ptr usually does not write all bytes */
599         memset(p->buf, 0, b->file[p->cat].head.block_offset);
600         p->buf[0] = p->leaf;
601         p->buf[1] = size & 255;
602         p->buf[2] = size >> 8;
603         encode_ptr(&dst, p->no_items);
604         check_block(b, p);
605         if (!cache_block (b, p->pos, p->buf, 1))
606         {
607             yaz_log(b->log_io, "bf_write: close_block");
608             bf_write(b->file[p->cat].bf, p->pos/CAT_MAX, 0, 0, p->buf);
609         }
610     }
611     (*b->method->codec.stop)(p->decodeClientData);
612     xfree(p->buf);
613     xfree(p);
614 }
615
616 int insert_sub (ISAMB b, struct ISAMB_block **p,
617                 void *new_item, int *mode,
618                 ISAMC_I *stream,
619                 struct ISAMB_block **sp,
620                 void *sub_item, int *sub_size,
621                 const void *max_item);
622
623 int insert_int (ISAMB b, struct ISAMB_block *p, void *lookahead_item,
624                 int *mode,
625                 ISAMC_I *stream, struct ISAMB_block **sp,
626                 void *split_item, int *split_size, const void *last_max_item)
627 {
628     char *startp = p->bytes;
629     const char *src = startp;
630     char *endp = p->bytes + p->size;
631     ISAM_P pos;
632     struct ISAMB_block *sub_p1 = 0, *sub_p2 = 0;
633     char sub_item[DST_ITEM_MAX];
634     int sub_size;
635     int more = 0;
636     zint diff_terms = 0;
637     void *c1 = (*b->method->codec.start)();
638
639     *sp = 0;
640
641     assert(p->size >= 0);
642     decode_ptr(&src, &pos);
643     while (src != endp)
644     {
645         int d;
646         const char *src0 = src;
647 #if INT_ENCODE
648         char file_item_buf[DST_ITEM_MAX];
649         char *file_item = file_item_buf;
650         (*b->method->codec.reset)(c1);
651         (*b->method->codec.decode)(c1, &file_item, &src);
652         d = (*b->method->compare_item)(file_item_buf, lookahead_item);
653         if (d > 0)
654         {
655             sub_p1 = open_block(b, pos);
656             assert (sub_p1);
657             diff_terms -= sub_p1->no_items;
658             more = insert_sub (b, &sub_p1, lookahead_item, mode,
659                                stream, &sub_p2, 
660                                sub_item, &sub_size, file_item_buf);
661             diff_terms += sub_p1->no_items;
662             src = src0;
663             break;
664         }
665 #else
666         zint item_len;
667         decode_item_len(&src, &item_len);
668         d = (*b->method->compare_item)(src, lookahead_item);
669         if (d > 0)
670         {
671             sub_p1 = open_block(b, pos);
672             assert (sub_p1);
673             diff_terms -= sub_p1->no_items;
674             more = insert_sub (b, &sub_p1, lookahead_item, mode,
675                                stream, &sub_p2, 
676                                sub_item, &sub_size, src);
677             diff_terms += sub_p1->no_items;
678             src = src0;
679             break;
680         }
681         src += item_len;
682 #endif
683         decode_ptr(&src, &pos);
684     }
685     if (!sub_p1)
686     {
687         /* we reached the end. So lookahead > last item */
688         sub_p1 = open_block(b, pos);
689         assert (sub_p1);
690         diff_terms -= sub_p1->no_items;
691         more = insert_sub (b, &sub_p1, lookahead_item, mode, stream, &sub_p2, 
692                            sub_item, &sub_size, last_max_item);
693         diff_terms += sub_p1->no_items;
694     }
695     if (sub_p2)
696         diff_terms += sub_p2->no_items;
697     if (diff_terms)
698     {
699         p->dirty = 1;
700         p->no_items += diff_terms;
701     }
702     if (sub_p2)
703     {
704         /* there was a split - must insert pointer in this one */
705         char dst_buf[DST_BUF_SIZE];
706         char *dst = dst_buf;
707 #if INT_ENCODE
708         const char *sub_item_ptr = sub_item;
709 #endif
710         assert (sub_size < 80 && sub_size > 1);
711
712         memcpy (dst, startp, src - startp);
713                 
714         dst += src - startp;
715
716 #if INT_ENCODE
717         (*b->method->codec.reset)(c1);
718         (*b->method->codec.encode)(c1, &dst, &sub_item_ptr);
719 #else
720         encode_item_len (&dst, sub_size);      /* sub length and item */
721         memcpy (dst, sub_item, sub_size);
722         dst += sub_size;
723 #endif
724
725         encode_ptr(&dst, sub_p2->pos);   /* pos */
726
727         if (endp - src)                   /* remaining data */
728         {
729             memcpy (dst, src, endp - src);
730             dst += endp - src;
731         }
732         p->size = dst - dst_buf;
733         assert (p->size >= 0);
734
735         if (p->size <= b->file[p->cat].head.block_max)
736         {
737             /* it fits OK in this block */
738             memcpy (startp, dst_buf, dst - dst_buf);
739
740             close_block(b, sub_p2);
741         }
742         else
743         {
744             /* must split _this_ block as well .. */
745             struct ISAMB_block *sub_p3;
746 #if INT_ENCODE
747             char file_item_buf[DST_ITEM_MAX];
748             char *file_item = file_item_buf;
749 #else
750             zint split_size_tmp;
751 #endif
752             zint no_items_first_half = 0;
753             int p_new_size;
754             const char *half;
755             src = dst_buf;
756             endp = dst;
757
758             p->dirty = 1;
759             close_block(b, sub_p2);
760
761             half = src + b->file[p->cat].head.block_size/2;
762             decode_ptr(&src, &pos);
763
764             /* read sub block so we can get no_items for it */
765             sub_p3 = open_block(b, pos);
766             no_items_first_half += sub_p3->no_items;
767             close_block(b, sub_p3);
768
769             while (src <= half)
770             {
771 #if INT_ENCODE
772                 file_item = file_item_buf;
773                 (*b->method->codec.reset)(c1);
774                 (*b->method->codec.decode)(c1, &file_item, &src);
775 #else
776                 decode_item_len(&src, &split_size_tmp);
777                 *split_size = (int) split_size_tmp;
778                 src += *split_size;
779 #endif
780                 decode_ptr(&src, &pos);
781
782                 /* read sub block so we can get no_items for it */
783                 sub_p3 = open_block(b, pos);
784                 no_items_first_half += sub_p3->no_items;
785                 close_block(b, sub_p3);
786             }
787             /*  p is first half */
788             p_new_size = src - dst_buf;
789             memcpy (p->bytes, dst_buf, p_new_size);
790
791 #if INT_ENCODE
792             file_item = file_item_buf;
793             (*b->method->codec.reset)(c1);
794             (*b->method->codec.decode)(c1, &file_item, &src);
795             *split_size = file_item - file_item_buf;
796             memcpy(split_item, file_item_buf, *split_size);
797 #else
798             decode_item_len(&src, &split_size_tmp);
799             *split_size = (int) split_size_tmp;
800             memcpy (split_item, src, *split_size);
801             src += *split_size;
802 #endif
803             /*  *sp is second half */
804             *sp = new_int (b, p->cat);
805             (*sp)->size = endp - src;
806             memcpy ((*sp)->bytes, src, (*sp)->size);
807
808             p->size = p_new_size;
809
810             /*  adjust no_items in first&second half */
811             (*sp)->no_items = p->no_items - no_items_first_half;
812             p->no_items = no_items_first_half;
813         }
814         p->dirty = 1;
815     }
816     close_block(b, sub_p1);
817     (*b->method->codec.stop)(c1);
818     return more;
819 }
820
821 int insert_leaf (ISAMB b, struct ISAMB_block **sp1, void *lookahead_item,
822                  int *lookahead_mode, ISAMC_I *stream,
823                  struct ISAMB_block **sp2,
824                  void *sub_item, int *sub_size,
825                  const void *max_item)
826 {
827     struct ISAMB_block *p = *sp1;
828     char *endp = 0;
829     const char *src = 0;
830     char dst_buf[DST_BUF_SIZE], *dst = dst_buf;
831     int new_size;
832     void *c1 = (*b->method->codec.start)();
833     void *c2 = (*b->method->codec.start)();
834     int more = 1;
835     int quater = b->file[b->no_cat-1].head.block_max / 4;
836     char *mid_cut = dst_buf + quater * 2;
837     char *tail_cut = dst_buf + quater * 3;
838     char *maxp = dst_buf + b->file[b->no_cat-1].head.block_max;
839     char *half1 = 0;
840     char *half2 = 0;
841     char cut_item_buf[DST_ITEM_MAX];
842     int cut_item_size = 0;
843     int no_items = 0;    /* number of items (total) */
844     int no_items_1 = 0;  /* number of items (first half) */
845     int inserted_dst_bytes = 0;
846
847     if (p && p->size)
848     {
849         char file_item_buf[DST_ITEM_MAX];
850         char *file_item = file_item_buf;
851             
852         src = p->bytes;
853         endp = p->bytes + p->size;
854         (*b->method->codec.decode)(c1, &file_item, &src);
855         while (1)
856         {
857             const char *dst_item = 0; /* resulting item to be inserted */
858             char *lookahead_next;
859             char *dst_0 = dst;
860             int d = -1;
861             
862             if (lookahead_item)
863                 d = (*b->method->compare_item)(file_item_buf, lookahead_item);
864             
865             /* d now holds comparison between existing file item and
866                lookahead item 
867                d = 0: equal
868                d > 0: lookahead before file
869                d < 0: lookahead after file
870             */
871             if (d > 0)
872             {
873                 /* lookahead must be inserted */
874                 dst_item = lookahead_item;
875                 /* if this is not an insertion, it's really bad .. */
876                 if (!*lookahead_mode)
877                 {
878                     yaz_log(YLOG_WARN, "isamb: Inconsistent register (1)");
879                     assert (*lookahead_mode);
880                 }
881             }
882             else
883                 dst_item = file_item_buf;
884
885             if (!*lookahead_mode && d == 0)
886             {
887                 /* it's a deletion and they match so there is nothing to be
888                    inserted anyway .. But mark the thing bad (file item
889                    was part of input.. The item will not be part of output */
890                 p->dirty = 1;
891             }
892             else if (!half1 && dst > mid_cut)
893             {
894                 /* we have reached the splitting point for the first time */
895                 const char *dst_item_0 = dst_item;
896                 half1 = dst; /* candidate for splitting */
897
898                 /* encode the resulting item */
899                 (*b->method->codec.encode)(c2, &dst, &dst_item);
900                 
901                 cut_item_size = dst_item - dst_item_0;
902                 assert(cut_item_size > 0);
903                 memcpy (cut_item_buf, dst_item_0, cut_item_size);
904                 
905                 half2 = dst;
906                 no_items_1 = no_items;
907                 no_items++;
908             }
909             else
910             {
911                 /* encode the resulting item */
912                 (*b->method->codec.encode)(c2, &dst, &dst_item);
913                 no_items++;
914             }
915
916             /* now move "pointers" .. result has been encoded .. */
917             if (d > 0)  
918             {
919                 /* we must move the lookahead pointer */
920
921                 inserted_dst_bytes += (dst - dst_0);
922                 if (inserted_dst_bytes >=  quater)
923                     /* no more room. Mark lookahead as "gone".. */
924                     lookahead_item = 0;
925                 else
926                 {
927                     /* move it really.. */
928                     lookahead_next = lookahead_item;
929                     if (!(*stream->read_item)(stream->clientData,
930                                               &lookahead_next,
931                                               lookahead_mode))
932                     {
933                         /* end of stream reached: no "more" and no lookahead */
934                         lookahead_item = 0;
935                         more = 0;
936                     }
937                     if (lookahead_item && max_item &&
938                         (*b->method->compare_item)(max_item, lookahead_item) <= 0)
939                     {
940                         /* the lookahead goes beyond what we allow in this
941                            leaf. Mark it as "gone" */
942                         lookahead_item = 0;
943                     }
944                     
945                     p->dirty = 1;
946                 }
947             }
948             else if (d == 0)
949             {
950                 /* exact match .. move both pointers */
951
952                 lookahead_next = lookahead_item;
953                 if (!(*stream->read_item)(stream->clientData,
954                                           &lookahead_next, lookahead_mode))
955                 {
956                     lookahead_item = 0;
957                     more = 0;
958                 }
959                 if (src == endp)
960                     break;  /* end of file stream reached .. */
961                 file_item = file_item_buf; /* move file pointer */
962                 (*b->method->codec.decode)(c1, &file_item, &src);
963             }
964             else
965             {
966                 /* file pointer must be moved */
967                 if (src == endp)
968                     break;
969                 file_item = file_item_buf;
970                 (*b->method->codec.decode)(c1, &file_item, &src);
971             }
972         }
973     }
974
975     /* this loop runs when we are "appending" to a leaf page. That is
976        either it's empty (new) or all file items have been read in
977        previous loop */
978
979     maxp = dst_buf + b->file[b->no_cat-1].head.block_max + quater;
980     while (lookahead_item)
981     {
982         char *dst_item;
983         const char *src = lookahead_item;
984         char *dst_0 = dst;
985         
986         /* if we have a lookahead item, we stop if we exceed the value of it */
987         if (max_item &&
988             (*b->method->compare_item)(max_item, lookahead_item) <= 0)
989         {
990             /* stop if we have reached the value of max item */
991             break;
992         }
993         if (!*lookahead_mode)
994         {
995             /* this is append. So a delete is bad */
996             yaz_log(YLOG_WARN, "isamb: Inconsistent register (2)");
997             abort();
998         }
999         else if (!half1 && dst > tail_cut)
1000         {
1001             const char *src_0 = src;
1002             half1 = dst; /* candidate for splitting */
1003             
1004             (*b->method->codec.encode)(c2, &dst, &src);
1005             
1006             cut_item_size = src - src_0;
1007             assert(cut_item_size > 0);
1008             memcpy (cut_item_buf, src_0, cut_item_size);
1009             
1010             no_items_1 = no_items;
1011             half2 = dst;
1012         }
1013         else
1014             (*b->method->codec.encode)(c2, &dst, &src);
1015
1016         if (dst > maxp)
1017         {
1018             dst = dst_0;
1019             break;
1020         }
1021         no_items++;
1022         if (p)
1023             p->dirty = 1;
1024         dst_item = lookahead_item;
1025         if (!(*stream->read_item)(stream->clientData, &dst_item,
1026                                   lookahead_mode))
1027         {
1028             lookahead_item = 0;
1029             more = 0;
1030         }
1031     }
1032     new_size = dst - dst_buf;
1033     if (p && p->cat != b->no_cat-1 && 
1034         new_size > b->file[p->cat].head.block_max)
1035     {
1036         /* non-btree block will be removed */
1037         p->deleted = 1;
1038         close_block(b, p);
1039         /* delete it too!! */
1040         p = 0; /* make a new one anyway */
1041     }
1042     if (!p)
1043     {   /* must create a new one */
1044         int i;
1045         for (i = 0; i < b->no_cat; i++)
1046             if (new_size <= b->file[i].head.block_max)
1047                 break;
1048         if (i == b->no_cat)
1049             i = b->no_cat - 1;
1050         p = new_leaf (b, i);
1051     }
1052     if (new_size > b->file[p->cat].head.block_max)
1053     {
1054         char *first_dst;
1055         const char *cut_item = cut_item_buf;
1056
1057         assert (half1);
1058         assert (half2);
1059
1060         assert(cut_item_size > 0);
1061         
1062         /* first half */
1063         p->size = half1 - dst_buf;
1064         assert(p->size <=  b->file[p->cat].head.block_max);
1065         memcpy (p->bytes, dst_buf, half1 - dst_buf);
1066         p->no_items = no_items_1;
1067
1068         /* second half */
1069         *sp2 = new_leaf (b, p->cat);
1070
1071         (*b->method->codec.reset)(c2);
1072
1073         first_dst = (*sp2)->bytes;
1074
1075         (*b->method->codec.encode)(c2, &first_dst, &cut_item);
1076
1077         memcpy (first_dst, half2, dst - half2);
1078
1079         (*sp2)->size = (first_dst - (*sp2)->bytes) + (dst - half2);
1080         assert((*sp2)->size <=  b->file[p->cat].head.block_max);
1081         (*sp2)->no_items = no_items - no_items_1;
1082         (*sp2)->dirty = 1;
1083         p->dirty = 1;
1084         memcpy (sub_item, cut_item_buf, cut_item_size);
1085         *sub_size = cut_item_size;
1086     }
1087     else
1088     {
1089         memcpy (p->bytes, dst_buf, dst - dst_buf);
1090         p->size = new_size;
1091         p->no_items = no_items;
1092     }
1093     (*b->method->codec.stop)(c1);
1094     (*b->method->codec.stop)(c2);
1095     *sp1 = p;
1096     return more;
1097 }
1098
1099 int insert_sub (ISAMB b, struct ISAMB_block **p, void *new_item,
1100                 int *mode,
1101                 ISAMC_I *stream,
1102                 struct ISAMB_block **sp,
1103                 void *sub_item, int *sub_size,
1104                 const void *max_item)
1105 {
1106     if (!*p || (*p)->leaf)
1107         return insert_leaf (b, p, new_item, mode, stream, sp, sub_item, 
1108                             sub_size, max_item);
1109     else
1110         return insert_int (b, *p, new_item, mode, stream, sp, sub_item,
1111                            sub_size, max_item);
1112 }
1113
1114 int isamb_unlink (ISAMB b, ISAM_P pos)
1115 {
1116     struct ISAMB_block *p1;
1117
1118     if (!pos)
1119         return 0;
1120     p1 = open_block(b, pos);
1121     p1->deleted = 1;
1122     if (!p1->leaf)
1123     {
1124         zint sub_p;
1125         const char *src = p1->bytes + p1->offset;
1126 #if INT_ENCODE
1127         void *c1 = (*b->method->codec.start)();
1128 #endif
1129         decode_ptr(&src, &sub_p);
1130         isamb_unlink(b, sub_p);
1131         
1132         while (src != p1->bytes + p1->size)
1133         {
1134 #if INT_ENCODE
1135             char file_item_buf[DST_ITEM_MAX];
1136             char *file_item = file_item_buf;
1137             (*b->method->codec.reset)(c1);
1138             (*b->method->codec.decode)(c1, &file_item, &src);
1139 #else
1140             zint item_len;
1141             decode_item_len(&src, &item_len);
1142             src += item_len;
1143 #endif
1144             decode_ptr(&src, &sub_p);
1145             isamb_unlink(b, sub_p);
1146         }
1147 #if INT_ENCODE
1148         (*b->method->codec.stop)(c1);
1149 #endif
1150     }
1151     close_block(b, p1);
1152     return 0;
1153 }
1154
1155 void isamb_merge(ISAMB b, ISAM_P *pos, ISAMC_I *stream)
1156 {
1157     char item_buf[DST_ITEM_MAX];
1158     char *item_ptr;
1159     int i_mode;
1160     int more;
1161     int must_delete = 0;
1162
1163     if (b->cache < 0)
1164     {
1165         int more = 1;
1166         while (more)
1167         {
1168             item_ptr = item_buf;
1169             more =
1170                 (*stream->read_item)(stream->clientData, &item_ptr, &i_mode);
1171         }
1172         *pos = 1;
1173         return;
1174     }
1175     item_ptr = item_buf;
1176     more = (*stream->read_item)(stream->clientData, &item_ptr, &i_mode);
1177     while (more)
1178     {
1179         struct ISAMB_block *p = 0, *sp = 0;
1180         char sub_item[DST_ITEM_MAX];
1181         int sub_size;
1182         
1183         if (*pos)
1184             p = open_block(b, *pos);
1185         more = insert_sub (b, &p, item_buf, &i_mode, stream, &sp,
1186                            sub_item, &sub_size, 0);
1187         if (sp)
1188         {    /* increase level of tree by one */
1189             struct ISAMB_block *p2 = new_int (b, p->cat);
1190             char *dst = p2->bytes + p2->size;
1191 #if INT_ENCODE
1192             void *c1 = (*b->method->codec.start)();
1193             const char *sub_item_ptr = sub_item;
1194 #endif
1195
1196             encode_ptr(&dst, p->pos);
1197             assert (sub_size < 80 && sub_size > 1);
1198 #if INT_ENCODE
1199             (*b->method->codec.reset)(c1);
1200             (*b->method->codec.encode)(c1, &dst, &sub_item_ptr);
1201 #else
1202             encode_item_len (&dst, sub_size);
1203             memcpy (dst, sub_item, sub_size);
1204             dst += sub_size;
1205 #endif
1206             encode_ptr(&dst, sp->pos);
1207             
1208             p2->size = dst - p2->bytes;
1209             p2->no_items = p->no_items + sp->no_items;
1210             *pos = p2->pos;  /* return new super page */
1211             close_block(b, sp);
1212             close_block(b, p2);
1213 #if INT_ENCODE
1214             (*b->method->codec.stop)(c1);
1215 #endif
1216         }
1217         else
1218         {
1219             *pos = p->pos;   /* return current one (again) */
1220         }
1221         if (p->no_items == 0)
1222             must_delete = 1;
1223         else
1224             must_delete = 0;
1225         close_block(b, p);
1226     }
1227     if (must_delete)
1228     {
1229         isamb_unlink(b, *pos);
1230         *pos = 0;
1231     }
1232 }
1233
1234 ISAMB_PP isamb_pp_open_x(ISAMB isamb, ISAM_P pos, int *level, int scope)
1235 {
1236     ISAMB_PP pp = xmalloc(sizeof(*pp));
1237     int i;
1238
1239     assert(pos);
1240
1241     pp->isamb = isamb;
1242     pp->block = xmalloc(ISAMB_MAX_LEVEL * sizeof(*pp->block));
1243
1244     pp->pos = pos;
1245     pp->level = 0;
1246     pp->maxlevel = 0;
1247     pp->total_size = 0;
1248     pp->no_blocks = 0;
1249     pp->skipped_numbers = 0;
1250     pp->returned_numbers = 0;
1251     pp->scope = scope;
1252     for (i = 0; i<ISAMB_MAX_LEVEL; i++)
1253         pp->skipped_nodes[i] = pp->accessed_nodes[i] = 0;
1254     while (1)
1255     {
1256         struct ISAMB_block *p = open_block(isamb, pos);
1257         const char *src = p->bytes + p->offset;
1258         pp->block[pp->level] = p;
1259
1260         pp->total_size += p->size;
1261         pp->no_blocks++;
1262         if (p->leaf)
1263             break;
1264         decode_ptr(&src, &pos);
1265         p->offset = src - p->bytes;
1266         pp->level++;
1267         pp->accessed_nodes[pp->level]++; 
1268     }
1269     pp->block[pp->level+1] = 0;
1270     pp->maxlevel = pp->level;
1271     if (level)
1272         *level = pp->level;
1273     return pp;
1274 }
1275
1276 ISAMB_PP isamb_pp_open (ISAMB isamb, ISAM_P pos, int scope)
1277 {
1278     return isamb_pp_open_x(isamb, pos, 0, scope);
1279 }
1280
1281 void isamb_pp_close_x(ISAMB_PP pp, zint *size, zint *blocks)
1282 {
1283     int i;
1284     if (!pp)
1285         return;
1286     yaz_log(YLOG_DEBUG, "isamb_pp_close lev=%d returned "ZINT_FORMAT" values, " 
1287                     "skipped "ZINT_FORMAT,
1288         pp->maxlevel, pp->skipped_numbers, pp->returned_numbers);
1289     for (i = pp->maxlevel; i>=0; i--)
1290         if (pp->skipped_nodes[i] || pp->accessed_nodes[i])
1291             yaz_log(YLOG_DEBUG, "isamb_pp_close  level leaf-%d: "
1292                             ZINT_FORMAT" read, "ZINT_FORMAT" skipped", i,
1293                  pp->accessed_nodes[i], pp->skipped_nodes[i]);
1294     pp->isamb->skipped_numbers += pp->skipped_numbers;
1295     pp->isamb->returned_numbers += pp->returned_numbers;
1296     for (i = pp->maxlevel; i>=0; i--)
1297     {
1298         pp->isamb->accessed_nodes[i] += pp->accessed_nodes[i];
1299         pp->isamb->skipped_nodes[i] += pp->skipped_nodes[i];
1300     }
1301     if (size)
1302         *size = pp->total_size;
1303     if (blocks)
1304         *blocks = pp->no_blocks;
1305     for (i = 0; i <= pp->level; i++)
1306         close_block(pp->isamb, pp->block[i]);
1307     xfree(pp->block);
1308     xfree(pp);
1309 }
1310
1311 int isamb_block_info (ISAMB isamb, int cat)
1312 {
1313     if (cat >= 0 && cat < isamb->no_cat)
1314         return isamb->file[cat].head.block_size;
1315     return -1;
1316 }
1317
1318 void isamb_pp_close (ISAMB_PP pp)
1319 {
1320     isamb_pp_close_x(pp, 0, 0);
1321 }
1322
1323 /* simple recursive dumper .. */
1324 static void isamb_dump_r (ISAMB b, ISAM_P pos, void (*pr)(const char *str),
1325                           int level)
1326 {
1327     char buf[1024];
1328     char prefix_str[1024];
1329     if (pos)
1330     {
1331         struct ISAMB_block *p = open_block(b, pos);
1332         sprintf(prefix_str, "%*s " ZINT_FORMAT " cat=%d size=%d max=%d items="
1333                 ZINT_FORMAT, level*2, "",
1334                 pos, p->cat, p->size, b->file[p->cat].head.block_max,
1335                 p->no_items);
1336         (*pr)(prefix_str);
1337         sprintf(prefix_str, "%*s " ZINT_FORMAT, level*2, "", pos);
1338         if (p->leaf)
1339         {
1340             while (p->offset < p->size)
1341             {
1342                 const char *src = p->bytes + p->offset;
1343                 char *dst = buf;
1344                 (*b->method->codec.decode)(p->decodeClientData, &dst, &src);
1345                 (*b->method->log_item)(YLOG_DEBUG, buf, prefix_str);
1346                 p->offset = src - (char*) p->bytes;
1347             }
1348             assert(p->offset == p->size);
1349         }
1350         else
1351         {
1352             const char *src = p->bytes + p->offset;
1353             ISAM_P sub;
1354
1355             decode_ptr(&src, &sub);
1356             p->offset = src - (char*) p->bytes;
1357
1358             isamb_dump_r(b, sub, pr, level+1);
1359             
1360             while (p->offset < p->size)
1361             {
1362 #if INT_ENCODE
1363                 char file_item_buf[DST_ITEM_MAX];
1364                 char *file_item = file_item_buf;
1365                 void *c1 = (*b->method->codec.start)();
1366                 (*b->method->codec.decode)(c1, &file_item, &src);
1367                 (*b->method->codec.stop)(c1);
1368                 (*b->method->log_item)(YLOG_DEBUG, file_item_buf, prefix_str);
1369 #else
1370                 zint item_len;
1371                 decode_item_len(&src, &item_len);
1372                 (*b->method->log_item)(YLOG_DEBUG, src, prefix_str);
1373                 src += item_len;
1374 #endif
1375                 decode_ptr(&src, &sub);
1376                 
1377                 p->offset = src - (char*) p->bytes;
1378                 
1379                 isamb_dump_r(b, sub, pr, level+1);
1380             }           
1381         }
1382         close_block(b, p);
1383     }
1384 }
1385
1386 void isamb_dump(ISAMB b, ISAM_P pos, void (*pr)(const char *str))
1387 {
1388     isamb_dump_r(b, pos, pr, 0);
1389 }
1390
1391 int isamb_pp_read(ISAMB_PP pp, void *buf)
1392 {
1393     return isamb_pp_forward(pp, buf, 0);
1394 }
1395
1396
1397 static int isamb_pp_on_right_node(ISAMB_PP pp, int level, const void *untilbuf)
1398 { /* looks one node higher to see if we should be on this node at all */
1399   /* useful in backing off quickly, and in avoiding tail descends */
1400   /* call with pp->level to begin with */
1401     struct ISAMB_block *p;
1402     int cmp;
1403     const char *src;
1404     ISAMB b = pp->isamb;
1405
1406     assert(level >= 0);
1407     if (level == 0)
1408     {
1409 #if ISAMB_DEBUG
1410             yaz_log(YLOG_DEBUG, "isamb_pp_on_right returning true for root");
1411 #endif
1412         return 1; /* we can never skip the root node */
1413     }
1414     level--;
1415     p = pp->block[level];
1416     assert(p->offset <= p->size);
1417     if (p->offset < p->size)
1418     {
1419 #if INT_ENCODE
1420         char file_item_buf[DST_ITEM_MAX];
1421         char *file_item = file_item_buf;
1422         void *c1 = (*b->method->codec.start)();
1423         assert(p->offset > 0); 
1424         src = p->bytes + p->offset;
1425         (*b->method->codec.decode)(c1, &file_item, &src);
1426         (*b->method->codec.stop)(c1);
1427         cmp = (*b->method->compare_item)(untilbuf, file_item_buf);
1428 #else
1429         zint item_len;
1430         assert(p->offset > 0); 
1431         src = p->bytes + p->offset;
1432         decode_item_len(&src, &item_len);
1433 #if ISAMB_DEBUG
1434         (*b->method->codec.log_item)(YLOG_DEBUG, untilbuf, "on_leaf: until");
1435         (*b->method->codec.log_item)(YLOG_DEBUG, src, "on_leaf: value");
1436 #endif
1437         cmp = (*b->method->compare_item)(untilbuf, src);
1438 #endif
1439         if (cmp < pp->scope)
1440         {  /* cmp<2 */
1441 #if ISAMB_DEBUG
1442             yaz_log(YLOG_DEBUG, "isamb_pp_on_right returning true "
1443                             "cmp=%d lev=%d ofs=%d", cmp, level, p->offset);
1444 #endif
1445             return 1; 
1446         }
1447         else
1448         {
1449 #if ISAMB_DEBUG
1450             yaz_log(YLOG_DEBUG, "isamb_pp_on_right returning false "
1451                             "cmp=%d lev=%d ofs=%d", cmp, level, p->offset);
1452 #endif
1453             return 0; 
1454         }
1455     }
1456     else {
1457 #if ISAMB_DEBUG
1458         yaz_log(YLOG_DEBUG, "isamb_pp_on_right at tail, looking higher "
1459                         "lev=%d", level);
1460 #endif
1461         return isamb_pp_on_right_node(pp, level, untilbuf);
1462     }
1463 } /* isamb_pp_on_right_node */
1464
1465 static int isamb_pp_read_on_leaf(ISAMB_PP pp, void *buf)
1466
1467     /* reads the next item on the current leaf, returns 0 if end of leaf*/
1468     struct ISAMB_block *p = pp->block[pp->level];
1469     char *dst;
1470     const char *src;
1471     assert(pp);
1472     assert(buf);
1473     if (p->offset == p->size)
1474     {
1475 #if ISAMB_DEBUG
1476         yaz_log(YLOG_DEBUG, "isamb_pp_read_on_leaf returning 0 on "
1477                 "node %d", p->pos);
1478 #endif
1479         return 0; /* at end of leaf */
1480     }
1481     src = p->bytes + p->offset;
1482     dst = buf;
1483     (*pp->isamb->method->codec.decode)(p->decodeClientData, &dst, &src);
1484     p->offset = src - (char*) p->bytes;
1485 #if ISAMB_DEBUG
1486     (*pp->isamb->method->codec.log_item)(YLOG_DEBUG, buf,
1487                                          "read_on_leaf returning 1");
1488 #endif
1489     pp->returned_numbers++;
1490     return 1;
1491 } /* read_on_leaf */
1492
1493 static int isamb_pp_forward_on_leaf(ISAMB_PP pp, void *buf, const void *untilbuf)
1494 { /* forwards on the current leaf, returns 0 if not found */
1495     int cmp;
1496     int skips = 0;
1497     while (1)
1498     {
1499         if (!isamb_pp_read_on_leaf(pp, buf))
1500             return 0;
1501         /* FIXME - this is an extra function call, inline the read? */
1502         cmp=(*pp->isamb->method->compare_item)(untilbuf, buf);
1503         if (cmp <pp->scope)
1504         {  /* cmp<2  found a good one */
1505 #if ISAMB_DEBUG
1506             if (skips)
1507                 yaz_log(YLOG_DEBUG, "isam_pp_fwd_on_leaf skipped %d items", skips);
1508 #endif
1509             pp->returned_numbers++;
1510             return 1;
1511         }
1512         if (!skips)
1513             if (!isamb_pp_on_right_node(pp, pp->level, untilbuf))
1514                 return 0; /* never mind the rest of this leaf */
1515         pp->skipped_numbers++;
1516         skips++;
1517     }
1518 } /* forward_on_leaf */
1519
1520 static int isamb_pp_climb_level(ISAMB_PP pp, ISAM_P *pos)
1521 { /* climbs higher in the tree, until finds a level with data left */
1522   /* returns the node to (consider to) descend to in *pos) */
1523     struct ISAMB_block *p = pp->block[pp->level];
1524     const char *src;
1525 #if ISAMB_DEBUG
1526     yaz_log(YLOG_DEBUG, "isamb_pp_climb_level starting "
1527                    "at level %d node %d ofs=%d sz=%d",
1528                     pp->level, p->pos, p->offset, p->size);
1529 #endif
1530     assert(pp->level >= 0);
1531     assert(p->offset <= p->size);
1532     if (pp->level==0)
1533     {
1534 #if ISAMB_DEBUG
1535         yaz_log(YLOG_DEBUG, "isamb_pp_climb_level returning 0 at root");
1536 #endif
1537         return 0;
1538     }
1539     assert(pp->level>0); 
1540     close_block(pp->isamb, pp->block[pp->level]);
1541     pp->block[pp->level] = 0;
1542     (pp->level)--;
1543     p = pp->block[pp->level];
1544 #if ISAMB_DEBUG
1545     yaz_log(YLOG_DEBUG, "isamb_pp_climb_level climbed to level %d node %d ofs=%d",
1546                     pp->level, p->pos, p->offset);
1547 #endif
1548     assert(!p->leaf);
1549     assert(p->offset <= p->size);
1550     if (p->offset == p->size)
1551     {
1552         /* we came from the last pointer, climb on */
1553         if (!isamb_pp_climb_level(pp, pos))
1554             return 0;
1555         p = pp->block[pp->level];
1556     }
1557     else
1558     {
1559 #if INT_ENCODE
1560         char file_item_buf[DST_ITEM_MAX];
1561         char *file_item = file_item_buf;
1562         ISAMB b = pp->isamb;
1563         void *c1 = (*b->method->codec.start)();
1564 #else
1565         zint item_len;
1566 #endif
1567         /* skip the child we just came from */
1568 #if ISAMB_DEBUG
1569         yaz_log(YLOG_DEBUG, "isam_pp_climb_level: skipping lev=%d ofs=%d sz=%d", 
1570                         pp->level, p->offset, p->size);
1571 #endif
1572         assert (p->offset < p->size);
1573         src = p->bytes + p->offset;
1574 #if INT_ENCODE
1575         (*b->method->codec.decode)(c1, &file_item, &src);
1576         (*b->method->codec.stop)(c1);
1577 #else
1578         decode_item_len(&src, &item_len);
1579         src += item_len;
1580 #endif
1581         decode_ptr(&src, pos);
1582         p->offset = src - (char *)p->bytes;
1583             
1584     }
1585     return 1;
1586 } /* climb_level */
1587
1588
1589 static zint isamb_pp_forward_unode(ISAMB_PP pp, zint pos, const void *untilbuf)
1590 { /* scans a upper node until it finds a child <= untilbuf */
1591   /* pp points to the key value, as always. pos is the child read from */
1592   /* the buffer */
1593   /* if all values are too small, returns the last child in the node */
1594   /* FIXME - this can be detected, and avoided by looking at the */
1595   /* parent node, but that gets messy. Presumably the cost is */
1596   /* pretty low anyway */
1597     ISAMB b = pp->isamb;
1598     struct ISAMB_block *p = pp->block[pp->level];
1599     const char *src = p->bytes + p->offset;
1600     int cmp;
1601     zint nxtpos;
1602 #if ISAMB_DEBUG
1603     int skips = 0;
1604     yaz_log(YLOG_DEBUG, "isamb_pp_forward_unode starting "
1605                    "at level %d node %d ofs=%di sz=%d",
1606                     pp->level, p->pos, p->offset, p->size);
1607 #endif
1608     assert(!p->leaf);
1609     assert(p->offset <= p->size);
1610     if (p->offset == p->size)
1611     {
1612 #if ISAMB_DEBUG
1613             yaz_log(YLOG_DEBUG, "isamb_pp_forward_unode returning at end "
1614                    "at level %d node %d ofs=%di sz=%d",
1615                     pp->level, p->pos, p->offset, p->size);
1616 #endif
1617         return pos; /* already at the end of it */
1618     }
1619     while(p->offset < p->size)
1620     {
1621 #if INT_ENCODE
1622         char file_item_buf[DST_ITEM_MAX];
1623         char *file_item = file_item_buf;
1624         void *c1 = (*b->method->codec.start)();
1625         (*b->method->codec.decode)(c1, &file_item, &src);
1626         (*b->method->codec.stop)(c1);
1627         cmp = (*b->method->compare_item)(untilbuf, file_item_buf);
1628 #else
1629         zint item_len;
1630         decode_item_len(&src, &item_len);
1631         cmp = (*b->method->compare_item)(untilbuf, src);
1632         src += item_len;
1633 #endif
1634         decode_ptr(&src, &nxtpos);
1635         if (cmp<pp->scope) /* cmp<2 */
1636         {
1637 #if ISAMB_DEBUG
1638             yaz_log(YLOG_DEBUG, "isamb_pp_forward_unode returning a hit "
1639                    "at level %d node %d ofs=%d sz=%d",
1640                     pp->level, p->pos, p->offset, p->size);
1641 #endif
1642             return pos;
1643         } /* found one */
1644         pos = nxtpos;
1645         p->offset = src-(char*)p->bytes;
1646         (pp->skipped_nodes[pp->maxlevel - pp->level -1])++;
1647 #if ISAMB_DEBUG
1648         skips++;
1649 #endif
1650     }
1651 #if ISAMB_DEBUG
1652             yaz_log(YLOG_DEBUG, "isamb_pp_forward_unode returning at tail "
1653                    "at level %d node %d ofs=%d sz=%d skips=%d",
1654                     pp->level, p->pos, p->offset, p->size, skips);
1655 #endif
1656     return pos; /* that's the last one in the line */
1657     
1658 } /* forward_unode */
1659
1660 static void isamb_pp_descend_to_leaf(ISAMB_PP pp, ISAM_P pos,
1661                                      const void *untilbuf)
1662 { /* climbs down the tree, from pos, to the leftmost leaf */
1663     struct ISAMB_block *p = pp->block[pp->level];
1664     const char *src;
1665     assert(!p->leaf);
1666 #if ISAMB_DEBUG
1667     yaz_log(YLOG_DEBUG, "isamb_pp_descend_to_leaf "
1668                    "starting at lev %d node %d ofs=%d lf=%d u=%p", 
1669                    pp->level, p->pos, p->offset, p->leaf, untilbuf);
1670 #endif
1671     if (untilbuf)
1672         pos = isamb_pp_forward_unode(pp, pos, untilbuf);
1673     ++(pp->level);
1674     assert(pos);
1675     p = open_block(pp->isamb, pos);
1676     pp->block[pp->level] = p;
1677     ++(pp->accessed_nodes[pp->maxlevel-pp->level]);
1678     ++(pp->no_blocks);
1679 #if ISAMB_DEBUG
1680     yaz_log(YLOG_DEBUG, "isamb_pp_descend_to_leaf "
1681                    "got lev %d node %d lf=%d", 
1682                    pp->level, p->pos, p->leaf);
1683 #endif
1684     if (p->leaf)
1685         return;
1686     assert (p->offset==0);
1687     src = p->bytes + p->offset;
1688     decode_ptr(&src, &pos);
1689     p->offset = src-(char*)p->bytes;
1690     isamb_pp_descend_to_leaf(pp, pos, untilbuf);
1691 #if ISAMB_DEBUG
1692     yaz_log(YLOG_DEBUG, "isamb_pp_descend_to_leaf "
1693                    "returning at lev %d node %d ofs=%d lf=%d", 
1694                    pp->level, p->pos, p->offset, p->leaf);
1695 #endif
1696 } /* descend_to_leaf */
1697
1698 static int isamb_pp_find_next_leaf(ISAMB_PP pp)
1699 { /* finds the next leaf by climbing up and down */
1700     ISAM_P pos;
1701     if (!isamb_pp_climb_level(pp, &pos))
1702         return 0;
1703     isamb_pp_descend_to_leaf(pp, pos, 0);
1704     return 1;
1705 }
1706
1707 static int isamb_pp_climb_desc(ISAMB_PP pp,  const void *untilbuf)
1708 { /* climbs up and descends to a leaf where values >= *untilbuf are found */
1709     ISAM_P pos;
1710 #if ISAMB_DEBUG
1711     struct ISAMB_block *p = pp->block[pp->level];
1712     yaz_log(YLOG_DEBUG, "isamb_pp_climb_desc starting "
1713                    "at level %d node %d ofs=%d sz=%d",
1714                     pp->level, p->pos, p->offset, p->size);
1715 #endif
1716     if (!isamb_pp_climb_level(pp, &pos))
1717         return 0;
1718     /* see if it would pay to climb one higher */
1719     if (!isamb_pp_on_right_node(pp, pp->level, untilbuf))
1720         if (!isamb_pp_climb_level(pp, &pos))
1721             return 0;
1722     isamb_pp_descend_to_leaf(pp, pos, untilbuf);
1723 #if ISAMB_DEBUG
1724     p = pp->block[pp->level];
1725     yaz_log(YLOG_DEBUG, "isamb_pp_climb_desc done "
1726                    "at level %d node %d ofs=%d sz=%d",
1727                     pp->level, p->pos, p->offset, p->size);
1728 #endif
1729     return 1;
1730 } /* climb_desc */
1731
1732 int isamb_pp_forward (ISAMB_PP pp, void *buf, const void *untilbuf)
1733 {
1734 #if ISAMB_DEBUG
1735     struct ISAMB_block *p = pp->block[pp->level];
1736     assert(p->leaf);
1737     yaz_log(YLOG_DEBUG, "isamb_pp_forward starting "
1738                    "at level %d node %d ofs=%d sz=%d u=%p sc=%d",
1739                     pp->level, p->pos, p->offset, p->size, untilbuf, scope);
1740 #endif
1741     if (untilbuf)
1742     {
1743         if (isamb_pp_forward_on_leaf(pp, buf, untilbuf))
1744         {
1745 #if ISAMB_DEBUG
1746             yaz_log(YLOG_DEBUG, "isamb_pp_forward (f) returning (A) "
1747                    "at level %d node %d ofs=%d sz=%d",
1748                     pp->level, p->pos, p->offset, p->size);
1749 #endif
1750             return 1;
1751         }
1752         if (! isamb_pp_climb_desc(pp, untilbuf))
1753         {
1754 #if ISAMB_DEBUG
1755             yaz_log(YLOG_DEBUG, "isamb_pp_forward (f) returning notfound (B) "
1756                    "at level %d node %d ofs=%d sz=%d",
1757                     pp->level, p->pos, p->offset, p->size);
1758 #endif
1759             return 0; /* could not find a leaf */
1760         }
1761         do {
1762             if (isamb_pp_forward_on_leaf(pp, buf, untilbuf))
1763             {
1764 #if ISAMB_DEBUG
1765                 yaz_log(YLOG_DEBUG, "isamb_pp_forward (f) returning (c) "
1766                         "at level %d node %d ofs=%d sz=%d",
1767                         pp->level, p->pos, p->offset, p->size);
1768 #endif
1769                 return 1;
1770             }
1771         } while (isamb_pp_find_next_leaf(pp));
1772         return 0; /* could not find at all */
1773     }
1774     else { /* no untilbuf, a straight read */
1775         /* FIXME - this should be moved
1776          * directly into the pp_read */
1777         /* keeping here now, to keep same
1778          * interface as the old fwd */
1779         if (isamb_pp_read_on_leaf(pp, buf))
1780         {
1781 #if ISAMB_DEBUG
1782             yaz_log(YLOG_DEBUG, "isamb_pp_forward (read) returning (D) "
1783                    "at level %d node %d ofs=%d sz=%d",
1784                     pp->level, p->pos, p->offset, p->size);
1785 #endif
1786             return 1;
1787         }
1788         if (isamb_pp_find_next_leaf(pp))
1789         {
1790 #if ISAMB_DEBUG
1791             yaz_log(YLOG_DEBUG, "isamb_pp_forward (read) returning (E) "
1792                    "at level %d node %d ofs=%d sz=%d",
1793                     pp->level, p->pos, p->offset, p->size);
1794 #endif
1795             return isamb_pp_read_on_leaf(pp, buf);
1796         }
1797         else
1798             return 0;
1799     }
1800 } /* isam_pp_forward (new version) */
1801
1802 void isamb_pp_pos(ISAMB_PP pp, double *current, double *total)
1803 { /* return an estimate of the current position and of the total number of */
1804   /* occureences in the isam tree, based on the current leaf */
1805     assert(total);
1806     assert(current);
1807     
1808     /* if end-of-stream PP may not be leaf */
1809
1810     *total = (double) (pp->block[0]->no_items);
1811     *current = (double) pp->returned_numbers;
1812 #if ISAMB_DEBUG
1813     yaz_log(YLOG_LOG, "isamb_pp_pos returning: cur= %0.1f tot=%0.1f rn="
1814          ZINT_FORMAT, *current, *total, pp->returned_numbers);
1815 #endif
1816 }
1817
1818 int isamb_pp_forward2(ISAMB_PP pp, void *buf, const void *untilb)
1819 {
1820     char *dst = buf;
1821     const char *src;
1822     struct ISAMB_block *p = pp->block[pp->level];
1823     ISAMB b = pp->isamb;
1824     if (!p)
1825         return 0;
1826 again:
1827     while (p->offset == p->size)
1828     {
1829         ISAM_P pos;
1830 #if INT_ENCODE
1831         const char *src_0;
1832         void *c1;
1833         char file_item_buf[DST_ITEM_MAX];
1834         char *file_item = file_item_buf;
1835 #else
1836         zint item_len;
1837 #endif
1838         while (p->offset == p->size)
1839         {
1840             if (pp->level == 0)
1841                 return 0;
1842             close_block (pp->isamb, pp->block[pp->level]);
1843             pp->block[pp->level] = 0;
1844             (pp->level)--;
1845             p = pp->block[pp->level];
1846             assert (!p->leaf);  
1847         }
1848
1849         assert(!p->leaf);
1850         src = p->bytes + p->offset;
1851        
1852 #if INT_ENCODE
1853         c1 = (*b->method->codec.start)();
1854         (*b->method->codec.decode)(c1, &file_item, &src);
1855 #else
1856         decode_ptr (&src, &item_len);
1857         src += item_len;
1858 #endif        
1859         decode_ptr (&src, &pos);
1860         p->offset = src - (char*) p->bytes;
1861
1862         src = p->bytes + p->offset;
1863
1864         while(1)
1865         {
1866             if (!untilb || p->offset == p->size)
1867                 break;
1868             assert(p->offset < p->size);
1869 #if INT_ENCODE
1870             src_0 = src;
1871             file_item = file_item_buf;
1872             (*b->method->codec.reset)(c1);
1873             (*b->method->codec.decode)(c1, &file_item, &src);
1874             if ((*b->method->compare_item)(untilb, file_item_buf) <= 1)
1875             {
1876                 src = src_0;
1877                 break;
1878             }
1879 #else
1880             decode_item_len(&src, &item_len);
1881             if ((*b->method->compare_item)(untilb, src) <= 1)
1882                 break;
1883             src += item_len;
1884 #endif
1885             decode_ptr (&src, &pos);
1886             p->offset = src - (char*) p->bytes;
1887         }
1888
1889         pp->level++;
1890
1891         while (1)
1892         {
1893             pp->block[pp->level] = p = open_block (pp->isamb, pos);
1894
1895             pp->total_size += p->size;
1896             pp->no_blocks++;
1897             
1898             if (p->leaf) 
1899             {
1900                 break;
1901             }
1902             
1903             src = p->bytes + p->offset;
1904             while(1)
1905             {
1906                 decode_ptr (&src, &pos);
1907                 p->offset = src - (char*) p->bytes;
1908                 
1909                 if (!untilb || p->offset == p->size)
1910                     break;
1911                 assert(p->offset < p->size);
1912 #if INT_ENCODE
1913                 src_0 = src;
1914                 file_item = file_item_buf;
1915                 (*b->method->codec.reset)(c1);
1916                 (*b->method->codec.decode)(c1, &file_item, &src);
1917                 if ((*b->method->compare_item)(untilb, file_item_buf) <= 1)
1918                 {
1919                     src = src_0;
1920                     break;
1921                 }
1922 #else
1923                 decode_ptr (&src, &item_len);
1924                 if ((*b->method->compare_item)(untilb, src) <= 1)
1925                     break;
1926                 src += item_len;
1927 #endif
1928             }
1929             pp->level++;
1930         }
1931 #if INT_ENCODE
1932         (*b->method->codec.stop)(c1);
1933 #endif
1934     }
1935     assert (p->offset < p->size);
1936     assert (p->leaf);
1937     while(1)
1938     {
1939         char *dst0 = dst;
1940         src = p->bytes + p->offset;
1941         (*pp->isamb->method->codec.decode)(p->decodeClientData, &dst, &src);
1942         p->offset = src - (char*) p->bytes;
1943         if (!untilb || (*pp->isamb->method->compare_item)(untilb, dst0) <= 1)
1944             break;
1945         dst = dst0;
1946         if (p->offset == p->size) goto again;
1947     }
1948     return 1;
1949 }