3e85ee1e0f97097afb488f4a7e4a17efc3204592
[idzebra-moved-to-github.git] / isamb / isamb.c
1 /* $Id: isamb.c,v 1.88 2006-12-12 13:46:41 adam Exp $
2    Copyright (C) 1995-2006
3    Index Data ApS
4
5 This file is part of the Zebra server.
6
7 Zebra is free software; you can redistribute it and/or modify it under
8 the terms of the GNU General Public License as published by the Free
9 Software Foundation; either version 2, or (at your option) any later
10 version.
11
12 Zebra is distributed in the hope that it will be useful, but WITHOUT ANY
13 WARRANTY; without even the implied warranty of MERCHANTABILITY or
14 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
15 for more details.
16
17 You should have received a copy of the GNU General Public License
18 along with this program; if not, write to the Free Software
19 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
20
21 */
22
23 #include <stdlib.h>
24 #include <string.h>
25 #include <yaz/log.h>
26 #include <yaz/xmalloc.h>
27 #include <idzebra/isamb.h>
28 #include <assert.h>
29
30 #ifndef ISAMB_DEBUG
31 #define ISAMB_DEBUG 0
32 #endif
33
34
35 #define ISAMB_MAJOR_VERSION 3
36 #define ISAMB_MINOR_VERSION 0
37
38 struct ISAMB_head {
39     zint first_block;
40     zint last_block;
41     zint free_list;
42     zint no_items;
43     int block_size;
44     int block_max;
45     int block_offset;
46 };
47
48 /* if 1, upper nodes items are encoded; 0 if not encoded */
49 #define INT_ENCODE 1
50
51 /* maximum size of encoded buffer */
52 #define DST_ITEM_MAX 256
53
54 #define ISAMB_MAX_LEVEL 10
55 /* approx 2*max page + max size of item */
56 #define DST_BUF_SIZE (2*4096+300)
57
58 /* should be maximum block size of multiple thereof */
59 #define ISAMB_CACHE_ENTRY_SIZE 4096
60
61 /* CAT_MAX: _must_ be power of 2 */
62 #define CAT_MAX 4
63 #define CAT_MASK (CAT_MAX-1)
64 /* CAT_NO: <= CAT_MAX */
65 #define CAT_NO 4
66
67 /* Smallest block size */
68 #define ISAMB_MIN_SIZE 32
69 /* Size factor */
70 #define ISAMB_FAC_SIZE 4
71
72 /* ISAMB_PTR_CODEC = 1 var, =0 fixed */
73 #define ISAMB_PTR_CODEC 1
74
75 struct ISAMB_cache_entry {
76     ISAM_P pos;
77     unsigned char *buf;
78     int dirty;
79     int hits;
80     struct ISAMB_cache_entry *next;
81 };
82
83 struct ISAMB_file {
84     BFile bf;
85     int head_dirty;
86     struct ISAMB_head head;
87     struct ISAMB_cache_entry *cache_entries;
88 };
89
90 struct ISAMB_s {
91     BFiles bfs;
92     ISAMC_M *method;
93
94     struct ISAMB_file *file;
95     int no_cat;
96     int cache; /* 0 = no cache, 1 = use cache, -1 = dummy isam (for testing only) */
97     int log_io;        /* log level for bf_read/bf_write calls */
98     int log_freelist;  /* log level for freelist handling */
99     zint skipped_numbers; /* on a leaf node */
100     zint returned_numbers; 
101     zint skipped_nodes[ISAMB_MAX_LEVEL]; /* [0]=skipped leaves, 1 = higher etc */
102     zint accessed_nodes[ISAMB_MAX_LEVEL]; /* nodes we did not skip */
103     zint number_of_int_splits;
104     zint number_of_leaf_splits;
105     int enable_int_count; /* whether we count nodes (or not) */
106     int cache_size; /* size of blocks to cache (if cache=1) */
107 };
108
109 struct ISAMB_block {
110     ISAM_P pos;
111     int cat;
112     int size;
113     int leaf;
114     int dirty;
115     int deleted;
116     int offset;
117     zint no_items;  /* number of nodes in this + children */
118     char *bytes;
119     char *cbuf;
120     unsigned char *buf;
121     void *decodeClientData;
122     int log_rw;
123 };
124
125 struct ISAMB_PP_s {
126     ISAMB isamb;
127     ISAM_P pos;
128     int level;
129     int maxlevel; /* total depth */
130     zint total_size;
131     zint no_blocks;
132     zint skipped_numbers; /* on a leaf node */
133     zint returned_numbers; 
134     zint skipped_nodes[ISAMB_MAX_LEVEL]; /* [0]=skipped leaves, 1 = higher etc */
135     zint accessed_nodes[ISAMB_MAX_LEVEL]; /* nodes we did not skip */
136     struct ISAMB_block **block;
137     int scope;  /* on what level we forward */
138 };
139
140
141 #define encode_item_len encode_ptr
142 #if ISAMB_PTR_CODEC
143 static void encode_ptr(char **dst, zint pos)
144 {
145     unsigned char *bp = (unsigned char*) *dst;
146
147     while (pos > 127)
148     {
149          *bp++ = (unsigned char) (128 | (pos & 127));
150          pos = pos >> 7;
151     }
152     *bp++ = (unsigned char) pos;
153     *dst = (char *) bp;
154 }
155 #else
156 static void encode_ptr(char **dst, zint pos)
157 {
158     memcpy(*dst, &pos, sizeof(pos));
159     (*dst) += sizeof(pos);
160 }
161 #endif
162
163 #define decode_item_len decode_ptr
164 #if ISAMB_PTR_CODEC
165 static void decode_ptr(const char **src, zint *pos)
166 {
167     zint d = 0;
168     unsigned char c;
169     unsigned r = 0;
170
171     while (((c = *(const unsigned char *)((*src)++)) & 128))
172     {
173         d += ((zint) (c & 127) << r);
174         r += 7;
175     }
176     d += ((zint) c << r);
177     *pos = d;
178 }
179 #else
180 static void decode_ptr(const char **src, zint *pos)
181 {
182      memcpy(pos, *src, sizeof(*pos));
183      (*src) += sizeof(*pos);
184 }
185 #endif
186
187
188 void isamb_set_int_count(ISAMB b, int v)
189 {
190     b->enable_int_count = v;
191 }
192
193 void isamb_set_cache_size(ISAMB b, int v)
194 {
195     b->cache_size = v;
196 }
197
198 ISAMB isamb_open(BFiles bfs, const char *name, int writeflag, ISAMC_M *method,
199                  int cache)
200 {
201     ISAMB isamb = xmalloc(sizeof(*isamb));
202     int i, b_size = ISAMB_MIN_SIZE;
203
204     isamb->bfs = bfs;
205     isamb->method = (ISAMC_M *) xmalloc(sizeof(*method));
206     memcpy(isamb->method, method, sizeof(*method));
207     isamb->no_cat = CAT_NO;
208     isamb->log_io = 0;
209     isamb->log_freelist = 0;
210     isamb->cache = cache;
211     isamb->skipped_numbers = 0;
212     isamb->returned_numbers = 0;
213     isamb->number_of_int_splits = 0;
214     isamb->number_of_leaf_splits = 0;
215     isamb->enable_int_count = 1;
216     isamb->cache_size = 40;
217
218     for (i = 0; i<ISAMB_MAX_LEVEL; i++)
219         isamb->skipped_nodes[i] = isamb->accessed_nodes[i] = 0;
220
221     if (cache == -1)
222     {
223         yaz_log(YLOG_WARN, "isamb_open %s. Degraded TEST mode", name);
224     }
225     else
226     {
227         assert(cache == 0 || cache == 1);
228     }
229     isamb->file = xmalloc(sizeof(*isamb->file) * isamb->no_cat);
230
231     for (i = 0; i < isamb->no_cat; i++)
232     {
233         isamb->file[i].bf = 0;
234         isamb->file[i].head_dirty = 0;
235         isamb->file[i].cache_entries = 0;
236     }
237
238     for (i = 0; i < isamb->no_cat; i++)
239     {
240         char fname[DST_BUF_SIZE];
241         char hbuf[DST_BUF_SIZE];
242
243         sprintf(fname, "%s%c", name, i+'A');
244         if (cache)
245             isamb->file[i].bf = bf_open(bfs, fname, ISAMB_CACHE_ENTRY_SIZE,
246                                          writeflag);
247         else
248             isamb->file[i].bf = bf_open(bfs, fname, b_size, writeflag);
249
250         if (!isamb->file[i].bf)
251         {
252             isamb_close(isamb);
253             return 0;
254         }
255
256         /* fill-in default values (for empty isamb) */
257         isamb->file[i].head.first_block = ISAMB_CACHE_ENTRY_SIZE/b_size+1;
258         isamb->file[i].head.last_block = isamb->file[i].head.first_block;
259         isamb->file[i].head.block_size = b_size;
260         assert(b_size <= ISAMB_CACHE_ENTRY_SIZE);
261 #if ISAMB_PTR_CODEC
262         if (i == isamb->no_cat-1 || b_size > 128)
263             isamb->file[i].head.block_offset = 8;
264         else
265             isamb->file[i].head.block_offset = 4;
266 #else
267         isamb->file[i].head.block_offset = 11;
268 #endif
269         isamb->file[i].head.block_max =
270             b_size - isamb->file[i].head.block_offset;
271         isamb->file[i].head.free_list = 0;
272         if (bf_read(isamb->file[i].bf, 0, 0, 0, hbuf))
273         {
274             /* got header assume "isamb"major minor len can fit in 16 bytes */
275             zint zint_tmp;
276             int major, minor, len, pos = 0;
277             int left;
278             const char *src = 0;
279             if (memcmp(hbuf, "isamb", 5))
280             {
281                 yaz_log(YLOG_WARN, "bad isamb header for file %s", fname);
282                 return 0;
283             }
284             if (sscanf(hbuf+5, "%d %d %d", &major, &minor, &len) != 3)
285             {
286                 yaz_log(YLOG_WARN, "bad isamb header for file %s", fname);
287                 return 0;
288             }
289             if (major != ISAMB_MAJOR_VERSION)
290             {
291                 yaz_log(YLOG_WARN, "bad major version for file %s %d, must be %d",
292                      fname, major, ISAMB_MAJOR_VERSION);
293                 return 0;
294             }
295             for (left = len - b_size; left > 0; left = left - b_size)
296             {
297                 pos++;
298                 if (!bf_read(isamb->file[i].bf, pos, 0, 0, hbuf + pos*b_size))
299                 {
300                     yaz_log(YLOG_WARN, "truncated isamb header for " 
301                          "file=%s len=%d pos=%d",
302                          fname, len, pos);
303                     return 0;
304                 }
305             }
306             src = hbuf + 16;
307             decode_ptr(&src, &isamb->file[i].head.first_block);
308             decode_ptr(&src, &isamb->file[i].head.last_block);
309             decode_ptr(&src, &zint_tmp);
310             isamb->file[i].head.block_size = (int) zint_tmp;
311             decode_ptr(&src, &zint_tmp);
312             isamb->file[i].head.block_max = (int) zint_tmp;
313             decode_ptr(&src, &isamb->file[i].head.free_list);
314         }
315         assert (isamb->file[i].head.block_size >= isamb->file[i].head.block_offset);
316         isamb->file[i].head_dirty = 0;
317         assert(isamb->file[i].head.block_size == b_size);
318         b_size = b_size * ISAMB_FAC_SIZE;
319     }
320 #if ISAMB_DEBUG
321     yaz_log(YLOG_WARN, "isamb debug enabled. Things will be slower than usual");
322 #endif
323     return isamb;
324 }
325
326 static void flush_blocks (ISAMB b, int cat)
327 {
328     while (b->file[cat].cache_entries)
329     {
330         struct ISAMB_cache_entry *ce_this = b->file[cat].cache_entries;
331         b->file[cat].cache_entries = ce_this->next;
332
333         if (ce_this->dirty)
334         {
335             yaz_log(b->log_io, "bf_write: flush_blocks");
336             bf_write(b->file[cat].bf, ce_this->pos, 0, 0, ce_this->buf);
337         }
338         xfree(ce_this->buf);
339         xfree(ce_this);
340     }
341 }
342
343 static int cache_block (ISAMB b, ISAM_P pos, unsigned char *userbuf, int wr)
344 {
345     int cat = (int) (pos&CAT_MASK);
346     int off = (int) (((pos/CAT_MAX) & 
347                (ISAMB_CACHE_ENTRY_SIZE / b->file[cat].head.block_size - 1))
348         * b->file[cat].head.block_size);
349     zint norm = pos / (CAT_MASK*ISAMB_CACHE_ENTRY_SIZE / b->file[cat].head.block_size);
350     int no = 0;
351     struct ISAMB_cache_entry **ce, *ce_this = 0, **ce_last = 0;
352
353     if (!b->cache)
354         return 0;
355
356     assert (ISAMB_CACHE_ENTRY_SIZE >= b->file[cat].head.block_size);
357     for (ce = &b->file[cat].cache_entries; *ce; ce = &(*ce)->next, no++)
358     {
359         ce_last = ce;
360         if ((*ce)->pos == norm)
361         {
362             ce_this = *ce;
363             *ce = (*ce)->next;   /* remove from list */
364             
365             ce_this->next = b->file[cat].cache_entries;  /* move to front */
366             b->file[cat].cache_entries = ce_this;
367             
368             if (wr)
369             {
370                 memcpy (ce_this->buf + off, userbuf, 
371                         b->file[cat].head.block_size);
372                 ce_this->dirty = 1;
373             }
374             else
375                 memcpy (userbuf, ce_this->buf + off,
376                         b->file[cat].head.block_size);
377             return 1;
378         }
379     }
380     if (no >= b->cache_size)
381     {
382         assert (ce_last && *ce_last);
383         ce_this = *ce_last;
384         *ce_last = 0;  /* remove the last entry from list */
385         if (ce_this->dirty)
386         {
387             yaz_log(b->log_io, "bf_write: cache_block");
388             bf_write(b->file[cat].bf, ce_this->pos, 0, 0, ce_this->buf);
389         }
390         xfree(ce_this->buf);
391         xfree(ce_this);
392     }
393     ce_this = xmalloc(sizeof(*ce_this));
394     ce_this->next = b->file[cat].cache_entries;
395     b->file[cat].cache_entries = ce_this;
396     ce_this->buf = xmalloc(ISAMB_CACHE_ENTRY_SIZE);
397     ce_this->pos = norm;
398     yaz_log(b->log_io, "bf_read: cache_block");
399     if (!bf_read(b->file[cat].bf, norm, 0, 0, ce_this->buf))
400         memset (ce_this->buf, 0, ISAMB_CACHE_ENTRY_SIZE);
401     if (wr)
402     {
403         memcpy (ce_this->buf + off, userbuf, b->file[cat].head.block_size);
404         ce_this->dirty = 1;
405     }
406     else
407     {
408         ce_this->dirty = 0;
409         memcpy (userbuf, ce_this->buf + off, b->file[cat].head.block_size);
410     }
411     return 1;
412 }
413
414
415 void isamb_close (ISAMB isamb)
416 {
417     int i;
418     for (i = 0; isamb->accessed_nodes[i]; i++)
419         yaz_log(YLOG_DEBUG, "isamb_close  level leaf-%d: "ZINT_FORMAT" read, "
420                         ZINT_FORMAT" skipped",
421              i, isamb->accessed_nodes[i], isamb->skipped_nodes[i]);
422     yaz_log(YLOG_DEBUG, "isamb_close returned "ZINT_FORMAT" values, "
423                    "skipped "ZINT_FORMAT,
424          isamb->skipped_numbers, isamb->returned_numbers);
425     for (i = 0; i<isamb->no_cat; i++)
426     {
427         flush_blocks (isamb, i);
428         if (isamb->file[i].head_dirty)
429         {
430             char hbuf[DST_BUF_SIZE];
431             int major = ISAMB_MAJOR_VERSION;
432             int minor = ISAMB_MINOR_VERSION;
433             int len = 16;
434             char *dst = hbuf + 16;
435             int pos = 0, left;
436             int b_size = isamb->file[i].head.block_size;
437
438             encode_ptr(&dst, isamb->file[i].head.first_block);
439             encode_ptr(&dst, isamb->file[i].head.last_block);
440             encode_ptr(&dst, isamb->file[i].head.block_size);
441             encode_ptr(&dst, isamb->file[i].head.block_max);
442             encode_ptr(&dst, isamb->file[i].head.free_list);
443             memset(dst, '\0', b_size); /* ensure no random bytes are written */
444
445             len = dst - hbuf;
446
447             /* print exactly 16 bytes (including trailing 0) */
448             sprintf(hbuf, "isamb%02d %02d %02d\r\n", major, minor, len);
449
450             bf_write(isamb->file[i].bf, pos, 0, 0, hbuf);
451
452             for (left = len - b_size; left > 0; left = left - b_size)
453             {
454                 pos++;
455                 bf_write(isamb->file[i].bf, pos, 0, 0, hbuf + pos*b_size);
456             }
457         }
458         if (isamb->file[i].bf)
459             bf_close (isamb->file[i].bf);
460     }
461     xfree(isamb->file);
462     xfree(isamb->method);
463     xfree(isamb);
464 }
465
466 /* open_block: read one block at pos.
467    Decode leading sys bytes .. consisting of
468    Offset:Meaning
469    0: leader byte, != 0 leaf, == 0, non-leaf
470    1-2: used size of block
471    3-7*: number of items and all children
472    
473    * Reserve 5 bytes for large block sizes. 1 for small ones .. Number
474    of items. We can thus have at most 2^40 nodes. 
475 */
476 static struct ISAMB_block *open_block(ISAMB b, ISAM_P pos)
477 {
478     int cat = (int) (pos&CAT_MASK);
479     const char *src;
480     int offset = b->file[cat].head.block_offset;
481     struct ISAMB_block *p;
482     if (!pos)
483         return 0;
484     p = xmalloc(sizeof(*p));
485     p->pos = pos;
486     p->cat = (int) (pos & CAT_MASK);
487     p->buf = xmalloc(b->file[cat].head.block_size);
488     p->cbuf = 0;
489
490     if (!cache_block (b, pos, p->buf, 0))
491     {
492         yaz_log(b->log_io, "bf_read: open_block");
493         if (bf_read(b->file[cat].bf, pos/CAT_MAX, 0, 0, p->buf) != 1)
494         {
495             yaz_log(YLOG_FATAL, "isamb: read fail for pos=%ld block=%ld",
496                      (long) pos, (long) pos/CAT_MAX);
497             zebra_exit("isamb:open_block");
498         }
499     }
500     p->bytes = (char *)p->buf + offset;
501     p->leaf = p->buf[0];
502     p->size = (p->buf[1] + 256 * p->buf[2]) - offset;
503     if (p->size < 0)
504     {
505         yaz_log(YLOG_FATAL, "Bad block size %d in pos=" ZINT_FORMAT "\n",
506                  p->size, pos);
507     }
508     assert (p->size >= 0);
509     src = (char*) p->buf + 3;
510     decode_ptr(&src, &p->no_items);
511
512     p->offset = 0;
513     p->dirty = 0;
514     p->deleted = 0;
515     p->decodeClientData = (*b->method->codec.start)();
516     return p;
517 }
518
519 struct ISAMB_block *new_block (ISAMB b, int leaf, int cat)
520 {
521     struct ISAMB_block *p;
522
523     p = xmalloc(sizeof(*p));
524     p->buf = xmalloc(b->file[cat].head.block_size);
525
526     if (!b->file[cat].head.free_list)
527     {
528         zint block_no;
529         block_no = b->file[cat].head.last_block++;
530         p->pos = block_no * CAT_MAX + cat;
531     }
532     else
533     {
534         p->pos = b->file[cat].head.free_list;
535         assert((p->pos & CAT_MASK) == cat);
536         if (!cache_block (b, p->pos, p->buf, 0))
537         {
538             yaz_log(b->log_io, "bf_read: new_block");
539             if (!bf_read(b->file[cat].bf, p->pos/CAT_MAX, 0, 0, p->buf))
540             {
541                 yaz_log(YLOG_FATAL, "isamb: read fail for pos=%ld block=%ld",
542                          (long) p->pos/CAT_MAX, (long) p->pos/CAT_MAX);
543                 zebra_exit("isamb:new_block");
544             }
545         }
546         yaz_log(b->log_freelist, "got block " ZINT_FORMAT " from freelist %d:" ZINT_FORMAT, p->pos,
547                  cat, p->pos/CAT_MAX);
548         memcpy (&b->file[cat].head.free_list, p->buf, sizeof(zint));
549     }
550     p->cat = cat;
551     b->file[cat].head_dirty = 1;
552     memset (p->buf, 0, b->file[cat].head.block_size);
553     p->bytes = (char*)p->buf + b->file[cat].head.block_offset;
554     p->leaf = leaf;
555     p->size = 0;
556     p->dirty = 1;
557     p->deleted = 0;
558     p->offset = 0;
559     p->no_items = 0;
560     p->decodeClientData = (*b->method->codec.start)();
561     return p;
562 }
563
564 struct ISAMB_block *new_leaf (ISAMB b, int cat)
565 {
566     return new_block (b, 1, cat);
567 }
568
569
570 struct ISAMB_block *new_int (ISAMB b, int cat)
571 {
572     return new_block (b, 0, cat);
573 }
574
575 static void check_block (ISAMB b, struct ISAMB_block *p)
576 {
577     assert(b); /* mostly to make the compiler shut up about unused b */
578     if (p->leaf)
579     {
580         ;
581     }
582     else
583     {
584         /* sanity check */
585         char *startp = p->bytes;
586         const char *src = startp;
587         char *endp = p->bytes + p->size;
588         ISAM_P pos;
589         void *c1 = (*b->method->codec.start)();
590             
591         decode_ptr(&src, &pos);
592         assert ((pos&CAT_MASK) == p->cat);
593         while (src != endp)
594         {
595 #if INT_ENCODE
596             char file_item_buf[DST_ITEM_MAX];
597             char *file_item = file_item_buf;
598             (*b->method->codec.reset)(c1);
599             (*b->method->codec.decode)(c1, &file_item, &src);
600 #else
601             zint item_len;
602             decode_item_len(&src, &item_len);
603             assert (item_len > 0 && item_len < 80);
604             src += item_len;
605 #endif
606             decode_ptr(&src, &pos);
607             if ((pos&CAT_MASK) != p->cat)
608             {
609                 assert ((pos&CAT_MASK) == p->cat);
610             }
611         }
612         (*b->method->codec.stop)(c1);
613     }
614 }
615
616 void close_block(ISAMB b, struct ISAMB_block *p)
617 {
618     if (!p)
619         return;
620     if (p->deleted)
621     {
622         yaz_log(b->log_freelist, "release block " ZINT_FORMAT " from freelist %d:" ZINT_FORMAT,
623                  p->pos, p->cat, p->pos/CAT_MAX);
624         memcpy (p->buf, &b->file[p->cat].head.free_list, sizeof(zint));
625         b->file[p->cat].head.free_list = p->pos;
626         if (!cache_block (b, p->pos, p->buf, 1))
627         {
628             yaz_log(b->log_io, "bf_write: close_block (deleted)");
629             bf_write(b->file[p->cat].bf, p->pos/CAT_MAX, 0, 0, p->buf);
630         }
631     }
632     else if (p->dirty)
633     {
634         int offset = b->file[p->cat].head.block_offset;
635         int size = p->size + offset;
636         char *dst =  (char*)p->buf + 3;
637         assert (p->size >= 0);
638         
639         /* memset becuase encode_ptr usually does not write all bytes */
640         memset(p->buf, 0, b->file[p->cat].head.block_offset);
641         p->buf[0] = p->leaf;
642         p->buf[1] = size & 255;
643         p->buf[2] = size >> 8;
644         encode_ptr(&dst, p->no_items);
645         check_block(b, p);
646         if (!cache_block (b, p->pos, p->buf, 1))
647         {
648             yaz_log(b->log_io, "bf_write: close_block");
649             bf_write(b->file[p->cat].bf, p->pos/CAT_MAX, 0, 0, p->buf);
650         }
651     }
652     (*b->method->codec.stop)(p->decodeClientData);
653     xfree(p->buf);
654     xfree(p);
655 }
656
657 int insert_sub (ISAMB b, struct ISAMB_block **p,
658                 void *new_item, int *mode,
659                 ISAMC_I *stream,
660                 struct ISAMB_block **sp,
661                 void *sub_item, int *sub_size,
662                 const void *max_item);
663
664 int insert_int (ISAMB b, struct ISAMB_block *p, void *lookahead_item,
665                 int *mode,
666                 ISAMC_I *stream, struct ISAMB_block **sp,
667                 void *split_item, int *split_size, const void *last_max_item)
668 {
669     char *startp = p->bytes;
670     const char *src = startp;
671     char *endp = p->bytes + p->size;
672     ISAM_P pos;
673     struct ISAMB_block *sub_p1 = 0, *sub_p2 = 0;
674     char sub_item[DST_ITEM_MAX];
675     int sub_size;
676     int more = 0;
677     zint diff_terms = 0;
678     void *c1 = (*b->method->codec.start)();
679
680     *sp = 0;
681
682     assert(p->size >= 0);
683     decode_ptr(&src, &pos);
684     while (src != endp)
685     {
686         int d;
687         const char *src0 = src;
688 #if INT_ENCODE
689         char file_item_buf[DST_ITEM_MAX];
690         char *file_item = file_item_buf;
691         (*b->method->codec.reset)(c1);
692         (*b->method->codec.decode)(c1, &file_item, &src);
693         d = (*b->method->compare_item)(file_item_buf, lookahead_item);
694         if (d > 0)
695         {
696             sub_p1 = open_block(b, pos);
697             assert (sub_p1);
698             diff_terms -= sub_p1->no_items;
699             more = insert_sub (b, &sub_p1, lookahead_item, mode,
700                                stream, &sub_p2, 
701                                sub_item, &sub_size, file_item_buf);
702             diff_terms += sub_p1->no_items;
703             src = src0;
704             break;
705         }
706 #else
707         zint item_len;
708         decode_item_len(&src, &item_len);
709         d = (*b->method->compare_item)(src, lookahead_item);
710         if (d > 0)
711         {
712             sub_p1 = open_block(b, pos);
713             assert (sub_p1);
714             diff_terms -= sub_p1->no_items;
715             more = insert_sub (b, &sub_p1, lookahead_item, mode,
716                                stream, &sub_p2, 
717                                sub_item, &sub_size, src);
718             diff_terms += sub_p1->no_items;
719             src = src0;
720             break;
721         }
722         src += item_len;
723 #endif
724         decode_ptr(&src, &pos);
725     }
726     if (!sub_p1)
727     {
728         /* we reached the end. So lookahead > last item */
729         sub_p1 = open_block(b, pos);
730         assert (sub_p1);
731         diff_terms -= sub_p1->no_items;
732         more = insert_sub (b, &sub_p1, lookahead_item, mode, stream, &sub_p2, 
733                            sub_item, &sub_size, last_max_item);
734         diff_terms += sub_p1->no_items;
735     }
736     if (sub_p2)
737         diff_terms += sub_p2->no_items;
738     if (diff_terms)
739     {
740         p->dirty = 1;
741         p->no_items += diff_terms;
742     }
743     if (sub_p2)
744     {
745         /* there was a split - must insert pointer in this one */
746         char dst_buf[DST_BUF_SIZE];
747         char *dst = dst_buf;
748 #if INT_ENCODE
749         const char *sub_item_ptr = sub_item;
750 #endif
751         assert (sub_size < 80 && sub_size > 1);
752
753         memcpy (dst, startp, src - startp);
754                 
755         dst += src - startp;
756
757 #if INT_ENCODE
758         (*b->method->codec.reset)(c1);
759         (*b->method->codec.encode)(c1, &dst, &sub_item_ptr);
760 #else
761         encode_item_len (&dst, sub_size);      /* sub length and item */
762         memcpy (dst, sub_item, sub_size);
763         dst += sub_size;
764 #endif
765
766         encode_ptr(&dst, sub_p2->pos);   /* pos */
767
768         if (endp - src)                   /* remaining data */
769         {
770             memcpy (dst, src, endp - src);
771             dst += endp - src;
772         }
773         p->size = dst - dst_buf;
774         assert (p->size >= 0);
775
776         if (p->size <= b->file[p->cat].head.block_max)
777         {
778             /* it fits OK in this block */
779             memcpy (startp, dst_buf, dst - dst_buf);
780
781             close_block(b, sub_p2);
782         }
783         else
784         {
785             /* must split _this_ block as well .. */
786             struct ISAMB_block *sub_p3;
787 #if INT_ENCODE
788             char file_item_buf[DST_ITEM_MAX];
789             char *file_item = file_item_buf;
790 #else
791             zint split_size_tmp;
792 #endif
793             zint no_items_first_half = 0;
794             int p_new_size;
795             const char *half;
796             src = dst_buf;
797             endp = dst;
798             
799             b->number_of_int_splits++;
800
801             p->dirty = 1;
802             close_block(b, sub_p2);
803
804             half = src + b->file[p->cat].head.block_size/2;
805             decode_ptr(&src, &pos);
806
807             if (b->enable_int_count)
808             {
809                 /* read sub block so we can get no_items for it */
810                 sub_p3 = open_block(b, pos);
811                 no_items_first_half += sub_p3->no_items;
812                 close_block(b, sub_p3);
813             }
814
815             while (src <= half)
816             {
817 #if INT_ENCODE
818                 file_item = file_item_buf;
819                 (*b->method->codec.reset)(c1);
820                 (*b->method->codec.decode)(c1, &file_item, &src);
821 #else
822                 decode_item_len(&src, &split_size_tmp);
823                 *split_size = (int) split_size_tmp;
824                 src += *split_size;
825 #endif
826                 decode_ptr(&src, &pos);
827
828                 if (b->enable_int_count)
829                 {
830                     /* read sub block so we can get no_items for it */
831                     sub_p3 = open_block(b, pos);
832                     no_items_first_half += sub_p3->no_items;
833                     close_block(b, sub_p3);
834                 }
835             }
836             /*  p is first half */
837             p_new_size = src - dst_buf;
838             memcpy (p->bytes, dst_buf, p_new_size);
839
840 #if INT_ENCODE
841             file_item = file_item_buf;
842             (*b->method->codec.reset)(c1);
843             (*b->method->codec.decode)(c1, &file_item, &src);
844             *split_size = file_item - file_item_buf;
845             memcpy(split_item, file_item_buf, *split_size);
846 #else
847             decode_item_len(&src, &split_size_tmp);
848             *split_size = (int) split_size_tmp;
849             memcpy (split_item, src, *split_size);
850             src += *split_size;
851 #endif
852             /*  *sp is second half */
853             *sp = new_int (b, p->cat);
854             (*sp)->size = endp - src;
855             memcpy ((*sp)->bytes, src, (*sp)->size);
856
857             p->size = p_new_size;
858
859             /*  adjust no_items in first&second half */
860             (*sp)->no_items = p->no_items - no_items_first_half;
861             p->no_items = no_items_first_half;
862         }
863         p->dirty = 1;
864     }
865     close_block(b, sub_p1);
866     (*b->method->codec.stop)(c1);
867     return more;
868 }
869
870 int insert_leaf (ISAMB b, struct ISAMB_block **sp1, void *lookahead_item,
871                  int *lookahead_mode, ISAMC_I *stream,
872                  struct ISAMB_block **sp2,
873                  void *sub_item, int *sub_size,
874                  const void *max_item)
875 {
876     struct ISAMB_block *p = *sp1;
877     char *endp = 0;
878     const char *src = 0;
879     char dst_buf[DST_BUF_SIZE], *dst = dst_buf;
880     int new_size;
881     void *c1 = (*b->method->codec.start)();
882     void *c2 = (*b->method->codec.start)();
883     int more = 1;
884     int quater = b->file[b->no_cat-1].head.block_max / 4;
885     char *mid_cut = dst_buf + quater * 2;
886     char *tail_cut = dst_buf + quater * 3;
887     char *maxp = dst_buf + b->file[b->no_cat-1].head.block_max;
888     char *half1 = 0;
889     char *half2 = 0;
890     char cut_item_buf[DST_ITEM_MAX];
891     int cut_item_size = 0;
892     int no_items = 0;    /* number of items (total) */
893     int no_items_1 = 0;  /* number of items (first half) */
894     int inserted_dst_bytes = 0;
895
896     if (p && p->size)
897     {
898         char file_item_buf[DST_ITEM_MAX];
899         char *file_item = file_item_buf;
900             
901         src = p->bytes;
902         endp = p->bytes + p->size;
903         (*b->method->codec.decode)(c1, &file_item, &src);
904         while (1)
905         {
906             const char *dst_item = 0; /* resulting item to be inserted */
907             char *lookahead_next;
908             char *dst_0 = dst;
909             int d = -1;
910             
911             if (lookahead_item)
912                 d = (*b->method->compare_item)(file_item_buf, lookahead_item);
913             
914             /* d now holds comparison between existing file item and
915                lookahead item 
916                d = 0: equal
917                d > 0: lookahead before file
918                d < 0: lookahead after file
919             */
920             if (d > 0)
921             {
922                 /* lookahead must be inserted */
923                 dst_item = lookahead_item;
924                 /* if this is not an insertion, it's really bad .. */
925                 if (!*lookahead_mode)
926                 {
927                     yaz_log(YLOG_WARN, "isamb: Inconsistent register (1)");
928                     assert(*lookahead_mode);
929                 }
930             }
931             else
932                 dst_item = file_item_buf;
933
934             if (!*lookahead_mode && d == 0)
935             {
936                 /* it's a deletion and they match so there is nothing to be
937                    inserted anyway .. But mark the thing bad (file item
938                    was part of input.. The item will not be part of output */
939                 p->dirty = 1;
940             }
941             else if (!half1 && dst > mid_cut)
942             {
943                 /* we have reached the splitting point for the first time */
944                 const char *dst_item_0 = dst_item;
945                 half1 = dst; /* candidate for splitting */
946
947                 /* encode the resulting item */
948                 (*b->method->codec.encode)(c2, &dst, &dst_item);
949                 
950                 cut_item_size = dst_item - dst_item_0;
951                 assert(cut_item_size > 0);
952                 memcpy (cut_item_buf, dst_item_0, cut_item_size);
953                 
954                 half2 = dst;
955                 no_items_1 = no_items;
956                 no_items++;
957             }
958             else
959             {
960                 /* encode the resulting item */
961                 (*b->method->codec.encode)(c2, &dst, &dst_item);
962                 no_items++;
963             }
964
965             /* now move "pointers" .. result has been encoded .. */
966             if (d > 0)  
967             {
968                 /* we must move the lookahead pointer */
969
970                 inserted_dst_bytes += (dst - dst_0);
971                 if (inserted_dst_bytes >=  quater)
972                     /* no more room. Mark lookahead as "gone".. */
973                     lookahead_item = 0;
974                 else
975                 {
976                     /* move it really.. */
977                     lookahead_next = lookahead_item;
978                     if (!(*stream->read_item)(stream->clientData,
979                                               &lookahead_next,
980                                               lookahead_mode))
981                     {
982                         /* end of stream reached: no "more" and no lookahead */
983                         lookahead_item = 0;
984                         more = 0;
985                     }
986                     if (lookahead_item && max_item &&
987                         (*b->method->compare_item)(max_item, lookahead_item) <= 0)
988                     {
989                         /* the lookahead goes beyond what we allow in this
990                            leaf. Mark it as "gone" */
991                         lookahead_item = 0;
992                     }
993                     
994                     p->dirty = 1;
995                 }
996             }
997             else if (d == 0)
998             {
999                 /* exact match .. move both pointers */
1000
1001                 lookahead_next = lookahead_item;
1002                 if (!(*stream->read_item)(stream->clientData,
1003                                           &lookahead_next, lookahead_mode))
1004                 {
1005                     lookahead_item = 0;
1006                     more = 0;
1007                 }
1008                 if (src == endp)
1009                     break;  /* end of file stream reached .. */
1010                 file_item = file_item_buf; /* move file pointer */
1011                 (*b->method->codec.decode)(c1, &file_item, &src);
1012             }
1013             else
1014             {
1015                 /* file pointer must be moved */
1016                 if (src == endp)
1017                     break;
1018                 file_item = file_item_buf;
1019                 (*b->method->codec.decode)(c1, &file_item, &src);
1020             }
1021         }
1022     }
1023
1024     /* this loop runs when we are "appending" to a leaf page. That is
1025        either it's empty (new) or all file items have been read in
1026        previous loop */
1027
1028     maxp = dst_buf + b->file[b->no_cat-1].head.block_max + quater;
1029     while (lookahead_item)
1030     {
1031         char *dst_item;
1032         const char *src = lookahead_item;
1033         char *dst_0 = dst;
1034         
1035         /* if we have a lookahead item, we stop if we exceed the value of it */
1036         if (max_item &&
1037             (*b->method->compare_item)(max_item, lookahead_item) <= 0)
1038         {
1039             /* stop if we have reached the value of max item */
1040             break;
1041         }
1042         if (!*lookahead_mode)
1043         {
1044             /* this is append. So a delete is bad */
1045             yaz_log(YLOG_WARN, "isamb: Inconsistent register (2)");
1046             assert(*lookahead_mode);
1047         }
1048         else if (!half1 && dst > tail_cut)
1049         {
1050             const char *src_0 = src;
1051             half1 = dst; /* candidate for splitting */
1052             
1053             (*b->method->codec.encode)(c2, &dst, &src);
1054             
1055             cut_item_size = src - src_0;
1056             assert(cut_item_size > 0);
1057             memcpy (cut_item_buf, src_0, cut_item_size);
1058             
1059             no_items_1 = no_items;
1060             half2 = dst;
1061         }
1062         else
1063             (*b->method->codec.encode)(c2, &dst, &src);
1064
1065         if (dst > maxp)
1066         {
1067             dst = dst_0;
1068             break;
1069         }
1070         no_items++;
1071         if (p)
1072             p->dirty = 1;
1073         dst_item = lookahead_item;
1074         if (!(*stream->read_item)(stream->clientData, &dst_item,
1075                                   lookahead_mode))
1076         {
1077             lookahead_item = 0;
1078             more = 0;
1079         }
1080     }
1081     new_size = dst - dst_buf;
1082     if (p && p->cat != b->no_cat-1 && 
1083         new_size > b->file[p->cat].head.block_max)
1084     {
1085         /* non-btree block will be removed */
1086         p->deleted = 1;
1087         close_block(b, p);
1088         /* delete it too!! */
1089         p = 0; /* make a new one anyway */
1090     }
1091     if (!p)
1092     {   /* must create a new one */
1093         int i;
1094         for (i = 0; i < b->no_cat; i++)
1095             if (new_size <= b->file[i].head.block_max)
1096                 break;
1097         if (i == b->no_cat)
1098             i = b->no_cat - 1;
1099         p = new_leaf (b, i);
1100     }
1101     if (new_size > b->file[p->cat].head.block_max)
1102     {
1103         char *first_dst;
1104         const char *cut_item = cut_item_buf;
1105
1106         assert (half1);
1107         assert (half2);
1108
1109         assert(cut_item_size > 0);
1110         
1111         /* first half */
1112         p->size = half1 - dst_buf;
1113         assert(p->size <=  b->file[p->cat].head.block_max);
1114         memcpy (p->bytes, dst_buf, half1 - dst_buf);
1115         p->no_items = no_items_1;
1116
1117         /* second half */
1118         *sp2 = new_leaf (b, p->cat);
1119
1120         (*b->method->codec.reset)(c2);
1121
1122         b->number_of_leaf_splits++;
1123
1124         first_dst = (*sp2)->bytes;
1125
1126         (*b->method->codec.encode)(c2, &first_dst, &cut_item);
1127
1128         memcpy (first_dst, half2, dst - half2);
1129
1130         (*sp2)->size = (first_dst - (*sp2)->bytes) + (dst - half2);
1131         assert((*sp2)->size <=  b->file[p->cat].head.block_max);
1132         (*sp2)->no_items = no_items - no_items_1;
1133         (*sp2)->dirty = 1;
1134         p->dirty = 1;
1135         memcpy (sub_item, cut_item_buf, cut_item_size);
1136         *sub_size = cut_item_size;
1137     }
1138     else
1139     {
1140         memcpy (p->bytes, dst_buf, dst - dst_buf);
1141         p->size = new_size;
1142         p->no_items = no_items;
1143     }
1144     (*b->method->codec.stop)(c1);
1145     (*b->method->codec.stop)(c2);
1146     *sp1 = p;
1147     return more;
1148 }
1149
1150 int insert_sub (ISAMB b, struct ISAMB_block **p, void *new_item,
1151                 int *mode,
1152                 ISAMC_I *stream,
1153                 struct ISAMB_block **sp,
1154                 void *sub_item, int *sub_size,
1155                 const void *max_item)
1156 {
1157     if (!*p || (*p)->leaf)
1158         return insert_leaf (b, p, new_item, mode, stream, sp, sub_item, 
1159                             sub_size, max_item);
1160     else
1161         return insert_int (b, *p, new_item, mode, stream, sp, sub_item,
1162                            sub_size, max_item);
1163 }
1164
1165 int isamb_unlink (ISAMB b, ISAM_P pos)
1166 {
1167     struct ISAMB_block *p1;
1168
1169     if (!pos)
1170         return 0;
1171     p1 = open_block(b, pos);
1172     p1->deleted = 1;
1173     if (!p1->leaf)
1174     {
1175         zint sub_p;
1176         const char *src = p1->bytes + p1->offset;
1177 #if INT_ENCODE
1178         void *c1 = (*b->method->codec.start)();
1179 #endif
1180         decode_ptr(&src, &sub_p);
1181         isamb_unlink(b, sub_p);
1182         
1183         while (src != p1->bytes + p1->size)
1184         {
1185 #if INT_ENCODE
1186             char file_item_buf[DST_ITEM_MAX];
1187             char *file_item = file_item_buf;
1188             (*b->method->codec.reset)(c1);
1189             (*b->method->codec.decode)(c1, &file_item, &src);
1190 #else
1191             zint item_len;
1192             decode_item_len(&src, &item_len);
1193             src += item_len;
1194 #endif
1195             decode_ptr(&src, &sub_p);
1196             isamb_unlink(b, sub_p);
1197         }
1198 #if INT_ENCODE
1199         (*b->method->codec.stop)(c1);
1200 #endif
1201     }
1202     close_block(b, p1);
1203     return 0;
1204 }
1205
1206 void isamb_merge(ISAMB b, ISAM_P *pos, ISAMC_I *stream)
1207 {
1208     char item_buf[DST_ITEM_MAX];
1209     char *item_ptr;
1210     int i_mode;
1211     int more;
1212     int must_delete = 0;
1213
1214     if (b->cache < 0)
1215     {
1216         int more = 1;
1217         while (more)
1218         {
1219             item_ptr = item_buf;
1220             more =
1221                 (*stream->read_item)(stream->clientData, &item_ptr, &i_mode);
1222         }
1223         *pos = 1;
1224         return;
1225     }
1226     item_ptr = item_buf;
1227     more = (*stream->read_item)(stream->clientData, &item_ptr, &i_mode);
1228     while (more)
1229     {
1230         struct ISAMB_block *p = 0, *sp = 0;
1231         char sub_item[DST_ITEM_MAX];
1232         int sub_size;
1233         
1234         if (*pos)
1235             p = open_block(b, *pos);
1236         more = insert_sub (b, &p, item_buf, &i_mode, stream, &sp,
1237                            sub_item, &sub_size, 0);
1238         if (sp)
1239         {    /* increase level of tree by one */
1240             struct ISAMB_block *p2 = new_int (b, p->cat);
1241             char *dst = p2->bytes + p2->size;
1242 #if INT_ENCODE
1243             void *c1 = (*b->method->codec.start)();
1244             const char *sub_item_ptr = sub_item;
1245 #endif
1246
1247             encode_ptr(&dst, p->pos);
1248             assert (sub_size < 80 && sub_size > 1);
1249 #if INT_ENCODE
1250             (*b->method->codec.reset)(c1);
1251             (*b->method->codec.encode)(c1, &dst, &sub_item_ptr);
1252 #else
1253             encode_item_len (&dst, sub_size);
1254             memcpy (dst, sub_item, sub_size);
1255             dst += sub_size;
1256 #endif
1257             encode_ptr(&dst, sp->pos);
1258             
1259             p2->size = dst - p2->bytes;
1260             p2->no_items = p->no_items + sp->no_items;
1261             *pos = p2->pos;  /* return new super page */
1262             close_block(b, sp);
1263             close_block(b, p2);
1264 #if INT_ENCODE
1265             (*b->method->codec.stop)(c1);
1266 #endif
1267         }
1268         else
1269         {
1270             *pos = p->pos;   /* return current one (again) */
1271         }
1272         if (p->no_items == 0)
1273             must_delete = 1;
1274         else
1275             must_delete = 0;
1276         close_block(b, p);
1277     }
1278     if (must_delete)
1279     {
1280         isamb_unlink(b, *pos);
1281         *pos = 0;
1282     }
1283 }
1284
1285 ISAMB_PP isamb_pp_open_x(ISAMB isamb, ISAM_P pos, int *level, int scope)
1286 {
1287     ISAMB_PP pp = xmalloc(sizeof(*pp));
1288     int i;
1289
1290     assert(pos);
1291
1292     pp->isamb = isamb;
1293     pp->block = xmalloc(ISAMB_MAX_LEVEL * sizeof(*pp->block));
1294
1295     pp->pos = pos;
1296     pp->level = 0;
1297     pp->maxlevel = 0;
1298     pp->total_size = 0;
1299     pp->no_blocks = 0;
1300     pp->skipped_numbers = 0;
1301     pp->returned_numbers = 0;
1302     pp->scope = scope;
1303     for (i = 0; i<ISAMB_MAX_LEVEL; i++)
1304         pp->skipped_nodes[i] = pp->accessed_nodes[i] = 0;
1305     while (1)
1306     {
1307         struct ISAMB_block *p = open_block(isamb, pos);
1308         const char *src = p->bytes + p->offset;
1309         pp->block[pp->level] = p;
1310
1311         pp->total_size += p->size;
1312         pp->no_blocks++;
1313         if (p->leaf)
1314             break;
1315         decode_ptr(&src, &pos);
1316         p->offset = src - p->bytes;
1317         pp->level++;
1318         pp->accessed_nodes[pp->level]++; 
1319     }
1320     pp->block[pp->level+1] = 0;
1321     pp->maxlevel = pp->level;
1322     if (level)
1323         *level = pp->level;
1324     return pp;
1325 }
1326
1327 ISAMB_PP isamb_pp_open (ISAMB isamb, ISAM_P pos, int scope)
1328 {
1329     return isamb_pp_open_x(isamb, pos, 0, scope);
1330 }
1331
1332 void isamb_pp_close_x(ISAMB_PP pp, zint *size, zint *blocks)
1333 {
1334     int i;
1335     if (!pp)
1336         return;
1337     yaz_log(YLOG_DEBUG, "isamb_pp_close lev=%d returned "ZINT_FORMAT" values, " 
1338                     "skipped "ZINT_FORMAT,
1339         pp->maxlevel, pp->skipped_numbers, pp->returned_numbers);
1340     for (i = pp->maxlevel; i>=0; i--)
1341         if (pp->skipped_nodes[i] || pp->accessed_nodes[i])
1342             yaz_log(YLOG_DEBUG, "isamb_pp_close  level leaf-%d: "
1343                             ZINT_FORMAT" read, "ZINT_FORMAT" skipped", i,
1344                  pp->accessed_nodes[i], pp->skipped_nodes[i]);
1345     pp->isamb->skipped_numbers += pp->skipped_numbers;
1346     pp->isamb->returned_numbers += pp->returned_numbers;
1347     for (i = pp->maxlevel; i>=0; i--)
1348     {
1349         pp->isamb->accessed_nodes[i] += pp->accessed_nodes[i];
1350         pp->isamb->skipped_nodes[i] += pp->skipped_nodes[i];
1351     }
1352     if (size)
1353         *size = pp->total_size;
1354     if (blocks)
1355         *blocks = pp->no_blocks;
1356     for (i = 0; i <= pp->level; i++)
1357         close_block(pp->isamb, pp->block[i]);
1358     xfree(pp->block);
1359     xfree(pp);
1360 }
1361
1362 int isamb_block_info (ISAMB isamb, int cat)
1363 {
1364     if (cat >= 0 && cat < isamb->no_cat)
1365         return isamb->file[cat].head.block_size;
1366     return -1;
1367 }
1368
1369 void isamb_pp_close (ISAMB_PP pp)
1370 {
1371     isamb_pp_close_x(pp, 0, 0);
1372 }
1373
1374 /* simple recursive dumper .. */
1375 static void isamb_dump_r (ISAMB b, ISAM_P pos, void (*pr)(const char *str),
1376                           int level)
1377 {
1378     char buf[1024];
1379     char prefix_str[1024];
1380     if (pos)
1381     {
1382         struct ISAMB_block *p = open_block(b, pos);
1383         sprintf(prefix_str, "%*s " ZINT_FORMAT " cat=%d size=%d max=%d items="
1384                 ZINT_FORMAT, level*2, "",
1385                 pos, p->cat, p->size, b->file[p->cat].head.block_max,
1386                 p->no_items);
1387         (*pr)(prefix_str);
1388         sprintf(prefix_str, "%*s " ZINT_FORMAT, level*2, "", pos);
1389         if (p->leaf)
1390         {
1391             while (p->offset < p->size)
1392             {
1393                 const char *src = p->bytes + p->offset;
1394                 char *dst = buf;
1395                 (*b->method->codec.decode)(p->decodeClientData, &dst, &src);
1396                 (*b->method->log_item)(YLOG_DEBUG, buf, prefix_str);
1397                 p->offset = src - (char*) p->bytes;
1398             }
1399             assert(p->offset == p->size);
1400         }
1401         else
1402         {
1403             const char *src = p->bytes + p->offset;
1404             ISAM_P sub;
1405
1406             decode_ptr(&src, &sub);
1407             p->offset = src - (char*) p->bytes;
1408
1409             isamb_dump_r(b, sub, pr, level+1);
1410             
1411             while (p->offset < p->size)
1412             {
1413 #if INT_ENCODE
1414                 char file_item_buf[DST_ITEM_MAX];
1415                 char *file_item = file_item_buf;
1416                 void *c1 = (*b->method->codec.start)();
1417                 (*b->method->codec.decode)(c1, &file_item, &src);
1418                 (*b->method->codec.stop)(c1);
1419                 (*b->method->log_item)(YLOG_DEBUG, file_item_buf, prefix_str);
1420 #else
1421                 zint item_len;
1422                 decode_item_len(&src, &item_len);
1423                 (*b->method->log_item)(YLOG_DEBUG, src, prefix_str);
1424                 src += item_len;
1425 #endif
1426                 decode_ptr(&src, &sub);
1427                 
1428                 p->offset = src - (char*) p->bytes;
1429                 
1430                 isamb_dump_r(b, sub, pr, level+1);
1431             }           
1432         }
1433         close_block(b, p);
1434     }
1435 }
1436
1437 void isamb_dump(ISAMB b, ISAM_P pos, void (*pr)(const char *str))
1438 {
1439     isamb_dump_r(b, pos, pr, 0);
1440 }
1441
1442 int isamb_pp_read(ISAMB_PP pp, void *buf)
1443 {
1444     return isamb_pp_forward(pp, buf, 0);
1445 }
1446
1447
1448 static int isamb_pp_on_right_node(ISAMB_PP pp, int level, const void *untilbuf)
1449 { /* looks one node higher to see if we should be on this node at all */
1450   /* useful in backing off quickly, and in avoiding tail descends */
1451   /* call with pp->level to begin with */
1452     struct ISAMB_block *p;
1453     int cmp;
1454     const char *src;
1455     ISAMB b = pp->isamb;
1456
1457     assert(level >= 0);
1458     if (level == 0)
1459     {
1460 #if ISAMB_DEBUG
1461             yaz_log(YLOG_DEBUG, "isamb_pp_on_right returning true for root");
1462 #endif
1463         return 1; /* we can never skip the root node */
1464     }
1465     level--;
1466     p = pp->block[level];
1467     assert(p->offset <= p->size);
1468     if (p->offset < p->size)
1469     {
1470 #if INT_ENCODE
1471         char file_item_buf[DST_ITEM_MAX];
1472         char *file_item = file_item_buf;
1473         void *c1 = (*b->method->codec.start)();
1474         assert(p->offset > 0); 
1475         src = p->bytes + p->offset;
1476         (*b->method->codec.decode)(c1, &file_item, &src);
1477         (*b->method->codec.stop)(c1);
1478         cmp = (*b->method->compare_item)(untilbuf, file_item_buf);
1479 #else
1480         zint item_len;
1481         assert(p->offset > 0); 
1482         src = p->bytes + p->offset;
1483         decode_item_len(&src, &item_len);
1484 #if ISAMB_DEBUG
1485         (*b->method->codec.log_item)(YLOG_DEBUG, untilbuf, "on_leaf: until");
1486         (*b->method->codec.log_item)(YLOG_DEBUG, src, "on_leaf: value");
1487 #endif
1488         cmp = (*b->method->compare_item)(untilbuf, src);
1489 #endif
1490         if (cmp < pp->scope)
1491         {  /* cmp<2 */
1492 #if ISAMB_DEBUG
1493             yaz_log(YLOG_DEBUG, "isamb_pp_on_right returning true "
1494                             "cmp=%d lev=%d ofs=%d", cmp, level, p->offset);
1495 #endif
1496             return 1; 
1497         }
1498         else
1499         {
1500 #if ISAMB_DEBUG
1501             yaz_log(YLOG_DEBUG, "isamb_pp_on_right returning false "
1502                             "cmp=%d lev=%d ofs=%d", cmp, level, p->offset);
1503 #endif
1504             return 0; 
1505         }
1506     }
1507     else {
1508 #if ISAMB_DEBUG
1509         yaz_log(YLOG_DEBUG, "isamb_pp_on_right at tail, looking higher "
1510                         "lev=%d", level);
1511 #endif
1512         return isamb_pp_on_right_node(pp, level, untilbuf);
1513     }
1514 } /* isamb_pp_on_right_node */
1515
1516 static int isamb_pp_read_on_leaf(ISAMB_PP pp, void *buf)
1517
1518     /* reads the next item on the current leaf, returns 0 if end of leaf*/
1519     struct ISAMB_block *p = pp->block[pp->level];
1520     char *dst;
1521     const char *src;
1522     assert(pp);
1523     assert(buf);
1524     if (p->offset == p->size)
1525     {
1526 #if ISAMB_DEBUG
1527         yaz_log(YLOG_DEBUG, "isamb_pp_read_on_leaf returning 0 on "
1528                 "node %d", p->pos);
1529 #endif
1530         return 0; /* at end of leaf */
1531     }
1532     src = p->bytes + p->offset;
1533     dst = buf;
1534     (*pp->isamb->method->codec.decode)(p->decodeClientData, &dst, &src);
1535     p->offset = src - (char*) p->bytes;
1536 #if ISAMB_DEBUG
1537     (*pp->isamb->method->codec.log_item)(YLOG_DEBUG, buf,
1538                                          "read_on_leaf returning 1");
1539 #endif
1540     pp->returned_numbers++;
1541     return 1;
1542 } /* read_on_leaf */
1543
1544 static int isamb_pp_forward_on_leaf(ISAMB_PP pp, void *buf, const void *untilbuf)
1545 { /* forwards on the current leaf, returns 0 if not found */
1546     int cmp;
1547     int skips = 0;
1548     while (1)
1549     {
1550         if (!isamb_pp_read_on_leaf(pp, buf))
1551             return 0;
1552         /* FIXME - this is an extra function call, inline the read? */
1553         cmp=(*pp->isamb->method->compare_item)(untilbuf, buf);
1554         if (cmp <pp->scope)
1555         {  /* cmp<2  found a good one */
1556 #if ISAMB_DEBUG
1557             if (skips)
1558                 yaz_log(YLOG_DEBUG, "isam_pp_fwd_on_leaf skipped %d items", skips);
1559 #endif
1560             pp->returned_numbers++;
1561             return 1;
1562         }
1563         if (!skips)
1564             if (!isamb_pp_on_right_node(pp, pp->level, untilbuf))
1565                 return 0; /* never mind the rest of this leaf */
1566         pp->skipped_numbers++;
1567         skips++;
1568     }
1569 } /* forward_on_leaf */
1570
1571 static int isamb_pp_climb_level(ISAMB_PP pp, ISAM_P *pos)
1572 { /* climbs higher in the tree, until finds a level with data left */
1573   /* returns the node to (consider to) descend to in *pos) */
1574     struct ISAMB_block *p = pp->block[pp->level];
1575     const char *src;
1576 #if ISAMB_DEBUG
1577     yaz_log(YLOG_DEBUG, "isamb_pp_climb_level starting "
1578                    "at level %d node %d ofs=%d sz=%d",
1579                     pp->level, p->pos, p->offset, p->size);
1580 #endif
1581     assert(pp->level >= 0);
1582     assert(p->offset <= p->size);
1583     if (pp->level==0)
1584     {
1585 #if ISAMB_DEBUG
1586         yaz_log(YLOG_DEBUG, "isamb_pp_climb_level returning 0 at root");
1587 #endif
1588         return 0;
1589     }
1590     assert(pp->level>0); 
1591     close_block(pp->isamb, pp->block[pp->level]);
1592     pp->block[pp->level] = 0;
1593     (pp->level)--;
1594     p = pp->block[pp->level];
1595 #if ISAMB_DEBUG
1596     yaz_log(YLOG_DEBUG, "isamb_pp_climb_level climbed to level %d node %d ofs=%d",
1597                     pp->level, p->pos, p->offset);
1598 #endif
1599     assert(!p->leaf);
1600     assert(p->offset <= p->size);
1601     if (p->offset == p->size)
1602     {
1603         /* we came from the last pointer, climb on */
1604         if (!isamb_pp_climb_level(pp, pos))
1605             return 0;
1606         p = pp->block[pp->level];
1607     }
1608     else
1609     {
1610 #if INT_ENCODE
1611         char file_item_buf[DST_ITEM_MAX];
1612         char *file_item = file_item_buf;
1613         ISAMB b = pp->isamb;
1614         void *c1 = (*b->method->codec.start)();
1615 #else
1616         zint item_len;
1617 #endif
1618         /* skip the child we just came from */
1619 #if ISAMB_DEBUG
1620         yaz_log(YLOG_DEBUG, "isam_pp_climb_level: skipping lev=%d ofs=%d sz=%d", 
1621                         pp->level, p->offset, p->size);
1622 #endif
1623         assert (p->offset < p->size);
1624         src = p->bytes + p->offset;
1625 #if INT_ENCODE
1626         (*b->method->codec.decode)(c1, &file_item, &src);
1627         (*b->method->codec.stop)(c1);
1628 #else
1629         decode_item_len(&src, &item_len);
1630         src += item_len;
1631 #endif
1632         decode_ptr(&src, pos);
1633         p->offset = src - (char *)p->bytes;
1634             
1635     }
1636     return 1;
1637 } /* climb_level */
1638
1639
1640 static zint isamb_pp_forward_unode(ISAMB_PP pp, zint pos, const void *untilbuf)
1641 { /* scans a upper node until it finds a child <= untilbuf */
1642   /* pp points to the key value, as always. pos is the child read from */
1643   /* the buffer */
1644   /* if all values are too small, returns the last child in the node */
1645   /* FIXME - this can be detected, and avoided by looking at the */
1646   /* parent node, but that gets messy. Presumably the cost is */
1647   /* pretty low anyway */
1648     ISAMB b = pp->isamb;
1649     struct ISAMB_block *p = pp->block[pp->level];
1650     const char *src = p->bytes + p->offset;
1651     int cmp;
1652     zint nxtpos;
1653 #if ISAMB_DEBUG
1654     int skips = 0;
1655     yaz_log(YLOG_DEBUG, "isamb_pp_forward_unode starting "
1656                    "at level %d node %d ofs=%di sz=%d",
1657                     pp->level, p->pos, p->offset, p->size);
1658 #endif
1659     assert(!p->leaf);
1660     assert(p->offset <= p->size);
1661     if (p->offset == p->size)
1662     {
1663 #if ISAMB_DEBUG
1664             yaz_log(YLOG_DEBUG, "isamb_pp_forward_unode returning at end "
1665                    "at level %d node %d ofs=%di sz=%d",
1666                     pp->level, p->pos, p->offset, p->size);
1667 #endif
1668         return pos; /* already at the end of it */
1669     }
1670     while(p->offset < p->size)
1671     {
1672 #if INT_ENCODE
1673         char file_item_buf[DST_ITEM_MAX];
1674         char *file_item = file_item_buf;
1675         void *c1 = (*b->method->codec.start)();
1676         (*b->method->codec.decode)(c1, &file_item, &src);
1677         (*b->method->codec.stop)(c1);
1678         cmp = (*b->method->compare_item)(untilbuf, file_item_buf);
1679 #else
1680         zint item_len;
1681         decode_item_len(&src, &item_len);
1682         cmp = (*b->method->compare_item)(untilbuf, src);
1683         src += item_len;
1684 #endif
1685         decode_ptr(&src, &nxtpos);
1686         if (cmp<pp->scope) /* cmp<2 */
1687         {
1688 #if ISAMB_DEBUG
1689             yaz_log(YLOG_DEBUG, "isamb_pp_forward_unode returning a hit "
1690                    "at level %d node %d ofs=%d sz=%d",
1691                     pp->level, p->pos, p->offset, p->size);
1692 #endif
1693             return pos;
1694         } /* found one */
1695         pos = nxtpos;
1696         p->offset = src-(char*)p->bytes;
1697         (pp->skipped_nodes[pp->maxlevel - pp->level -1])++;
1698 #if ISAMB_DEBUG
1699         skips++;
1700 #endif
1701     }
1702 #if ISAMB_DEBUG
1703             yaz_log(YLOG_DEBUG, "isamb_pp_forward_unode returning at tail "
1704                    "at level %d node %d ofs=%d sz=%d skips=%d",
1705                     pp->level, p->pos, p->offset, p->size, skips);
1706 #endif
1707     return pos; /* that's the last one in the line */
1708     
1709 } /* forward_unode */
1710
1711 static void isamb_pp_descend_to_leaf(ISAMB_PP pp, ISAM_P pos,
1712                                      const void *untilbuf)
1713 { /* climbs down the tree, from pos, to the leftmost leaf */
1714     struct ISAMB_block *p = pp->block[pp->level];
1715     const char *src;
1716     assert(!p->leaf);
1717 #if ISAMB_DEBUG
1718     yaz_log(YLOG_DEBUG, "isamb_pp_descend_to_leaf "
1719                    "starting at lev %d node %d ofs=%d lf=%d u=%p", 
1720                    pp->level, p->pos, p->offset, p->leaf, untilbuf);
1721 #endif
1722     if (untilbuf)
1723         pos = isamb_pp_forward_unode(pp, pos, untilbuf);
1724     ++(pp->level);
1725     assert(pos);
1726     p = open_block(pp->isamb, pos);
1727     pp->block[pp->level] = p;
1728     ++(pp->accessed_nodes[pp->maxlevel-pp->level]);
1729     ++(pp->no_blocks);
1730 #if ISAMB_DEBUG
1731     yaz_log(YLOG_DEBUG, "isamb_pp_descend_to_leaf "
1732                    "got lev %d node %d lf=%d", 
1733                    pp->level, p->pos, p->leaf);
1734 #endif
1735     if (p->leaf)
1736         return;
1737     assert (p->offset==0);
1738     src = p->bytes + p->offset;
1739     decode_ptr(&src, &pos);
1740     p->offset = src-(char*)p->bytes;
1741     isamb_pp_descend_to_leaf(pp, pos, untilbuf);
1742 #if ISAMB_DEBUG
1743     yaz_log(YLOG_DEBUG, "isamb_pp_descend_to_leaf "
1744                    "returning at lev %d node %d ofs=%d lf=%d", 
1745                    pp->level, p->pos, p->offset, p->leaf);
1746 #endif
1747 } /* descend_to_leaf */
1748
1749 static int isamb_pp_find_next_leaf(ISAMB_PP pp)
1750 { /* finds the next leaf by climbing up and down */
1751     ISAM_P pos;
1752     if (!isamb_pp_climb_level(pp, &pos))
1753         return 0;
1754     isamb_pp_descend_to_leaf(pp, pos, 0);
1755     return 1;
1756 }
1757
1758 static int isamb_pp_climb_desc(ISAMB_PP pp,  const void *untilbuf)
1759 { /* climbs up and descends to a leaf where values >= *untilbuf are found */
1760     ISAM_P pos;
1761 #if ISAMB_DEBUG
1762     struct ISAMB_block *p = pp->block[pp->level];
1763     yaz_log(YLOG_DEBUG, "isamb_pp_climb_desc starting "
1764                    "at level %d node %d ofs=%d sz=%d",
1765                     pp->level, p->pos, p->offset, p->size);
1766 #endif
1767     if (!isamb_pp_climb_level(pp, &pos))
1768         return 0;
1769     /* see if it would pay to climb one higher */
1770     if (!isamb_pp_on_right_node(pp, pp->level, untilbuf))
1771         if (!isamb_pp_climb_level(pp, &pos))
1772             return 0;
1773     isamb_pp_descend_to_leaf(pp, pos, untilbuf);
1774 #if ISAMB_DEBUG
1775     p = pp->block[pp->level];
1776     yaz_log(YLOG_DEBUG, "isamb_pp_climb_desc done "
1777                    "at level %d node %d ofs=%d sz=%d",
1778                     pp->level, p->pos, p->offset, p->size);
1779 #endif
1780     return 1;
1781 } /* climb_desc */
1782
1783 int isamb_pp_forward (ISAMB_PP pp, void *buf, const void *untilbuf)
1784 {
1785 #if ISAMB_DEBUG
1786     struct ISAMB_block *p = pp->block[pp->level];
1787     assert(p->leaf);
1788     yaz_log(YLOG_DEBUG, "isamb_pp_forward starting "
1789                    "at level %d node %d ofs=%d sz=%d u=%p sc=%d",
1790                     pp->level, p->pos, p->offset, p->size, untilbuf, scope);
1791 #endif
1792     if (untilbuf)
1793     {
1794         if (isamb_pp_forward_on_leaf(pp, buf, untilbuf))
1795         {
1796 #if ISAMB_DEBUG
1797             yaz_log(YLOG_DEBUG, "isamb_pp_forward (f) returning (A) "
1798                    "at level %d node %d ofs=%d sz=%d",
1799                     pp->level, p->pos, p->offset, p->size);
1800 #endif
1801             return 1;
1802         }
1803         if (! isamb_pp_climb_desc(pp, untilbuf))
1804         {
1805 #if ISAMB_DEBUG
1806             yaz_log(YLOG_DEBUG, "isamb_pp_forward (f) returning notfound (B) "
1807                    "at level %d node %d ofs=%d sz=%d",
1808                     pp->level, p->pos, p->offset, p->size);
1809 #endif
1810             return 0; /* could not find a leaf */
1811         }
1812         do {
1813             if (isamb_pp_forward_on_leaf(pp, buf, untilbuf))
1814             {
1815 #if ISAMB_DEBUG
1816                 yaz_log(YLOG_DEBUG, "isamb_pp_forward (f) returning (c) "
1817                         "at level %d node %d ofs=%d sz=%d",
1818                         pp->level, p->pos, p->offset, p->size);
1819 #endif
1820                 return 1;
1821             }
1822         } while (isamb_pp_find_next_leaf(pp));
1823         return 0; /* could not find at all */
1824     }
1825     else { /* no untilbuf, a straight read */
1826         /* FIXME - this should be moved
1827          * directly into the pp_read */
1828         /* keeping here now, to keep same
1829          * interface as the old fwd */
1830         if (isamb_pp_read_on_leaf(pp, buf))
1831         {
1832 #if ISAMB_DEBUG
1833             yaz_log(YLOG_DEBUG, "isamb_pp_forward (read) returning (D) "
1834                    "at level %d node %d ofs=%d sz=%d",
1835                     pp->level, p->pos, p->offset, p->size);
1836 #endif
1837             return 1;
1838         }
1839         if (isamb_pp_find_next_leaf(pp))
1840         {
1841 #if ISAMB_DEBUG
1842             yaz_log(YLOG_DEBUG, "isamb_pp_forward (read) returning (E) "
1843                    "at level %d node %d ofs=%d sz=%d",
1844                     pp->level, p->pos, p->offset, p->size);
1845 #endif
1846             return isamb_pp_read_on_leaf(pp, buf);
1847         }
1848         else
1849             return 0;
1850     }
1851 } /* isam_pp_forward (new version) */
1852
1853 void isamb_pp_pos(ISAMB_PP pp, double *current, double *total)
1854 { /* return an estimate of the current position and of the total number of */
1855   /* occureences in the isam tree, based on the current leaf */
1856     assert(total);
1857     assert(current);
1858     
1859     /* if end-of-stream PP may not be leaf */
1860
1861     *total = (double) (pp->block[0]->no_items);
1862     *current = (double) pp->returned_numbers;
1863 #if ISAMB_DEBUG
1864     yaz_log(YLOG_LOG, "isamb_pp_pos returning: cur= %0.1f tot=%0.1f rn="
1865          ZINT_FORMAT, *current, *total, pp->returned_numbers);
1866 #endif
1867 }
1868
1869 int isamb_pp_forward2(ISAMB_PP pp, void *buf, const void *untilb)
1870 {
1871     char *dst = buf;
1872     const char *src;
1873     struct ISAMB_block *p = pp->block[pp->level];
1874     ISAMB b = pp->isamb;
1875     if (!p)
1876         return 0;
1877 again:
1878     while (p->offset == p->size)
1879     {
1880         ISAM_P pos;
1881 #if INT_ENCODE
1882         const char *src_0;
1883         void *c1;
1884         char file_item_buf[DST_ITEM_MAX];
1885         char *file_item = file_item_buf;
1886 #else
1887         zint item_len;
1888 #endif
1889         while (p->offset == p->size)
1890         {
1891             if (pp->level == 0)
1892                 return 0;
1893             close_block (pp->isamb, pp->block[pp->level]);
1894             pp->block[pp->level] = 0;
1895             (pp->level)--;
1896             p = pp->block[pp->level];
1897             assert (!p->leaf);  
1898         }
1899
1900         assert(!p->leaf);
1901         src = p->bytes + p->offset;
1902        
1903 #if INT_ENCODE
1904         c1 = (*b->method->codec.start)();
1905         (*b->method->codec.decode)(c1, &file_item, &src);
1906 #else
1907         decode_ptr (&src, &item_len);
1908         src += item_len;
1909 #endif        
1910         decode_ptr (&src, &pos);
1911         p->offset = src - (char*) p->bytes;
1912
1913         src = p->bytes + p->offset;
1914
1915         while(1)
1916         {
1917             if (!untilb || p->offset == p->size)
1918                 break;
1919             assert(p->offset < p->size);
1920 #if INT_ENCODE
1921             src_0 = src;
1922             file_item = file_item_buf;
1923             (*b->method->codec.reset)(c1);
1924             (*b->method->codec.decode)(c1, &file_item, &src);
1925             if ((*b->method->compare_item)(untilb, file_item_buf) <= 1)
1926             {
1927                 src = src_0;
1928                 break;
1929             }
1930 #else
1931             decode_item_len(&src, &item_len);
1932             if ((*b->method->compare_item)(untilb, src) <= 1)
1933                 break;
1934             src += item_len;
1935 #endif
1936             decode_ptr (&src, &pos);
1937             p->offset = src - (char*) p->bytes;
1938         }
1939
1940         pp->level++;
1941
1942         while (1)
1943         {
1944             pp->block[pp->level] = p = open_block (pp->isamb, pos);
1945
1946             pp->total_size += p->size;
1947             pp->no_blocks++;
1948             
1949             if (p->leaf) 
1950             {
1951                 break;
1952             }
1953             
1954             src = p->bytes + p->offset;
1955             while(1)
1956             {
1957                 decode_ptr (&src, &pos);
1958                 p->offset = src - (char*) p->bytes;
1959                 
1960                 if (!untilb || p->offset == p->size)
1961                     break;
1962                 assert(p->offset < p->size);
1963 #if INT_ENCODE
1964                 src_0 = src;
1965                 file_item = file_item_buf;
1966                 (*b->method->codec.reset)(c1);
1967                 (*b->method->codec.decode)(c1, &file_item, &src);
1968                 if ((*b->method->compare_item)(untilb, file_item_buf) <= 1)
1969                 {
1970                     src = src_0;
1971                     break;
1972                 }
1973 #else
1974                 decode_ptr (&src, &item_len);
1975                 if ((*b->method->compare_item)(untilb, src) <= 1)
1976                     break;
1977                 src += item_len;
1978 #endif
1979             }
1980             pp->level++;
1981         }
1982 #if INT_ENCODE
1983         (*b->method->codec.stop)(c1);
1984 #endif
1985     }
1986     assert (p->offset < p->size);
1987     assert (p->leaf);
1988     while(1)
1989     {
1990         char *dst0 = dst;
1991         src = p->bytes + p->offset;
1992         (*pp->isamb->method->codec.decode)(p->decodeClientData, &dst, &src);
1993         p->offset = src - (char*) p->bytes;
1994         if (!untilb || (*pp->isamb->method->compare_item)(untilb, dst0) <= 1)
1995             break;
1996         dst = dst0;
1997         if (p->offset == p->size) goto again;
1998     }
1999     return 1;
2000 }
2001
2002 zint isamb_get_int_splits(ISAMB b)
2003 {
2004     return b->number_of_int_splits;
2005 }
2006
2007 zint isamb_get_leaf_splits(ISAMB b)
2008 {
2009     return b->number_of_leaf_splits;
2010 }
2011
2012 /*
2013  * Local variables:
2014  * c-basic-offset: 4
2015  * indent-tabs-mode: nil
2016  * End:
2017  * vim: shiftwidth=4 tabstop=8 expandtab
2018  */
2019