Many public functions returns ZEBRA_RES rather than int to avoid
[idzebra-moved-to-github.git] / isamb / isamb.c
1 /* $Id: isamb.c,v 1.77 2005-04-15 10:47:49 adam Exp $
2    Copyright (C) 1995-2005
3    Index Data ApS
4
5 This file is part of the Zebra server.
6
7 Zebra is free software; you can redistribute it and/or modify it under
8 the terms of the GNU General Public License as published by the Free
9 Software Foundation; either version 2, or (at your option) any later
10 version.
11
12 Zebra is distributed in the hope that it will be useful, but WITHOUT ANY
13 WARRANTY; without even the implied warranty of MERCHANTABILITY or
14 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
15 for more details.
16
17 You should have received a copy of the GNU General Public License
18 along with Zebra; see the file LICENSE.zebra.  If not, write to the
19 Free Software Foundation, 59 Temple Place - Suite 330, Boston, MA
20 02111-1307, USA.
21 */
22
23 #include <stdlib.h>
24 #include <string.h>
25 #include <yaz/log.h>
26 #include <yaz/xmalloc.h>
27 #include <idzebra/isamb.h>
28 #include <assert.h>
29
30 #ifndef ISAMB_DEBUG
31 #define ISAMB_DEBUG 0
32 #endif
33
34
35 #define ISAMB_MAJOR_VERSION 3
36 #define ISAMB_MINOR_VERSION 0
37
38 struct ISAMB_head {
39     zint first_block;
40     zint last_block;
41     zint free_list;
42     zint no_items;
43     int block_size;
44     int block_max;
45     int block_offset;
46 };
47
48 /* if 1, upper nodes items are encoded; 0 if not encoded */
49 #define INT_ENCODE 1
50
51 /* maximum size of encoded buffer */
52 #define DST_ITEM_MAX 256
53
54 #define ISAMB_MAX_LEVEL 10
55 /* approx 2*max page + max size of item */
56 #define DST_BUF_SIZE 16840
57
58 #define ISAMB_CACHE_ENTRY_SIZE 4096
59
60 /* CAT_MAX: _must_ be power of 2 */
61 #define CAT_MAX 4
62 #define CAT_MASK (CAT_MAX-1)
63 /* CAT_NO: <= CAT_MAX */
64 #define CAT_NO 4
65
66 /* Smallest block size */
67 #define ISAMB_MIN_SIZE 32
68 /* Size factor */
69 #define ISAMB_FAC_SIZE 4
70
71 /* ISAMB_PTR_CODEC = 1 var, =0 fixed */
72 #define ISAMB_PTR_CODEC 1
73
74 struct ISAMB_cache_entry {
75     ISAM_P pos;
76     unsigned char *buf;
77     int dirty;
78     int hits;
79     struct ISAMB_cache_entry *next;
80 };
81
82 struct ISAMB_file {
83     BFile bf;
84     int head_dirty;
85     struct ISAMB_head head;
86     struct ISAMB_cache_entry *cache_entries;
87 };
88
89 struct ISAMB_s {
90     BFiles bfs;
91     ISAMC_M *method;
92
93     struct ISAMB_file *file;
94     int no_cat;
95     int cache; /* 0 = no cache, 1 = use cache, -1 = dummy isam (for testing only) */
96     int log_io;        /* log level for bf_read/bf_write calls */
97     int log_freelist;  /* log level for freelist handling */
98     zint skipped_numbers; /* on a leaf node */
99     zint returned_numbers; 
100     zint skipped_nodes[ISAMB_MAX_LEVEL]; /* [0]=skipped leaves, 1 = higher etc */
101     zint accessed_nodes[ISAMB_MAX_LEVEL]; /* nodes we did not skip */
102 };
103
104 struct ISAMB_block {
105     ISAM_P pos;
106     int cat;
107     int size;
108     int leaf;
109     int dirty;
110     int deleted;
111     int offset;
112     zint no_items;  /* number of nodes in this + children */
113     char *bytes;
114     char *cbuf;
115     unsigned char *buf;
116     void *decodeClientData;
117     int log_rw;
118 };
119
120 struct ISAMB_PP_s {
121     ISAMB isamb;
122     ISAM_P pos;
123     int level;
124     int maxlevel; /* total depth */
125     zint total_size;
126     zint no_blocks;
127     zint skipped_numbers; /* on a leaf node */
128     zint returned_numbers; 
129     zint skipped_nodes[ISAMB_MAX_LEVEL]; /* [0]=skipped leaves, 1 = higher etc */
130     zint accessed_nodes[ISAMB_MAX_LEVEL]; /* nodes we did not skip */
131     struct ISAMB_block **block;
132     int scope;  /* on what level we forward */
133 };
134
135
136 #define encode_item_len encode_ptr
137 #if ISAMB_PTR_CODEC
138 static void encode_ptr(char **dst, zint pos)
139 {
140     unsigned char *bp = (unsigned char*) *dst;
141
142     while (pos > 127)
143     {
144          *bp++ = (unsigned char) (128 | (pos & 127));
145          pos = pos >> 7;
146     }
147     *bp++ = (unsigned char) pos;
148     *dst = (char *) bp;
149 }
150 #else
151 static void encode_ptr(char **dst, zint pos)
152 {
153     memcpy(*dst, &pos, sizeof(pos));
154     (*dst) += sizeof(pos);
155 }
156 #endif
157
158 #define decode_item_len decode_ptr
159 #if ISAMB_PTR_CODEC
160 static void decode_ptr(const char **src1, zint *pos)
161 {
162     const unsigned char **src = (const unsigned char **) src1;
163     zint d = 0;
164     unsigned char c;
165     unsigned r = 0;
166
167     while (((c = *(*src)++) & 128))
168     {
169         d += ((zint) (c & 127) << r);
170         r += 7;
171     }
172     d += ((zint) c << r);
173     *pos = d;
174 }
175 #else
176 static void decode_ptr(const char **src, zint *pos)
177 {
178      memcpy(pos, *src, sizeof(*pos));
179      (*src) += sizeof(*pos);
180 }
181 #endif
182
183 ISAMB isamb_open(BFiles bfs, const char *name, int writeflag, ISAMC_M *method,
184                  int cache)
185 {
186     ISAMB isamb = xmalloc(sizeof(*isamb));
187     int i, b_size = ISAMB_MIN_SIZE;
188
189     isamb->bfs = bfs;
190     isamb->method = (ISAMC_M *) xmalloc(sizeof(*method));
191     memcpy(isamb->method, method, sizeof(*method));
192     isamb->no_cat = CAT_NO;
193     isamb->log_io = 0;
194     isamb->log_freelist = 0;
195     isamb->cache = cache;
196     isamb->skipped_numbers = 0;
197     isamb->returned_numbers = 0;
198     for (i = 0; i<ISAMB_MAX_LEVEL; i++)
199         isamb->skipped_nodes[i] = isamb->accessed_nodes[i] = 0;
200
201     assert(cache == 0);
202     isamb->file = xmalloc(sizeof(*isamb->file) * isamb->no_cat);
203     for (i = 0; i < isamb->no_cat; i++)
204     {
205         char fname[DST_BUF_SIZE];
206         char hbuf[DST_BUF_SIZE];
207         isamb->file[i].cache_entries = 0;
208         isamb->file[i].head_dirty = 0;
209         sprintf(fname, "%s%c", name, i+'A');
210         if (cache)
211             isamb->file[i].bf = bf_open(bfs, fname, ISAMB_CACHE_ENTRY_SIZE,
212                                          writeflag);
213         else
214             isamb->file[i].bf = bf_open(bfs, fname, b_size, writeflag);
215
216         /* fill-in default values (for empty isamb) */
217         isamb->file[i].head.first_block = ISAMB_CACHE_ENTRY_SIZE/b_size+1;
218         isamb->file[i].head.last_block = isamb->file[i].head.first_block;
219         isamb->file[i].head.block_size = b_size;
220 #if ISAMB_PTR_CODEC
221         if (i == isamb->no_cat-1 || b_size > 128)
222             isamb->file[i].head.block_offset = 8;
223         else
224             isamb->file[i].head.block_offset = 4;
225 #else
226         isamb->file[i].head.block_offset = 11;
227 #endif
228         isamb->file[i].head.block_max =
229             b_size - isamb->file[i].head.block_offset;
230         isamb->file[i].head.free_list = 0;
231         if (bf_read(isamb->file[i].bf, 0, 0, 0, hbuf))
232         {
233             /* got header assume "isamb"major minor len can fit in 16 bytes */
234             zint zint_tmp;
235             int major, minor, len, pos = 0;
236             int left;
237             const char *src = 0;
238             if (memcmp(hbuf, "isamb", 5))
239             {
240                 yaz_log(YLOG_WARN, "bad isamb header for file %s", fname);
241                 return 0;
242             }
243             if (sscanf(hbuf+5, "%d %d %d", &major, &minor, &len) != 3)
244             {
245                 yaz_log(YLOG_WARN, "bad isamb header for file %s", fname);
246                 return 0;
247             }
248             if (major != ISAMB_MAJOR_VERSION)
249             {
250                 yaz_log(YLOG_WARN, "bad major version for file %s %d, must be %d",
251                      fname, major, ISAMB_MAJOR_VERSION);
252                 return 0;
253             }
254             for (left = len - b_size; left > 0; left = left - b_size)
255             {
256                 pos++;
257                 if (!bf_read(isamb->file[i].bf, pos, 0, 0, hbuf + pos*b_size))
258                 {
259                     yaz_log(YLOG_WARN, "truncated isamb header for " 
260                          "file=%s len=%d pos=%d",
261                          fname, len, pos);
262                     return 0;
263                 }
264             }
265             src = hbuf + 16;
266             decode_ptr(&src, &isamb->file[i].head.first_block);
267             decode_ptr(&src, &isamb->file[i].head.last_block);
268             decode_ptr(&src, &zint_tmp);
269             isamb->file[i].head.block_size = (int) zint_tmp;
270             decode_ptr(&src, &zint_tmp);
271             isamb->file[i].head.block_max = (int) zint_tmp;
272             decode_ptr(&src, &isamb->file[i].head.free_list);
273         }
274         assert (isamb->file[i].head.block_size >= isamb->file[i].head.block_offset);
275         isamb->file[i].head_dirty = 0;
276         assert(isamb->file[i].head.block_size == b_size);
277         b_size = b_size * ISAMB_FAC_SIZE;
278     }
279 #if ISAMB_DEBUG
280     yaz_log(YLOG_WARN, "isamb debug enabled. Things will be slower than usual");
281 #endif
282     return isamb;
283 }
284
285 static void flush_blocks (ISAMB b, int cat)
286 {
287     while (b->file[cat].cache_entries)
288     {
289         struct ISAMB_cache_entry *ce_this = b->file[cat].cache_entries;
290         b->file[cat].cache_entries = ce_this->next;
291
292         if (ce_this->dirty)
293         {
294             yaz_log(b->log_io, "bf_write: flush_blocks");
295             bf_write(b->file[cat].bf, ce_this->pos, 0, 0, ce_this->buf);
296         }
297         xfree(ce_this->buf);
298         xfree(ce_this);
299     }
300 }
301
302 static int cache_block (ISAMB b, ISAM_P pos, char *userbuf, int wr)
303 {
304     int cat = (int) (pos&CAT_MASK);
305     int off = (int) (((pos/CAT_MAX) & 
306                (ISAMB_CACHE_ENTRY_SIZE / b->file[cat].head.block_size - 1))
307         * b->file[cat].head.block_size);
308     zint norm = pos / (CAT_MASK*ISAMB_CACHE_ENTRY_SIZE / b->file[cat].head.block_size);
309     int no = 0;
310     struct ISAMB_cache_entry **ce, *ce_this = 0, **ce_last = 0;
311
312     if (!b->cache)
313         return 0;
314
315     assert (ISAMB_CACHE_ENTRY_SIZE >= b->file[cat].head.block_size);
316     for (ce = &b->file[cat].cache_entries; *ce; ce = &(*ce)->next, no++)
317     {
318         ce_last = ce;
319         if ((*ce)->pos == norm)
320         {
321             ce_this = *ce;
322             *ce = (*ce)->next;   /* remove from list */
323             
324             ce_this->next = b->file[cat].cache_entries;  /* move to front */
325             b->file[cat].cache_entries = ce_this;
326             
327             if (wr)
328             {
329                 memcpy (ce_this->buf + off, userbuf, 
330                         b->file[cat].head.block_size);
331                 ce_this->dirty = 1;
332             }
333             else
334                 memcpy (userbuf, ce_this->buf + off,
335                         b->file[cat].head.block_size);
336             return 1;
337         }
338     }
339     if (no >= 40)
340     {
341         assert (no == 40);
342         assert (ce_last && *ce_last);
343         ce_this = *ce_last;
344         *ce_last = 0;  /* remove the last entry from list */
345         if (ce_this->dirty)
346         {
347             yaz_log(b->log_io, "bf_write: cache_block");
348             bf_write(b->file[cat].bf, ce_this->pos, 0, 0, ce_this->buf);
349         }
350         xfree(ce_this->buf);
351         xfree(ce_this);
352     }
353     ce_this = xmalloc(sizeof(*ce_this));
354     ce_this->next = b->file[cat].cache_entries;
355     b->file[cat].cache_entries = ce_this;
356     ce_this->buf = xmalloc(ISAMB_CACHE_ENTRY_SIZE);
357     ce_this->pos = norm;
358     yaz_log(b->log_io, "bf_read: cache_block");
359     if (!bf_read(b->file[cat].bf, norm, 0, 0, ce_this->buf))
360         memset (ce_this->buf, 0, ISAMB_CACHE_ENTRY_SIZE);
361     if (wr)
362     {
363         memcpy (ce_this->buf + off, userbuf, b->file[cat].head.block_size);
364         ce_this->dirty = 1;
365     }
366     else
367     {
368         ce_this->dirty = 0;
369         memcpy (userbuf, ce_this->buf + off, b->file[cat].head.block_size);
370     }
371     return 1;
372 }
373
374
375 void isamb_close (ISAMB isamb)
376 {
377     int i;
378     for (i = 0; isamb->accessed_nodes[i]; i++)
379         yaz_log(YLOG_DEBUG, "isamb_close  level leaf-%d: "ZINT_FORMAT" read, "
380                         ZINT_FORMAT" skipped",
381              i, isamb->accessed_nodes[i], isamb->skipped_nodes[i]);
382     yaz_log(YLOG_DEBUG, "isamb_close returned "ZINT_FORMAT" values, "
383                    "skipped "ZINT_FORMAT,
384          isamb->skipped_numbers, isamb->returned_numbers);
385     for (i = 0; i<isamb->no_cat; i++)
386     {
387         flush_blocks (isamb, i);
388         if (isamb->file[i].head_dirty)
389         {
390             char hbuf[DST_BUF_SIZE];
391             int major = ISAMB_MAJOR_VERSION;
392             int minor = ISAMB_MINOR_VERSION;
393             int len = 16;
394             char *dst = hbuf + 16;
395             int pos = 0, left;
396             int b_size = isamb->file[i].head.block_size;
397
398             encode_ptr(&dst, isamb->file[i].head.first_block);
399             encode_ptr(&dst, isamb->file[i].head.last_block);
400             encode_ptr(&dst, isamb->file[i].head.block_size);
401             encode_ptr(&dst, isamb->file[i].head.block_max);
402             encode_ptr(&dst, isamb->file[i].head.free_list);
403             memset(dst, '\0', b_size); /* ensure no random bytes are written */
404
405             len = dst - hbuf;
406
407             /* print exactly 16 bytes (including trailing 0) */
408             sprintf(hbuf, "isamb%02d %02d %02d\r\n", major, minor, len);
409
410             bf_write(isamb->file[i].bf, pos, 0, 0, hbuf);
411
412             for (left = len - b_size; left > 0; left = left - b_size)
413             {
414                 pos++;
415                 bf_write(isamb->file[i].bf, pos, 0, 0, hbuf + pos*b_size);
416             }
417         }
418         bf_close (isamb->file[i].bf);
419     }
420     xfree(isamb->file);
421     xfree(isamb->method);
422     xfree(isamb);
423 }
424
425 /* open_block: read one block at pos.
426    Decode leading sys bytes .. consisting of
427    Offset:Meaning
428    0: leader byte, != 0 leaf, == 0, non-leaf
429    1-2: used size of block
430    3-7*: number of items and all children
431    
432    * Reserve 5 bytes for large block sizes. 1 for small ones .. Number
433    of items. We can thus have at most 2^40 nodes. 
434 */
435 static struct ISAMB_block *open_block(ISAMB b, ISAM_P pos)
436 {
437     int cat = (int) (pos&CAT_MASK);
438     const char *src;
439     int offset = b->file[cat].head.block_offset;
440     struct ISAMB_block *p;
441     if (!pos)
442         return 0;
443     p = xmalloc(sizeof(*p));
444     p->pos = pos;
445     p->cat = (int) (pos & CAT_MASK);
446     p->buf = xmalloc(b->file[cat].head.block_size);
447     p->cbuf = 0;
448
449     if (!cache_block (b, pos, p->buf, 0))
450     {
451         yaz_log(b->log_io, "bf_read: open_block");
452         if (!bf_read(b->file[cat].bf, pos/CAT_MAX, 0, 0, p->buf))
453         {
454             yaz_log(YLOG_FATAL, "isamb: read fail for pos=%ld block=%ld",
455                      (long) pos, (long) pos/CAT_MAX);
456             abort();
457         }
458     }
459     p->bytes = p->buf + offset;
460     p->leaf = p->buf[0];
461     p->size = (p->buf[1] + 256 * p->buf[2]) - offset;
462     if (p->size < 0)
463     {
464         yaz_log(YLOG_FATAL, "Bad block size %d in pos=" ZINT_FORMAT "\n",
465                  p->size, pos);
466     }
467     assert (p->size >= 0);
468     src = p->buf + 3;
469     decode_ptr(&src, &p->no_items);
470
471     p->offset = 0;
472     p->dirty = 0;
473     p->deleted = 0;
474     p->decodeClientData = (*b->method->codec.start)();
475     return p;
476 }
477
478 struct ISAMB_block *new_block (ISAMB b, int leaf, int cat)
479 {
480     struct ISAMB_block *p;
481
482     p = xmalloc(sizeof(*p));
483     p->buf = xmalloc(b->file[cat].head.block_size);
484
485     if (!b->file[cat].head.free_list)
486     {
487         zint block_no;
488         block_no = b->file[cat].head.last_block++;
489         p->pos = block_no * CAT_MAX + cat;
490     }
491     else
492     {
493         p->pos = b->file[cat].head.free_list;
494         assert((p->pos & CAT_MASK) == cat);
495         if (!cache_block (b, p->pos, p->buf, 0))
496         {
497             yaz_log(b->log_io, "bf_read: new_block");
498             if (!bf_read(b->file[cat].bf, p->pos/CAT_MAX, 0, 0, p->buf))
499             {
500                 yaz_log(YLOG_FATAL, "isamb: read fail for pos=%ld block=%ld",
501                          (long) p->pos/CAT_MAX, (long) p->pos/CAT_MAX);
502                 abort ();
503             }
504         }
505         yaz_log(b->log_freelist, "got block " ZINT_FORMAT " from freelist %d:" ZINT_FORMAT, p->pos,
506                  cat, p->pos/CAT_MAX);
507         memcpy (&b->file[cat].head.free_list, p->buf, sizeof(zint));
508     }
509     p->cat = cat;
510     b->file[cat].head_dirty = 1;
511     memset (p->buf, 0, b->file[cat].head.block_size);
512     p->bytes = p->buf + b->file[cat].head.block_offset;
513     p->leaf = leaf;
514     p->size = 0;
515     p->dirty = 1;
516     p->deleted = 0;
517     p->offset = 0;
518     p->no_items = 0;
519     p->decodeClientData = (*b->method->codec.start)();
520     return p;
521 }
522
523 struct ISAMB_block *new_leaf (ISAMB b, int cat)
524 {
525     return new_block (b, 1, cat);
526 }
527
528
529 struct ISAMB_block *new_int (ISAMB b, int cat)
530 {
531     return new_block (b, 0, cat);
532 }
533
534 static void check_block (ISAMB b, struct ISAMB_block *p)
535 {
536     assert(b); /* mostly to make the compiler shut up about unused b */
537     if (p->leaf)
538     {
539         ;
540     }
541     else
542     {
543         /* sanity check */
544         char *startp = p->bytes;
545         const char *src = startp;
546         char *endp = p->bytes + p->size;
547         ISAM_P pos;
548         void *c1 = (*b->method->codec.start)();
549             
550         decode_ptr(&src, &pos);
551         assert ((pos&CAT_MASK) == p->cat);
552         while (src != endp)
553         {
554 #if INT_ENCODE
555             char file_item_buf[DST_ITEM_MAX];
556             char *file_item = file_item_buf;
557             (*b->method->codec.reset)(c1);
558             (*b->method->codec.decode)(c1, &file_item, &src);
559 #else
560             zint item_len;
561             decode_item_len(&src, &item_len);
562             assert (item_len > 0 && item_len < 80);
563             src += item_len;
564 #endif
565             decode_ptr(&src, &pos);
566             if ((pos&CAT_MASK) != p->cat)
567             {
568                 assert ((pos&CAT_MASK) == p->cat);
569             }
570         }
571         (*b->method->codec.stop)(c1);
572     }
573 }
574
575 void close_block(ISAMB b, struct ISAMB_block *p)
576 {
577     if (!p)
578         return;
579     if (p->deleted)
580     {
581         yaz_log(b->log_freelist, "release block " ZINT_FORMAT " from freelist %d:" ZINT_FORMAT,
582                  p->pos, p->cat, p->pos/CAT_MAX);
583         memcpy (p->buf, &b->file[p->cat].head.free_list, sizeof(zint));
584         b->file[p->cat].head.free_list = p->pos;
585         if (!cache_block (b, p->pos, p->buf, 1))
586         {
587             yaz_log(b->log_io, "bf_write: close_block (deleted)");
588             bf_write(b->file[p->cat].bf, p->pos/CAT_MAX, 0, 0, p->buf);
589         }
590     }
591     else if (p->dirty)
592     {
593         int offset = b->file[p->cat].head.block_offset;
594         int size = p->size + offset;
595         char *dst =  p->buf + 3;
596         assert (p->size >= 0);
597         
598         /* memset becuase encode_ptr usually does not write all bytes */
599         memset(p->buf, 0, b->file[p->cat].head.block_offset);
600         p->buf[0] = p->leaf;
601         p->buf[1] = size & 255;
602         p->buf[2] = size >> 8;
603         encode_ptr(&dst, p->no_items);
604         check_block(b, p);
605         if (!cache_block (b, p->pos, p->buf, 1))
606         {
607             yaz_log(b->log_io, "bf_write: close_block");
608             bf_write(b->file[p->cat].bf, p->pos/CAT_MAX, 0, 0, p->buf);
609         }
610     }
611     (*b->method->codec.stop)(p->decodeClientData);
612     xfree(p->buf);
613     xfree(p);
614 }
615
616 int insert_sub (ISAMB b, struct ISAMB_block **p,
617                 void *new_item, int *mode,
618                 ISAMC_I *stream,
619                 struct ISAMB_block **sp,
620                 void *sub_item, int *sub_size,
621                 const void *max_item);
622
623 int insert_int (ISAMB b, struct ISAMB_block *p, void *lookahead_item,
624                 int *mode,
625                 ISAMC_I *stream, struct ISAMB_block **sp,
626                 void *split_item, int *split_size, const void *last_max_item)
627 {
628     char *startp = p->bytes;
629     const char *src = startp;
630     char *endp = p->bytes + p->size;
631     ISAM_P pos;
632     struct ISAMB_block *sub_p1 = 0, *sub_p2 = 0;
633     char sub_item[DST_ITEM_MAX];
634     int sub_size;
635     int more = 0;
636     zint diff_terms = 0;
637     void *c1 = (*b->method->codec.start)();
638
639     *sp = 0;
640
641     assert(p->size >= 0);
642     decode_ptr(&src, &pos);
643     while (src != endp)
644     {
645         int d;
646         const char *src0 = src;
647 #if INT_ENCODE
648         char file_item_buf[DST_ITEM_MAX];
649         char *file_item = file_item_buf;
650         (*b->method->codec.reset)(c1);
651         (*b->method->codec.decode)(c1, &file_item, &src);
652         d = (*b->method->compare_item)(file_item_buf, lookahead_item);
653         if (d > 0)
654         {
655             sub_p1 = open_block(b, pos);
656             assert (sub_p1);
657             diff_terms -= sub_p1->no_items;
658             more = insert_sub (b, &sub_p1, lookahead_item, mode,
659                                stream, &sub_p2, 
660                                sub_item, &sub_size, file_item_buf);
661             diff_terms += sub_p1->no_items;
662             src = src0;
663             break;
664         }
665 #else
666         zint item_len;
667         decode_item_len(&src, &item_len);
668         d = (*b->method->compare_item)(src, lookahead_item);
669         if (d > 0)
670         {
671             sub_p1 = open_block(b, pos);
672             assert (sub_p1);
673             diff_terms -= sub_p1->no_items;
674             more = insert_sub (b, &sub_p1, lookahead_item, mode,
675                                stream, &sub_p2, 
676                                sub_item, &sub_size, src);
677             diff_terms += sub_p1->no_items;
678             src = src0;
679             break;
680         }
681         src += item_len;
682 #endif
683         decode_ptr(&src, &pos);
684     }
685     if (!sub_p1)
686     {
687         /* we reached the end. So lookahead > last item */
688         sub_p1 = open_block(b, pos);
689         assert (sub_p1);
690         diff_terms -= sub_p1->no_items;
691         more = insert_sub (b, &sub_p1, lookahead_item, mode, stream, &sub_p2, 
692                            sub_item, &sub_size, last_max_item);
693         diff_terms += sub_p1->no_items;
694     }
695     if (sub_p2)
696         diff_terms += sub_p2->no_items;
697     if (diff_terms)
698     {
699         p->dirty = 1;
700         p->no_items += diff_terms;
701     }
702     if (sub_p2)
703     {
704         /* there was a split - must insert pointer in this one */
705         char dst_buf[DST_BUF_SIZE];
706         char *dst = dst_buf;
707 #if INT_ENCODE
708         const char *sub_item_ptr = sub_item;
709 #endif
710         assert (sub_size < 80 && sub_size > 1);
711
712         memcpy (dst, startp, src - startp);
713                 
714         dst += src - startp;
715
716 #if INT_ENCODE
717         (*b->method->codec.reset)(c1);
718         (*b->method->codec.encode)(c1, &dst, &sub_item_ptr);
719 #else
720         encode_item_len (&dst, sub_size);      /* sub length and item */
721         memcpy (dst, sub_item, sub_size);
722         dst += sub_size;
723 #endif
724
725         encode_ptr(&dst, sub_p2->pos);   /* pos */
726
727         if (endp - src)                   /* remaining data */
728         {
729             memcpy (dst, src, endp - src);
730             dst += endp - src;
731         }
732         p->size = dst - dst_buf;
733         assert (p->size >= 0);
734
735
736         if (p->size <= b->file[p->cat].head.block_max)
737         {
738             /* it fits OK in this block */
739             memcpy (startp, dst_buf, dst - dst_buf);
740         }
741         else
742         {
743             /* must split _this_ block as well .. */
744             struct ISAMB_block *sub_p3;
745 #if INT_ENCODE
746             char file_item_buf[DST_ITEM_MAX];
747             char *file_item = file_item_buf;
748 #else
749             zint split_size_tmp;
750 #endif
751             zint no_items_first_half = 0;
752             int p_new_size;
753             const char *half;
754             src = dst_buf;
755             endp = dst;
756
757             half = src + b->file[p->cat].head.block_size/2;
758             decode_ptr(&src, &pos);
759
760             /* read sub block so we can get no_items for it */
761             sub_p3 = open_block(b, pos);
762             no_items_first_half += sub_p3->no_items;
763             close_block(b, sub_p3);
764
765             while (src <= half)
766             {
767 #if INT_ENCODE
768                 file_item = file_item_buf;
769                 (*b->method->codec.reset)(c1);
770                 (*b->method->codec.decode)(c1, &file_item, &src);
771 #else
772                 decode_item_len(&src, &split_size_tmp);
773                 *split_size = (int) split_size_tmp;
774                 src += *split_size;
775 #endif
776                 decode_ptr(&src, &pos);
777
778                 /* read sub block so we can get no_items for it */
779                 sub_p3 = open_block(b, pos);
780                 no_items_first_half += sub_p3->no_items;
781                 close_block(b, sub_p3);
782             }
783             /*  p is first half */
784             p_new_size = src - dst_buf;
785             memcpy (p->bytes, dst_buf, p_new_size);
786
787 #if INT_ENCODE
788             file_item = file_item_buf;
789             (*b->method->codec.reset)(c1);
790             (*b->method->codec.decode)(c1, &file_item, &src);
791             *split_size = file_item - file_item_buf;
792             memcpy(split_item, file_item_buf, *split_size);
793 #else
794             decode_item_len(&src, &split_size_tmp);
795             *split_size = (int) split_size_tmp;
796             memcpy (split_item, src, *split_size);
797             src += *split_size;
798 #endif
799             /*  *sp is second half */
800             *sp = new_int (b, p->cat);
801             (*sp)->size = endp - src;
802             memcpy ((*sp)->bytes, src, (*sp)->size);
803
804             p->size = p_new_size;
805
806             /*  adjust no_items in first&second half */
807             (*sp)->no_items = p->no_items - no_items_first_half;
808             p->no_items = no_items_first_half;
809         }
810         p->dirty = 1;
811         close_block(b, sub_p2);
812     }
813     close_block(b, sub_p1);
814     (*b->method->codec.stop)(c1);
815     return more;
816 }
817
818 int insert_leaf (ISAMB b, struct ISAMB_block **sp1, void *lookahead_item,
819                  int *lookahead_mode, ISAMC_I *stream,
820                  struct ISAMB_block **sp2,
821                  void *sub_item, int *sub_size,
822                  const void *max_item)
823 {
824     struct ISAMB_block *p = *sp1;
825     char *endp = 0;
826     const char *src = 0;
827     char dst_buf[DST_BUF_SIZE], *dst = dst_buf;
828     int new_size;
829     void *c1 = (*b->method->codec.start)();
830     void *c2 = (*b->method->codec.start)();
831     int more = 1;
832     int quater = b->file[b->no_cat-1].head.block_max / 4;
833     char *mid_cut = dst_buf + quater * 2;
834     char *tail_cut = dst_buf + quater * 3;
835     char *maxp = dst_buf + b->file[b->no_cat-1].head.block_max;
836     char *half1 = 0;
837     char *half2 = 0;
838     char cut_item_buf[DST_ITEM_MAX];
839     int cut_item_size = 0;
840     int no_items = 0;    /* number of items (total) */
841     int no_items_1 = 0;  /* number of items (first half) */
842     int inserted_dst_bytes = 0;
843
844     if (p && p->size)
845     {
846         char file_item_buf[DST_ITEM_MAX];
847         char *file_item = file_item_buf;
848             
849         src = p->bytes;
850         endp = p->bytes + p->size;
851         (*b->method->codec.decode)(c1, &file_item, &src);
852         while (1)
853         {
854             const char *dst_item = 0; /* resulting item to be inserted */
855             char *lookahead_next;
856             char *dst_0 = dst;
857             int d = -1;
858             
859             if (lookahead_item)
860                 d = (*b->method->compare_item)(file_item_buf, lookahead_item);
861             
862             /* d now holds comparison between existing file item and
863                lookahead item 
864                d = 0: equal
865                d > 0: lookahead before file
866                d < 0: lookahead after file
867             */
868             if (d > 0)
869             {
870                 /* lookahead must be inserted */
871                 dst_item = lookahead_item;
872                 /* if this is not an insertion, it's really bad .. */
873                 if (!*lookahead_mode)
874                 {
875                     yaz_log(YLOG_WARN, "isamb: Inconsistent register (1)");
876                     assert (*lookahead_mode);
877                 }
878             }
879             else
880                 dst_item = file_item_buf;
881
882             if (!*lookahead_mode && d == 0)
883             {
884                 /* it's a deletion and they match so there is nothing to be
885                    inserted anyway .. But mark the thing bad (file item
886                    was part of input.. The item will not be part of output */
887                 p->dirty = 1;
888             }
889             else if (!half1 && dst > mid_cut)
890             {
891                 /* we have reached the splitting point for the first time */
892                 const char *dst_item_0 = dst_item;
893                 half1 = dst; /* candidate for splitting */
894
895                 /* encode the resulting item */
896                 (*b->method->codec.encode)(c2, &dst, &dst_item);
897                 
898                 cut_item_size = dst_item - dst_item_0;
899                 assert(cut_item_size > 0);
900                 memcpy (cut_item_buf, dst_item_0, cut_item_size);
901                 
902                 half2 = dst;
903                 no_items_1 = no_items;
904                 no_items++;
905             }
906             else
907             {
908                 /* encode the resulting item */
909                 (*b->method->codec.encode)(c2, &dst, &dst_item);
910                 no_items++;
911             }
912
913             /* now move "pointers" .. result has been encoded .. */
914             if (d > 0)  
915             {
916                 /* we must move the lookahead pointer */
917
918                 inserted_dst_bytes += (dst - dst_0);
919                 if (inserted_dst_bytes >=  quater)
920                     /* no more room. Mark lookahead as "gone".. */
921                     lookahead_item = 0;
922                 else
923                 {
924                     /* move it really.. */
925                     lookahead_next = lookahead_item;
926                     if (!(*stream->read_item)(stream->clientData,
927                                               &lookahead_next,
928                                               lookahead_mode))
929                     {
930                         /* end of stream reached: no "more" and no lookahead */
931                         lookahead_item = 0;
932                         more = 0;
933                     }
934                     if (lookahead_item && max_item &&
935                         (*b->method->compare_item)(max_item, lookahead_item) <= 0)
936                     {
937                         /* the lookahead goes beyond what we allow in this
938                            leaf. Mark it as "gone" */
939                         lookahead_item = 0;
940                     }
941                     
942                     p->dirty = 1;
943                 }
944             }
945             else if (d == 0)
946             {
947                 /* exact match .. move both pointers */
948
949                 lookahead_next = lookahead_item;
950                 if (!(*stream->read_item)(stream->clientData,
951                                           &lookahead_next, lookahead_mode))
952                 {
953                     lookahead_item = 0;
954                     more = 0;
955                 }
956                 if (src == endp)
957                     break;  /* end of file stream reached .. */
958                 file_item = file_item_buf; /* move file pointer */
959                 (*b->method->codec.decode)(c1, &file_item, &src);
960             }
961             else
962             {
963                 /* file pointer must be moved */
964                 if (src == endp)
965                     break;
966                 file_item = file_item_buf;
967                 (*b->method->codec.decode)(c1, &file_item, &src);
968             }
969         }
970     }
971
972     /* this loop runs when we are "appending" to a leaf page. That is
973        either it's empty (new) or all file items have been read in
974        previous loop */
975
976     maxp = dst_buf + b->file[b->no_cat-1].head.block_max + quater;
977     while (lookahead_item)
978     {
979         char *dst_item;
980         const char *src = lookahead_item;
981         char *dst_0 = dst;
982         
983         /* if we have a lookahead item, we stop if we exceed the value of it */
984         if (max_item &&
985             (*b->method->compare_item)(max_item, lookahead_item) <= 0)
986         {
987             /* stop if we have reached the value of max item */
988             break;
989         }
990         if (!*lookahead_mode)
991         {
992             /* this is append. So a delete is bad */
993             yaz_log(YLOG_WARN, "isamb: Inconsistent register (2)");
994             abort();
995         }
996         else if (!half1 && dst > tail_cut)
997         {
998             const char *src_0 = src;
999             half1 = dst; /* candidate for splitting */
1000             
1001             (*b->method->codec.encode)(c2, &dst, &src);
1002             
1003             cut_item_size = src - src_0;
1004             assert(cut_item_size > 0);
1005             memcpy (cut_item_buf, src_0, cut_item_size);
1006             
1007             no_items_1 = no_items;
1008             half2 = dst;
1009         }
1010         else
1011             (*b->method->codec.encode)(c2, &dst, &src);
1012
1013         if (dst > maxp)
1014         {
1015             dst = dst_0;
1016             break;
1017         }
1018         no_items++;
1019         if (p)
1020             p->dirty = 1;
1021         dst_item = lookahead_item;
1022         if (!(*stream->read_item)(stream->clientData, &dst_item,
1023                                   lookahead_mode))
1024         {
1025             lookahead_item = 0;
1026             more = 0;
1027         }
1028     }
1029     new_size = dst - dst_buf;
1030     if (p && p->cat != b->no_cat-1 && 
1031         new_size > b->file[p->cat].head.block_max)
1032     {
1033         /* non-btree block will be removed */
1034         p->deleted = 1;
1035         close_block(b, p);
1036         /* delete it too!! */
1037         p = 0; /* make a new one anyway */
1038     }
1039     if (!p)
1040     {   /* must create a new one */
1041         int i;
1042         for (i = 0; i < b->no_cat; i++)
1043             if (new_size <= b->file[i].head.block_max)
1044                 break;
1045         if (i == b->no_cat)
1046             i = b->no_cat - 1;
1047         p = new_leaf (b, i);
1048     }
1049     if (new_size > b->file[p->cat].head.block_max)
1050     {
1051         char *first_dst;
1052         const char *cut_item = cut_item_buf;
1053
1054         assert (half1);
1055         assert (half2);
1056
1057         assert(cut_item_size > 0);
1058         
1059         /* first half */
1060         p->size = half1 - dst_buf;
1061         assert(p->size <=  b->file[p->cat].head.block_max);
1062         memcpy (p->bytes, dst_buf, half1 - dst_buf);
1063         p->no_items = no_items_1;
1064
1065         /* second half */
1066         *sp2 = new_leaf (b, p->cat);
1067
1068         (*b->method->codec.reset)(c2);
1069
1070         first_dst = (*sp2)->bytes;
1071
1072         (*b->method->codec.encode)(c2, &first_dst, &cut_item);
1073
1074         memcpy (first_dst, half2, dst - half2);
1075
1076         (*sp2)->size = (first_dst - (*sp2)->bytes) + (dst - half2);
1077         assert((*sp2)->size <=  b->file[p->cat].head.block_max);
1078         (*sp2)->no_items = no_items - no_items_1;
1079         (*sp2)->dirty = 1;
1080         p->dirty = 1;
1081         memcpy (sub_item, cut_item_buf, cut_item_size);
1082         *sub_size = cut_item_size;
1083     }
1084     else
1085     {
1086         memcpy (p->bytes, dst_buf, dst - dst_buf);
1087         p->size = new_size;
1088         p->no_items = no_items;
1089     }
1090     (*b->method->codec.stop)(c1);
1091     (*b->method->codec.stop)(c2);
1092     *sp1 = p;
1093     return more;
1094 }
1095
1096 int insert_sub (ISAMB b, struct ISAMB_block **p, void *new_item,
1097                 int *mode,
1098                 ISAMC_I *stream,
1099                 struct ISAMB_block **sp,
1100                 void *sub_item, int *sub_size,
1101                 const void *max_item)
1102 {
1103     if (!*p || (*p)->leaf)
1104         return insert_leaf (b, p, new_item, mode, stream, sp, sub_item, 
1105                             sub_size, max_item);
1106     else
1107         return insert_int (b, *p, new_item, mode, stream, sp, sub_item,
1108                            sub_size, max_item);
1109 }
1110
1111 int isamb_unlink (ISAMB b, ISAM_P pos)
1112 {
1113     struct ISAMB_block *p1;
1114
1115     if (!pos)
1116         return 0;
1117     p1 = open_block(b, pos);
1118     p1->deleted = 1;
1119     if (!p1->leaf)
1120     {
1121         zint sub_p;
1122         const char *src = p1->bytes + p1->offset;
1123 #if INT_ENCODE
1124         void *c1 = (*b->method->codec.start)();
1125 #endif
1126         decode_ptr(&src, &sub_p);
1127         isamb_unlink(b, sub_p);
1128         
1129         while (src != p1->bytes + p1->size)
1130         {
1131 #if INT_ENCODE
1132             char file_item_buf[DST_ITEM_MAX];
1133             char *file_item = file_item_buf;
1134             (*b->method->codec.reset)(c1);
1135             (*b->method->codec.decode)(c1, &file_item, &src);
1136 #else
1137             zint item_len;
1138             decode_item_len(&src, &item_len);
1139             src += item_len;
1140 #endif
1141             decode_ptr(&src, &sub_p);
1142             isamb_unlink(b, sub_p);
1143         }
1144 #if INT_ENCODE
1145         (*b->method->codec.stop)(c1);
1146 #endif
1147     }
1148     close_block(b, p1);
1149     return 0;
1150 }
1151
1152 void isamb_merge(ISAMB b, ISAM_P *pos, ISAMC_I *stream)
1153 {
1154     char item_buf[DST_ITEM_MAX];
1155     char *item_ptr;
1156     int i_mode;
1157     int more;
1158     int must_delete = 0;
1159
1160     if (b->cache < 0)
1161     {
1162         int more = 1;
1163         while (more)
1164         {
1165             item_ptr = item_buf;
1166             more =
1167                 (*stream->read_item)(stream->clientData, &item_ptr, &i_mode);
1168         }
1169         *pos = 1;
1170         return;
1171     }
1172     item_ptr = item_buf;
1173     more = (*stream->read_item)(stream->clientData, &item_ptr, &i_mode);
1174     while (more)
1175     {
1176         struct ISAMB_block *p = 0, *sp = 0;
1177         char sub_item[DST_ITEM_MAX];
1178         int sub_size;
1179         
1180         if (*pos)
1181             p = open_block(b, *pos);
1182         more = insert_sub (b, &p, item_buf, &i_mode, stream, &sp,
1183                            sub_item, &sub_size, 0);
1184         if (sp)
1185         {    /* increase level of tree by one */
1186             struct ISAMB_block *p2 = new_int (b, p->cat);
1187             char *dst = p2->bytes + p2->size;
1188 #if INT_ENCODE
1189             void *c1 = (*b->method->codec.start)();
1190             const char *sub_item_ptr = sub_item;
1191 #endif
1192
1193             encode_ptr(&dst, p->pos);
1194             assert (sub_size < 80 && sub_size > 1);
1195 #if INT_ENCODE
1196             (*b->method->codec.reset)(c1);
1197             (*b->method->codec.encode)(c1, &dst, &sub_item_ptr);
1198 #else
1199             encode_item_len (&dst, sub_size);
1200             memcpy (dst, sub_item, sub_size);
1201             dst += sub_size;
1202 #endif
1203             encode_ptr(&dst, sp->pos);
1204             
1205             p2->size = dst - p2->bytes;
1206             p2->no_items = p->no_items + sp->no_items;
1207             *pos = p2->pos;  /* return new super page */
1208             close_block(b, sp);
1209             close_block(b, p2);
1210 #if INT_ENCODE
1211             (*b->method->codec.stop)(c1);
1212 #endif
1213         }
1214         else
1215         {
1216             *pos = p->pos;   /* return current one (again) */
1217         }
1218         if (p->no_items == 0)
1219             must_delete = 1;
1220         else
1221             must_delete = 0;
1222         close_block(b, p);
1223     }
1224     if (must_delete)
1225     {
1226         isamb_unlink(b, *pos);
1227         *pos = 0;
1228     }
1229 }
1230
1231 ISAMB_PP isamb_pp_open_x(ISAMB isamb, ISAM_P pos, int *level, int scope)
1232 {
1233     ISAMB_PP pp = xmalloc(sizeof(*pp));
1234     int i;
1235
1236     assert(pos);
1237
1238     pp->isamb = isamb;
1239     pp->block = xmalloc(ISAMB_MAX_LEVEL * sizeof(*pp->block));
1240
1241     pp->pos = pos;
1242     pp->level = 0;
1243     pp->maxlevel = 0;
1244     pp->total_size = 0;
1245     pp->no_blocks = 0;
1246     pp->skipped_numbers = 0;
1247     pp->returned_numbers = 0;
1248     pp->scope = scope;
1249     for (i = 0; i<ISAMB_MAX_LEVEL; i++)
1250         pp->skipped_nodes[i] = pp->accessed_nodes[i] = 0;
1251     while (1)
1252     {
1253         struct ISAMB_block *p = open_block(isamb, pos);
1254         const char *src = p->bytes + p->offset;
1255         pp->block[pp->level] = p;
1256
1257         pp->total_size += p->size;
1258         pp->no_blocks++;
1259         if (p->leaf)
1260             break;
1261         decode_ptr(&src, &pos);
1262         p->offset = src - p->bytes;
1263         pp->level++;
1264         pp->accessed_nodes[pp->level]++; 
1265     }
1266     pp->block[pp->level+1] = 0;
1267     pp->maxlevel = pp->level;
1268     if (level)
1269         *level = pp->level;
1270     return pp;
1271 }
1272
1273 ISAMB_PP isamb_pp_open (ISAMB isamb, ISAM_P pos, int scope)
1274 {
1275     return isamb_pp_open_x(isamb, pos, 0, scope);
1276 }
1277
1278 void isamb_pp_close_x(ISAMB_PP pp, zint *size, zint *blocks)
1279 {
1280     int i;
1281     if (!pp)
1282         return;
1283     yaz_log(YLOG_DEBUG, "isamb_pp_close lev=%d returned "ZINT_FORMAT" values, " 
1284                     "skipped "ZINT_FORMAT,
1285         pp->maxlevel, pp->skipped_numbers, pp->returned_numbers);
1286     for (i = pp->maxlevel; i>=0; i--)
1287         if (pp->skipped_nodes[i] || pp->accessed_nodes[i])
1288             yaz_log(YLOG_DEBUG, "isamb_pp_close  level leaf-%d: "
1289                             ZINT_FORMAT" read, "ZINT_FORMAT" skipped", i,
1290                  pp->accessed_nodes[i], pp->skipped_nodes[i]);
1291     pp->isamb->skipped_numbers += pp->skipped_numbers;
1292     pp->isamb->returned_numbers += pp->returned_numbers;
1293     for (i = pp->maxlevel; i>=0; i--)
1294     {
1295         pp->isamb->accessed_nodes[i] += pp->accessed_nodes[i];
1296         pp->isamb->skipped_nodes[i] += pp->skipped_nodes[i];
1297     }
1298     if (size)
1299         *size = pp->total_size;
1300     if (blocks)
1301         *blocks = pp->no_blocks;
1302     for (i = 0; i <= pp->level; i++)
1303         close_block(pp->isamb, pp->block[i]);
1304     xfree(pp->block);
1305     xfree(pp);
1306 }
1307
1308 int isamb_block_info (ISAMB isamb, int cat)
1309 {
1310     if (cat >= 0 && cat < isamb->no_cat)
1311         return isamb->file[cat].head.block_size;
1312     return -1;
1313 }
1314
1315 void isamb_pp_close (ISAMB_PP pp)
1316 {
1317     isamb_pp_close_x(pp, 0, 0);
1318 }
1319
1320 /* simple recursive dumper .. */
1321 static void isamb_dump_r (ISAMB b, ISAM_P pos, void (*pr)(const char *str),
1322                           int level)
1323 {
1324     char buf[1024];
1325     char prefix_str[1024];
1326     if (pos)
1327     {
1328         struct ISAMB_block *p = open_block(b, pos);
1329         sprintf(prefix_str, "%*s " ZINT_FORMAT " cat=%d size=%d max=%d items="
1330                 ZINT_FORMAT, level*2, "",
1331                 pos, p->cat, p->size, b->file[p->cat].head.block_max,
1332                 p->no_items);
1333         (*pr)(prefix_str);
1334         sprintf(prefix_str, "%*s " ZINT_FORMAT, level*2, "", pos);
1335         if (p->leaf)
1336         {
1337             while (p->offset < p->size)
1338             {
1339                 const char *src = p->bytes + p->offset;
1340                 char *dst = buf;
1341                 (*b->method->codec.decode)(p->decodeClientData, &dst, &src);
1342                 (*b->method->log_item)(YLOG_DEBUG, buf, prefix_str);
1343                 p->offset = src - (char*) p->bytes;
1344             }
1345             assert(p->offset == p->size);
1346         }
1347         else
1348         {
1349             const char *src = p->bytes + p->offset;
1350             ISAM_P sub;
1351
1352             decode_ptr(&src, &sub);
1353             p->offset = src - (char*) p->bytes;
1354
1355             isamb_dump_r(b, sub, pr, level+1);
1356             
1357             while (p->offset < p->size)
1358             {
1359 #if INT_ENCODE
1360                 char file_item_buf[DST_ITEM_MAX];
1361                 char *file_item = file_item_buf;
1362                 void *c1 = (*b->method->codec.start)();
1363                 (*b->method->codec.decode)(c1, &file_item, &src);
1364                 (*b->method->codec.stop)(c1);
1365                 (*b->method->log_item)(YLOG_DEBUG, file_item_buf, prefix_str);
1366 #else
1367                 zint item_len;
1368                 decode_item_len(&src, &item_len);
1369                 (*b->method->log_item)(YLOG_DEBUG, src, prefix_str);
1370                 src += item_len;
1371 #endif
1372                 decode_ptr(&src, &sub);
1373                 
1374                 p->offset = src - (char*) p->bytes;
1375                 
1376                 isamb_dump_r(b, sub, pr, level+1);
1377             }           
1378         }
1379         close_block(b, p);
1380     }
1381 }
1382
1383 void isamb_dump(ISAMB b, ISAM_P pos, void (*pr)(const char *str))
1384 {
1385     isamb_dump_r(b, pos, pr, 0);
1386 }
1387
1388 int isamb_pp_read(ISAMB_PP pp, void *buf)
1389 {
1390     return isamb_pp_forward(pp, buf, 0);
1391 }
1392
1393
1394 static int isamb_pp_on_right_node(ISAMB_PP pp, int level, const void *untilbuf)
1395 { /* looks one node higher to see if we should be on this node at all */
1396   /* useful in backing off quickly, and in avoiding tail descends */
1397   /* call with pp->level to begin with */
1398     struct ISAMB_block *p;
1399     int cmp;
1400     const char *src;
1401     ISAMB b = pp->isamb;
1402
1403     assert(level >= 0);
1404     if (level == 0)
1405     {
1406 #if ISAMB_DEBUG
1407             yaz_log(YLOG_DEBUG, "isamb_pp_on_right returning true for root");
1408 #endif
1409         return 1; /* we can never skip the root node */
1410     }
1411     level--;
1412     p = pp->block[level];
1413     assert(p->offset <= p->size);
1414     if (p->offset < p->size)
1415     {
1416 #if INT_ENCODE
1417         char file_item_buf[DST_ITEM_MAX];
1418         char *file_item = file_item_buf;
1419         void *c1 = (*b->method->codec.start)();
1420         assert(p->offset > 0); 
1421         src = p->bytes + p->offset;
1422         (*b->method->codec.decode)(c1, &file_item, &src);
1423         (*b->method->codec.stop)(c1);
1424         cmp = (*b->method->compare_item)(untilbuf, file_item_buf);
1425 #else
1426         zint item_len;
1427         assert(p->offset > 0); 
1428         src = p->bytes + p->offset;
1429         decode_item_len(&src, &item_len);
1430 #if ISAMB_DEBUG
1431         (*b->method->codec.log_item)(YLOG_DEBUG, untilbuf, "on_leaf: until");
1432         (*b->method->codec.log_item)(YLOG_DEBUG, src, "on_leaf: value");
1433 #endif
1434         cmp = (*b->method->compare_item)(untilbuf, src);
1435 #endif
1436         if (cmp < pp->scope)
1437         {  /* cmp<2 */
1438 #if ISAMB_DEBUG
1439             yaz_log(YLOG_DEBUG, "isamb_pp_on_right returning true "
1440                             "cmp=%d lev=%d ofs=%d", cmp, level, p->offset);
1441 #endif
1442             return 1; 
1443         }
1444         else
1445         {
1446 #if ISAMB_DEBUG
1447             yaz_log(YLOG_DEBUG, "isamb_pp_on_right returning false "
1448                             "cmp=%d lev=%d ofs=%d", cmp, level, p->offset);
1449 #endif
1450             return 0; 
1451         }
1452     }
1453     else {
1454 #if ISAMB_DEBUG
1455         yaz_log(YLOG_DEBUG, "isamb_pp_on_right at tail, looking higher "
1456                         "lev=%d", level);
1457 #endif
1458         return isamb_pp_on_right_node(pp, level, untilbuf);
1459     }
1460 } /* isamb_pp_on_right_node */
1461
1462 static int isamb_pp_read_on_leaf(ISAMB_PP pp, void *buf)
1463
1464     /* reads the next item on the current leaf, returns 0 if end of leaf*/
1465     struct ISAMB_block *p = pp->block[pp->level];
1466     char *dst;
1467     const char *src;
1468     assert(pp);
1469     assert(buf);
1470     if (p->offset == p->size)
1471     {
1472 #if ISAMB_DEBUG
1473         yaz_log(YLOG_DEBUG, "isamb_pp_read_on_leaf returning 0 on "
1474                 "node %d", p->pos);
1475 #endif
1476         return 0; /* at end of leaf */
1477     }
1478     src = p->bytes + p->offset;
1479     dst = buf;
1480     (*pp->isamb->method->codec.decode)(p->decodeClientData, &dst, &src);
1481     p->offset = src - (char*) p->bytes;
1482 #if ISAMB_DEBUG
1483     (*pp->isamb->method->codec.log_item)(YLOG_DEBUG, buf,
1484                                          "read_on_leaf returning 1");
1485 #endif
1486     pp->returned_numbers++;
1487     return 1;
1488 } /* read_on_leaf */
1489
1490 static int isamb_pp_forward_on_leaf(ISAMB_PP pp, void *buf, const void *untilbuf)
1491 { /* forwards on the current leaf, returns 0 if not found */
1492     int cmp;
1493     int skips = 0;
1494     while (1)
1495     {
1496         if (!isamb_pp_read_on_leaf(pp, buf))
1497             return 0;
1498         /* FIXME - this is an extra function call, inline the read? */
1499         cmp=(*pp->isamb->method->compare_item)(untilbuf, buf);
1500         if (cmp <pp->scope)
1501         {  /* cmp<2  found a good one */
1502 #if ISAMB_DEBUG
1503             if (skips)
1504                 yaz_log(YLOG_DEBUG, "isam_pp_fwd_on_leaf skipped %d items", skips);
1505 #endif
1506             pp->returned_numbers++;
1507             return 1;
1508         }
1509         if (!skips)
1510             if (!isamb_pp_on_right_node(pp, pp->level, untilbuf))
1511                 return 0; /* never mind the rest of this leaf */
1512         pp->skipped_numbers++;
1513         skips++;
1514     }
1515 } /* forward_on_leaf */
1516
1517 static int isamb_pp_climb_level(ISAMB_PP pp, ISAM_P *pos)
1518 { /* climbs higher in the tree, until finds a level with data left */
1519   /* returns the node to (consider to) descend to in *pos) */
1520     struct ISAMB_block *p = pp->block[pp->level];
1521     const char *src;
1522 #if ISAMB_DEBUG
1523     yaz_log(YLOG_DEBUG, "isamb_pp_climb_level starting "
1524                    "at level %d node %d ofs=%d sz=%d",
1525                     pp->level, p->pos, p->offset, p->size);
1526 #endif
1527     assert(pp->level >= 0);
1528     assert(p->offset <= p->size);
1529     if (pp->level==0)
1530     {
1531 #if ISAMB_DEBUG
1532         yaz_log(YLOG_DEBUG, "isamb_pp_climb_level returning 0 at root");
1533 #endif
1534         return 0;
1535     }
1536     assert(pp->level>0); 
1537     close_block(pp->isamb, pp->block[pp->level]);
1538     pp->block[pp->level] = 0;
1539     (pp->level)--;
1540     p = pp->block[pp->level];
1541 #if ISAMB_DEBUG
1542     yaz_log(YLOG_DEBUG, "isamb_pp_climb_level climbed to level %d node %d ofs=%d",
1543                     pp->level, p->pos, p->offset);
1544 #endif
1545     assert(!p->leaf);
1546     assert(p->offset <= p->size);
1547     if (p->offset == p->size)
1548     {
1549         /* we came from the last pointer, climb on */
1550         if (!isamb_pp_climb_level(pp, pos))
1551             return 0;
1552         p = pp->block[pp->level];
1553     }
1554     else
1555     {
1556 #if INT_ENCODE
1557         char file_item_buf[DST_ITEM_MAX];
1558         char *file_item = file_item_buf;
1559         ISAMB b = pp->isamb;
1560         void *c1 = (*b->method->codec.start)();
1561 #else
1562         zint item_len;
1563 #endif
1564         /* skip the child we just came from */
1565 #if ISAMB_DEBUG
1566         yaz_log(YLOG_DEBUG, "isam_pp_climb_level: skipping lev=%d ofs=%d sz=%d", 
1567                         pp->level, p->offset, p->size);
1568 #endif
1569         assert (p->offset < p->size);
1570         src = p->bytes + p->offset;
1571 #if INT_ENCODE
1572         (*b->method->codec.decode)(c1, &file_item, &src);
1573         (*b->method->codec.stop)(c1);
1574 #else
1575         decode_item_len(&src, &item_len);
1576         src += item_len;
1577 #endif
1578         decode_ptr(&src, pos);
1579         p->offset = src - (char *)p->bytes;
1580             
1581     }
1582     return 1;
1583 } /* climb_level */
1584
1585
1586 static zint isamb_pp_forward_unode(ISAMB_PP pp, zint pos, const void *untilbuf)
1587 { /* scans a upper node until it finds a child <= untilbuf */
1588   /* pp points to the key value, as always. pos is the child read from */
1589   /* the buffer */
1590   /* if all values are too small, returns the last child in the node */
1591   /* FIXME - this can be detected, and avoided by looking at the */
1592   /* parent node, but that gets messy. Presumably the cost is */
1593   /* pretty low anyway */
1594     ISAMB b = pp->isamb;
1595     struct ISAMB_block *p = pp->block[pp->level];
1596     const char *src = p->bytes + p->offset;
1597     int cmp;
1598     zint nxtpos;
1599 #if ISAMB_DEBUG
1600     int skips = 0;
1601     yaz_log(YLOG_DEBUG, "isamb_pp_forward_unode starting "
1602                    "at level %d node %d ofs=%di sz=%d",
1603                     pp->level, p->pos, p->offset, p->size);
1604 #endif
1605     assert(!p->leaf);
1606     assert(p->offset <= p->size);
1607     if (p->offset == p->size)
1608     {
1609 #if ISAMB_DEBUG
1610             yaz_log(YLOG_DEBUG, "isamb_pp_forward_unode returning at end "
1611                    "at level %d node %d ofs=%di sz=%d",
1612                     pp->level, p->pos, p->offset, p->size);
1613 #endif
1614         return pos; /* already at the end of it */
1615     }
1616     while(p->offset < p->size)
1617     {
1618 #if INT_ENCODE
1619         char file_item_buf[DST_ITEM_MAX];
1620         char *file_item = file_item_buf;
1621         void *c1 = (*b->method->codec.start)();
1622         (*b->method->codec.decode)(c1, &file_item, &src);
1623         (*b->method->codec.stop)(c1);
1624         cmp = (*b->method->compare_item)(untilbuf, file_item_buf);
1625 #else
1626         zint item_len;
1627         decode_item_len(&src, &item_len);
1628         cmp = (*b->method->compare_item)(untilbuf, src);
1629         src += item_len;
1630 #endif
1631         decode_ptr(&src, &nxtpos);
1632         if (cmp<pp->scope) /* cmp<2 */
1633         {
1634 #if ISAMB_DEBUG
1635             yaz_log(YLOG_DEBUG, "isamb_pp_forward_unode returning a hit "
1636                    "at level %d node %d ofs=%d sz=%d",
1637                     pp->level, p->pos, p->offset, p->size);
1638 #endif
1639             return pos;
1640         } /* found one */
1641         pos = nxtpos;
1642         p->offset = src-(char*)p->bytes;
1643         (pp->skipped_nodes[pp->maxlevel - pp->level -1])++;
1644 #if ISAMB_DEBUG
1645         skips++;
1646 #endif
1647     }
1648 #if ISAMB_DEBUG
1649             yaz_log(YLOG_DEBUG, "isamb_pp_forward_unode returning at tail "
1650                    "at level %d node %d ofs=%d sz=%d skips=%d",
1651                     pp->level, p->pos, p->offset, p->size, skips);
1652 #endif
1653     return pos; /* that's the last one in the line */
1654     
1655 } /* forward_unode */
1656
1657 static void isamb_pp_descend_to_leaf(ISAMB_PP pp, ISAM_P pos,
1658                                      const void *untilbuf)
1659 { /* climbs down the tree, from pos, to the leftmost leaf */
1660     struct ISAMB_block *p = pp->block[pp->level];
1661     const char *src;
1662     assert(!p->leaf);
1663 #if ISAMB_DEBUG
1664     yaz_log(YLOG_DEBUG, "isamb_pp_descend_to_leaf "
1665                    "starting at lev %d node %d ofs=%d lf=%d u=%p", 
1666                    pp->level, p->pos, p->offset, p->leaf, untilbuf);
1667 #endif
1668     if (untilbuf)
1669         pos = isamb_pp_forward_unode(pp, pos, untilbuf);
1670     ++(pp->level);
1671     assert(pos);
1672     p = open_block(pp->isamb, pos);
1673     pp->block[pp->level] = p;
1674     ++(pp->accessed_nodes[pp->maxlevel-pp->level]);
1675     ++(pp->no_blocks);
1676 #if ISAMB_DEBUG
1677     yaz_log(YLOG_DEBUG, "isamb_pp_descend_to_leaf "
1678                    "got lev %d node %d lf=%d", 
1679                    pp->level, p->pos, p->leaf);
1680 #endif
1681     if (p->leaf)
1682         return;
1683     assert (p->offset==0);
1684     src = p->bytes + p->offset;
1685     decode_ptr(&src, &pos);
1686     p->offset = src-(char*)p->bytes;
1687     isamb_pp_descend_to_leaf(pp, pos, untilbuf);
1688 #if ISAMB_DEBUG
1689     yaz_log(YLOG_DEBUG, "isamb_pp_descend_to_leaf "
1690                    "returning at lev %d node %d ofs=%d lf=%d", 
1691                    pp->level, p->pos, p->offset, p->leaf);
1692 #endif
1693 } /* descend_to_leaf */
1694
1695 static int isamb_pp_find_next_leaf(ISAMB_PP pp)
1696 { /* finds the next leaf by climbing up and down */
1697     ISAM_P pos;
1698     if (!isamb_pp_climb_level(pp, &pos))
1699         return 0;
1700     isamb_pp_descend_to_leaf(pp, pos, 0);
1701     return 1;
1702 }
1703
1704 static int isamb_pp_climb_desc(ISAMB_PP pp,  const void *untilbuf)
1705 { /* climbs up and descends to a leaf where values >= *untilbuf are found */
1706     ISAM_P pos;
1707 #if ISAMB_DEBUG
1708     struct ISAMB_block *p = pp->block[pp->level];
1709     yaz_log(YLOG_DEBUG, "isamb_pp_climb_desc starting "
1710                    "at level %d node %d ofs=%d sz=%d",
1711                     pp->level, p->pos, p->offset, p->size);
1712 #endif
1713     if (!isamb_pp_climb_level(pp, &pos))
1714         return 0;
1715     /* see if it would pay to climb one higher */
1716     if (!isamb_pp_on_right_node(pp, pp->level, untilbuf))
1717         if (!isamb_pp_climb_level(pp, &pos))
1718             return 0;
1719     isamb_pp_descend_to_leaf(pp, pos, untilbuf);
1720 #if ISAMB_DEBUG
1721     p = pp->block[pp->level];
1722     yaz_log(YLOG_DEBUG, "isamb_pp_climb_desc done "
1723                    "at level %d node %d ofs=%d sz=%d",
1724                     pp->level, p->pos, p->offset, p->size);
1725 #endif
1726     return 1;
1727 } /* climb_desc */
1728
1729 int isamb_pp_forward (ISAMB_PP pp, void *buf, const void *untilbuf)
1730 {
1731 #if ISAMB_DEBUG
1732     struct ISAMB_block *p = pp->block[pp->level];
1733     assert(p->leaf);
1734     yaz_log(YLOG_DEBUG, "isamb_pp_forward starting "
1735                    "at level %d node %d ofs=%d sz=%d u=%p sc=%d",
1736                     pp->level, p->pos, p->offset, p->size, untilbuf, scope);
1737 #endif
1738     if (untilbuf)
1739     {
1740         if (isamb_pp_forward_on_leaf(pp, buf, untilbuf))
1741         {
1742 #if ISAMB_DEBUG
1743             yaz_log(YLOG_DEBUG, "isamb_pp_forward (f) returning (A) "
1744                    "at level %d node %d ofs=%d sz=%d",
1745                     pp->level, p->pos, p->offset, p->size);
1746 #endif
1747             return 1;
1748         }
1749         if (! isamb_pp_climb_desc(pp, untilbuf))
1750         {
1751 #if ISAMB_DEBUG
1752             yaz_log(YLOG_DEBUG, "isamb_pp_forward (f) returning notfound (B) "
1753                    "at level %d node %d ofs=%d sz=%d",
1754                     pp->level, p->pos, p->offset, p->size);
1755 #endif
1756             return 0; /* could not find a leaf */
1757         }
1758         do {
1759             if (isamb_pp_forward_on_leaf(pp, buf, untilbuf))
1760             {
1761 #if ISAMB_DEBUG
1762                 yaz_log(YLOG_DEBUG, "isamb_pp_forward (f) returning (c) "
1763                         "at level %d node %d ofs=%d sz=%d",
1764                         pp->level, p->pos, p->offset, p->size);
1765 #endif
1766                 return 1;
1767             }
1768         } while (isamb_pp_find_next_leaf(pp));
1769         return 0; /* could not find at all */
1770     }
1771     else { /* no untilbuf, a straight read */
1772         /* FIXME - this should be moved
1773          * directly into the pp_read */
1774         /* keeping here now, to keep same
1775          * interface as the old fwd */
1776         if (isamb_pp_read_on_leaf(pp, buf))
1777         {
1778 #if ISAMB_DEBUG
1779             yaz_log(YLOG_DEBUG, "isamb_pp_forward (read) returning (D) "
1780                    "at level %d node %d ofs=%d sz=%d",
1781                     pp->level, p->pos, p->offset, p->size);
1782 #endif
1783             return 1;
1784         }
1785         if (isamb_pp_find_next_leaf(pp))
1786         {
1787 #if ISAMB_DEBUG
1788             yaz_log(YLOG_DEBUG, "isamb_pp_forward (read) returning (E) "
1789                    "at level %d node %d ofs=%d sz=%d",
1790                     pp->level, p->pos, p->offset, p->size);
1791 #endif
1792             return isamb_pp_read_on_leaf(pp, buf);
1793         }
1794         else
1795             return 0;
1796     }
1797 } /* isam_pp_forward (new version) */
1798
1799 void isamb_pp_pos(ISAMB_PP pp, double *current, double *total)
1800 { /* return an estimate of the current position and of the total number of */
1801   /* occureences in the isam tree, based on the current leaf */
1802     struct ISAMB_block *p = pp->block[pp->level];
1803     assert(total);
1804     assert(current);
1805     assert(p->leaf);
1806
1807     *total = (double) (pp->block[0]->no_items);
1808     *current = (double) pp->returned_numbers;
1809 #if ISAMB_DEBUG
1810     yaz_log(YLOG_LOG, "isamb_pp_pos returning: cur= %0.1f tot=%0.1f rn="
1811          ZINT_FORMAT, *current, *total, pp->returned_numbers);
1812 #endif
1813 }
1814
1815 int isamb_pp_forward2(ISAMB_PP pp, void *buf, const void *untilb)
1816 {
1817     char *dst = buf;
1818     const char *src;
1819     struct ISAMB_block *p = pp->block[pp->level];
1820     ISAMB b = pp->isamb;
1821     if (!p)
1822         return 0;
1823 again:
1824     while (p->offset == p->size)
1825     {
1826         ISAM_P pos;
1827 #if INT_ENCODE
1828         const char *src_0;
1829         void *c1;
1830         char file_item_buf[DST_ITEM_MAX];
1831         char *file_item = file_item_buf;
1832 #else
1833         zint item_len;
1834 #endif
1835         while (p->offset == p->size)
1836         {
1837             if (pp->level == 0)
1838                 return 0;
1839             close_block (pp->isamb, pp->block[pp->level]);
1840             pp->block[pp->level] = 0;
1841             (pp->level)--;
1842             p = pp->block[pp->level];
1843             assert (!p->leaf);  
1844         }
1845
1846         assert(!p->leaf);
1847         src = p->bytes + p->offset;
1848        
1849 #if INT_ENCODE
1850         c1 = (*b->method->codec.start)();
1851         (*b->method->codec.decode)(c1, &file_item, &src);
1852 #else
1853         decode_ptr (&src, &item_len);
1854         src += item_len;
1855 #endif        
1856         decode_ptr (&src, &pos);
1857         p->offset = src - (char*) p->bytes;
1858
1859         src = p->bytes + p->offset;
1860
1861         while(1)
1862         {
1863             if (!untilb || p->offset == p->size)
1864                 break;
1865             assert(p->offset < p->size);
1866 #if INT_ENCODE
1867             src_0 = src;
1868             file_item = file_item_buf;
1869             (*b->method->codec.reset)(c1);
1870             (*b->method->codec.decode)(c1, &file_item, &src);
1871             if ((*b->method->compare_item)(untilb, file_item_buf) <= 1)
1872             {
1873                 src = src_0;
1874                 break;
1875             }
1876 #else
1877             decode_item_len(&src, &item_len);
1878             if ((*b->method->compare_item)(untilb, src) <= 1)
1879                 break;
1880             src += item_len;
1881 #endif
1882             decode_ptr (&src, &pos);
1883             p->offset = src - (char*) p->bytes;
1884         }
1885
1886         pp->level++;
1887
1888         while (1)
1889         {
1890             pp->block[pp->level] = p = open_block (pp->isamb, pos);
1891
1892             pp->total_size += p->size;
1893             pp->no_blocks++;
1894             
1895             if (p->leaf) 
1896             {
1897                 break;
1898             }
1899             
1900             src = p->bytes + p->offset;
1901             while(1)
1902             {
1903                 decode_ptr (&src, &pos);
1904                 p->offset = src - (char*) p->bytes;
1905                 
1906                 if (!untilb || p->offset == p->size)
1907                     break;
1908                 assert(p->offset < p->size);
1909 #if INT_ENCODE
1910                 src_0 = src;
1911                 file_item = file_item_buf;
1912                 (*b->method->codec.reset)(c1);
1913                 (*b->method->codec.decode)(c1, &file_item, &src);
1914                 if ((*b->method->compare_item)(untilb, file_item_buf) <= 1)
1915                 {
1916                     src = src_0;
1917                     break;
1918                 }
1919 #else
1920                 decode_ptr (&src, &item_len);
1921                 if ((*b->method->compare_item)(untilb, src) <= 1)
1922                     break;
1923                 src += item_len;
1924 #endif
1925             }
1926             pp->level++;
1927         }
1928 #if INT_ENCODE
1929         (*b->method->codec.stop)(c1);
1930 #endif
1931     }
1932     assert (p->offset < p->size);
1933     assert (p->leaf);
1934     while(1)
1935     {
1936         char *dst0 = dst;
1937         src = p->bytes + p->offset;
1938         (*pp->isamb->method->codec.decode)(p->decodeClientData, &dst, &src);
1939         p->offset = src - (char*) p->bytes;
1940         if (!untilb || (*pp->isamb->method->compare_item)(untilb, dst0) <= 1)
1941             break;
1942         dst = dst0;
1943         if (p->offset == p->size) goto again;
1944     }
1945     return 1;
1946 }