dd2f84aecbaa257c6a761b2a6916b9674e31b032
[idzebra-moved-to-github.git] / isamb / isamb.c
1 /* $Id: isamb.c,v 1.84 2006-09-26 12:56:33 adam Exp $
2    Copyright (C) 1995-2006
3    Index Data ApS
4
5 This file is part of the Zebra server.
6
7 Zebra is free software; you can redistribute it and/or modify it under
8 the terms of the GNU General Public License as published by the Free
9 Software Foundation; either version 2, or (at your option) any later
10 version.
11
12 Zebra is distributed in the hope that it will be useful, but WITHOUT ANY
13 WARRANTY; without even the implied warranty of MERCHANTABILITY or
14 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
15 for more details.
16
17 You should have received a copy of the GNU General Public License
18 along with this program; if not, write to the Free Software
19 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
20
21 */
22
23 #include <stdlib.h>
24 #include <string.h>
25 #include <yaz/log.h>
26 #include <yaz/xmalloc.h>
27 #include <idzebra/isamb.h>
28 #include <assert.h>
29
30 #ifndef ISAMB_DEBUG
31 #define ISAMB_DEBUG 0
32 #endif
33
34
35 #define ISAMB_MAJOR_VERSION 3
36 #define ISAMB_MINOR_VERSION 0
37
38 struct ISAMB_head {
39     zint first_block;
40     zint last_block;
41     zint free_list;
42     zint no_items;
43     int block_size;
44     int block_max;
45     int block_offset;
46 };
47
48 /* if 1, upper nodes items are encoded; 0 if not encoded */
49 #define INT_ENCODE 1
50
51 /* maximum size of encoded buffer */
52 #define DST_ITEM_MAX 256
53
54 #define ISAMB_MAX_LEVEL 10
55 /* approx 2*max page + max size of item */
56 #define DST_BUF_SIZE 16840
57
58 #define ISAMB_CACHE_ENTRY_SIZE 4096
59
60 /* CAT_MAX: _must_ be power of 2 */
61 #define CAT_MAX 4
62 #define CAT_MASK (CAT_MAX-1)
63 /* CAT_NO: <= CAT_MAX */
64 #define CAT_NO 4
65
66 /* Smallest block size */
67 #define ISAMB_MIN_SIZE 32
68 /* Size factor */
69 #define ISAMB_FAC_SIZE 4
70
71 /* ISAMB_PTR_CODEC = 1 var, =0 fixed */
72 #define ISAMB_PTR_CODEC 1
73
74 struct ISAMB_cache_entry {
75     ISAM_P pos;
76     unsigned char *buf;
77     int dirty;
78     int hits;
79     struct ISAMB_cache_entry *next;
80 };
81
82 struct ISAMB_file {
83     BFile bf;
84     int head_dirty;
85     struct ISAMB_head head;
86     struct ISAMB_cache_entry *cache_entries;
87 };
88
89 struct ISAMB_s {
90     BFiles bfs;
91     ISAMC_M *method;
92
93     struct ISAMB_file *file;
94     int no_cat;
95     int cache; /* 0 = no cache, 1 = use cache, -1 = dummy isam (for testing only) */
96     int log_io;        /* log level for bf_read/bf_write calls */
97     int log_freelist;  /* log level for freelist handling */
98     zint skipped_numbers; /* on a leaf node */
99     zint returned_numbers; 
100     zint skipped_nodes[ISAMB_MAX_LEVEL]; /* [0]=skipped leaves, 1 = higher etc */
101     zint accessed_nodes[ISAMB_MAX_LEVEL]; /* nodes we did not skip */
102 };
103
104 struct ISAMB_block {
105     ISAM_P pos;
106     int cat;
107     int size;
108     int leaf;
109     int dirty;
110     int deleted;
111     int offset;
112     zint no_items;  /* number of nodes in this + children */
113     char *bytes;
114     char *cbuf;
115     unsigned char *buf;
116     void *decodeClientData;
117     int log_rw;
118 };
119
120 struct ISAMB_PP_s {
121     ISAMB isamb;
122     ISAM_P pos;
123     int level;
124     int maxlevel; /* total depth */
125     zint total_size;
126     zint no_blocks;
127     zint skipped_numbers; /* on a leaf node */
128     zint returned_numbers; 
129     zint skipped_nodes[ISAMB_MAX_LEVEL]; /* [0]=skipped leaves, 1 = higher etc */
130     zint accessed_nodes[ISAMB_MAX_LEVEL]; /* nodes we did not skip */
131     struct ISAMB_block **block;
132     int scope;  /* on what level we forward */
133 };
134
135
136 #define encode_item_len encode_ptr
137 #if ISAMB_PTR_CODEC
138 static void encode_ptr(char **dst, zint pos)
139 {
140     unsigned char *bp = (unsigned char*) *dst;
141
142     while (pos > 127)
143     {
144          *bp++ = (unsigned char) (128 | (pos & 127));
145          pos = pos >> 7;
146     }
147     *bp++ = (unsigned char) pos;
148     *dst = (char *) bp;
149 }
150 #else
151 static void encode_ptr(char **dst, zint pos)
152 {
153     memcpy(*dst, &pos, sizeof(pos));
154     (*dst) += sizeof(pos);
155 }
156 #endif
157
158 #define decode_item_len decode_ptr
159 #if ISAMB_PTR_CODEC
160 static void decode_ptr(const char **src, zint *pos)
161 {
162     zint d = 0;
163     unsigned char c;
164     unsigned r = 0;
165
166     while (((c = *(const unsigned char *)((*src)++)) & 128))
167     {
168         d += ((zint) (c & 127) << r);
169         r += 7;
170     }
171     d += ((zint) c << r);
172     *pos = d;
173 }
174 #else
175 static void decode_ptr(const char **src, zint *pos)
176 {
177      memcpy(pos, *src, sizeof(*pos));
178      (*src) += sizeof(*pos);
179 }
180 #endif
181
182 ISAMB isamb_open(BFiles bfs, const char *name, int writeflag, ISAMC_M *method,
183                  int cache)
184 {
185     ISAMB isamb = xmalloc(sizeof(*isamb));
186     int i, b_size = ISAMB_MIN_SIZE;
187
188     isamb->bfs = bfs;
189     isamb->method = (ISAMC_M *) xmalloc(sizeof(*method));
190     memcpy(isamb->method, method, sizeof(*method));
191     isamb->no_cat = CAT_NO;
192     isamb->log_io = 0;
193     isamb->log_freelist = 0;
194     isamb->cache = cache;
195     isamb->skipped_numbers = 0;
196     isamb->returned_numbers = 0;
197     for (i = 0; i<ISAMB_MAX_LEVEL; i++)
198         isamb->skipped_nodes[i] = isamb->accessed_nodes[i] = 0;
199
200     assert(cache == 0);
201     isamb->file = xmalloc(sizeof(*isamb->file) * isamb->no_cat);
202     for (i = 0; i < isamb->no_cat; i++)
203     {
204         char fname[DST_BUF_SIZE];
205         char hbuf[DST_BUF_SIZE];
206         isamb->file[i].cache_entries = 0;
207         isamb->file[i].head_dirty = 0;
208         sprintf(fname, "%s%c", name, i+'A');
209         if (cache)
210             isamb->file[i].bf = bf_open(bfs, fname, ISAMB_CACHE_ENTRY_SIZE,
211                                          writeflag);
212         else
213             isamb->file[i].bf = bf_open(bfs, fname, b_size, writeflag);
214
215         /* fill-in default values (for empty isamb) */
216         isamb->file[i].head.first_block = ISAMB_CACHE_ENTRY_SIZE/b_size+1;
217         isamb->file[i].head.last_block = isamb->file[i].head.first_block;
218         isamb->file[i].head.block_size = b_size;
219 #if ISAMB_PTR_CODEC
220         if (i == isamb->no_cat-1 || b_size > 128)
221             isamb->file[i].head.block_offset = 8;
222         else
223             isamb->file[i].head.block_offset = 4;
224 #else
225         isamb->file[i].head.block_offset = 11;
226 #endif
227         isamb->file[i].head.block_max =
228             b_size - isamb->file[i].head.block_offset;
229         isamb->file[i].head.free_list = 0;
230         if (bf_read(isamb->file[i].bf, 0, 0, 0, hbuf))
231         {
232             /* got header assume "isamb"major minor len can fit in 16 bytes */
233             zint zint_tmp;
234             int major, minor, len, pos = 0;
235             int left;
236             const char *src = 0;
237             if (memcmp(hbuf, "isamb", 5))
238             {
239                 yaz_log(YLOG_WARN, "bad isamb header for file %s", fname);
240                 return 0;
241             }
242             if (sscanf(hbuf+5, "%d %d %d", &major, &minor, &len) != 3)
243             {
244                 yaz_log(YLOG_WARN, "bad isamb header for file %s", fname);
245                 return 0;
246             }
247             if (major != ISAMB_MAJOR_VERSION)
248             {
249                 yaz_log(YLOG_WARN, "bad major version for file %s %d, must be %d",
250                      fname, major, ISAMB_MAJOR_VERSION);
251                 return 0;
252             }
253             for (left = len - b_size; left > 0; left = left - b_size)
254             {
255                 pos++;
256                 if (!bf_read(isamb->file[i].bf, pos, 0, 0, hbuf + pos*b_size))
257                 {
258                     yaz_log(YLOG_WARN, "truncated isamb header for " 
259                          "file=%s len=%d pos=%d",
260                          fname, len, pos);
261                     return 0;
262                 }
263             }
264             src = hbuf + 16;
265             decode_ptr(&src, &isamb->file[i].head.first_block);
266             decode_ptr(&src, &isamb->file[i].head.last_block);
267             decode_ptr(&src, &zint_tmp);
268             isamb->file[i].head.block_size = (int) zint_tmp;
269             decode_ptr(&src, &zint_tmp);
270             isamb->file[i].head.block_max = (int) zint_tmp;
271             decode_ptr(&src, &isamb->file[i].head.free_list);
272         }
273         assert (isamb->file[i].head.block_size >= isamb->file[i].head.block_offset);
274         isamb->file[i].head_dirty = 0;
275         assert(isamb->file[i].head.block_size == b_size);
276         b_size = b_size * ISAMB_FAC_SIZE;
277     }
278 #if ISAMB_DEBUG
279     yaz_log(YLOG_WARN, "isamb debug enabled. Things will be slower than usual");
280 #endif
281     return isamb;
282 }
283
284 static void flush_blocks (ISAMB b, int cat)
285 {
286     while (b->file[cat].cache_entries)
287     {
288         struct ISAMB_cache_entry *ce_this = b->file[cat].cache_entries;
289         b->file[cat].cache_entries = ce_this->next;
290
291         if (ce_this->dirty)
292         {
293             yaz_log(b->log_io, "bf_write: flush_blocks");
294             bf_write(b->file[cat].bf, ce_this->pos, 0, 0, ce_this->buf);
295         }
296         xfree(ce_this->buf);
297         xfree(ce_this);
298     }
299 }
300
301 static int cache_block (ISAMB b, ISAM_P pos, unsigned char *userbuf, int wr)
302 {
303     int cat = (int) (pos&CAT_MASK);
304     int off = (int) (((pos/CAT_MAX) & 
305                (ISAMB_CACHE_ENTRY_SIZE / b->file[cat].head.block_size - 1))
306         * b->file[cat].head.block_size);
307     zint norm = pos / (CAT_MASK*ISAMB_CACHE_ENTRY_SIZE / b->file[cat].head.block_size);
308     int no = 0;
309     struct ISAMB_cache_entry **ce, *ce_this = 0, **ce_last = 0;
310
311     if (!b->cache)
312         return 0;
313
314     assert (ISAMB_CACHE_ENTRY_SIZE >= b->file[cat].head.block_size);
315     for (ce = &b->file[cat].cache_entries; *ce; ce = &(*ce)->next, no++)
316     {
317         ce_last = ce;
318         if ((*ce)->pos == norm)
319         {
320             ce_this = *ce;
321             *ce = (*ce)->next;   /* remove from list */
322             
323             ce_this->next = b->file[cat].cache_entries;  /* move to front */
324             b->file[cat].cache_entries = ce_this;
325             
326             if (wr)
327             {
328                 memcpy (ce_this->buf + off, userbuf, 
329                         b->file[cat].head.block_size);
330                 ce_this->dirty = 1;
331             }
332             else
333                 memcpy (userbuf, ce_this->buf + off,
334                         b->file[cat].head.block_size);
335             return 1;
336         }
337     }
338     if (no >= 40)
339     {
340         assert (no == 40);
341         assert (ce_last && *ce_last);
342         ce_this = *ce_last;
343         *ce_last = 0;  /* remove the last entry from list */
344         if (ce_this->dirty)
345         {
346             yaz_log(b->log_io, "bf_write: cache_block");
347             bf_write(b->file[cat].bf, ce_this->pos, 0, 0, ce_this->buf);
348         }
349         xfree(ce_this->buf);
350         xfree(ce_this);
351     }
352     ce_this = xmalloc(sizeof(*ce_this));
353     ce_this->next = b->file[cat].cache_entries;
354     b->file[cat].cache_entries = ce_this;
355     ce_this->buf = xmalloc(ISAMB_CACHE_ENTRY_SIZE);
356     ce_this->pos = norm;
357     yaz_log(b->log_io, "bf_read: cache_block");
358     if (!bf_read(b->file[cat].bf, norm, 0, 0, ce_this->buf))
359         memset (ce_this->buf, 0, ISAMB_CACHE_ENTRY_SIZE);
360     if (wr)
361     {
362         memcpy (ce_this->buf + off, userbuf, b->file[cat].head.block_size);
363         ce_this->dirty = 1;
364     }
365     else
366     {
367         ce_this->dirty = 0;
368         memcpy (userbuf, ce_this->buf + off, b->file[cat].head.block_size);
369     }
370     return 1;
371 }
372
373
374 void isamb_close (ISAMB isamb)
375 {
376     int i;
377     for (i = 0; isamb->accessed_nodes[i]; i++)
378         yaz_log(YLOG_DEBUG, "isamb_close  level leaf-%d: "ZINT_FORMAT" read, "
379                         ZINT_FORMAT" skipped",
380              i, isamb->accessed_nodes[i], isamb->skipped_nodes[i]);
381     yaz_log(YLOG_DEBUG, "isamb_close returned "ZINT_FORMAT" values, "
382                    "skipped "ZINT_FORMAT,
383          isamb->skipped_numbers, isamb->returned_numbers);
384     for (i = 0; i<isamb->no_cat; i++)
385     {
386         flush_blocks (isamb, i);
387         if (isamb->file[i].head_dirty)
388         {
389             char hbuf[DST_BUF_SIZE];
390             int major = ISAMB_MAJOR_VERSION;
391             int minor = ISAMB_MINOR_VERSION;
392             int len = 16;
393             char *dst = hbuf + 16;
394             int pos = 0, left;
395             int b_size = isamb->file[i].head.block_size;
396
397             encode_ptr(&dst, isamb->file[i].head.first_block);
398             encode_ptr(&dst, isamb->file[i].head.last_block);
399             encode_ptr(&dst, isamb->file[i].head.block_size);
400             encode_ptr(&dst, isamb->file[i].head.block_max);
401             encode_ptr(&dst, isamb->file[i].head.free_list);
402             memset(dst, '\0', b_size); /* ensure no random bytes are written */
403
404             len = dst - hbuf;
405
406             /* print exactly 16 bytes (including trailing 0) */
407             sprintf(hbuf, "isamb%02d %02d %02d\r\n", major, minor, len);
408
409             bf_write(isamb->file[i].bf, pos, 0, 0, hbuf);
410
411             for (left = len - b_size; left > 0; left = left - b_size)
412             {
413                 pos++;
414                 bf_write(isamb->file[i].bf, pos, 0, 0, hbuf + pos*b_size);
415             }
416         }
417         bf_close (isamb->file[i].bf);
418     }
419     xfree(isamb->file);
420     xfree(isamb->method);
421     xfree(isamb);
422 }
423
424 /* open_block: read one block at pos.
425    Decode leading sys bytes .. consisting of
426    Offset:Meaning
427    0: leader byte, != 0 leaf, == 0, non-leaf
428    1-2: used size of block
429    3-7*: number of items and all children
430    
431    * Reserve 5 bytes for large block sizes. 1 for small ones .. Number
432    of items. We can thus have at most 2^40 nodes. 
433 */
434 static struct ISAMB_block *open_block(ISAMB b, ISAM_P pos)
435 {
436     int cat = (int) (pos&CAT_MASK);
437     const char *src;
438     int offset = b->file[cat].head.block_offset;
439     struct ISAMB_block *p;
440     if (!pos)
441         return 0;
442     p = xmalloc(sizeof(*p));
443     p->pos = pos;
444     p->cat = (int) (pos & CAT_MASK);
445     p->buf = xmalloc(b->file[cat].head.block_size);
446     p->cbuf = 0;
447
448     if (!cache_block (b, pos, p->buf, 0))
449     {
450         yaz_log(b->log_io, "bf_read: open_block");
451         if (!bf_read(b->file[cat].bf, pos/CAT_MAX, 0, 0, p->buf))
452         {
453             yaz_log(YLOG_FATAL, "isamb: read fail for pos=%ld block=%ld",
454                      (long) pos, (long) pos/CAT_MAX);
455             abort();
456         }
457     }
458     p->bytes = (char *)p->buf + offset;
459     p->leaf = p->buf[0];
460     p->size = (p->buf[1] + 256 * p->buf[2]) - offset;
461     if (p->size < 0)
462     {
463         yaz_log(YLOG_FATAL, "Bad block size %d in pos=" ZINT_FORMAT "\n",
464                  p->size, pos);
465     }
466     assert (p->size >= 0);
467     src = (char*) p->buf + 3;
468     decode_ptr(&src, &p->no_items);
469
470     p->offset = 0;
471     p->dirty = 0;
472     p->deleted = 0;
473     p->decodeClientData = (*b->method->codec.start)();
474     return p;
475 }
476
477 struct ISAMB_block *new_block (ISAMB b, int leaf, int cat)
478 {
479     struct ISAMB_block *p;
480
481     p = xmalloc(sizeof(*p));
482     p->buf = xmalloc(b->file[cat].head.block_size);
483
484     if (!b->file[cat].head.free_list)
485     {
486         zint block_no;
487         block_no = b->file[cat].head.last_block++;
488         p->pos = block_no * CAT_MAX + cat;
489     }
490     else
491     {
492         p->pos = b->file[cat].head.free_list;
493         assert((p->pos & CAT_MASK) == cat);
494         if (!cache_block (b, p->pos, p->buf, 0))
495         {
496             yaz_log(b->log_io, "bf_read: new_block");
497             if (!bf_read(b->file[cat].bf, p->pos/CAT_MAX, 0, 0, p->buf))
498             {
499                 yaz_log(YLOG_FATAL, "isamb: read fail for pos=%ld block=%ld",
500                          (long) p->pos/CAT_MAX, (long) p->pos/CAT_MAX);
501                 abort ();
502             }
503         }
504         yaz_log(b->log_freelist, "got block " ZINT_FORMAT " from freelist %d:" ZINT_FORMAT, p->pos,
505                  cat, p->pos/CAT_MAX);
506         memcpy (&b->file[cat].head.free_list, p->buf, sizeof(zint));
507     }
508     p->cat = cat;
509     b->file[cat].head_dirty = 1;
510     memset (p->buf, 0, b->file[cat].head.block_size);
511     p->bytes = (char*)p->buf + b->file[cat].head.block_offset;
512     p->leaf = leaf;
513     p->size = 0;
514     p->dirty = 1;
515     p->deleted = 0;
516     p->offset = 0;
517     p->no_items = 0;
518     p->decodeClientData = (*b->method->codec.start)();
519     return p;
520 }
521
522 struct ISAMB_block *new_leaf (ISAMB b, int cat)
523 {
524     return new_block (b, 1, cat);
525 }
526
527
528 struct ISAMB_block *new_int (ISAMB b, int cat)
529 {
530     return new_block (b, 0, cat);
531 }
532
533 static void check_block (ISAMB b, struct ISAMB_block *p)
534 {
535     assert(b); /* mostly to make the compiler shut up about unused b */
536     if (p->leaf)
537     {
538         ;
539     }
540     else
541     {
542         /* sanity check */
543         char *startp = p->bytes;
544         const char *src = startp;
545         char *endp = p->bytes + p->size;
546         ISAM_P pos;
547         void *c1 = (*b->method->codec.start)();
548             
549         decode_ptr(&src, &pos);
550         assert ((pos&CAT_MASK) == p->cat);
551         while (src != endp)
552         {
553 #if INT_ENCODE
554             char file_item_buf[DST_ITEM_MAX];
555             char *file_item = file_item_buf;
556             (*b->method->codec.reset)(c1);
557             (*b->method->codec.decode)(c1, &file_item, &src);
558 #else
559             zint item_len;
560             decode_item_len(&src, &item_len);
561             assert (item_len > 0 && item_len < 80);
562             src += item_len;
563 #endif
564             decode_ptr(&src, &pos);
565             if ((pos&CAT_MASK) != p->cat)
566             {
567                 assert ((pos&CAT_MASK) == p->cat);
568             }
569         }
570         (*b->method->codec.stop)(c1);
571     }
572 }
573
574 void close_block(ISAMB b, struct ISAMB_block *p)
575 {
576     if (!p)
577         return;
578     if (p->deleted)
579     {
580         yaz_log(b->log_freelist, "release block " ZINT_FORMAT " from freelist %d:" ZINT_FORMAT,
581                  p->pos, p->cat, p->pos/CAT_MAX);
582         memcpy (p->buf, &b->file[p->cat].head.free_list, sizeof(zint));
583         b->file[p->cat].head.free_list = p->pos;
584         if (!cache_block (b, p->pos, p->buf, 1))
585         {
586             yaz_log(b->log_io, "bf_write: close_block (deleted)");
587             bf_write(b->file[p->cat].bf, p->pos/CAT_MAX, 0, 0, p->buf);
588         }
589     }
590     else if (p->dirty)
591     {
592         int offset = b->file[p->cat].head.block_offset;
593         int size = p->size + offset;
594         char *dst =  (char*)p->buf + 3;
595         assert (p->size >= 0);
596         
597         /* memset becuase encode_ptr usually does not write all bytes */
598         memset(p->buf, 0, b->file[p->cat].head.block_offset);
599         p->buf[0] = p->leaf;
600         p->buf[1] = size & 255;
601         p->buf[2] = size >> 8;
602         encode_ptr(&dst, p->no_items);
603         check_block(b, p);
604         if (!cache_block (b, p->pos, p->buf, 1))
605         {
606             yaz_log(b->log_io, "bf_write: close_block");
607             bf_write(b->file[p->cat].bf, p->pos/CAT_MAX, 0, 0, p->buf);
608         }
609     }
610     (*b->method->codec.stop)(p->decodeClientData);
611     xfree(p->buf);
612     xfree(p);
613 }
614
615 int insert_sub (ISAMB b, struct ISAMB_block **p,
616                 void *new_item, int *mode,
617                 ISAMC_I *stream,
618                 struct ISAMB_block **sp,
619                 void *sub_item, int *sub_size,
620                 const void *max_item);
621
622 int insert_int (ISAMB b, struct ISAMB_block *p, void *lookahead_item,
623                 int *mode,
624                 ISAMC_I *stream, struct ISAMB_block **sp,
625                 void *split_item, int *split_size, const void *last_max_item)
626 {
627     char *startp = p->bytes;
628     const char *src = startp;
629     char *endp = p->bytes + p->size;
630     ISAM_P pos;
631     struct ISAMB_block *sub_p1 = 0, *sub_p2 = 0;
632     char sub_item[DST_ITEM_MAX];
633     int sub_size;
634     int more = 0;
635     zint diff_terms = 0;
636     void *c1 = (*b->method->codec.start)();
637
638     *sp = 0;
639
640     assert(p->size >= 0);
641     decode_ptr(&src, &pos);
642     while (src != endp)
643     {
644         int d;
645         const char *src0 = src;
646 #if INT_ENCODE
647         char file_item_buf[DST_ITEM_MAX];
648         char *file_item = file_item_buf;
649         (*b->method->codec.reset)(c1);
650         (*b->method->codec.decode)(c1, &file_item, &src);
651         d = (*b->method->compare_item)(file_item_buf, lookahead_item);
652         if (d > 0)
653         {
654             sub_p1 = open_block(b, pos);
655             assert (sub_p1);
656             diff_terms -= sub_p1->no_items;
657             more = insert_sub (b, &sub_p1, lookahead_item, mode,
658                                stream, &sub_p2, 
659                                sub_item, &sub_size, file_item_buf);
660             diff_terms += sub_p1->no_items;
661             src = src0;
662             break;
663         }
664 #else
665         zint item_len;
666         decode_item_len(&src, &item_len);
667         d = (*b->method->compare_item)(src, lookahead_item);
668         if (d > 0)
669         {
670             sub_p1 = open_block(b, pos);
671             assert (sub_p1);
672             diff_terms -= sub_p1->no_items;
673             more = insert_sub (b, &sub_p1, lookahead_item, mode,
674                                stream, &sub_p2, 
675                                sub_item, &sub_size, src);
676             diff_terms += sub_p1->no_items;
677             src = src0;
678             break;
679         }
680         src += item_len;
681 #endif
682         decode_ptr(&src, &pos);
683     }
684     if (!sub_p1)
685     {
686         /* we reached the end. So lookahead > last item */
687         sub_p1 = open_block(b, pos);
688         assert (sub_p1);
689         diff_terms -= sub_p1->no_items;
690         more = insert_sub (b, &sub_p1, lookahead_item, mode, stream, &sub_p2, 
691                            sub_item, &sub_size, last_max_item);
692         diff_terms += sub_p1->no_items;
693     }
694     if (sub_p2)
695         diff_terms += sub_p2->no_items;
696     if (diff_terms)
697     {
698         p->dirty = 1;
699         p->no_items += diff_terms;
700     }
701     if (sub_p2)
702     {
703         /* there was a split - must insert pointer in this one */
704         char dst_buf[DST_BUF_SIZE];
705         char *dst = dst_buf;
706 #if INT_ENCODE
707         const char *sub_item_ptr = sub_item;
708 #endif
709         assert (sub_size < 80 && sub_size > 1);
710
711         memcpy (dst, startp, src - startp);
712                 
713         dst += src - startp;
714
715 #if INT_ENCODE
716         (*b->method->codec.reset)(c1);
717         (*b->method->codec.encode)(c1, &dst, &sub_item_ptr);
718 #else
719         encode_item_len (&dst, sub_size);      /* sub length and item */
720         memcpy (dst, sub_item, sub_size);
721         dst += sub_size;
722 #endif
723
724         encode_ptr(&dst, sub_p2->pos);   /* pos */
725
726         if (endp - src)                   /* remaining data */
727         {
728             memcpy (dst, src, endp - src);
729             dst += endp - src;
730         }
731         p->size = dst - dst_buf;
732         assert (p->size >= 0);
733
734         if (p->size <= b->file[p->cat].head.block_max)
735         {
736             /* it fits OK in this block */
737             memcpy (startp, dst_buf, dst - dst_buf);
738
739             close_block(b, sub_p2);
740         }
741         else
742         {
743             /* must split _this_ block as well .. */
744             struct ISAMB_block *sub_p3;
745 #if INT_ENCODE
746             char file_item_buf[DST_ITEM_MAX];
747             char *file_item = file_item_buf;
748 #else
749             zint split_size_tmp;
750 #endif
751             zint no_items_first_half = 0;
752             int p_new_size;
753             const char *half;
754             src = dst_buf;
755             endp = dst;
756
757             p->dirty = 1;
758             close_block(b, sub_p2);
759
760             half = src + b->file[p->cat].head.block_size/2;
761             decode_ptr(&src, &pos);
762
763             /* read sub block so we can get no_items for it */
764             sub_p3 = open_block(b, pos);
765             no_items_first_half += sub_p3->no_items;
766             close_block(b, sub_p3);
767
768             while (src <= half)
769             {
770 #if INT_ENCODE
771                 file_item = file_item_buf;
772                 (*b->method->codec.reset)(c1);
773                 (*b->method->codec.decode)(c1, &file_item, &src);
774 #else
775                 decode_item_len(&src, &split_size_tmp);
776                 *split_size = (int) split_size_tmp;
777                 src += *split_size;
778 #endif
779                 decode_ptr(&src, &pos);
780
781                 /* read sub block so we can get no_items for it */
782                 sub_p3 = open_block(b, pos);
783                 no_items_first_half += sub_p3->no_items;
784                 close_block(b, sub_p3);
785             }
786             /*  p is first half */
787             p_new_size = src - dst_buf;
788             memcpy (p->bytes, dst_buf, p_new_size);
789
790 #if INT_ENCODE
791             file_item = file_item_buf;
792             (*b->method->codec.reset)(c1);
793             (*b->method->codec.decode)(c1, &file_item, &src);
794             *split_size = file_item - file_item_buf;
795             memcpy(split_item, file_item_buf, *split_size);
796 #else
797             decode_item_len(&src, &split_size_tmp);
798             *split_size = (int) split_size_tmp;
799             memcpy (split_item, src, *split_size);
800             src += *split_size;
801 #endif
802             /*  *sp is second half */
803             *sp = new_int (b, p->cat);
804             (*sp)->size = endp - src;
805             memcpy ((*sp)->bytes, src, (*sp)->size);
806
807             p->size = p_new_size;
808
809             /*  adjust no_items in first&second half */
810             (*sp)->no_items = p->no_items - no_items_first_half;
811             p->no_items = no_items_first_half;
812         }
813         p->dirty = 1;
814     }
815     close_block(b, sub_p1);
816     (*b->method->codec.stop)(c1);
817     return more;
818 }
819
820 int insert_leaf (ISAMB b, struct ISAMB_block **sp1, void *lookahead_item,
821                  int *lookahead_mode, ISAMC_I *stream,
822                  struct ISAMB_block **sp2,
823                  void *sub_item, int *sub_size,
824                  const void *max_item)
825 {
826     struct ISAMB_block *p = *sp1;
827     char *endp = 0;
828     const char *src = 0;
829     char dst_buf[DST_BUF_SIZE], *dst = dst_buf;
830     int new_size;
831     void *c1 = (*b->method->codec.start)();
832     void *c2 = (*b->method->codec.start)();
833     int more = 1;
834     int quater = b->file[b->no_cat-1].head.block_max / 4;
835     char *mid_cut = dst_buf + quater * 2;
836     char *tail_cut = dst_buf + quater * 3;
837     char *maxp = dst_buf + b->file[b->no_cat-1].head.block_max;
838     char *half1 = 0;
839     char *half2 = 0;
840     char cut_item_buf[DST_ITEM_MAX];
841     int cut_item_size = 0;
842     int no_items = 0;    /* number of items (total) */
843     int no_items_1 = 0;  /* number of items (first half) */
844     int inserted_dst_bytes = 0;
845
846     if (p && p->size)
847     {
848         char file_item_buf[DST_ITEM_MAX];
849         char *file_item = file_item_buf;
850             
851         src = p->bytes;
852         endp = p->bytes + p->size;
853         (*b->method->codec.decode)(c1, &file_item, &src);
854         while (1)
855         {
856             const char *dst_item = 0; /* resulting item to be inserted */
857             char *lookahead_next;
858             char *dst_0 = dst;
859             int d = -1;
860             
861             if (lookahead_item)
862                 d = (*b->method->compare_item)(file_item_buf, lookahead_item);
863             
864             /* d now holds comparison between existing file item and
865                lookahead item 
866                d = 0: equal
867                d > 0: lookahead before file
868                d < 0: lookahead after file
869             */
870             if (d > 0)
871             {
872                 /* lookahead must be inserted */
873                 dst_item = lookahead_item;
874                 /* if this is not an insertion, it's really bad .. */
875                 if (!*lookahead_mode)
876                 {
877                     yaz_log(YLOG_WARN, "isamb: Inconsistent register (1)");
878                     assert (*lookahead_mode);
879                 }
880             }
881             else
882                 dst_item = file_item_buf;
883
884             if (!*lookahead_mode && d == 0)
885             {
886                 /* it's a deletion and they match so there is nothing to be
887                    inserted anyway .. But mark the thing bad (file item
888                    was part of input.. The item will not be part of output */
889                 p->dirty = 1;
890             }
891             else if (!half1 && dst > mid_cut)
892             {
893                 /* we have reached the splitting point for the first time */
894                 const char *dst_item_0 = dst_item;
895                 half1 = dst; /* candidate for splitting */
896
897                 /* encode the resulting item */
898                 (*b->method->codec.encode)(c2, &dst, &dst_item);
899                 
900                 cut_item_size = dst_item - dst_item_0;
901                 assert(cut_item_size > 0);
902                 memcpy (cut_item_buf, dst_item_0, cut_item_size);
903                 
904                 half2 = dst;
905                 no_items_1 = no_items;
906                 no_items++;
907             }
908             else
909             {
910                 /* encode the resulting item */
911                 (*b->method->codec.encode)(c2, &dst, &dst_item);
912                 no_items++;
913             }
914
915             /* now move "pointers" .. result has been encoded .. */
916             if (d > 0)  
917             {
918                 /* we must move the lookahead pointer */
919
920                 inserted_dst_bytes += (dst - dst_0);
921                 if (inserted_dst_bytes >=  quater)
922                     /* no more room. Mark lookahead as "gone".. */
923                     lookahead_item = 0;
924                 else
925                 {
926                     /* move it really.. */
927                     lookahead_next = lookahead_item;
928                     if (!(*stream->read_item)(stream->clientData,
929                                               &lookahead_next,
930                                               lookahead_mode))
931                     {
932                         /* end of stream reached: no "more" and no lookahead */
933                         lookahead_item = 0;
934                         more = 0;
935                     }
936                     if (lookahead_item && max_item &&
937                         (*b->method->compare_item)(max_item, lookahead_item) <= 0)
938                     {
939                         /* the lookahead goes beyond what we allow in this
940                            leaf. Mark it as "gone" */
941                         lookahead_item = 0;
942                     }
943                     
944                     p->dirty = 1;
945                 }
946             }
947             else if (d == 0)
948             {
949                 /* exact match .. move both pointers */
950
951                 lookahead_next = lookahead_item;
952                 if (!(*stream->read_item)(stream->clientData,
953                                           &lookahead_next, lookahead_mode))
954                 {
955                     lookahead_item = 0;
956                     more = 0;
957                 }
958                 if (src == endp)
959                     break;  /* end of file stream reached .. */
960                 file_item = file_item_buf; /* move file pointer */
961                 (*b->method->codec.decode)(c1, &file_item, &src);
962             }
963             else
964             {
965                 /* file pointer must be moved */
966                 if (src == endp)
967                     break;
968                 file_item = file_item_buf;
969                 (*b->method->codec.decode)(c1, &file_item, &src);
970             }
971         }
972     }
973
974     /* this loop runs when we are "appending" to a leaf page. That is
975        either it's empty (new) or all file items have been read in
976        previous loop */
977
978     maxp = dst_buf + b->file[b->no_cat-1].head.block_max + quater;
979     while (lookahead_item)
980     {
981         char *dst_item;
982         const char *src = lookahead_item;
983         char *dst_0 = dst;
984         
985         /* if we have a lookahead item, we stop if we exceed the value of it */
986         if (max_item &&
987             (*b->method->compare_item)(max_item, lookahead_item) <= 0)
988         {
989             /* stop if we have reached the value of max item */
990             break;
991         }
992         if (!*lookahead_mode)
993         {
994             /* this is append. So a delete is bad */
995             yaz_log(YLOG_WARN, "isamb: Inconsistent register (2)");
996             abort();
997         }
998         else if (!half1 && dst > tail_cut)
999         {
1000             const char *src_0 = src;
1001             half1 = dst; /* candidate for splitting */
1002             
1003             (*b->method->codec.encode)(c2, &dst, &src);
1004             
1005             cut_item_size = src - src_0;
1006             assert(cut_item_size > 0);
1007             memcpy (cut_item_buf, src_0, cut_item_size);
1008             
1009             no_items_1 = no_items;
1010             half2 = dst;
1011         }
1012         else
1013             (*b->method->codec.encode)(c2, &dst, &src);
1014
1015         if (dst > maxp)
1016         {
1017             dst = dst_0;
1018             break;
1019         }
1020         no_items++;
1021         if (p)
1022             p->dirty = 1;
1023         dst_item = lookahead_item;
1024         if (!(*stream->read_item)(stream->clientData, &dst_item,
1025                                   lookahead_mode))
1026         {
1027             lookahead_item = 0;
1028             more = 0;
1029         }
1030     }
1031     new_size = dst - dst_buf;
1032     if (p && p->cat != b->no_cat-1 && 
1033         new_size > b->file[p->cat].head.block_max)
1034     {
1035         /* non-btree block will be removed */
1036         p->deleted = 1;
1037         close_block(b, p);
1038         /* delete it too!! */
1039         p = 0; /* make a new one anyway */
1040     }
1041     if (!p)
1042     {   /* must create a new one */
1043         int i;
1044         for (i = 0; i < b->no_cat; i++)
1045             if (new_size <= b->file[i].head.block_max)
1046                 break;
1047         if (i == b->no_cat)
1048             i = b->no_cat - 1;
1049         p = new_leaf (b, i);
1050     }
1051     if (new_size > b->file[p->cat].head.block_max)
1052     {
1053         char *first_dst;
1054         const char *cut_item = cut_item_buf;
1055
1056         assert (half1);
1057         assert (half2);
1058
1059         assert(cut_item_size > 0);
1060         
1061         /* first half */
1062         p->size = half1 - dst_buf;
1063         assert(p->size <=  b->file[p->cat].head.block_max);
1064         memcpy (p->bytes, dst_buf, half1 - dst_buf);
1065         p->no_items = no_items_1;
1066
1067         /* second half */
1068         *sp2 = new_leaf (b, p->cat);
1069
1070         (*b->method->codec.reset)(c2);
1071
1072         first_dst = (*sp2)->bytes;
1073
1074         (*b->method->codec.encode)(c2, &first_dst, &cut_item);
1075
1076         memcpy (first_dst, half2, dst - half2);
1077
1078         (*sp2)->size = (first_dst - (*sp2)->bytes) + (dst - half2);
1079         assert((*sp2)->size <=  b->file[p->cat].head.block_max);
1080         (*sp2)->no_items = no_items - no_items_1;
1081         (*sp2)->dirty = 1;
1082         p->dirty = 1;
1083         memcpy (sub_item, cut_item_buf, cut_item_size);
1084         *sub_size = cut_item_size;
1085     }
1086     else
1087     {
1088         memcpy (p->bytes, dst_buf, dst - dst_buf);
1089         p->size = new_size;
1090         p->no_items = no_items;
1091     }
1092     (*b->method->codec.stop)(c1);
1093     (*b->method->codec.stop)(c2);
1094     *sp1 = p;
1095     return more;
1096 }
1097
1098 int insert_sub (ISAMB b, struct ISAMB_block **p, void *new_item,
1099                 int *mode,
1100                 ISAMC_I *stream,
1101                 struct ISAMB_block **sp,
1102                 void *sub_item, int *sub_size,
1103                 const void *max_item)
1104 {
1105     if (!*p || (*p)->leaf)
1106         return insert_leaf (b, p, new_item, mode, stream, sp, sub_item, 
1107                             sub_size, max_item);
1108     else
1109         return insert_int (b, *p, new_item, mode, stream, sp, sub_item,
1110                            sub_size, max_item);
1111 }
1112
1113 int isamb_unlink (ISAMB b, ISAM_P pos)
1114 {
1115     struct ISAMB_block *p1;
1116
1117     if (!pos)
1118         return 0;
1119     p1 = open_block(b, pos);
1120     p1->deleted = 1;
1121     if (!p1->leaf)
1122     {
1123         zint sub_p;
1124         const char *src = p1->bytes + p1->offset;
1125 #if INT_ENCODE
1126         void *c1 = (*b->method->codec.start)();
1127 #endif
1128         decode_ptr(&src, &sub_p);
1129         isamb_unlink(b, sub_p);
1130         
1131         while (src != p1->bytes + p1->size)
1132         {
1133 #if INT_ENCODE
1134             char file_item_buf[DST_ITEM_MAX];
1135             char *file_item = file_item_buf;
1136             (*b->method->codec.reset)(c1);
1137             (*b->method->codec.decode)(c1, &file_item, &src);
1138 #else
1139             zint item_len;
1140             decode_item_len(&src, &item_len);
1141             src += item_len;
1142 #endif
1143             decode_ptr(&src, &sub_p);
1144             isamb_unlink(b, sub_p);
1145         }
1146 #if INT_ENCODE
1147         (*b->method->codec.stop)(c1);
1148 #endif
1149     }
1150     close_block(b, p1);
1151     return 0;
1152 }
1153
1154 void isamb_merge(ISAMB b, ISAM_P *pos, ISAMC_I *stream)
1155 {
1156     char item_buf[DST_ITEM_MAX];
1157     char *item_ptr;
1158     int i_mode;
1159     int more;
1160     int must_delete = 0;
1161
1162     if (b->cache < 0)
1163     {
1164         int more = 1;
1165         while (more)
1166         {
1167             item_ptr = item_buf;
1168             more =
1169                 (*stream->read_item)(stream->clientData, &item_ptr, &i_mode);
1170         }
1171         *pos = 1;
1172         return;
1173     }
1174     item_ptr = item_buf;
1175     more = (*stream->read_item)(stream->clientData, &item_ptr, &i_mode);
1176     while (more)
1177     {
1178         struct ISAMB_block *p = 0, *sp = 0;
1179         char sub_item[DST_ITEM_MAX];
1180         int sub_size;
1181         
1182         if (*pos)
1183             p = open_block(b, *pos);
1184         more = insert_sub (b, &p, item_buf, &i_mode, stream, &sp,
1185                            sub_item, &sub_size, 0);
1186         if (sp)
1187         {    /* increase level of tree by one */
1188             struct ISAMB_block *p2 = new_int (b, p->cat);
1189             char *dst = p2->bytes + p2->size;
1190 #if INT_ENCODE
1191             void *c1 = (*b->method->codec.start)();
1192             const char *sub_item_ptr = sub_item;
1193 #endif
1194
1195             encode_ptr(&dst, p->pos);
1196             assert (sub_size < 80 && sub_size > 1);
1197 #if INT_ENCODE
1198             (*b->method->codec.reset)(c1);
1199             (*b->method->codec.encode)(c1, &dst, &sub_item_ptr);
1200 #else
1201             encode_item_len (&dst, sub_size);
1202             memcpy (dst, sub_item, sub_size);
1203             dst += sub_size;
1204 #endif
1205             encode_ptr(&dst, sp->pos);
1206             
1207             p2->size = dst - p2->bytes;
1208             p2->no_items = p->no_items + sp->no_items;
1209             *pos = p2->pos;  /* return new super page */
1210             close_block(b, sp);
1211             close_block(b, p2);
1212 #if INT_ENCODE
1213             (*b->method->codec.stop)(c1);
1214 #endif
1215         }
1216         else
1217         {
1218             *pos = p->pos;   /* return current one (again) */
1219         }
1220         if (p->no_items == 0)
1221             must_delete = 1;
1222         else
1223             must_delete = 0;
1224         close_block(b, p);
1225     }
1226     if (must_delete)
1227     {
1228         isamb_unlink(b, *pos);
1229         *pos = 0;
1230     }
1231 }
1232
1233 ISAMB_PP isamb_pp_open_x(ISAMB isamb, ISAM_P pos, int *level, int scope)
1234 {
1235     ISAMB_PP pp = xmalloc(sizeof(*pp));
1236     int i;
1237
1238     assert(pos);
1239
1240     pp->isamb = isamb;
1241     pp->block = xmalloc(ISAMB_MAX_LEVEL * sizeof(*pp->block));
1242
1243     pp->pos = pos;
1244     pp->level = 0;
1245     pp->maxlevel = 0;
1246     pp->total_size = 0;
1247     pp->no_blocks = 0;
1248     pp->skipped_numbers = 0;
1249     pp->returned_numbers = 0;
1250     pp->scope = scope;
1251     for (i = 0; i<ISAMB_MAX_LEVEL; i++)
1252         pp->skipped_nodes[i] = pp->accessed_nodes[i] = 0;
1253     while (1)
1254     {
1255         struct ISAMB_block *p = open_block(isamb, pos);
1256         const char *src = p->bytes + p->offset;
1257         pp->block[pp->level] = p;
1258
1259         pp->total_size += p->size;
1260         pp->no_blocks++;
1261         if (p->leaf)
1262             break;
1263         decode_ptr(&src, &pos);
1264         p->offset = src - p->bytes;
1265         pp->level++;
1266         pp->accessed_nodes[pp->level]++; 
1267     }
1268     pp->block[pp->level+1] = 0;
1269     pp->maxlevel = pp->level;
1270     if (level)
1271         *level = pp->level;
1272     return pp;
1273 }
1274
1275 ISAMB_PP isamb_pp_open (ISAMB isamb, ISAM_P pos, int scope)
1276 {
1277     return isamb_pp_open_x(isamb, pos, 0, scope);
1278 }
1279
1280 void isamb_pp_close_x(ISAMB_PP pp, zint *size, zint *blocks)
1281 {
1282     int i;
1283     if (!pp)
1284         return;
1285     yaz_log(YLOG_DEBUG, "isamb_pp_close lev=%d returned "ZINT_FORMAT" values, " 
1286                     "skipped "ZINT_FORMAT,
1287         pp->maxlevel, pp->skipped_numbers, pp->returned_numbers);
1288     for (i = pp->maxlevel; i>=0; i--)
1289         if (pp->skipped_nodes[i] || pp->accessed_nodes[i])
1290             yaz_log(YLOG_DEBUG, "isamb_pp_close  level leaf-%d: "
1291                             ZINT_FORMAT" read, "ZINT_FORMAT" skipped", i,
1292                  pp->accessed_nodes[i], pp->skipped_nodes[i]);
1293     pp->isamb->skipped_numbers += pp->skipped_numbers;
1294     pp->isamb->returned_numbers += pp->returned_numbers;
1295     for (i = pp->maxlevel; i>=0; i--)
1296     {
1297         pp->isamb->accessed_nodes[i] += pp->accessed_nodes[i];
1298         pp->isamb->skipped_nodes[i] += pp->skipped_nodes[i];
1299     }
1300     if (size)
1301         *size = pp->total_size;
1302     if (blocks)
1303         *blocks = pp->no_blocks;
1304     for (i = 0; i <= pp->level; i++)
1305         close_block(pp->isamb, pp->block[i]);
1306     xfree(pp->block);
1307     xfree(pp);
1308 }
1309
1310 int isamb_block_info (ISAMB isamb, int cat)
1311 {
1312     if (cat >= 0 && cat < isamb->no_cat)
1313         return isamb->file[cat].head.block_size;
1314     return -1;
1315 }
1316
1317 void isamb_pp_close (ISAMB_PP pp)
1318 {
1319     isamb_pp_close_x(pp, 0, 0);
1320 }
1321
1322 /* simple recursive dumper .. */
1323 static void isamb_dump_r (ISAMB b, ISAM_P pos, void (*pr)(const char *str),
1324                           int level)
1325 {
1326     char buf[1024];
1327     char prefix_str[1024];
1328     if (pos)
1329     {
1330         struct ISAMB_block *p = open_block(b, pos);
1331         sprintf(prefix_str, "%*s " ZINT_FORMAT " cat=%d size=%d max=%d items="
1332                 ZINT_FORMAT, level*2, "",
1333                 pos, p->cat, p->size, b->file[p->cat].head.block_max,
1334                 p->no_items);
1335         (*pr)(prefix_str);
1336         sprintf(prefix_str, "%*s " ZINT_FORMAT, level*2, "", pos);
1337         if (p->leaf)
1338         {
1339             while (p->offset < p->size)
1340             {
1341                 const char *src = p->bytes + p->offset;
1342                 char *dst = buf;
1343                 (*b->method->codec.decode)(p->decodeClientData, &dst, &src);
1344                 (*b->method->log_item)(YLOG_DEBUG, buf, prefix_str);
1345                 p->offset = src - (char*) p->bytes;
1346             }
1347             assert(p->offset == p->size);
1348         }
1349         else
1350         {
1351             const char *src = p->bytes + p->offset;
1352             ISAM_P sub;
1353
1354             decode_ptr(&src, &sub);
1355             p->offset = src - (char*) p->bytes;
1356
1357             isamb_dump_r(b, sub, pr, level+1);
1358             
1359             while (p->offset < p->size)
1360             {
1361 #if INT_ENCODE
1362                 char file_item_buf[DST_ITEM_MAX];
1363                 char *file_item = file_item_buf;
1364                 void *c1 = (*b->method->codec.start)();
1365                 (*b->method->codec.decode)(c1, &file_item, &src);
1366                 (*b->method->codec.stop)(c1);
1367                 (*b->method->log_item)(YLOG_DEBUG, file_item_buf, prefix_str);
1368 #else
1369                 zint item_len;
1370                 decode_item_len(&src, &item_len);
1371                 (*b->method->log_item)(YLOG_DEBUG, src, prefix_str);
1372                 src += item_len;
1373 #endif
1374                 decode_ptr(&src, &sub);
1375                 
1376                 p->offset = src - (char*) p->bytes;
1377                 
1378                 isamb_dump_r(b, sub, pr, level+1);
1379             }           
1380         }
1381         close_block(b, p);
1382     }
1383 }
1384
1385 void isamb_dump(ISAMB b, ISAM_P pos, void (*pr)(const char *str))
1386 {
1387     isamb_dump_r(b, pos, pr, 0);
1388 }
1389
1390 int isamb_pp_read(ISAMB_PP pp, void *buf)
1391 {
1392     return isamb_pp_forward(pp, buf, 0);
1393 }
1394
1395
1396 static int isamb_pp_on_right_node(ISAMB_PP pp, int level, const void *untilbuf)
1397 { /* looks one node higher to see if we should be on this node at all */
1398   /* useful in backing off quickly, and in avoiding tail descends */
1399   /* call with pp->level to begin with */
1400     struct ISAMB_block *p;
1401     int cmp;
1402     const char *src;
1403     ISAMB b = pp->isamb;
1404
1405     assert(level >= 0);
1406     if (level == 0)
1407     {
1408 #if ISAMB_DEBUG
1409             yaz_log(YLOG_DEBUG, "isamb_pp_on_right returning true for root");
1410 #endif
1411         return 1; /* we can never skip the root node */
1412     }
1413     level--;
1414     p = pp->block[level];
1415     assert(p->offset <= p->size);
1416     if (p->offset < p->size)
1417     {
1418 #if INT_ENCODE
1419         char file_item_buf[DST_ITEM_MAX];
1420         char *file_item = file_item_buf;
1421         void *c1 = (*b->method->codec.start)();
1422         assert(p->offset > 0); 
1423         src = p->bytes + p->offset;
1424         (*b->method->codec.decode)(c1, &file_item, &src);
1425         (*b->method->codec.stop)(c1);
1426         cmp = (*b->method->compare_item)(untilbuf, file_item_buf);
1427 #else
1428         zint item_len;
1429         assert(p->offset > 0); 
1430         src = p->bytes + p->offset;
1431         decode_item_len(&src, &item_len);
1432 #if ISAMB_DEBUG
1433         (*b->method->codec.log_item)(YLOG_DEBUG, untilbuf, "on_leaf: until");
1434         (*b->method->codec.log_item)(YLOG_DEBUG, src, "on_leaf: value");
1435 #endif
1436         cmp = (*b->method->compare_item)(untilbuf, src);
1437 #endif
1438         if (cmp < pp->scope)
1439         {  /* cmp<2 */
1440 #if ISAMB_DEBUG
1441             yaz_log(YLOG_DEBUG, "isamb_pp_on_right returning true "
1442                             "cmp=%d lev=%d ofs=%d", cmp, level, p->offset);
1443 #endif
1444             return 1; 
1445         }
1446         else
1447         {
1448 #if ISAMB_DEBUG
1449             yaz_log(YLOG_DEBUG, "isamb_pp_on_right returning false "
1450                             "cmp=%d lev=%d ofs=%d", cmp, level, p->offset);
1451 #endif
1452             return 0; 
1453         }
1454     }
1455     else {
1456 #if ISAMB_DEBUG
1457         yaz_log(YLOG_DEBUG, "isamb_pp_on_right at tail, looking higher "
1458                         "lev=%d", level);
1459 #endif
1460         return isamb_pp_on_right_node(pp, level, untilbuf);
1461     }
1462 } /* isamb_pp_on_right_node */
1463
1464 static int isamb_pp_read_on_leaf(ISAMB_PP pp, void *buf)
1465
1466     /* reads the next item on the current leaf, returns 0 if end of leaf*/
1467     struct ISAMB_block *p = pp->block[pp->level];
1468     char *dst;
1469     const char *src;
1470     assert(pp);
1471     assert(buf);
1472     if (p->offset == p->size)
1473     {
1474 #if ISAMB_DEBUG
1475         yaz_log(YLOG_DEBUG, "isamb_pp_read_on_leaf returning 0 on "
1476                 "node %d", p->pos);
1477 #endif
1478         return 0; /* at end of leaf */
1479     }
1480     src = p->bytes + p->offset;
1481     dst = buf;
1482     (*pp->isamb->method->codec.decode)(p->decodeClientData, &dst, &src);
1483     p->offset = src - (char*) p->bytes;
1484 #if ISAMB_DEBUG
1485     (*pp->isamb->method->codec.log_item)(YLOG_DEBUG, buf,
1486                                          "read_on_leaf returning 1");
1487 #endif
1488     pp->returned_numbers++;
1489     return 1;
1490 } /* read_on_leaf */
1491
1492 static int isamb_pp_forward_on_leaf(ISAMB_PP pp, void *buf, const void *untilbuf)
1493 { /* forwards on the current leaf, returns 0 if not found */
1494     int cmp;
1495     int skips = 0;
1496     while (1)
1497     {
1498         if (!isamb_pp_read_on_leaf(pp, buf))
1499             return 0;
1500         /* FIXME - this is an extra function call, inline the read? */
1501         cmp=(*pp->isamb->method->compare_item)(untilbuf, buf);
1502         if (cmp <pp->scope)
1503         {  /* cmp<2  found a good one */
1504 #if ISAMB_DEBUG
1505             if (skips)
1506                 yaz_log(YLOG_DEBUG, "isam_pp_fwd_on_leaf skipped %d items", skips);
1507 #endif
1508             pp->returned_numbers++;
1509             return 1;
1510         }
1511         if (!skips)
1512             if (!isamb_pp_on_right_node(pp, pp->level, untilbuf))
1513                 return 0; /* never mind the rest of this leaf */
1514         pp->skipped_numbers++;
1515         skips++;
1516     }
1517 } /* forward_on_leaf */
1518
1519 static int isamb_pp_climb_level(ISAMB_PP pp, ISAM_P *pos)
1520 { /* climbs higher in the tree, until finds a level with data left */
1521   /* returns the node to (consider to) descend to in *pos) */
1522     struct ISAMB_block *p = pp->block[pp->level];
1523     const char *src;
1524 #if ISAMB_DEBUG
1525     yaz_log(YLOG_DEBUG, "isamb_pp_climb_level starting "
1526                    "at level %d node %d ofs=%d sz=%d",
1527                     pp->level, p->pos, p->offset, p->size);
1528 #endif
1529     assert(pp->level >= 0);
1530     assert(p->offset <= p->size);
1531     if (pp->level==0)
1532     {
1533 #if ISAMB_DEBUG
1534         yaz_log(YLOG_DEBUG, "isamb_pp_climb_level returning 0 at root");
1535 #endif
1536         return 0;
1537     }
1538     assert(pp->level>0); 
1539     close_block(pp->isamb, pp->block[pp->level]);
1540     pp->block[pp->level] = 0;
1541     (pp->level)--;
1542     p = pp->block[pp->level];
1543 #if ISAMB_DEBUG
1544     yaz_log(YLOG_DEBUG, "isamb_pp_climb_level climbed to level %d node %d ofs=%d",
1545                     pp->level, p->pos, p->offset);
1546 #endif
1547     assert(!p->leaf);
1548     assert(p->offset <= p->size);
1549     if (p->offset == p->size)
1550     {
1551         /* we came from the last pointer, climb on */
1552         if (!isamb_pp_climb_level(pp, pos))
1553             return 0;
1554         p = pp->block[pp->level];
1555     }
1556     else
1557     {
1558 #if INT_ENCODE
1559         char file_item_buf[DST_ITEM_MAX];
1560         char *file_item = file_item_buf;
1561         ISAMB b = pp->isamb;
1562         void *c1 = (*b->method->codec.start)();
1563 #else
1564         zint item_len;
1565 #endif
1566         /* skip the child we just came from */
1567 #if ISAMB_DEBUG
1568         yaz_log(YLOG_DEBUG, "isam_pp_climb_level: skipping lev=%d ofs=%d sz=%d", 
1569                         pp->level, p->offset, p->size);
1570 #endif
1571         assert (p->offset < p->size);
1572         src = p->bytes + p->offset;
1573 #if INT_ENCODE
1574         (*b->method->codec.decode)(c1, &file_item, &src);
1575         (*b->method->codec.stop)(c1);
1576 #else
1577         decode_item_len(&src, &item_len);
1578         src += item_len;
1579 #endif
1580         decode_ptr(&src, pos);
1581         p->offset = src - (char *)p->bytes;
1582             
1583     }
1584     return 1;
1585 } /* climb_level */
1586
1587
1588 static zint isamb_pp_forward_unode(ISAMB_PP pp, zint pos, const void *untilbuf)
1589 { /* scans a upper node until it finds a child <= untilbuf */
1590   /* pp points to the key value, as always. pos is the child read from */
1591   /* the buffer */
1592   /* if all values are too small, returns the last child in the node */
1593   /* FIXME - this can be detected, and avoided by looking at the */
1594   /* parent node, but that gets messy. Presumably the cost is */
1595   /* pretty low anyway */
1596     ISAMB b = pp->isamb;
1597     struct ISAMB_block *p = pp->block[pp->level];
1598     const char *src = p->bytes + p->offset;
1599     int cmp;
1600     zint nxtpos;
1601 #if ISAMB_DEBUG
1602     int skips = 0;
1603     yaz_log(YLOG_DEBUG, "isamb_pp_forward_unode starting "
1604                    "at level %d node %d ofs=%di sz=%d",
1605                     pp->level, p->pos, p->offset, p->size);
1606 #endif
1607     assert(!p->leaf);
1608     assert(p->offset <= p->size);
1609     if (p->offset == p->size)
1610     {
1611 #if ISAMB_DEBUG
1612             yaz_log(YLOG_DEBUG, "isamb_pp_forward_unode returning at end "
1613                    "at level %d node %d ofs=%di sz=%d",
1614                     pp->level, p->pos, p->offset, p->size);
1615 #endif
1616         return pos; /* already at the end of it */
1617     }
1618     while(p->offset < p->size)
1619     {
1620 #if INT_ENCODE
1621         char file_item_buf[DST_ITEM_MAX];
1622         char *file_item = file_item_buf;
1623         void *c1 = (*b->method->codec.start)();
1624         (*b->method->codec.decode)(c1, &file_item, &src);
1625         (*b->method->codec.stop)(c1);
1626         cmp = (*b->method->compare_item)(untilbuf, file_item_buf);
1627 #else
1628         zint item_len;
1629         decode_item_len(&src, &item_len);
1630         cmp = (*b->method->compare_item)(untilbuf, src);
1631         src += item_len;
1632 #endif
1633         decode_ptr(&src, &nxtpos);
1634         if (cmp<pp->scope) /* cmp<2 */
1635         {
1636 #if ISAMB_DEBUG
1637             yaz_log(YLOG_DEBUG, "isamb_pp_forward_unode returning a hit "
1638                    "at level %d node %d ofs=%d sz=%d",
1639                     pp->level, p->pos, p->offset, p->size);
1640 #endif
1641             return pos;
1642         } /* found one */
1643         pos = nxtpos;
1644         p->offset = src-(char*)p->bytes;
1645         (pp->skipped_nodes[pp->maxlevel - pp->level -1])++;
1646 #if ISAMB_DEBUG
1647         skips++;
1648 #endif
1649     }
1650 #if ISAMB_DEBUG
1651             yaz_log(YLOG_DEBUG, "isamb_pp_forward_unode returning at tail "
1652                    "at level %d node %d ofs=%d sz=%d skips=%d",
1653                     pp->level, p->pos, p->offset, p->size, skips);
1654 #endif
1655     return pos; /* that's the last one in the line */
1656     
1657 } /* forward_unode */
1658
1659 static void isamb_pp_descend_to_leaf(ISAMB_PP pp, ISAM_P pos,
1660                                      const void *untilbuf)
1661 { /* climbs down the tree, from pos, to the leftmost leaf */
1662     struct ISAMB_block *p = pp->block[pp->level];
1663     const char *src;
1664     assert(!p->leaf);
1665 #if ISAMB_DEBUG
1666     yaz_log(YLOG_DEBUG, "isamb_pp_descend_to_leaf "
1667                    "starting at lev %d node %d ofs=%d lf=%d u=%p", 
1668                    pp->level, p->pos, p->offset, p->leaf, untilbuf);
1669 #endif
1670     if (untilbuf)
1671         pos = isamb_pp_forward_unode(pp, pos, untilbuf);
1672     ++(pp->level);
1673     assert(pos);
1674     p = open_block(pp->isamb, pos);
1675     pp->block[pp->level] = p;
1676     ++(pp->accessed_nodes[pp->maxlevel-pp->level]);
1677     ++(pp->no_blocks);
1678 #if ISAMB_DEBUG
1679     yaz_log(YLOG_DEBUG, "isamb_pp_descend_to_leaf "
1680                    "got lev %d node %d lf=%d", 
1681                    pp->level, p->pos, p->leaf);
1682 #endif
1683     if (p->leaf)
1684         return;
1685     assert (p->offset==0);
1686     src = p->bytes + p->offset;
1687     decode_ptr(&src, &pos);
1688     p->offset = src-(char*)p->bytes;
1689     isamb_pp_descend_to_leaf(pp, pos, untilbuf);
1690 #if ISAMB_DEBUG
1691     yaz_log(YLOG_DEBUG, "isamb_pp_descend_to_leaf "
1692                    "returning at lev %d node %d ofs=%d lf=%d", 
1693                    pp->level, p->pos, p->offset, p->leaf);
1694 #endif
1695 } /* descend_to_leaf */
1696
1697 static int isamb_pp_find_next_leaf(ISAMB_PP pp)
1698 { /* finds the next leaf by climbing up and down */
1699     ISAM_P pos;
1700     if (!isamb_pp_climb_level(pp, &pos))
1701         return 0;
1702     isamb_pp_descend_to_leaf(pp, pos, 0);
1703     return 1;
1704 }
1705
1706 static int isamb_pp_climb_desc(ISAMB_PP pp,  const void *untilbuf)
1707 { /* climbs up and descends to a leaf where values >= *untilbuf are found */
1708     ISAM_P pos;
1709 #if ISAMB_DEBUG
1710     struct ISAMB_block *p = pp->block[pp->level];
1711     yaz_log(YLOG_DEBUG, "isamb_pp_climb_desc starting "
1712                    "at level %d node %d ofs=%d sz=%d",
1713                     pp->level, p->pos, p->offset, p->size);
1714 #endif
1715     if (!isamb_pp_climb_level(pp, &pos))
1716         return 0;
1717     /* see if it would pay to climb one higher */
1718     if (!isamb_pp_on_right_node(pp, pp->level, untilbuf))
1719         if (!isamb_pp_climb_level(pp, &pos))
1720             return 0;
1721     isamb_pp_descend_to_leaf(pp, pos, untilbuf);
1722 #if ISAMB_DEBUG
1723     p = pp->block[pp->level];
1724     yaz_log(YLOG_DEBUG, "isamb_pp_climb_desc done "
1725                    "at level %d node %d ofs=%d sz=%d",
1726                     pp->level, p->pos, p->offset, p->size);
1727 #endif
1728     return 1;
1729 } /* climb_desc */
1730
1731 int isamb_pp_forward (ISAMB_PP pp, void *buf, const void *untilbuf)
1732 {
1733 #if ISAMB_DEBUG
1734     struct ISAMB_block *p = pp->block[pp->level];
1735     assert(p->leaf);
1736     yaz_log(YLOG_DEBUG, "isamb_pp_forward starting "
1737                    "at level %d node %d ofs=%d sz=%d u=%p sc=%d",
1738                     pp->level, p->pos, p->offset, p->size, untilbuf, scope);
1739 #endif
1740     if (untilbuf)
1741     {
1742         if (isamb_pp_forward_on_leaf(pp, buf, untilbuf))
1743         {
1744 #if ISAMB_DEBUG
1745             yaz_log(YLOG_DEBUG, "isamb_pp_forward (f) returning (A) "
1746                    "at level %d node %d ofs=%d sz=%d",
1747                     pp->level, p->pos, p->offset, p->size);
1748 #endif
1749             return 1;
1750         }
1751         if (! isamb_pp_climb_desc(pp, untilbuf))
1752         {
1753 #if ISAMB_DEBUG
1754             yaz_log(YLOG_DEBUG, "isamb_pp_forward (f) returning notfound (B) "
1755                    "at level %d node %d ofs=%d sz=%d",
1756                     pp->level, p->pos, p->offset, p->size);
1757 #endif
1758             return 0; /* could not find a leaf */
1759         }
1760         do {
1761             if (isamb_pp_forward_on_leaf(pp, buf, untilbuf))
1762             {
1763 #if ISAMB_DEBUG
1764                 yaz_log(YLOG_DEBUG, "isamb_pp_forward (f) returning (c) "
1765                         "at level %d node %d ofs=%d sz=%d",
1766                         pp->level, p->pos, p->offset, p->size);
1767 #endif
1768                 return 1;
1769             }
1770         } while (isamb_pp_find_next_leaf(pp));
1771         return 0; /* could not find at all */
1772     }
1773     else { /* no untilbuf, a straight read */
1774         /* FIXME - this should be moved
1775          * directly into the pp_read */
1776         /* keeping here now, to keep same
1777          * interface as the old fwd */
1778         if (isamb_pp_read_on_leaf(pp, buf))
1779         {
1780 #if ISAMB_DEBUG
1781             yaz_log(YLOG_DEBUG, "isamb_pp_forward (read) returning (D) "
1782                    "at level %d node %d ofs=%d sz=%d",
1783                     pp->level, p->pos, p->offset, p->size);
1784 #endif
1785             return 1;
1786         }
1787         if (isamb_pp_find_next_leaf(pp))
1788         {
1789 #if ISAMB_DEBUG
1790             yaz_log(YLOG_DEBUG, "isamb_pp_forward (read) returning (E) "
1791                    "at level %d node %d ofs=%d sz=%d",
1792                     pp->level, p->pos, p->offset, p->size);
1793 #endif
1794             return isamb_pp_read_on_leaf(pp, buf);
1795         }
1796         else
1797             return 0;
1798     }
1799 } /* isam_pp_forward (new version) */
1800
1801 void isamb_pp_pos(ISAMB_PP pp, double *current, double *total)
1802 { /* return an estimate of the current position and of the total number of */
1803   /* occureences in the isam tree, based on the current leaf */
1804     assert(total);
1805     assert(current);
1806     
1807     /* if end-of-stream PP may not be leaf */
1808
1809     *total = (double) (pp->block[0]->no_items);
1810     *current = (double) pp->returned_numbers;
1811 #if ISAMB_DEBUG
1812     yaz_log(YLOG_LOG, "isamb_pp_pos returning: cur= %0.1f tot=%0.1f rn="
1813          ZINT_FORMAT, *current, *total, pp->returned_numbers);
1814 #endif
1815 }
1816
1817 int isamb_pp_forward2(ISAMB_PP pp, void *buf, const void *untilb)
1818 {
1819     char *dst = buf;
1820     const char *src;
1821     struct ISAMB_block *p = pp->block[pp->level];
1822     ISAMB b = pp->isamb;
1823     if (!p)
1824         return 0;
1825 again:
1826     while (p->offset == p->size)
1827     {
1828         ISAM_P pos;
1829 #if INT_ENCODE
1830         const char *src_0;
1831         void *c1;
1832         char file_item_buf[DST_ITEM_MAX];
1833         char *file_item = file_item_buf;
1834 #else
1835         zint item_len;
1836 #endif
1837         while (p->offset == p->size)
1838         {
1839             if (pp->level == 0)
1840                 return 0;
1841             close_block (pp->isamb, pp->block[pp->level]);
1842             pp->block[pp->level] = 0;
1843             (pp->level)--;
1844             p = pp->block[pp->level];
1845             assert (!p->leaf);  
1846         }
1847
1848         assert(!p->leaf);
1849         src = p->bytes + p->offset;
1850        
1851 #if INT_ENCODE
1852         c1 = (*b->method->codec.start)();
1853         (*b->method->codec.decode)(c1, &file_item, &src);
1854 #else
1855         decode_ptr (&src, &item_len);
1856         src += item_len;
1857 #endif        
1858         decode_ptr (&src, &pos);
1859         p->offset = src - (char*) p->bytes;
1860
1861         src = p->bytes + p->offset;
1862
1863         while(1)
1864         {
1865             if (!untilb || p->offset == p->size)
1866                 break;
1867             assert(p->offset < p->size);
1868 #if INT_ENCODE
1869             src_0 = src;
1870             file_item = file_item_buf;
1871             (*b->method->codec.reset)(c1);
1872             (*b->method->codec.decode)(c1, &file_item, &src);
1873             if ((*b->method->compare_item)(untilb, file_item_buf) <= 1)
1874             {
1875                 src = src_0;
1876                 break;
1877             }
1878 #else
1879             decode_item_len(&src, &item_len);
1880             if ((*b->method->compare_item)(untilb, src) <= 1)
1881                 break;
1882             src += item_len;
1883 #endif
1884             decode_ptr (&src, &pos);
1885             p->offset = src - (char*) p->bytes;
1886         }
1887
1888         pp->level++;
1889
1890         while (1)
1891         {
1892             pp->block[pp->level] = p = open_block (pp->isamb, pos);
1893
1894             pp->total_size += p->size;
1895             pp->no_blocks++;
1896             
1897             if (p->leaf) 
1898             {
1899                 break;
1900             }
1901             
1902             src = p->bytes + p->offset;
1903             while(1)
1904             {
1905                 decode_ptr (&src, &pos);
1906                 p->offset = src - (char*) p->bytes;
1907                 
1908                 if (!untilb || p->offset == p->size)
1909                     break;
1910                 assert(p->offset < p->size);
1911 #if INT_ENCODE
1912                 src_0 = src;
1913                 file_item = file_item_buf;
1914                 (*b->method->codec.reset)(c1);
1915                 (*b->method->codec.decode)(c1, &file_item, &src);
1916                 if ((*b->method->compare_item)(untilb, file_item_buf) <= 1)
1917                 {
1918                     src = src_0;
1919                     break;
1920                 }
1921 #else
1922                 decode_ptr (&src, &item_len);
1923                 if ((*b->method->compare_item)(untilb, src) <= 1)
1924                     break;
1925                 src += item_len;
1926 #endif
1927             }
1928             pp->level++;
1929         }
1930 #if INT_ENCODE
1931         (*b->method->codec.stop)(c1);
1932 #endif
1933     }
1934     assert (p->offset < p->size);
1935     assert (p->leaf);
1936     while(1)
1937     {
1938         char *dst0 = dst;
1939         src = p->bytes + p->offset;
1940         (*pp->isamb->method->codec.decode)(p->decodeClientData, &dst, &src);
1941         p->offset = src - (char*) p->bytes;
1942         if (!untilb || (*pp->isamb->method->compare_item)(untilb, dst0) <= 1)
1943             break;
1944         dst = dst0;
1945         if (p->offset == p->size) goto again;
1946     }
1947     return 1;
1948 }
1949 /*
1950  * Local variables:
1951  * c-basic-offset: 4
1952  * indent-tabs-mode: nil
1953  * End:
1954  * vim: shiftwidth=4 tabstop=8 expandtab
1955  */
1956