Allow different values for cache parameter for isamb_open: 0=normal,
[idzebra-moved-to-github.git] / isamb / isamb.c
1 /* $Id: isamb.c,v 1.87 2006-12-07 19:23:56 adam Exp $
2    Copyright (C) 1995-2006
3    Index Data ApS
4
5 This file is part of the Zebra server.
6
7 Zebra is free software; you can redistribute it and/or modify it under
8 the terms of the GNU General Public License as published by the Free
9 Software Foundation; either version 2, or (at your option) any later
10 version.
11
12 Zebra is distributed in the hope that it will be useful, but WITHOUT ANY
13 WARRANTY; without even the implied warranty of MERCHANTABILITY or
14 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
15 for more details.
16
17 You should have received a copy of the GNU General Public License
18 along with this program; if not, write to the Free Software
19 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
20
21 */
22
23 #include <stdlib.h>
24 #include <string.h>
25 #include <yaz/log.h>
26 #include <yaz/xmalloc.h>
27 #include <idzebra/isamb.h>
28 #include <assert.h>
29
30 #ifndef ISAMB_DEBUG
31 #define ISAMB_DEBUG 0
32 #endif
33
34
35 #define ISAMB_MAJOR_VERSION 3
36 #define ISAMB_MINOR_VERSION 0
37
38 struct ISAMB_head {
39     zint first_block;
40     zint last_block;
41     zint free_list;
42     zint no_items;
43     int block_size;
44     int block_max;
45     int block_offset;
46 };
47
48 /* if 1, upper nodes items are encoded; 0 if not encoded */
49 #define INT_ENCODE 1
50
51 /* maximum size of encoded buffer */
52 #define DST_ITEM_MAX 256
53
54 #define ISAMB_MAX_LEVEL 10
55 /* approx 2*max page + max size of item */
56 #define DST_BUF_SIZE (2*4096+300)
57
58 /* should be maximum block size of multiple thereof */
59 #define ISAMB_CACHE_ENTRY_SIZE 4096
60
61 /* CAT_MAX: _must_ be power of 2 */
62 #define CAT_MAX 4
63 #define CAT_MASK (CAT_MAX-1)
64 /* CAT_NO: <= CAT_MAX */
65 #define CAT_NO 4
66
67 /* Smallest block size */
68 #define ISAMB_MIN_SIZE 32
69 /* Size factor */
70 #define ISAMB_FAC_SIZE 4
71
72 /* ISAMB_PTR_CODEC = 1 var, =0 fixed */
73 #define ISAMB_PTR_CODEC 1
74
75 struct ISAMB_cache_entry {
76     ISAM_P pos;
77     unsigned char *buf;
78     int dirty;
79     int hits;
80     struct ISAMB_cache_entry *next;
81 };
82
83 struct ISAMB_file {
84     BFile bf;
85     int head_dirty;
86     struct ISAMB_head head;
87     struct ISAMB_cache_entry *cache_entries;
88 };
89
90 struct ISAMB_s {
91     BFiles bfs;
92     ISAMC_M *method;
93
94     struct ISAMB_file *file;
95     int no_cat;
96     int cache; /* 0 = no cache, 1 = use cache, -1 = dummy isam (for testing only) */
97     int log_io;        /* log level for bf_read/bf_write calls */
98     int log_freelist;  /* log level for freelist handling */
99     zint skipped_numbers; /* on a leaf node */
100     zint returned_numbers; 
101     zint skipped_nodes[ISAMB_MAX_LEVEL]; /* [0]=skipped leaves, 1 = higher etc */
102     zint accessed_nodes[ISAMB_MAX_LEVEL]; /* nodes we did not skip */
103 };
104
105 struct ISAMB_block {
106     ISAM_P pos;
107     int cat;
108     int size;
109     int leaf;
110     int dirty;
111     int deleted;
112     int offset;
113     zint no_items;  /* number of nodes in this + children */
114     char *bytes;
115     char *cbuf;
116     unsigned char *buf;
117     void *decodeClientData;
118     int log_rw;
119 };
120
121 struct ISAMB_PP_s {
122     ISAMB isamb;
123     ISAM_P pos;
124     int level;
125     int maxlevel; /* total depth */
126     zint total_size;
127     zint no_blocks;
128     zint skipped_numbers; /* on a leaf node */
129     zint returned_numbers; 
130     zint skipped_nodes[ISAMB_MAX_LEVEL]; /* [0]=skipped leaves, 1 = higher etc */
131     zint accessed_nodes[ISAMB_MAX_LEVEL]; /* nodes we did not skip */
132     struct ISAMB_block **block;
133     int scope;  /* on what level we forward */
134 };
135
136
137 #define encode_item_len encode_ptr
138 #if ISAMB_PTR_CODEC
139 static void encode_ptr(char **dst, zint pos)
140 {
141     unsigned char *bp = (unsigned char*) *dst;
142
143     while (pos > 127)
144     {
145          *bp++ = (unsigned char) (128 | (pos & 127));
146          pos = pos >> 7;
147     }
148     *bp++ = (unsigned char) pos;
149     *dst = (char *) bp;
150 }
151 #else
152 static void encode_ptr(char **dst, zint pos)
153 {
154     memcpy(*dst, &pos, sizeof(pos));
155     (*dst) += sizeof(pos);
156 }
157 #endif
158
159 #define decode_item_len decode_ptr
160 #if ISAMB_PTR_CODEC
161 static void decode_ptr(const char **src, zint *pos)
162 {
163     zint d = 0;
164     unsigned char c;
165     unsigned r = 0;
166
167     while (((c = *(const unsigned char *)((*src)++)) & 128))
168     {
169         d += ((zint) (c & 127) << r);
170         r += 7;
171     }
172     d += ((zint) c << r);
173     *pos = d;
174 }
175 #else
176 static void decode_ptr(const char **src, zint *pos)
177 {
178      memcpy(pos, *src, sizeof(*pos));
179      (*src) += sizeof(*pos);
180 }
181 #endif
182
183 ISAMB isamb_open(BFiles bfs, const char *name, int writeflag, ISAMC_M *method,
184                  int cache)
185 {
186     ISAMB isamb = xmalloc(sizeof(*isamb));
187     int i, b_size = ISAMB_MIN_SIZE;
188
189     isamb->bfs = bfs;
190     isamb->method = (ISAMC_M *) xmalloc(sizeof(*method));
191     memcpy(isamb->method, method, sizeof(*method));
192     isamb->no_cat = CAT_NO;
193     isamb->log_io = 0;
194     isamb->log_freelist = 0;
195     isamb->cache = cache;
196     isamb->skipped_numbers = 0;
197     isamb->returned_numbers = 0;
198     for (i = 0; i<ISAMB_MAX_LEVEL; i++)
199         isamb->skipped_nodes[i] = isamb->accessed_nodes[i] = 0;
200
201     if (cache == -1)
202     {
203         yaz_log(YLOG_WARN, "isamb_open %s. Degraded TEST mode", name);
204     }
205     else
206     {
207         assert(cache == 0 || cache == 1);
208     }
209     isamb->file = xmalloc(sizeof(*isamb->file) * isamb->no_cat);
210
211     for (i = 0; i < isamb->no_cat; i++)
212     {
213         isamb->file[i].bf = 0;
214         isamb->file[i].head_dirty = 0;
215         isamb->file[i].cache_entries = 0;
216     }
217
218     for (i = 0; i < isamb->no_cat; i++)
219     {
220         char fname[DST_BUF_SIZE];
221         char hbuf[DST_BUF_SIZE];
222
223         sprintf(fname, "%s%c", name, i+'A');
224         if (cache)
225             isamb->file[i].bf = bf_open(bfs, fname, ISAMB_CACHE_ENTRY_SIZE,
226                                          writeflag);
227         else
228             isamb->file[i].bf = bf_open(bfs, fname, b_size, writeflag);
229
230         if (!isamb->file[i].bf)
231         {
232             isamb_close(isamb);
233             return 0;
234         }
235
236         /* fill-in default values (for empty isamb) */
237         isamb->file[i].head.first_block = ISAMB_CACHE_ENTRY_SIZE/b_size+1;
238         isamb->file[i].head.last_block = isamb->file[i].head.first_block;
239         isamb->file[i].head.block_size = b_size;
240         assert(b_size <= ISAMB_CACHE_ENTRY_SIZE);
241 #if ISAMB_PTR_CODEC
242         if (i == isamb->no_cat-1 || b_size > 128)
243             isamb->file[i].head.block_offset = 8;
244         else
245             isamb->file[i].head.block_offset = 4;
246 #else
247         isamb->file[i].head.block_offset = 11;
248 #endif
249         isamb->file[i].head.block_max =
250             b_size - isamb->file[i].head.block_offset;
251         isamb->file[i].head.free_list = 0;
252         if (bf_read(isamb->file[i].bf, 0, 0, 0, hbuf))
253         {
254             /* got header assume "isamb"major minor len can fit in 16 bytes */
255             zint zint_tmp;
256             int major, minor, len, pos = 0;
257             int left;
258             const char *src = 0;
259             if (memcmp(hbuf, "isamb", 5))
260             {
261                 yaz_log(YLOG_WARN, "bad isamb header for file %s", fname);
262                 return 0;
263             }
264             if (sscanf(hbuf+5, "%d %d %d", &major, &minor, &len) != 3)
265             {
266                 yaz_log(YLOG_WARN, "bad isamb header for file %s", fname);
267                 return 0;
268             }
269             if (major != ISAMB_MAJOR_VERSION)
270             {
271                 yaz_log(YLOG_WARN, "bad major version for file %s %d, must be %d",
272                      fname, major, ISAMB_MAJOR_VERSION);
273                 return 0;
274             }
275             for (left = len - b_size; left > 0; left = left - b_size)
276             {
277                 pos++;
278                 if (!bf_read(isamb->file[i].bf, pos, 0, 0, hbuf + pos*b_size))
279                 {
280                     yaz_log(YLOG_WARN, "truncated isamb header for " 
281                          "file=%s len=%d pos=%d",
282                          fname, len, pos);
283                     return 0;
284                 }
285             }
286             src = hbuf + 16;
287             decode_ptr(&src, &isamb->file[i].head.first_block);
288             decode_ptr(&src, &isamb->file[i].head.last_block);
289             decode_ptr(&src, &zint_tmp);
290             isamb->file[i].head.block_size = (int) zint_tmp;
291             decode_ptr(&src, &zint_tmp);
292             isamb->file[i].head.block_max = (int) zint_tmp;
293             decode_ptr(&src, &isamb->file[i].head.free_list);
294         }
295         assert (isamb->file[i].head.block_size >= isamb->file[i].head.block_offset);
296         isamb->file[i].head_dirty = 0;
297         assert(isamb->file[i].head.block_size == b_size);
298         b_size = b_size * ISAMB_FAC_SIZE;
299     }
300 #if ISAMB_DEBUG
301     yaz_log(YLOG_WARN, "isamb debug enabled. Things will be slower than usual");
302 #endif
303     return isamb;
304 }
305
306 static void flush_blocks (ISAMB b, int cat)
307 {
308     while (b->file[cat].cache_entries)
309     {
310         struct ISAMB_cache_entry *ce_this = b->file[cat].cache_entries;
311         b->file[cat].cache_entries = ce_this->next;
312
313         if (ce_this->dirty)
314         {
315             yaz_log(b->log_io, "bf_write: flush_blocks");
316             bf_write(b->file[cat].bf, ce_this->pos, 0, 0, ce_this->buf);
317         }
318         xfree(ce_this->buf);
319         xfree(ce_this);
320     }
321 }
322
323 static int cache_block (ISAMB b, ISAM_P pos, unsigned char *userbuf, int wr)
324 {
325     int cat = (int) (pos&CAT_MASK);
326     int off = (int) (((pos/CAT_MAX) & 
327                (ISAMB_CACHE_ENTRY_SIZE / b->file[cat].head.block_size - 1))
328         * b->file[cat].head.block_size);
329     zint norm = pos / (CAT_MASK*ISAMB_CACHE_ENTRY_SIZE / b->file[cat].head.block_size);
330     int no = 0;
331     struct ISAMB_cache_entry **ce, *ce_this = 0, **ce_last = 0;
332
333     if (!b->cache)
334         return 0;
335
336     assert (ISAMB_CACHE_ENTRY_SIZE >= b->file[cat].head.block_size);
337     for (ce = &b->file[cat].cache_entries; *ce; ce = &(*ce)->next, no++)
338     {
339         ce_last = ce;
340         if ((*ce)->pos == norm)
341         {
342             ce_this = *ce;
343             *ce = (*ce)->next;   /* remove from list */
344             
345             ce_this->next = b->file[cat].cache_entries;  /* move to front */
346             b->file[cat].cache_entries = ce_this;
347             
348             if (wr)
349             {
350                 memcpy (ce_this->buf + off, userbuf, 
351                         b->file[cat].head.block_size);
352                 ce_this->dirty = 1;
353             }
354             else
355                 memcpy (userbuf, ce_this->buf + off,
356                         b->file[cat].head.block_size);
357             return 1;
358         }
359     }
360     if (no >= 40)
361     {
362         assert (no == 40);
363         assert (ce_last && *ce_last);
364         ce_this = *ce_last;
365         *ce_last = 0;  /* remove the last entry from list */
366         if (ce_this->dirty)
367         {
368             yaz_log(b->log_io, "bf_write: cache_block");
369             bf_write(b->file[cat].bf, ce_this->pos, 0, 0, ce_this->buf);
370         }
371         xfree(ce_this->buf);
372         xfree(ce_this);
373     }
374     ce_this = xmalloc(sizeof(*ce_this));
375     ce_this->next = b->file[cat].cache_entries;
376     b->file[cat].cache_entries = ce_this;
377     ce_this->buf = xmalloc(ISAMB_CACHE_ENTRY_SIZE);
378     ce_this->pos = norm;
379     yaz_log(b->log_io, "bf_read: cache_block");
380     if (!bf_read(b->file[cat].bf, norm, 0, 0, ce_this->buf))
381         memset (ce_this->buf, 0, ISAMB_CACHE_ENTRY_SIZE);
382     if (wr)
383     {
384         memcpy (ce_this->buf + off, userbuf, b->file[cat].head.block_size);
385         ce_this->dirty = 1;
386     }
387     else
388     {
389         ce_this->dirty = 0;
390         memcpy (userbuf, ce_this->buf + off, b->file[cat].head.block_size);
391     }
392     return 1;
393 }
394
395
396 void isamb_close (ISAMB isamb)
397 {
398     int i;
399     for (i = 0; isamb->accessed_nodes[i]; i++)
400         yaz_log(YLOG_DEBUG, "isamb_close  level leaf-%d: "ZINT_FORMAT" read, "
401                         ZINT_FORMAT" skipped",
402              i, isamb->accessed_nodes[i], isamb->skipped_nodes[i]);
403     yaz_log(YLOG_DEBUG, "isamb_close returned "ZINT_FORMAT" values, "
404                    "skipped "ZINT_FORMAT,
405          isamb->skipped_numbers, isamb->returned_numbers);
406     for (i = 0; i<isamb->no_cat; i++)
407     {
408         flush_blocks (isamb, i);
409         if (isamb->file[i].head_dirty)
410         {
411             char hbuf[DST_BUF_SIZE];
412             int major = ISAMB_MAJOR_VERSION;
413             int minor = ISAMB_MINOR_VERSION;
414             int len = 16;
415             char *dst = hbuf + 16;
416             int pos = 0, left;
417             int b_size = isamb->file[i].head.block_size;
418
419             encode_ptr(&dst, isamb->file[i].head.first_block);
420             encode_ptr(&dst, isamb->file[i].head.last_block);
421             encode_ptr(&dst, isamb->file[i].head.block_size);
422             encode_ptr(&dst, isamb->file[i].head.block_max);
423             encode_ptr(&dst, isamb->file[i].head.free_list);
424             memset(dst, '\0', b_size); /* ensure no random bytes are written */
425
426             len = dst - hbuf;
427
428             /* print exactly 16 bytes (including trailing 0) */
429             sprintf(hbuf, "isamb%02d %02d %02d\r\n", major, minor, len);
430
431             bf_write(isamb->file[i].bf, pos, 0, 0, hbuf);
432
433             for (left = len - b_size; left > 0; left = left - b_size)
434             {
435                 pos++;
436                 bf_write(isamb->file[i].bf, pos, 0, 0, hbuf + pos*b_size);
437             }
438         }
439         if (isamb->file[i].bf)
440             bf_close (isamb->file[i].bf);
441     }
442     xfree(isamb->file);
443     xfree(isamb->method);
444     xfree(isamb);
445 }
446
447 /* open_block: read one block at pos.
448    Decode leading sys bytes .. consisting of
449    Offset:Meaning
450    0: leader byte, != 0 leaf, == 0, non-leaf
451    1-2: used size of block
452    3-7*: number of items and all children
453    
454    * Reserve 5 bytes for large block sizes. 1 for small ones .. Number
455    of items. We can thus have at most 2^40 nodes. 
456 */
457 static struct ISAMB_block *open_block(ISAMB b, ISAM_P pos)
458 {
459     int cat = (int) (pos&CAT_MASK);
460     const char *src;
461     int offset = b->file[cat].head.block_offset;
462     struct ISAMB_block *p;
463     if (!pos)
464         return 0;
465     p = xmalloc(sizeof(*p));
466     p->pos = pos;
467     p->cat = (int) (pos & CAT_MASK);
468     p->buf = xmalloc(b->file[cat].head.block_size);
469     p->cbuf = 0;
470
471     if (!cache_block (b, pos, p->buf, 0))
472     {
473         yaz_log(b->log_io, "bf_read: open_block");
474         if (bf_read(b->file[cat].bf, pos/CAT_MAX, 0, 0, p->buf) != 1)
475         {
476             yaz_log(YLOG_FATAL, "isamb: read fail for pos=%ld block=%ld",
477                      (long) pos, (long) pos/CAT_MAX);
478             zebra_exit("isamb:open_block");
479         }
480     }
481     p->bytes = (char *)p->buf + offset;
482     p->leaf = p->buf[0];
483     p->size = (p->buf[1] + 256 * p->buf[2]) - offset;
484     if (p->size < 0)
485     {
486         yaz_log(YLOG_FATAL, "Bad block size %d in pos=" ZINT_FORMAT "\n",
487                  p->size, pos);
488     }
489     assert (p->size >= 0);
490     src = (char*) p->buf + 3;
491     decode_ptr(&src, &p->no_items);
492
493     p->offset = 0;
494     p->dirty = 0;
495     p->deleted = 0;
496     p->decodeClientData = (*b->method->codec.start)();
497     return p;
498 }
499
500 struct ISAMB_block *new_block (ISAMB b, int leaf, int cat)
501 {
502     struct ISAMB_block *p;
503
504     p = xmalloc(sizeof(*p));
505     p->buf = xmalloc(b->file[cat].head.block_size);
506
507     if (!b->file[cat].head.free_list)
508     {
509         zint block_no;
510         block_no = b->file[cat].head.last_block++;
511         p->pos = block_no * CAT_MAX + cat;
512     }
513     else
514     {
515         p->pos = b->file[cat].head.free_list;
516         assert((p->pos & CAT_MASK) == cat);
517         if (!cache_block (b, p->pos, p->buf, 0))
518         {
519             yaz_log(b->log_io, "bf_read: new_block");
520             if (!bf_read(b->file[cat].bf, p->pos/CAT_MAX, 0, 0, p->buf))
521             {
522                 yaz_log(YLOG_FATAL, "isamb: read fail for pos=%ld block=%ld",
523                          (long) p->pos/CAT_MAX, (long) p->pos/CAT_MAX);
524                 zebra_exit("isamb:new_block");
525             }
526         }
527         yaz_log(b->log_freelist, "got block " ZINT_FORMAT " from freelist %d:" ZINT_FORMAT, p->pos,
528                  cat, p->pos/CAT_MAX);
529         memcpy (&b->file[cat].head.free_list, p->buf, sizeof(zint));
530     }
531     p->cat = cat;
532     b->file[cat].head_dirty = 1;
533     memset (p->buf, 0, b->file[cat].head.block_size);
534     p->bytes = (char*)p->buf + b->file[cat].head.block_offset;
535     p->leaf = leaf;
536     p->size = 0;
537     p->dirty = 1;
538     p->deleted = 0;
539     p->offset = 0;
540     p->no_items = 0;
541     p->decodeClientData = (*b->method->codec.start)();
542     return p;
543 }
544
545 struct ISAMB_block *new_leaf (ISAMB b, int cat)
546 {
547     return new_block (b, 1, cat);
548 }
549
550
551 struct ISAMB_block *new_int (ISAMB b, int cat)
552 {
553     return new_block (b, 0, cat);
554 }
555
556 static void check_block (ISAMB b, struct ISAMB_block *p)
557 {
558     assert(b); /* mostly to make the compiler shut up about unused b */
559     if (p->leaf)
560     {
561         ;
562     }
563     else
564     {
565         /* sanity check */
566         char *startp = p->bytes;
567         const char *src = startp;
568         char *endp = p->bytes + p->size;
569         ISAM_P pos;
570         void *c1 = (*b->method->codec.start)();
571             
572         decode_ptr(&src, &pos);
573         assert ((pos&CAT_MASK) == p->cat);
574         while (src != endp)
575         {
576 #if INT_ENCODE
577             char file_item_buf[DST_ITEM_MAX];
578             char *file_item = file_item_buf;
579             (*b->method->codec.reset)(c1);
580             (*b->method->codec.decode)(c1, &file_item, &src);
581 #else
582             zint item_len;
583             decode_item_len(&src, &item_len);
584             assert (item_len > 0 && item_len < 80);
585             src += item_len;
586 #endif
587             decode_ptr(&src, &pos);
588             if ((pos&CAT_MASK) != p->cat)
589             {
590                 assert ((pos&CAT_MASK) == p->cat);
591             }
592         }
593         (*b->method->codec.stop)(c1);
594     }
595 }
596
597 void close_block(ISAMB b, struct ISAMB_block *p)
598 {
599     if (!p)
600         return;
601     if (p->deleted)
602     {
603         yaz_log(b->log_freelist, "release block " ZINT_FORMAT " from freelist %d:" ZINT_FORMAT,
604                  p->pos, p->cat, p->pos/CAT_MAX);
605         memcpy (p->buf, &b->file[p->cat].head.free_list, sizeof(zint));
606         b->file[p->cat].head.free_list = p->pos;
607         if (!cache_block (b, p->pos, p->buf, 1))
608         {
609             yaz_log(b->log_io, "bf_write: close_block (deleted)");
610             bf_write(b->file[p->cat].bf, p->pos/CAT_MAX, 0, 0, p->buf);
611         }
612     }
613     else if (p->dirty)
614     {
615         int offset = b->file[p->cat].head.block_offset;
616         int size = p->size + offset;
617         char *dst =  (char*)p->buf + 3;
618         assert (p->size >= 0);
619         
620         /* memset becuase encode_ptr usually does not write all bytes */
621         memset(p->buf, 0, b->file[p->cat].head.block_offset);
622         p->buf[0] = p->leaf;
623         p->buf[1] = size & 255;
624         p->buf[2] = size >> 8;
625         encode_ptr(&dst, p->no_items);
626         check_block(b, p);
627         if (!cache_block (b, p->pos, p->buf, 1))
628         {
629             yaz_log(b->log_io, "bf_write: close_block");
630             bf_write(b->file[p->cat].bf, p->pos/CAT_MAX, 0, 0, p->buf);
631         }
632     }
633     (*b->method->codec.stop)(p->decodeClientData);
634     xfree(p->buf);
635     xfree(p);
636 }
637
638 int insert_sub (ISAMB b, struct ISAMB_block **p,
639                 void *new_item, int *mode,
640                 ISAMC_I *stream,
641                 struct ISAMB_block **sp,
642                 void *sub_item, int *sub_size,
643                 const void *max_item);
644
645 int insert_int (ISAMB b, struct ISAMB_block *p, void *lookahead_item,
646                 int *mode,
647                 ISAMC_I *stream, struct ISAMB_block **sp,
648                 void *split_item, int *split_size, const void *last_max_item)
649 {
650     char *startp = p->bytes;
651     const char *src = startp;
652     char *endp = p->bytes + p->size;
653     ISAM_P pos;
654     struct ISAMB_block *sub_p1 = 0, *sub_p2 = 0;
655     char sub_item[DST_ITEM_MAX];
656     int sub_size;
657     int more = 0;
658     zint diff_terms = 0;
659     void *c1 = (*b->method->codec.start)();
660
661     *sp = 0;
662
663     assert(p->size >= 0);
664     decode_ptr(&src, &pos);
665     while (src != endp)
666     {
667         int d;
668         const char *src0 = src;
669 #if INT_ENCODE
670         char file_item_buf[DST_ITEM_MAX];
671         char *file_item = file_item_buf;
672         (*b->method->codec.reset)(c1);
673         (*b->method->codec.decode)(c1, &file_item, &src);
674         d = (*b->method->compare_item)(file_item_buf, lookahead_item);
675         if (d > 0)
676         {
677             sub_p1 = open_block(b, pos);
678             assert (sub_p1);
679             diff_terms -= sub_p1->no_items;
680             more = insert_sub (b, &sub_p1, lookahead_item, mode,
681                                stream, &sub_p2, 
682                                sub_item, &sub_size, file_item_buf);
683             diff_terms += sub_p1->no_items;
684             src = src0;
685             break;
686         }
687 #else
688         zint item_len;
689         decode_item_len(&src, &item_len);
690         d = (*b->method->compare_item)(src, lookahead_item);
691         if (d > 0)
692         {
693             sub_p1 = open_block(b, pos);
694             assert (sub_p1);
695             diff_terms -= sub_p1->no_items;
696             more = insert_sub (b, &sub_p1, lookahead_item, mode,
697                                stream, &sub_p2, 
698                                sub_item, &sub_size, src);
699             diff_terms += sub_p1->no_items;
700             src = src0;
701             break;
702         }
703         src += item_len;
704 #endif
705         decode_ptr(&src, &pos);
706     }
707     if (!sub_p1)
708     {
709         /* we reached the end. So lookahead > last item */
710         sub_p1 = open_block(b, pos);
711         assert (sub_p1);
712         diff_terms -= sub_p1->no_items;
713         more = insert_sub (b, &sub_p1, lookahead_item, mode, stream, &sub_p2, 
714                            sub_item, &sub_size, last_max_item);
715         diff_terms += sub_p1->no_items;
716     }
717     if (sub_p2)
718         diff_terms += sub_p2->no_items;
719     if (diff_terms)
720     {
721         p->dirty = 1;
722         p->no_items += diff_terms;
723     }
724     if (sub_p2)
725     {
726         /* there was a split - must insert pointer in this one */
727         char dst_buf[DST_BUF_SIZE];
728         char *dst = dst_buf;
729 #if INT_ENCODE
730         const char *sub_item_ptr = sub_item;
731 #endif
732         assert (sub_size < 80 && sub_size > 1);
733
734         memcpy (dst, startp, src - startp);
735                 
736         dst += src - startp;
737
738 #if INT_ENCODE
739         (*b->method->codec.reset)(c1);
740         (*b->method->codec.encode)(c1, &dst, &sub_item_ptr);
741 #else
742         encode_item_len (&dst, sub_size);      /* sub length and item */
743         memcpy (dst, sub_item, sub_size);
744         dst += sub_size;
745 #endif
746
747         encode_ptr(&dst, sub_p2->pos);   /* pos */
748
749         if (endp - src)                   /* remaining data */
750         {
751             memcpy (dst, src, endp - src);
752             dst += endp - src;
753         }
754         p->size = dst - dst_buf;
755         assert (p->size >= 0);
756
757         if (p->size <= b->file[p->cat].head.block_max)
758         {
759             /* it fits OK in this block */
760             memcpy (startp, dst_buf, dst - dst_buf);
761
762             close_block(b, sub_p2);
763         }
764         else
765         {
766             /* must split _this_ block as well .. */
767             struct ISAMB_block *sub_p3;
768 #if INT_ENCODE
769             char file_item_buf[DST_ITEM_MAX];
770             char *file_item = file_item_buf;
771 #else
772             zint split_size_tmp;
773 #endif
774             zint no_items_first_half = 0;
775             int p_new_size;
776             const char *half;
777             src = dst_buf;
778             endp = dst;
779
780             p->dirty = 1;
781             close_block(b, sub_p2);
782
783             half = src + b->file[p->cat].head.block_size/2;
784             decode_ptr(&src, &pos);
785
786             /* read sub block so we can get no_items for it */
787             sub_p3 = open_block(b, pos);
788             no_items_first_half += sub_p3->no_items;
789             close_block(b, sub_p3);
790
791             while (src <= half)
792             {
793 #if INT_ENCODE
794                 file_item = file_item_buf;
795                 (*b->method->codec.reset)(c1);
796                 (*b->method->codec.decode)(c1, &file_item, &src);
797 #else
798                 decode_item_len(&src, &split_size_tmp);
799                 *split_size = (int) split_size_tmp;
800                 src += *split_size;
801 #endif
802                 decode_ptr(&src, &pos);
803
804                 /* read sub block so we can get no_items for it */
805                 sub_p3 = open_block(b, pos);
806                 no_items_first_half += sub_p3->no_items;
807                 close_block(b, sub_p3);
808             }
809             /*  p is first half */
810             p_new_size = src - dst_buf;
811             memcpy (p->bytes, dst_buf, p_new_size);
812
813 #if INT_ENCODE
814             file_item = file_item_buf;
815             (*b->method->codec.reset)(c1);
816             (*b->method->codec.decode)(c1, &file_item, &src);
817             *split_size = file_item - file_item_buf;
818             memcpy(split_item, file_item_buf, *split_size);
819 #else
820             decode_item_len(&src, &split_size_tmp);
821             *split_size = (int) split_size_tmp;
822             memcpy (split_item, src, *split_size);
823             src += *split_size;
824 #endif
825             /*  *sp is second half */
826             *sp = new_int (b, p->cat);
827             (*sp)->size = endp - src;
828             memcpy ((*sp)->bytes, src, (*sp)->size);
829
830             p->size = p_new_size;
831
832             /*  adjust no_items in first&second half */
833             (*sp)->no_items = p->no_items - no_items_first_half;
834             p->no_items = no_items_first_half;
835         }
836         p->dirty = 1;
837     }
838     close_block(b, sub_p1);
839     (*b->method->codec.stop)(c1);
840     return more;
841 }
842
843 int insert_leaf (ISAMB b, struct ISAMB_block **sp1, void *lookahead_item,
844                  int *lookahead_mode, ISAMC_I *stream,
845                  struct ISAMB_block **sp2,
846                  void *sub_item, int *sub_size,
847                  const void *max_item)
848 {
849     struct ISAMB_block *p = *sp1;
850     char *endp = 0;
851     const char *src = 0;
852     char dst_buf[DST_BUF_SIZE], *dst = dst_buf;
853     int new_size;
854     void *c1 = (*b->method->codec.start)();
855     void *c2 = (*b->method->codec.start)();
856     int more = 1;
857     int quater = b->file[b->no_cat-1].head.block_max / 4;
858     char *mid_cut = dst_buf + quater * 2;
859     char *tail_cut = dst_buf + quater * 3;
860     char *maxp = dst_buf + b->file[b->no_cat-1].head.block_max;
861     char *half1 = 0;
862     char *half2 = 0;
863     char cut_item_buf[DST_ITEM_MAX];
864     int cut_item_size = 0;
865     int no_items = 0;    /* number of items (total) */
866     int no_items_1 = 0;  /* number of items (first half) */
867     int inserted_dst_bytes = 0;
868
869     if (p && p->size)
870     {
871         char file_item_buf[DST_ITEM_MAX];
872         char *file_item = file_item_buf;
873             
874         src = p->bytes;
875         endp = p->bytes + p->size;
876         (*b->method->codec.decode)(c1, &file_item, &src);
877         while (1)
878         {
879             const char *dst_item = 0; /* resulting item to be inserted */
880             char *lookahead_next;
881             char *dst_0 = dst;
882             int d = -1;
883             
884             if (lookahead_item)
885                 d = (*b->method->compare_item)(file_item_buf, lookahead_item);
886             
887             /* d now holds comparison between existing file item and
888                lookahead item 
889                d = 0: equal
890                d > 0: lookahead before file
891                d < 0: lookahead after file
892             */
893             if (d > 0)
894             {
895                 /* lookahead must be inserted */
896                 dst_item = lookahead_item;
897                 /* if this is not an insertion, it's really bad .. */
898                 if (!*lookahead_mode)
899                 {
900                     yaz_log(YLOG_WARN, "isamb: Inconsistent register (1)");
901                     assert(*lookahead_mode);
902                 }
903             }
904             else
905                 dst_item = file_item_buf;
906
907             if (!*lookahead_mode && d == 0)
908             {
909                 /* it's a deletion and they match so there is nothing to be
910                    inserted anyway .. But mark the thing bad (file item
911                    was part of input.. The item will not be part of output */
912                 p->dirty = 1;
913             }
914             else if (!half1 && dst > mid_cut)
915             {
916                 /* we have reached the splitting point for the first time */
917                 const char *dst_item_0 = dst_item;
918                 half1 = dst; /* candidate for splitting */
919
920                 /* encode the resulting item */
921                 (*b->method->codec.encode)(c2, &dst, &dst_item);
922                 
923                 cut_item_size = dst_item - dst_item_0;
924                 assert(cut_item_size > 0);
925                 memcpy (cut_item_buf, dst_item_0, cut_item_size);
926                 
927                 half2 = dst;
928                 no_items_1 = no_items;
929                 no_items++;
930             }
931             else
932             {
933                 /* encode the resulting item */
934                 (*b->method->codec.encode)(c2, &dst, &dst_item);
935                 no_items++;
936             }
937
938             /* now move "pointers" .. result has been encoded .. */
939             if (d > 0)  
940             {
941                 /* we must move the lookahead pointer */
942
943                 inserted_dst_bytes += (dst - dst_0);
944                 if (inserted_dst_bytes >=  quater)
945                     /* no more room. Mark lookahead as "gone".. */
946                     lookahead_item = 0;
947                 else
948                 {
949                     /* move it really.. */
950                     lookahead_next = lookahead_item;
951                     if (!(*stream->read_item)(stream->clientData,
952                                               &lookahead_next,
953                                               lookahead_mode))
954                     {
955                         /* end of stream reached: no "more" and no lookahead */
956                         lookahead_item = 0;
957                         more = 0;
958                     }
959                     if (lookahead_item && max_item &&
960                         (*b->method->compare_item)(max_item, lookahead_item) <= 0)
961                     {
962                         /* the lookahead goes beyond what we allow in this
963                            leaf. Mark it as "gone" */
964                         lookahead_item = 0;
965                     }
966                     
967                     p->dirty = 1;
968                 }
969             }
970             else if (d == 0)
971             {
972                 /* exact match .. move both pointers */
973
974                 lookahead_next = lookahead_item;
975                 if (!(*stream->read_item)(stream->clientData,
976                                           &lookahead_next, lookahead_mode))
977                 {
978                     lookahead_item = 0;
979                     more = 0;
980                 }
981                 if (src == endp)
982                     break;  /* end of file stream reached .. */
983                 file_item = file_item_buf; /* move file pointer */
984                 (*b->method->codec.decode)(c1, &file_item, &src);
985             }
986             else
987             {
988                 /* file pointer must be moved */
989                 if (src == endp)
990                     break;
991                 file_item = file_item_buf;
992                 (*b->method->codec.decode)(c1, &file_item, &src);
993             }
994         }
995     }
996
997     /* this loop runs when we are "appending" to a leaf page. That is
998        either it's empty (new) or all file items have been read in
999        previous loop */
1000
1001     maxp = dst_buf + b->file[b->no_cat-1].head.block_max + quater;
1002     while (lookahead_item)
1003     {
1004         char *dst_item;
1005         const char *src = lookahead_item;
1006         char *dst_0 = dst;
1007         
1008         /* if we have a lookahead item, we stop if we exceed the value of it */
1009         if (max_item &&
1010             (*b->method->compare_item)(max_item, lookahead_item) <= 0)
1011         {
1012             /* stop if we have reached the value of max item */
1013             break;
1014         }
1015         if (!*lookahead_mode)
1016         {
1017             /* this is append. So a delete is bad */
1018             yaz_log(YLOG_WARN, "isamb: Inconsistent register (2)");
1019             assert(*lookahead_mode);
1020         }
1021         else if (!half1 && dst > tail_cut)
1022         {
1023             const char *src_0 = src;
1024             half1 = dst; /* candidate for splitting */
1025             
1026             (*b->method->codec.encode)(c2, &dst, &src);
1027             
1028             cut_item_size = src - src_0;
1029             assert(cut_item_size > 0);
1030             memcpy (cut_item_buf, src_0, cut_item_size);
1031             
1032             no_items_1 = no_items;
1033             half2 = dst;
1034         }
1035         else
1036             (*b->method->codec.encode)(c2, &dst, &src);
1037
1038         if (dst > maxp)
1039         {
1040             dst = dst_0;
1041             break;
1042         }
1043         no_items++;
1044         if (p)
1045             p->dirty = 1;
1046         dst_item = lookahead_item;
1047         if (!(*stream->read_item)(stream->clientData, &dst_item,
1048                                   lookahead_mode))
1049         {
1050             lookahead_item = 0;
1051             more = 0;
1052         }
1053     }
1054     new_size = dst - dst_buf;
1055     if (p && p->cat != b->no_cat-1 && 
1056         new_size > b->file[p->cat].head.block_max)
1057     {
1058         /* non-btree block will be removed */
1059         p->deleted = 1;
1060         close_block(b, p);
1061         /* delete it too!! */
1062         p = 0; /* make a new one anyway */
1063     }
1064     if (!p)
1065     {   /* must create a new one */
1066         int i;
1067         for (i = 0; i < b->no_cat; i++)
1068             if (new_size <= b->file[i].head.block_max)
1069                 break;
1070         if (i == b->no_cat)
1071             i = b->no_cat - 1;
1072         p = new_leaf (b, i);
1073     }
1074     if (new_size > b->file[p->cat].head.block_max)
1075     {
1076         char *first_dst;
1077         const char *cut_item = cut_item_buf;
1078
1079         assert (half1);
1080         assert (half2);
1081
1082         assert(cut_item_size > 0);
1083         
1084         /* first half */
1085         p->size = half1 - dst_buf;
1086         assert(p->size <=  b->file[p->cat].head.block_max);
1087         memcpy (p->bytes, dst_buf, half1 - dst_buf);
1088         p->no_items = no_items_1;
1089
1090         /* second half */
1091         *sp2 = new_leaf (b, p->cat);
1092
1093         (*b->method->codec.reset)(c2);
1094
1095         first_dst = (*sp2)->bytes;
1096
1097         (*b->method->codec.encode)(c2, &first_dst, &cut_item);
1098
1099         memcpy (first_dst, half2, dst - half2);
1100
1101         (*sp2)->size = (first_dst - (*sp2)->bytes) + (dst - half2);
1102         assert((*sp2)->size <=  b->file[p->cat].head.block_max);
1103         (*sp2)->no_items = no_items - no_items_1;
1104         (*sp2)->dirty = 1;
1105         p->dirty = 1;
1106         memcpy (sub_item, cut_item_buf, cut_item_size);
1107         *sub_size = cut_item_size;
1108     }
1109     else
1110     {
1111         memcpy (p->bytes, dst_buf, dst - dst_buf);
1112         p->size = new_size;
1113         p->no_items = no_items;
1114     }
1115     (*b->method->codec.stop)(c1);
1116     (*b->method->codec.stop)(c2);
1117     *sp1 = p;
1118     return more;
1119 }
1120
1121 int insert_sub (ISAMB b, struct ISAMB_block **p, void *new_item,
1122                 int *mode,
1123                 ISAMC_I *stream,
1124                 struct ISAMB_block **sp,
1125                 void *sub_item, int *sub_size,
1126                 const void *max_item)
1127 {
1128     if (!*p || (*p)->leaf)
1129         return insert_leaf (b, p, new_item, mode, stream, sp, sub_item, 
1130                             sub_size, max_item);
1131     else
1132         return insert_int (b, *p, new_item, mode, stream, sp, sub_item,
1133                            sub_size, max_item);
1134 }
1135
1136 int isamb_unlink (ISAMB b, ISAM_P pos)
1137 {
1138     struct ISAMB_block *p1;
1139
1140     if (!pos)
1141         return 0;
1142     p1 = open_block(b, pos);
1143     p1->deleted = 1;
1144     if (!p1->leaf)
1145     {
1146         zint sub_p;
1147         const char *src = p1->bytes + p1->offset;
1148 #if INT_ENCODE
1149         void *c1 = (*b->method->codec.start)();
1150 #endif
1151         decode_ptr(&src, &sub_p);
1152         isamb_unlink(b, sub_p);
1153         
1154         while (src != p1->bytes + p1->size)
1155         {
1156 #if INT_ENCODE
1157             char file_item_buf[DST_ITEM_MAX];
1158             char *file_item = file_item_buf;
1159             (*b->method->codec.reset)(c1);
1160             (*b->method->codec.decode)(c1, &file_item, &src);
1161 #else
1162             zint item_len;
1163             decode_item_len(&src, &item_len);
1164             src += item_len;
1165 #endif
1166             decode_ptr(&src, &sub_p);
1167             isamb_unlink(b, sub_p);
1168         }
1169 #if INT_ENCODE
1170         (*b->method->codec.stop)(c1);
1171 #endif
1172     }
1173     close_block(b, p1);
1174     return 0;
1175 }
1176
1177 void isamb_merge(ISAMB b, ISAM_P *pos, ISAMC_I *stream)
1178 {
1179     char item_buf[DST_ITEM_MAX];
1180     char *item_ptr;
1181     int i_mode;
1182     int more;
1183     int must_delete = 0;
1184
1185     if (b->cache < 0)
1186     {
1187         int more = 1;
1188         while (more)
1189         {
1190             item_ptr = item_buf;
1191             more =
1192                 (*stream->read_item)(stream->clientData, &item_ptr, &i_mode);
1193         }
1194         *pos = 1;
1195         return;
1196     }
1197     item_ptr = item_buf;
1198     more = (*stream->read_item)(stream->clientData, &item_ptr, &i_mode);
1199     while (more)
1200     {
1201         struct ISAMB_block *p = 0, *sp = 0;
1202         char sub_item[DST_ITEM_MAX];
1203         int sub_size;
1204         
1205         if (*pos)
1206             p = open_block(b, *pos);
1207         more = insert_sub (b, &p, item_buf, &i_mode, stream, &sp,
1208                            sub_item, &sub_size, 0);
1209         if (sp)
1210         {    /* increase level of tree by one */
1211             struct ISAMB_block *p2 = new_int (b, p->cat);
1212             char *dst = p2->bytes + p2->size;
1213 #if INT_ENCODE
1214             void *c1 = (*b->method->codec.start)();
1215             const char *sub_item_ptr = sub_item;
1216 #endif
1217
1218             encode_ptr(&dst, p->pos);
1219             assert (sub_size < 80 && sub_size > 1);
1220 #if INT_ENCODE
1221             (*b->method->codec.reset)(c1);
1222             (*b->method->codec.encode)(c1, &dst, &sub_item_ptr);
1223 #else
1224             encode_item_len (&dst, sub_size);
1225             memcpy (dst, sub_item, sub_size);
1226             dst += sub_size;
1227 #endif
1228             encode_ptr(&dst, sp->pos);
1229             
1230             p2->size = dst - p2->bytes;
1231             p2->no_items = p->no_items + sp->no_items;
1232             *pos = p2->pos;  /* return new super page */
1233             close_block(b, sp);
1234             close_block(b, p2);
1235 #if INT_ENCODE
1236             (*b->method->codec.stop)(c1);
1237 #endif
1238         }
1239         else
1240         {
1241             *pos = p->pos;   /* return current one (again) */
1242         }
1243         if (p->no_items == 0)
1244             must_delete = 1;
1245         else
1246             must_delete = 0;
1247         close_block(b, p);
1248     }
1249     if (must_delete)
1250     {
1251         isamb_unlink(b, *pos);
1252         *pos = 0;
1253     }
1254 }
1255
1256 ISAMB_PP isamb_pp_open_x(ISAMB isamb, ISAM_P pos, int *level, int scope)
1257 {
1258     ISAMB_PP pp = xmalloc(sizeof(*pp));
1259     int i;
1260
1261     assert(pos);
1262
1263     pp->isamb = isamb;
1264     pp->block = xmalloc(ISAMB_MAX_LEVEL * sizeof(*pp->block));
1265
1266     pp->pos = pos;
1267     pp->level = 0;
1268     pp->maxlevel = 0;
1269     pp->total_size = 0;
1270     pp->no_blocks = 0;
1271     pp->skipped_numbers = 0;
1272     pp->returned_numbers = 0;
1273     pp->scope = scope;
1274     for (i = 0; i<ISAMB_MAX_LEVEL; i++)
1275         pp->skipped_nodes[i] = pp->accessed_nodes[i] = 0;
1276     while (1)
1277     {
1278         struct ISAMB_block *p = open_block(isamb, pos);
1279         const char *src = p->bytes + p->offset;
1280         pp->block[pp->level] = p;
1281
1282         pp->total_size += p->size;
1283         pp->no_blocks++;
1284         if (p->leaf)
1285             break;
1286         decode_ptr(&src, &pos);
1287         p->offset = src - p->bytes;
1288         pp->level++;
1289         pp->accessed_nodes[pp->level]++; 
1290     }
1291     pp->block[pp->level+1] = 0;
1292     pp->maxlevel = pp->level;
1293     if (level)
1294         *level = pp->level;
1295     return pp;
1296 }
1297
1298 ISAMB_PP isamb_pp_open (ISAMB isamb, ISAM_P pos, int scope)
1299 {
1300     return isamb_pp_open_x(isamb, pos, 0, scope);
1301 }
1302
1303 void isamb_pp_close_x(ISAMB_PP pp, zint *size, zint *blocks)
1304 {
1305     int i;
1306     if (!pp)
1307         return;
1308     yaz_log(YLOG_DEBUG, "isamb_pp_close lev=%d returned "ZINT_FORMAT" values, " 
1309                     "skipped "ZINT_FORMAT,
1310         pp->maxlevel, pp->skipped_numbers, pp->returned_numbers);
1311     for (i = pp->maxlevel; i>=0; i--)
1312         if (pp->skipped_nodes[i] || pp->accessed_nodes[i])
1313             yaz_log(YLOG_DEBUG, "isamb_pp_close  level leaf-%d: "
1314                             ZINT_FORMAT" read, "ZINT_FORMAT" skipped", i,
1315                  pp->accessed_nodes[i], pp->skipped_nodes[i]);
1316     pp->isamb->skipped_numbers += pp->skipped_numbers;
1317     pp->isamb->returned_numbers += pp->returned_numbers;
1318     for (i = pp->maxlevel; i>=0; i--)
1319     {
1320         pp->isamb->accessed_nodes[i] += pp->accessed_nodes[i];
1321         pp->isamb->skipped_nodes[i] += pp->skipped_nodes[i];
1322     }
1323     if (size)
1324         *size = pp->total_size;
1325     if (blocks)
1326         *blocks = pp->no_blocks;
1327     for (i = 0; i <= pp->level; i++)
1328         close_block(pp->isamb, pp->block[i]);
1329     xfree(pp->block);
1330     xfree(pp);
1331 }
1332
1333 int isamb_block_info (ISAMB isamb, int cat)
1334 {
1335     if (cat >= 0 && cat < isamb->no_cat)
1336         return isamb->file[cat].head.block_size;
1337     return -1;
1338 }
1339
1340 void isamb_pp_close (ISAMB_PP pp)
1341 {
1342     isamb_pp_close_x(pp, 0, 0);
1343 }
1344
1345 /* simple recursive dumper .. */
1346 static void isamb_dump_r (ISAMB b, ISAM_P pos, void (*pr)(const char *str),
1347                           int level)
1348 {
1349     char buf[1024];
1350     char prefix_str[1024];
1351     if (pos)
1352     {
1353         struct ISAMB_block *p = open_block(b, pos);
1354         sprintf(prefix_str, "%*s " ZINT_FORMAT " cat=%d size=%d max=%d items="
1355                 ZINT_FORMAT, level*2, "",
1356                 pos, p->cat, p->size, b->file[p->cat].head.block_max,
1357                 p->no_items);
1358         (*pr)(prefix_str);
1359         sprintf(prefix_str, "%*s " ZINT_FORMAT, level*2, "", pos);
1360         if (p->leaf)
1361         {
1362             while (p->offset < p->size)
1363             {
1364                 const char *src = p->bytes + p->offset;
1365                 char *dst = buf;
1366                 (*b->method->codec.decode)(p->decodeClientData, &dst, &src);
1367                 (*b->method->log_item)(YLOG_DEBUG, buf, prefix_str);
1368                 p->offset = src - (char*) p->bytes;
1369             }
1370             assert(p->offset == p->size);
1371         }
1372         else
1373         {
1374             const char *src = p->bytes + p->offset;
1375             ISAM_P sub;
1376
1377             decode_ptr(&src, &sub);
1378             p->offset = src - (char*) p->bytes;
1379
1380             isamb_dump_r(b, sub, pr, level+1);
1381             
1382             while (p->offset < p->size)
1383             {
1384 #if INT_ENCODE
1385                 char file_item_buf[DST_ITEM_MAX];
1386                 char *file_item = file_item_buf;
1387                 void *c1 = (*b->method->codec.start)();
1388                 (*b->method->codec.decode)(c1, &file_item, &src);
1389                 (*b->method->codec.stop)(c1);
1390                 (*b->method->log_item)(YLOG_DEBUG, file_item_buf, prefix_str);
1391 #else
1392                 zint item_len;
1393                 decode_item_len(&src, &item_len);
1394                 (*b->method->log_item)(YLOG_DEBUG, src, prefix_str);
1395                 src += item_len;
1396 #endif
1397                 decode_ptr(&src, &sub);
1398                 
1399                 p->offset = src - (char*) p->bytes;
1400                 
1401                 isamb_dump_r(b, sub, pr, level+1);
1402             }           
1403         }
1404         close_block(b, p);
1405     }
1406 }
1407
1408 void isamb_dump(ISAMB b, ISAM_P pos, void (*pr)(const char *str))
1409 {
1410     isamb_dump_r(b, pos, pr, 0);
1411 }
1412
1413 int isamb_pp_read(ISAMB_PP pp, void *buf)
1414 {
1415     return isamb_pp_forward(pp, buf, 0);
1416 }
1417
1418
1419 static int isamb_pp_on_right_node(ISAMB_PP pp, int level, const void *untilbuf)
1420 { /* looks one node higher to see if we should be on this node at all */
1421   /* useful in backing off quickly, and in avoiding tail descends */
1422   /* call with pp->level to begin with */
1423     struct ISAMB_block *p;
1424     int cmp;
1425     const char *src;
1426     ISAMB b = pp->isamb;
1427
1428     assert(level >= 0);
1429     if (level == 0)
1430     {
1431 #if ISAMB_DEBUG
1432             yaz_log(YLOG_DEBUG, "isamb_pp_on_right returning true for root");
1433 #endif
1434         return 1; /* we can never skip the root node */
1435     }
1436     level--;
1437     p = pp->block[level];
1438     assert(p->offset <= p->size);
1439     if (p->offset < p->size)
1440     {
1441 #if INT_ENCODE
1442         char file_item_buf[DST_ITEM_MAX];
1443         char *file_item = file_item_buf;
1444         void *c1 = (*b->method->codec.start)();
1445         assert(p->offset > 0); 
1446         src = p->bytes + p->offset;
1447         (*b->method->codec.decode)(c1, &file_item, &src);
1448         (*b->method->codec.stop)(c1);
1449         cmp = (*b->method->compare_item)(untilbuf, file_item_buf);
1450 #else
1451         zint item_len;
1452         assert(p->offset > 0); 
1453         src = p->bytes + p->offset;
1454         decode_item_len(&src, &item_len);
1455 #if ISAMB_DEBUG
1456         (*b->method->codec.log_item)(YLOG_DEBUG, untilbuf, "on_leaf: until");
1457         (*b->method->codec.log_item)(YLOG_DEBUG, src, "on_leaf: value");
1458 #endif
1459         cmp = (*b->method->compare_item)(untilbuf, src);
1460 #endif
1461         if (cmp < pp->scope)
1462         {  /* cmp<2 */
1463 #if ISAMB_DEBUG
1464             yaz_log(YLOG_DEBUG, "isamb_pp_on_right returning true "
1465                             "cmp=%d lev=%d ofs=%d", cmp, level, p->offset);
1466 #endif
1467             return 1; 
1468         }
1469         else
1470         {
1471 #if ISAMB_DEBUG
1472             yaz_log(YLOG_DEBUG, "isamb_pp_on_right returning false "
1473                             "cmp=%d lev=%d ofs=%d", cmp, level, p->offset);
1474 #endif
1475             return 0; 
1476         }
1477     }
1478     else {
1479 #if ISAMB_DEBUG
1480         yaz_log(YLOG_DEBUG, "isamb_pp_on_right at tail, looking higher "
1481                         "lev=%d", level);
1482 #endif
1483         return isamb_pp_on_right_node(pp, level, untilbuf);
1484     }
1485 } /* isamb_pp_on_right_node */
1486
1487 static int isamb_pp_read_on_leaf(ISAMB_PP pp, void *buf)
1488
1489     /* reads the next item on the current leaf, returns 0 if end of leaf*/
1490     struct ISAMB_block *p = pp->block[pp->level];
1491     char *dst;
1492     const char *src;
1493     assert(pp);
1494     assert(buf);
1495     if (p->offset == p->size)
1496     {
1497 #if ISAMB_DEBUG
1498         yaz_log(YLOG_DEBUG, "isamb_pp_read_on_leaf returning 0 on "
1499                 "node %d", p->pos);
1500 #endif
1501         return 0; /* at end of leaf */
1502     }
1503     src = p->bytes + p->offset;
1504     dst = buf;
1505     (*pp->isamb->method->codec.decode)(p->decodeClientData, &dst, &src);
1506     p->offset = src - (char*) p->bytes;
1507 #if ISAMB_DEBUG
1508     (*pp->isamb->method->codec.log_item)(YLOG_DEBUG, buf,
1509                                          "read_on_leaf returning 1");
1510 #endif
1511     pp->returned_numbers++;
1512     return 1;
1513 } /* read_on_leaf */
1514
1515 static int isamb_pp_forward_on_leaf(ISAMB_PP pp, void *buf, const void *untilbuf)
1516 { /* forwards on the current leaf, returns 0 if not found */
1517     int cmp;
1518     int skips = 0;
1519     while (1)
1520     {
1521         if (!isamb_pp_read_on_leaf(pp, buf))
1522             return 0;
1523         /* FIXME - this is an extra function call, inline the read? */
1524         cmp=(*pp->isamb->method->compare_item)(untilbuf, buf);
1525         if (cmp <pp->scope)
1526         {  /* cmp<2  found a good one */
1527 #if ISAMB_DEBUG
1528             if (skips)
1529                 yaz_log(YLOG_DEBUG, "isam_pp_fwd_on_leaf skipped %d items", skips);
1530 #endif
1531             pp->returned_numbers++;
1532             return 1;
1533         }
1534         if (!skips)
1535             if (!isamb_pp_on_right_node(pp, pp->level, untilbuf))
1536                 return 0; /* never mind the rest of this leaf */
1537         pp->skipped_numbers++;
1538         skips++;
1539     }
1540 } /* forward_on_leaf */
1541
1542 static int isamb_pp_climb_level(ISAMB_PP pp, ISAM_P *pos)
1543 { /* climbs higher in the tree, until finds a level with data left */
1544   /* returns the node to (consider to) descend to in *pos) */
1545     struct ISAMB_block *p = pp->block[pp->level];
1546     const char *src;
1547 #if ISAMB_DEBUG
1548     yaz_log(YLOG_DEBUG, "isamb_pp_climb_level starting "
1549                    "at level %d node %d ofs=%d sz=%d",
1550                     pp->level, p->pos, p->offset, p->size);
1551 #endif
1552     assert(pp->level >= 0);
1553     assert(p->offset <= p->size);
1554     if (pp->level==0)
1555     {
1556 #if ISAMB_DEBUG
1557         yaz_log(YLOG_DEBUG, "isamb_pp_climb_level returning 0 at root");
1558 #endif
1559         return 0;
1560     }
1561     assert(pp->level>0); 
1562     close_block(pp->isamb, pp->block[pp->level]);
1563     pp->block[pp->level] = 0;
1564     (pp->level)--;
1565     p = pp->block[pp->level];
1566 #if ISAMB_DEBUG
1567     yaz_log(YLOG_DEBUG, "isamb_pp_climb_level climbed to level %d node %d ofs=%d",
1568                     pp->level, p->pos, p->offset);
1569 #endif
1570     assert(!p->leaf);
1571     assert(p->offset <= p->size);
1572     if (p->offset == p->size)
1573     {
1574         /* we came from the last pointer, climb on */
1575         if (!isamb_pp_climb_level(pp, pos))
1576             return 0;
1577         p = pp->block[pp->level];
1578     }
1579     else
1580     {
1581 #if INT_ENCODE
1582         char file_item_buf[DST_ITEM_MAX];
1583         char *file_item = file_item_buf;
1584         ISAMB b = pp->isamb;
1585         void *c1 = (*b->method->codec.start)();
1586 #else
1587         zint item_len;
1588 #endif
1589         /* skip the child we just came from */
1590 #if ISAMB_DEBUG
1591         yaz_log(YLOG_DEBUG, "isam_pp_climb_level: skipping lev=%d ofs=%d sz=%d", 
1592                         pp->level, p->offset, p->size);
1593 #endif
1594         assert (p->offset < p->size);
1595         src = p->bytes + p->offset;
1596 #if INT_ENCODE
1597         (*b->method->codec.decode)(c1, &file_item, &src);
1598         (*b->method->codec.stop)(c1);
1599 #else
1600         decode_item_len(&src, &item_len);
1601         src += item_len;
1602 #endif
1603         decode_ptr(&src, pos);
1604         p->offset = src - (char *)p->bytes;
1605             
1606     }
1607     return 1;
1608 } /* climb_level */
1609
1610
1611 static zint isamb_pp_forward_unode(ISAMB_PP pp, zint pos, const void *untilbuf)
1612 { /* scans a upper node until it finds a child <= untilbuf */
1613   /* pp points to the key value, as always. pos is the child read from */
1614   /* the buffer */
1615   /* if all values are too small, returns the last child in the node */
1616   /* FIXME - this can be detected, and avoided by looking at the */
1617   /* parent node, but that gets messy. Presumably the cost is */
1618   /* pretty low anyway */
1619     ISAMB b = pp->isamb;
1620     struct ISAMB_block *p = pp->block[pp->level];
1621     const char *src = p->bytes + p->offset;
1622     int cmp;
1623     zint nxtpos;
1624 #if ISAMB_DEBUG
1625     int skips = 0;
1626     yaz_log(YLOG_DEBUG, "isamb_pp_forward_unode starting "
1627                    "at level %d node %d ofs=%di sz=%d",
1628                     pp->level, p->pos, p->offset, p->size);
1629 #endif
1630     assert(!p->leaf);
1631     assert(p->offset <= p->size);
1632     if (p->offset == p->size)
1633     {
1634 #if ISAMB_DEBUG
1635             yaz_log(YLOG_DEBUG, "isamb_pp_forward_unode returning at end "
1636                    "at level %d node %d ofs=%di sz=%d",
1637                     pp->level, p->pos, p->offset, p->size);
1638 #endif
1639         return pos; /* already at the end of it */
1640     }
1641     while(p->offset < p->size)
1642     {
1643 #if INT_ENCODE
1644         char file_item_buf[DST_ITEM_MAX];
1645         char *file_item = file_item_buf;
1646         void *c1 = (*b->method->codec.start)();
1647         (*b->method->codec.decode)(c1, &file_item, &src);
1648         (*b->method->codec.stop)(c1);
1649         cmp = (*b->method->compare_item)(untilbuf, file_item_buf);
1650 #else
1651         zint item_len;
1652         decode_item_len(&src, &item_len);
1653         cmp = (*b->method->compare_item)(untilbuf, src);
1654         src += item_len;
1655 #endif
1656         decode_ptr(&src, &nxtpos);
1657         if (cmp<pp->scope) /* cmp<2 */
1658         {
1659 #if ISAMB_DEBUG
1660             yaz_log(YLOG_DEBUG, "isamb_pp_forward_unode returning a hit "
1661                    "at level %d node %d ofs=%d sz=%d",
1662                     pp->level, p->pos, p->offset, p->size);
1663 #endif
1664             return pos;
1665         } /* found one */
1666         pos = nxtpos;
1667         p->offset = src-(char*)p->bytes;
1668         (pp->skipped_nodes[pp->maxlevel - pp->level -1])++;
1669 #if ISAMB_DEBUG
1670         skips++;
1671 #endif
1672     }
1673 #if ISAMB_DEBUG
1674             yaz_log(YLOG_DEBUG, "isamb_pp_forward_unode returning at tail "
1675                    "at level %d node %d ofs=%d sz=%d skips=%d",
1676                     pp->level, p->pos, p->offset, p->size, skips);
1677 #endif
1678     return pos; /* that's the last one in the line */
1679     
1680 } /* forward_unode */
1681
1682 static void isamb_pp_descend_to_leaf(ISAMB_PP pp, ISAM_P pos,
1683                                      const void *untilbuf)
1684 { /* climbs down the tree, from pos, to the leftmost leaf */
1685     struct ISAMB_block *p = pp->block[pp->level];
1686     const char *src;
1687     assert(!p->leaf);
1688 #if ISAMB_DEBUG
1689     yaz_log(YLOG_DEBUG, "isamb_pp_descend_to_leaf "
1690                    "starting at lev %d node %d ofs=%d lf=%d u=%p", 
1691                    pp->level, p->pos, p->offset, p->leaf, untilbuf);
1692 #endif
1693     if (untilbuf)
1694         pos = isamb_pp_forward_unode(pp, pos, untilbuf);
1695     ++(pp->level);
1696     assert(pos);
1697     p = open_block(pp->isamb, pos);
1698     pp->block[pp->level] = p;
1699     ++(pp->accessed_nodes[pp->maxlevel-pp->level]);
1700     ++(pp->no_blocks);
1701 #if ISAMB_DEBUG
1702     yaz_log(YLOG_DEBUG, "isamb_pp_descend_to_leaf "
1703                    "got lev %d node %d lf=%d", 
1704                    pp->level, p->pos, p->leaf);
1705 #endif
1706     if (p->leaf)
1707         return;
1708     assert (p->offset==0);
1709     src = p->bytes + p->offset;
1710     decode_ptr(&src, &pos);
1711     p->offset = src-(char*)p->bytes;
1712     isamb_pp_descend_to_leaf(pp, pos, untilbuf);
1713 #if ISAMB_DEBUG
1714     yaz_log(YLOG_DEBUG, "isamb_pp_descend_to_leaf "
1715                    "returning at lev %d node %d ofs=%d lf=%d", 
1716                    pp->level, p->pos, p->offset, p->leaf);
1717 #endif
1718 } /* descend_to_leaf */
1719
1720 static int isamb_pp_find_next_leaf(ISAMB_PP pp)
1721 { /* finds the next leaf by climbing up and down */
1722     ISAM_P pos;
1723     if (!isamb_pp_climb_level(pp, &pos))
1724         return 0;
1725     isamb_pp_descend_to_leaf(pp, pos, 0);
1726     return 1;
1727 }
1728
1729 static int isamb_pp_climb_desc(ISAMB_PP pp,  const void *untilbuf)
1730 { /* climbs up and descends to a leaf where values >= *untilbuf are found */
1731     ISAM_P pos;
1732 #if ISAMB_DEBUG
1733     struct ISAMB_block *p = pp->block[pp->level];
1734     yaz_log(YLOG_DEBUG, "isamb_pp_climb_desc starting "
1735                    "at level %d node %d ofs=%d sz=%d",
1736                     pp->level, p->pos, p->offset, p->size);
1737 #endif
1738     if (!isamb_pp_climb_level(pp, &pos))
1739         return 0;
1740     /* see if it would pay to climb one higher */
1741     if (!isamb_pp_on_right_node(pp, pp->level, untilbuf))
1742         if (!isamb_pp_climb_level(pp, &pos))
1743             return 0;
1744     isamb_pp_descend_to_leaf(pp, pos, untilbuf);
1745 #if ISAMB_DEBUG
1746     p = pp->block[pp->level];
1747     yaz_log(YLOG_DEBUG, "isamb_pp_climb_desc done "
1748                    "at level %d node %d ofs=%d sz=%d",
1749                     pp->level, p->pos, p->offset, p->size);
1750 #endif
1751     return 1;
1752 } /* climb_desc */
1753
1754 int isamb_pp_forward (ISAMB_PP pp, void *buf, const void *untilbuf)
1755 {
1756 #if ISAMB_DEBUG
1757     struct ISAMB_block *p = pp->block[pp->level];
1758     assert(p->leaf);
1759     yaz_log(YLOG_DEBUG, "isamb_pp_forward starting "
1760                    "at level %d node %d ofs=%d sz=%d u=%p sc=%d",
1761                     pp->level, p->pos, p->offset, p->size, untilbuf, scope);
1762 #endif
1763     if (untilbuf)
1764     {
1765         if (isamb_pp_forward_on_leaf(pp, buf, untilbuf))
1766         {
1767 #if ISAMB_DEBUG
1768             yaz_log(YLOG_DEBUG, "isamb_pp_forward (f) returning (A) "
1769                    "at level %d node %d ofs=%d sz=%d",
1770                     pp->level, p->pos, p->offset, p->size);
1771 #endif
1772             return 1;
1773         }
1774         if (! isamb_pp_climb_desc(pp, untilbuf))
1775         {
1776 #if ISAMB_DEBUG
1777             yaz_log(YLOG_DEBUG, "isamb_pp_forward (f) returning notfound (B) "
1778                    "at level %d node %d ofs=%d sz=%d",
1779                     pp->level, p->pos, p->offset, p->size);
1780 #endif
1781             return 0; /* could not find a leaf */
1782         }
1783         do {
1784             if (isamb_pp_forward_on_leaf(pp, buf, untilbuf))
1785             {
1786 #if ISAMB_DEBUG
1787                 yaz_log(YLOG_DEBUG, "isamb_pp_forward (f) returning (c) "
1788                         "at level %d node %d ofs=%d sz=%d",
1789                         pp->level, p->pos, p->offset, p->size);
1790 #endif
1791                 return 1;
1792             }
1793         } while (isamb_pp_find_next_leaf(pp));
1794         return 0; /* could not find at all */
1795     }
1796     else { /* no untilbuf, a straight read */
1797         /* FIXME - this should be moved
1798          * directly into the pp_read */
1799         /* keeping here now, to keep same
1800          * interface as the old fwd */
1801         if (isamb_pp_read_on_leaf(pp, buf))
1802         {
1803 #if ISAMB_DEBUG
1804             yaz_log(YLOG_DEBUG, "isamb_pp_forward (read) returning (D) "
1805                    "at level %d node %d ofs=%d sz=%d",
1806                     pp->level, p->pos, p->offset, p->size);
1807 #endif
1808             return 1;
1809         }
1810         if (isamb_pp_find_next_leaf(pp))
1811         {
1812 #if ISAMB_DEBUG
1813             yaz_log(YLOG_DEBUG, "isamb_pp_forward (read) returning (E) "
1814                    "at level %d node %d ofs=%d sz=%d",
1815                     pp->level, p->pos, p->offset, p->size);
1816 #endif
1817             return isamb_pp_read_on_leaf(pp, buf);
1818         }
1819         else
1820             return 0;
1821     }
1822 } /* isam_pp_forward (new version) */
1823
1824 void isamb_pp_pos(ISAMB_PP pp, double *current, double *total)
1825 { /* return an estimate of the current position and of the total number of */
1826   /* occureences in the isam tree, based on the current leaf */
1827     assert(total);
1828     assert(current);
1829     
1830     /* if end-of-stream PP may not be leaf */
1831
1832     *total = (double) (pp->block[0]->no_items);
1833     *current = (double) pp->returned_numbers;
1834 #if ISAMB_DEBUG
1835     yaz_log(YLOG_LOG, "isamb_pp_pos returning: cur= %0.1f tot=%0.1f rn="
1836          ZINT_FORMAT, *current, *total, pp->returned_numbers);
1837 #endif
1838 }
1839
1840 int isamb_pp_forward2(ISAMB_PP pp, void *buf, const void *untilb)
1841 {
1842     char *dst = buf;
1843     const char *src;
1844     struct ISAMB_block *p = pp->block[pp->level];
1845     ISAMB b = pp->isamb;
1846     if (!p)
1847         return 0;
1848 again:
1849     while (p->offset == p->size)
1850     {
1851         ISAM_P pos;
1852 #if INT_ENCODE
1853         const char *src_0;
1854         void *c1;
1855         char file_item_buf[DST_ITEM_MAX];
1856         char *file_item = file_item_buf;
1857 #else
1858         zint item_len;
1859 #endif
1860         while (p->offset == p->size)
1861         {
1862             if (pp->level == 0)
1863                 return 0;
1864             close_block (pp->isamb, pp->block[pp->level]);
1865             pp->block[pp->level] = 0;
1866             (pp->level)--;
1867             p = pp->block[pp->level];
1868             assert (!p->leaf);  
1869         }
1870
1871         assert(!p->leaf);
1872         src = p->bytes + p->offset;
1873        
1874 #if INT_ENCODE
1875         c1 = (*b->method->codec.start)();
1876         (*b->method->codec.decode)(c1, &file_item, &src);
1877 #else
1878         decode_ptr (&src, &item_len);
1879         src += item_len;
1880 #endif        
1881         decode_ptr (&src, &pos);
1882         p->offset = src - (char*) p->bytes;
1883
1884         src = p->bytes + p->offset;
1885
1886         while(1)
1887         {
1888             if (!untilb || p->offset == p->size)
1889                 break;
1890             assert(p->offset < p->size);
1891 #if INT_ENCODE
1892             src_0 = src;
1893             file_item = file_item_buf;
1894             (*b->method->codec.reset)(c1);
1895             (*b->method->codec.decode)(c1, &file_item, &src);
1896             if ((*b->method->compare_item)(untilb, file_item_buf) <= 1)
1897             {
1898                 src = src_0;
1899                 break;
1900             }
1901 #else
1902             decode_item_len(&src, &item_len);
1903             if ((*b->method->compare_item)(untilb, src) <= 1)
1904                 break;
1905             src += item_len;
1906 #endif
1907             decode_ptr (&src, &pos);
1908             p->offset = src - (char*) p->bytes;
1909         }
1910
1911         pp->level++;
1912
1913         while (1)
1914         {
1915             pp->block[pp->level] = p = open_block (pp->isamb, pos);
1916
1917             pp->total_size += p->size;
1918             pp->no_blocks++;
1919             
1920             if (p->leaf) 
1921             {
1922                 break;
1923             }
1924             
1925             src = p->bytes + p->offset;
1926             while(1)
1927             {
1928                 decode_ptr (&src, &pos);
1929                 p->offset = src - (char*) p->bytes;
1930                 
1931                 if (!untilb || p->offset == p->size)
1932                     break;
1933                 assert(p->offset < p->size);
1934 #if INT_ENCODE
1935                 src_0 = src;
1936                 file_item = file_item_buf;
1937                 (*b->method->codec.reset)(c1);
1938                 (*b->method->codec.decode)(c1, &file_item, &src);
1939                 if ((*b->method->compare_item)(untilb, file_item_buf) <= 1)
1940                 {
1941                     src = src_0;
1942                     break;
1943                 }
1944 #else
1945                 decode_ptr (&src, &item_len);
1946                 if ((*b->method->compare_item)(untilb, src) <= 1)
1947                     break;
1948                 src += item_len;
1949 #endif
1950             }
1951             pp->level++;
1952         }
1953 #if INT_ENCODE
1954         (*b->method->codec.stop)(c1);
1955 #endif
1956     }
1957     assert (p->offset < p->size);
1958     assert (p->leaf);
1959     while(1)
1960     {
1961         char *dst0 = dst;
1962         src = p->bytes + p->offset;
1963         (*pp->isamb->method->codec.decode)(p->decodeClientData, &dst, &src);
1964         p->offset = src - (char*) p->bytes;
1965         if (!untilb || (*pp->isamb->method->compare_item)(untilb, dst0) <= 1)
1966             break;
1967         dst = dst0;
1968         if (p->offset == p->size) goto again;
1969     }
1970     return 1;
1971 }
1972 /*
1973  * Local variables:
1974  * c-basic-offset: 4
1975  * indent-tabs-mode: nil
1976  * End:
1977  * vim: shiftwidth=4 tabstop=8 expandtab
1978  */
1979