8e517e0424e26c5bcfadb4a776ad8def23807657
[idzebra-moved-to-github.git] / isamb / isamb.c
1 /* $Id: isamb.c,v 1.29 2004-06-01 12:32:19 heikki Exp $
2    Copyright (C) 1995,1996,1997,1998,1999,2000,2001,2002,2003,2004
3    Index Data Aps
4
5 This file is part of the Zebra server.
6
7 Zebra is free software; you can redistribute it and/or modify it under
8 the terms of the GNU General Public License as published by the Free
9 Software Foundation; either version 2, or (at your option) any later
10 version.
11
12 Zebra is distributed in the hope that it will be useful, but WITHOUT ANY
13 WARRANTY; without even the implied warranty of MERCHANTABILITY or
14 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
15 for more details.
16
17 You should have received a copy of the GNU General Public License
18 along with Zebra; see the file LICENSE.zebra.  If not, write to the
19 Free Software Foundation, 59 Temple Place - Suite 330, Boston, MA
20 02111-1307, USA.
21 */
22
23 #include <string.h>
24 #include <yaz/xmalloc.h>
25 #include <yaz/log.h>
26 #include <isamb.h>
27 #include <assert.h>
28 #include <../index/index.h> /* for log_keydump. Debugging only */
29
30 struct ISAMB_head {
31     int first_block;
32     int last_block;
33     int block_size;
34     int block_max;
35     int free_list;
36 };
37
38 #define ISAMB_DATA_OFFSET 3
39
40 /* maximum size of encoded buffer */
41 #define DST_ITEM_MAX 256
42
43 #define ISAMB_MAX_LEVEL 10
44 /* approx 2*max page + max size of item */
45 #define DST_BUF_SIZE 16840
46
47 #define ISAMB_CACHE_ENTRY_SIZE 4096
48
49 /* CAT_MAX: _must_ be power of 2 */
50 #define CAT_MAX 4
51 #define CAT_MASK (CAT_MAX-1)
52 /* CAT_NO: <= CAT_MAX */
53 #define CAT_NO 4
54
55 /* ISAMB_PTR_CODEC=1 var, =0 fixed */
56 #define ISAMB_PTR_CODEC  1
57
58 struct ISAMB_cache_entry {
59     ISAMB_P pos;
60     unsigned char *buf;
61     int dirty;
62     int hits;
63     struct ISAMB_cache_entry *next;
64 };
65
66 struct ISAMB_file {
67     BFile bf;
68     int head_dirty;
69     struct ISAMB_head head;
70     struct ISAMB_cache_entry *cache_entries;
71 };
72
73 struct ISAMB_s {
74     BFiles bfs;
75     ISAMC_M *method;
76
77     struct ISAMB_file *file;
78     int no_cat;
79     int cache; /* 0=no cache, 1=use cache, -1=dummy isam (for testing only) */
80     int log_io;        /* log level for bf_read/bf_write calls */
81     int log_freelist;  /* log level for freelist handling */
82     int skipped_numbers; /* on a leaf node */
83     int returned_numbers; 
84     int skipped_nodes[ISAMB_MAX_LEVEL]; /* [0]=skipped leaves, 1=higher etc */
85     int accessed_nodes[ISAMB_MAX_LEVEL]; /* nodes we did not skip */
86 };
87
88 struct ISAMB_block {
89     ISAMB_P pos;
90     int cat;
91     int size;
92     int leaf;
93     int dirty;
94     int deleted;
95     int offset;
96     char *bytes;
97     unsigned char *buf;
98     void *decodeClientData;
99     int log_rw;
100 };
101
102 struct ISAMB_PP_s {
103     ISAMB isamb;
104     ISAMB_P pos;
105     int level;
106     int maxlevel; /* total depth */
107     int total_size;
108     int no_blocks;
109     int skipped_numbers; /* on a leaf node */
110     int returned_numbers; 
111     int skipped_nodes[ISAMB_MAX_LEVEL]; /* [0]=skipped leaves, 1=higher etc */
112     int accessed_nodes[ISAMB_MAX_LEVEL]; /* nodes we did not skip */
113     struct ISAMB_block **block;
114 };
115
116 #if ISAMB_PTR_CODEC
117 static void encode_ptr (char **dst, unsigned pos)
118 {
119     unsigned char *bp = (unsigned char*) *dst;
120
121     while (pos > 127)
122     {
123          *bp++ = 128 | (pos & 127);
124          pos = pos >> 7;
125     }
126     *bp++ = pos;
127     *dst = (char *) bp;
128 }
129 #else
130 static void encode_ptr (char **dst, unsigned pos)
131 {
132     memcpy(*dst, &pos, sizeof(pos));
133     (*dst) += sizeof(pos);
134 }
135 #endif
136
137 #if ISAMB_PTR_CODEC
138 static void decode_ptr (char **src1, int *pos)
139 {
140     unsigned char **src = (unsigned char **) src1;
141     unsigned d = 0;
142     unsigned char c;
143     unsigned r = 0;
144
145     while (((c = *(*src)++) & 128))
146     {
147         d += ((c & 127) << r);
148         r += 7;
149     }
150     d += (c << r);
151     *pos = d;
152 }
153 #else
154 static void decode_ptr (char **src, int *pos)
155 {
156      memcpy (pos, *src, sizeof(*pos));
157      (*src) += sizeof(*pos);
158 }
159 #endif
160
161 ISAMB isamb_open (BFiles bfs, const char *name, int writeflag, ISAMC_M *method,
162                   int cache)
163 {
164     ISAMB isamb = xmalloc (sizeof(*isamb));
165     int i, b_size = 32;
166
167     isamb->bfs = bfs;
168     isamb->method = (ISAMC_M *) xmalloc (sizeof(*method));
169     memcpy (isamb->method, method, sizeof(*method));
170     isamb->no_cat = CAT_NO;
171     isamb->log_io = 0;
172     isamb->log_freelist = 0;
173     isamb->cache = cache;
174     isamb->skipped_numbers=0;
175     isamb->returned_numbers=0;
176     for (i=0;i<ISAMB_MAX_LEVEL;i++)
177       isamb->skipped_nodes[i]= isamb->accessed_nodes[i]=0;
178
179     assert (cache == 0);
180     isamb->file = xmalloc (sizeof(*isamb->file) * isamb->no_cat);
181     for (i = 0; i<isamb->no_cat; i++)
182     {
183         char fname[DST_BUF_SIZE];
184         isamb->file[i].cache_entries = 0;
185         isamb->file[i].head_dirty = 0;
186         sprintf (fname, "%s%c", name, i+'A');
187         if (cache)
188             isamb->file[i].bf = bf_open (bfs, fname, ISAMB_CACHE_ENTRY_SIZE,
189                                          writeflag);
190         else
191             isamb->file[i].bf = bf_open (bfs, fname, b_size, writeflag);
192
193         
194         if (!bf_read (isamb->file[i].bf, 0, 0, sizeof(struct ISAMB_head),
195                       &isamb->file[i].head))
196         {
197             isamb->file[i].head.first_block = ISAMB_CACHE_ENTRY_SIZE/b_size+1;
198             isamb->file[i].head.last_block = isamb->file[i].head.first_block;
199             isamb->file[i].head.block_size = b_size;
200             isamb->file[i].head.block_max = b_size - ISAMB_DATA_OFFSET;
201             isamb->file[i].head.free_list = 0;
202         }
203         assert (isamb->file[i].head.block_size >= ISAMB_DATA_OFFSET);
204         isamb->file[i].head_dirty = 0;
205         assert(isamb->file[i].head.block_size == b_size);
206         b_size = b_size * 4;
207     }
208     return isamb;
209 }
210
211 static void flush_blocks (ISAMB b, int cat)
212 {
213     while (b->file[cat].cache_entries)
214     {
215         struct ISAMB_cache_entry *ce_this = b->file[cat].cache_entries;
216         b->file[cat].cache_entries = ce_this->next;
217
218         if (ce_this->dirty)
219         {
220             yaz_log (b->log_io, "bf_write: flush_blocks");
221             bf_write (b->file[cat].bf, ce_this->pos, 0, 0, ce_this->buf);
222         }
223         xfree (ce_this->buf);
224         xfree (ce_this);
225     }
226 }
227
228 static int get_block (ISAMB b, ISAMC_P pos, char *userbuf, int wr)
229 {
230     int cat = pos&CAT_MASK;
231     int off = ((pos/CAT_MAX) & 
232                (ISAMB_CACHE_ENTRY_SIZE / b->file[cat].head.block_size - 1))
233         * b->file[cat].head.block_size;
234     int norm = pos / (CAT_MASK*ISAMB_CACHE_ENTRY_SIZE / b->file[cat].head.block_size);
235     int no = 0;
236     struct ISAMB_cache_entry **ce, *ce_this = 0, **ce_last = 0;
237
238     if (!b->cache)
239         return 0;
240
241     assert (ISAMB_CACHE_ENTRY_SIZE >= b->file[cat].head.block_size);
242     for (ce = &b->file[cat].cache_entries; *ce; ce = &(*ce)->next, no++)
243     {
244         ce_last = ce;
245         if ((*ce)->pos == norm)
246         {
247             ce_this = *ce;
248             *ce = (*ce)->next;   /* remove from list */
249             
250             ce_this->next = b->file[cat].cache_entries;  /* move to front */
251             b->file[cat].cache_entries = ce_this;
252             
253             if (wr)
254             {
255                 memcpy (ce_this->buf + off, userbuf, 
256                         b->file[cat].head.block_size);
257                 ce_this->dirty = 1;
258             }
259             else
260                 memcpy (userbuf, ce_this->buf + off,
261                         b->file[cat].head.block_size);
262             return 1;
263         }
264     }
265     if (no >= 40)
266     {
267         assert (no == 40);
268         assert (ce_last && *ce_last);
269         ce_this = *ce_last;
270         *ce_last = 0;  /* remove the last entry from list */
271         if (ce_this->dirty)
272         {
273             yaz_log (b->log_io, "bf_write: get_block");
274             bf_write (b->file[cat].bf, ce_this->pos, 0, 0, ce_this->buf);
275         }
276         xfree (ce_this->buf);
277         xfree (ce_this);
278     }
279     ce_this = xmalloc (sizeof(*ce_this));
280     ce_this->next = b->file[cat].cache_entries;
281     b->file[cat].cache_entries = ce_this;
282     ce_this->buf = xmalloc (ISAMB_CACHE_ENTRY_SIZE);
283     ce_this->pos = norm;
284     yaz_log (b->log_io, "bf_read: get_block");
285     if (!bf_read (b->file[cat].bf, norm, 0, 0, ce_this->buf))
286         memset (ce_this->buf, 0, ISAMB_CACHE_ENTRY_SIZE);
287     if (wr)
288     {
289         memcpy (ce_this->buf + off, userbuf, b->file[cat].head.block_size);
290         ce_this->dirty = 1;
291     }
292     else
293     {
294         ce_this->dirty = 0;
295         memcpy (userbuf, ce_this->buf + off, b->file[cat].head.block_size);
296     }
297     return 1;
298 }
299
300
301 void isamb_close (ISAMB isamb)
302 {
303     int i;
304     for (i=0;isamb->accessed_nodes[i];i++)
305         logf(LOG_DEBUG,"isamb_close  level leaf-%d: %d read, %d skipped",
306              i, isamb->accessed_nodes[i], isamb->skipped_nodes[i]);
307     logf(LOG_DEBUG,"isamb_close returned %d values, skipped %d",
308          isamb->skipped_numbers, isamb->returned_numbers);
309     for (i = 0; i<isamb->no_cat; i++)
310     {
311         flush_blocks (isamb, i);
312         if (isamb->file[i].head_dirty)
313             bf_write (isamb->file[i].bf, 0, 0,
314                       sizeof(struct ISAMB_head), &isamb->file[i].head);
315         
316         bf_close (isamb->file[i].bf);
317     }
318     xfree (isamb->file);
319     xfree (isamb->method);
320     xfree (isamb);
321 }
322
323
324 struct ISAMB_block *open_block (ISAMB b, ISAMC_P pos)
325 {
326     int cat = pos&CAT_MASK;
327     struct ISAMB_block *p;
328     if (!pos)
329         return 0;
330     p = xmalloc (sizeof(*p));
331     p->pos = pos;
332     p->cat = pos & CAT_MASK;
333     p->buf = xmalloc (b->file[cat].head.block_size);
334
335     if (!get_block (b, pos, p->buf, 0))
336     {
337         yaz_log (b->log_io, "bf_read: open_block");
338         if (!bf_read (b->file[cat].bf, pos/CAT_MAX, 0, 0, p->buf))
339         {
340             yaz_log (LOG_FATAL, "isamb: read fail for pos=%ld block=%ld",
341                      (long) pos, (long) pos/CAT_MAX);
342             abort();
343         }
344     }
345     p->bytes = p->buf + ISAMB_DATA_OFFSET;
346     p->leaf = p->buf[0];
347     p->size = (p->buf[1] + 256 * p->buf[2]) - ISAMB_DATA_OFFSET;
348     if (p->size < 0)
349     {
350         yaz_log (LOG_FATAL, "Bad block size %d in pos=%d\n", p->size, pos);
351     }
352     assert (p->size >= 0);
353     p->offset = 0;
354     p->dirty = 0;
355     p->deleted = 0;
356     p->decodeClientData = (*b->method->code_start)(ISAMC_DECODE);
357     yaz_log (LOG_DEBUG, "isamb_open_block: Opened block %d ofs=%d",pos, p->offset);
358     return p;
359 }
360
361 struct ISAMB_block *new_block (ISAMB b, int leaf, int cat)
362 {
363     struct ISAMB_block *p;
364
365     p = xmalloc (sizeof(*p));
366     p->buf = xmalloc (b->file[cat].head.block_size);
367
368     if (!b->file[cat].head.free_list)
369     {
370         int block_no;
371         block_no = b->file[cat].head.last_block++;
372         p->pos = block_no * CAT_MAX + cat;
373     }
374     else
375     {
376         p->pos = b->file[cat].head.free_list;
377         assert((p->pos & CAT_MASK) == cat);
378         if (!get_block (b, p->pos, p->buf, 0))
379         {
380             yaz_log (b->log_io, "bf_read: new_block");
381             if (!bf_read (b->file[cat].bf, p->pos/CAT_MAX, 0, 0, p->buf))
382             {
383                 yaz_log (LOG_FATAL, "isamb: read fail for pos=%ld block=%ld",
384                          (long) p->pos/CAT_MAX, (long) p->pos/CAT_MAX);
385                 abort ();
386             }
387         }
388         yaz_log (b->log_freelist, "got block %d from freelist %d:%d", p->pos,
389                  cat, p->pos/CAT_MAX);
390         memcpy (&b->file[cat].head.free_list, p->buf, sizeof(int));
391     }
392     p->cat = cat;
393     b->file[cat].head_dirty = 1;
394     memset (p->buf, 0, b->file[cat].head.block_size);
395     p->bytes = p->buf + ISAMB_DATA_OFFSET;
396     p->leaf = leaf;
397     p->size = 0;
398     p->dirty = 1;
399     p->deleted = 0;
400     p->offset = 0;
401     p->decodeClientData = (*b->method->code_start)(ISAMC_DECODE);
402     return p;
403 }
404
405 struct ISAMB_block *new_leaf (ISAMB b, int cat)
406 {
407     return new_block (b, 1, cat);
408 }
409
410
411 struct ISAMB_block *new_int (ISAMB b, int cat)
412 {
413     return new_block (b, 0, cat);
414 }
415
416 static void check_block (ISAMB b, struct ISAMB_block *p)
417 {
418     if (p->leaf)
419     {
420         ;
421     }
422     else
423     {
424         /* sanity check */
425         char *startp = p->bytes;
426         char *src = startp;
427         char *endp = p->bytes + p->size;
428         int pos;
429             
430         decode_ptr (&src, &pos);
431         assert ((pos&CAT_MASK) == p->cat);
432         while (src != endp)
433         {
434             int item_len;
435             decode_ptr (&src, &item_len);
436             assert (item_len > 0 && item_len < 30);
437             src += item_len;
438             decode_ptr (&src, &pos);
439             assert ((pos&CAT_MASK) == p->cat);
440         }
441     }
442 }
443
444 void close_block (ISAMB b, struct ISAMB_block *p)
445 {
446     if (!p)
447         return;
448     if (p->deleted)
449     {
450         yaz_log (b->log_freelist, "release block %d from freelist %d:%d",
451                  p->pos, p->cat, p->pos/CAT_MAX);
452         memcpy (p->buf, &b->file[p->cat].head.free_list, sizeof(int));
453         b->file[p->cat].head.free_list = p->pos;
454         if (!get_block (b, p->pos, p->buf, 1))
455         {
456             yaz_log (b->log_io, "bf_write: close_block (deleted)");
457             bf_write (b->file[p->cat].bf, p->pos/CAT_MAX, 0, 0, p->buf);
458         }
459     }
460     else if (p->dirty)
461     {
462         int size = p->size + ISAMB_DATA_OFFSET;
463         assert (p->size >= 0);
464         p->buf[0] = p->leaf;
465         p->buf[1] = size & 255;
466         p->buf[2] = size >> 8;
467         check_block(b, p);
468         if (!get_block (b, p->pos, p->buf, 1))
469         {
470             yaz_log (b->log_io, "bf_write: close_block");
471             bf_write (b->file[p->cat].bf, p->pos/CAT_MAX, 0, 0, p->buf);
472         }
473     }
474     (*b->method->code_stop)(ISAMC_DECODE, p->decodeClientData);
475     xfree (p->buf);
476     xfree (p);
477 }
478
479 int insert_sub (ISAMB b, struct ISAMB_block **p,
480                 void *new_item, int *mode,
481                 ISAMC_I *stream,
482                 struct ISAMB_block **sp,
483                 void *sub_item, int *sub_size,
484                 void *max_item);
485
486 int insert_int (ISAMB b, struct ISAMB_block *p, void *lookahead_item,
487                 int *mode,
488                 ISAMC_I *stream, struct ISAMB_block **sp,
489                 void *split_item, int *split_size, void *last_max_item)
490 {
491     char *startp = p->bytes;
492     char *src = startp;
493     char *endp = p->bytes + p->size;
494     int pos;
495     struct ISAMB_block *sub_p1 = 0, *sub_p2 = 0;
496     char sub_item[DST_ITEM_MAX];
497     int sub_size;
498     int more;
499
500     *sp = 0;
501
502     assert(p->size >= 0);
503     decode_ptr (&src, &pos);
504     while (src != endp)
505     {
506         int item_len;
507         int d;
508         char *src0 = src;
509         decode_ptr (&src, &item_len);
510         d = (*b->method->compare_item)(src, lookahead_item);
511         if (d > 0)
512         {
513             sub_p1 = open_block (b, pos);
514             assert (sub_p1);
515             more = insert_sub (b, &sub_p1, lookahead_item, mode,
516                                stream, &sub_p2, 
517                                sub_item, &sub_size, src);
518             src = src0;
519             break;
520         }
521         src += item_len;
522         decode_ptr (&src, &pos);
523     }
524     if (!sub_p1)
525     {
526         sub_p1 = open_block (b, pos);
527         assert (sub_p1);
528         more = insert_sub (b, &sub_p1, lookahead_item, mode, stream, &sub_p2, 
529                            sub_item, &sub_size, last_max_item);
530     }
531     if (sub_p2)
532     {
533         /* there was a split - must insert pointer in this one */
534         char dst_buf[DST_BUF_SIZE];
535         char *dst = dst_buf;
536
537         assert (sub_size < 30 && sub_size > 1);
538
539         memcpy (dst, startp, src - startp);
540                 
541         dst += src - startp;
542
543         encode_ptr (&dst, sub_size);      /* sub length and item */
544         memcpy (dst, sub_item, sub_size);
545         dst += sub_size;
546
547         encode_ptr (&dst, sub_p2->pos);   /* pos */
548
549         if (endp - src)                   /* remaining data */
550         {
551             memcpy (dst, src, endp - src);
552             dst += endp - src;
553         }
554         p->size = dst - dst_buf;
555         assert (p->size >= 0);
556         if (p->size <= b->file[p->cat].head.block_max)
557         {
558             memcpy (startp, dst_buf, dst - dst_buf);
559         }
560         else
561         {
562             int p_new_size;
563             char *half;
564             src = dst_buf;
565             endp = dst;
566
567             half = src + b->file[p->cat].head.block_size/2;
568             decode_ptr (&src, &pos);
569             while (src <= half)
570             {
571                 decode_ptr (&src, split_size);
572                 src += *split_size;
573                 decode_ptr (&src, &pos);
574             }
575             p_new_size = src - dst_buf;
576             memcpy (p->bytes, dst_buf, p_new_size);
577
578             decode_ptr (&src, split_size);
579             memcpy (split_item, src, *split_size);
580             src += *split_size;
581
582             *sp = new_int (b, p->cat);
583             (*sp)->size = endp - src;
584             memcpy ((*sp)->bytes, src, (*sp)->size);
585
586             p->size = p_new_size;
587         }
588         p->dirty = 1;
589         close_block (b, sub_p2);
590     }
591     close_block (b, sub_p1);
592     return more;
593 }
594
595
596 int insert_leaf (ISAMB b, struct ISAMB_block **sp1, void *lookahead_item,
597                  int *lookahead_mode, ISAMC_I *stream,
598                  struct ISAMB_block **sp2,
599                  void *sub_item, int *sub_size,
600                  void *max_item)
601 {
602     struct ISAMB_block *p = *sp1;
603     char *src = 0, *endp = 0;
604     char dst_buf[DST_BUF_SIZE], *dst = dst_buf;
605     int new_size;
606     void *c1 = (*b->method->code_start)(ISAMC_DECODE);
607     void *c2 = (*b->method->code_start)(ISAMC_ENCODE);
608     int more = 1;
609     int quater = b->file[b->no_cat-1].head.block_max / CAT_MAX;
610     char *cut = dst_buf + quater * 2;
611     char *maxp = dst_buf + b->file[b->no_cat-1].head.block_max;
612     char *half1 = 0;
613     char *half2 = 0;
614     char cut_item_buf[DST_ITEM_MAX];
615     int cut_item_size = 0;
616
617     if (p && p->size)
618     {
619         char file_item_buf[DST_ITEM_MAX];
620         char *file_item = file_item_buf;
621             
622         src = p->bytes;
623         endp = p->bytes + p->size;
624         (*b->method->code_item)(ISAMC_DECODE, c1, &file_item, &src);
625         while (1)
626         {
627             char *dst_item = 0;
628             char *dst_0 = dst;
629             char *lookahead_next;
630             int d = -1;
631             
632             if (lookahead_item)
633                 d = (*b->method->compare_item)(file_item_buf, lookahead_item);
634             
635             if (d > 0)
636             {
637                 dst_item = lookahead_item;
638                 if (!*lookahead_mode)
639                 {
640                     yaz_log (LOG_WARN, "isamb: Inconsistent register (1)");
641                     assert (*lookahead_mode);
642                 }
643             }
644             else
645                 dst_item = file_item_buf;
646             if (!*lookahead_mode && d == 0)
647             {
648                 p->dirty = 1;
649             }
650             else if (!half1 && dst > cut)
651             {
652                 char *dst_item_0 = dst_item;
653                 half1 = dst; /* candidate for splitting */
654                 
655                 (*b->method->code_item)(ISAMC_ENCODE, c2, &dst, &dst_item);
656                 
657                 cut_item_size = dst_item - dst_item_0;
658                 memcpy (cut_item_buf, dst_item_0, cut_item_size);
659                 
660                 half2 = dst;
661             }
662             else
663                 (*b->method->code_item)(ISAMC_ENCODE, c2, &dst, &dst_item);
664             if (d > 0)  
665             {
666                 if (dst > maxp)
667                 {
668                     dst = dst_0;
669                     lookahead_item = 0;
670                 }
671                 else
672                 {
673                     lookahead_next = lookahead_item;
674                     if (!(*stream->read_item)(stream->clientData,
675                                               &lookahead_next,
676                                               lookahead_mode))
677                     {
678                         lookahead_item = 0;
679                         more = 0;
680                     }
681                     if (lookahead_item && max_item &&
682                         (*b->method->compare_item)(max_item, lookahead_item) <= 0)
683                     {
684                         /* max_item 1 */
685                         lookahead_item = 0;
686                     }
687                     
688                     p->dirty = 1;
689                 }
690             }
691             else if (d == 0)
692             {
693                 lookahead_next = lookahead_item;
694                 if (!(*stream->read_item)(stream->clientData,
695                                           &lookahead_next, lookahead_mode))
696                 {
697                     lookahead_item = 0;
698                     more = 0;
699                 }
700                 if (src == endp)
701                     break;
702                 file_item = file_item_buf;
703                 (*b->method->code_item)(ISAMC_DECODE, c1, &file_item, &src);
704             }
705             else
706             {
707                 if (src == endp)
708                     break;
709                 file_item = file_item_buf;
710                 (*b->method->code_item)(ISAMC_DECODE, c1, &file_item, &src);
711             }
712         }
713     }
714     maxp = dst_buf + b->file[b->no_cat-1].head.block_max + quater;
715     while (lookahead_item)
716     {
717         char *dst_item = lookahead_item;
718         char *dst_0 = dst;
719         
720         if (max_item &&
721             (*b->method->compare_item)(max_item, lookahead_item) <= 0)
722         {
723             /* max_item 2 */
724             break;
725         }
726         if (!*lookahead_mode)
727         {
728             yaz_log (LOG_WARN, "isamb: Inconsistent register (2)");
729             abort();
730         }
731         else if (!half1 && dst > cut)   
732         {
733             char *dst_item_0 = dst_item;
734             half1 = dst; /* candidate for splitting */
735             
736             (*b->method->code_item)(ISAMC_ENCODE, c2, &dst, &dst_item);
737             
738             cut_item_size = dst_item - dst_item_0;
739             memcpy (cut_item_buf, dst_item_0, cut_item_size);
740             
741             half2 = dst;
742         }
743         else
744             (*b->method->code_item)(ISAMC_ENCODE, c2, &dst, &dst_item);
745
746         if (dst > maxp)
747         {
748             dst = dst_0;
749             break;
750         }
751         if (p)
752             p->dirty = 1;
753         dst_item = lookahead_item;
754         if (!(*stream->read_item)(stream->clientData, &dst_item,
755                                   lookahead_mode))
756         {
757             lookahead_item = 0;
758             more = 0;
759         }
760     }
761     new_size = dst - dst_buf;
762     if (p && p->cat != b->no_cat-1 && 
763         new_size > b->file[p->cat].head.block_max)
764     {
765         /* non-btree block will be removed */
766         p->deleted = 1;
767         close_block (b, p);
768         /* delete it too!! */
769         p = 0; /* make a new one anyway */
770     }
771     if (!p)
772     {   /* must create a new one */
773         int i;
774         for (i = 0; i < b->no_cat; i++)
775             if (new_size <= b->file[i].head.block_max)
776                 break;
777         if (i == b->no_cat)
778             i = b->no_cat - 1;
779         p = new_leaf (b, i);
780     }
781     if (new_size > b->file[p->cat].head.block_max)
782     {
783         char *first_dst;
784         char *cut_item = cut_item_buf;
785
786         assert (half1);
787         assert (half2);
788
789        /* first half */
790         p->size = half1 - dst_buf;
791         memcpy (p->bytes, dst_buf, half1 - dst_buf);
792
793         /* second half */
794         *sp2 = new_leaf (b, p->cat);
795
796         (*b->method->code_reset)(c2);
797
798         first_dst = (*sp2)->bytes;
799
800         (*b->method->code_item)(ISAMC_ENCODE, c2, &first_dst, &cut_item);
801
802         memcpy (first_dst, half2, dst - half2);
803
804         (*sp2)->size = (first_dst - (*sp2)->bytes) + (dst - half2);
805         (*sp2)->dirty = 1;
806         p->dirty = 1;
807         memcpy (sub_item, cut_item_buf, cut_item_size);
808         *sub_size = cut_item_size;
809     }
810     else
811     {
812         memcpy (p->bytes, dst_buf, dst - dst_buf);
813         p->size = new_size;
814     }
815     (*b->method->code_stop)(ISAMC_DECODE, c1);
816     (*b->method->code_stop)(ISAMC_ENCODE, c2);
817     *sp1 = p;
818     return more;
819 }
820
821 int insert_sub (ISAMB b, struct ISAMB_block **p, void *new_item,
822                 int *mode,
823                 ISAMC_I *stream,
824                 struct ISAMB_block **sp,
825                 void *sub_item, int *sub_size,
826                 void *max_item)
827 {
828     if (!*p || (*p)->leaf)
829         return insert_leaf (b, p, new_item, mode, stream, sp, sub_item, 
830                             sub_size, max_item);
831     else
832         return insert_int (b, *p, new_item, mode, stream, sp, sub_item,
833                            sub_size, max_item);
834 }
835
836 int isamb_unlink (ISAMB b, ISAMC_P pos)
837 {
838     struct ISAMB_block *p1;
839
840     if (!pos)
841         return 0;
842     p1 = open_block(b, pos);
843     p1->deleted = 1;
844     if (!p1->leaf)
845     {
846         int sub_p;
847         int item_len;
848         char *src = p1->bytes + p1->offset;
849
850         decode_ptr(&src, &sub_p);
851         isamb_unlink(b, sub_p);
852         
853         while (src != p1->bytes + p1->size)
854         {
855             decode_ptr(&src, &item_len);
856             src += item_len;
857             decode_ptr(&src, &sub_p);
858             isamb_unlink(b, sub_p);
859         }
860     }
861     close_block(b, p1);
862     return 0;
863 }
864
865 int isamb_merge (ISAMB b, ISAMC_P pos, ISAMC_I *stream)
866 {
867     char item_buf[DST_ITEM_MAX];
868     char *item_ptr;
869     int i_mode;
870     int more;
871
872     if (b->cache < 0)
873     {
874         int more = 1;
875         while (more)
876         {
877             item_ptr = item_buf;
878             more =
879                 (*stream->read_item)(stream->clientData, &item_ptr, &i_mode);
880         }
881         return 1;
882     }
883     item_ptr = item_buf;
884     more = (*stream->read_item)(stream->clientData, &item_ptr, &i_mode);
885     while (more)
886     {
887         struct ISAMB_block *p = 0, *sp = 0;
888         char sub_item[DST_ITEM_MAX];
889         int sub_size;
890         
891         if (pos)
892             p = open_block (b, pos);
893         more = insert_sub (b, &p, item_buf, &i_mode, stream, &sp,
894                             sub_item, &sub_size, 0);
895         if (sp)
896         {    /* increase level of tree by one */
897             struct ISAMB_block *p2 = new_int (b, p->cat);
898             char *dst = p2->bytes + p2->size;
899             
900             encode_ptr (&dst, p->pos);
901             assert (sub_size < 20);
902             encode_ptr (&dst, sub_size);
903             memcpy (dst, sub_item, sub_size);
904             dst += sub_size;
905             encode_ptr (&dst, sp->pos);
906             
907             p2->size = dst - p2->bytes;
908             pos = p2->pos;  /* return new super page */
909             close_block (b, sp);
910             close_block (b, p2);
911         }
912         else
913             pos = p->pos;   /* return current one (again) */
914         close_block (b, p);
915     }
916     return pos;
917 }
918
919 ISAMB_PP isamb_pp_open_x (ISAMB isamb, ISAMB_P pos, int *level)
920 {
921     ISAMB_PP pp = xmalloc (sizeof(*pp));
922     int i;
923
924     pp->isamb = isamb;
925     pp->block = xmalloc (ISAMB_MAX_LEVEL * sizeof(*pp->block));
926
927     pp->pos = pos;
928     pp->level = 0;
929     pp->maxlevel=0;
930     pp->total_size = 0;
931     pp->no_blocks = 0;
932     pp->skipped_numbers=0;
933     pp->returned_numbers=0;
934     for (i=0;i<ISAMB_MAX_LEVEL;i++)
935         pp->skipped_nodes[i] = pp->accessed_nodes[i]=0;
936     while (1)
937     {
938         struct ISAMB_block *p = open_block (isamb, pos);
939         char *src = p->bytes + p->offset;
940         pp->block[pp->level] = p;
941
942         pp->total_size += p->size;
943         pp->no_blocks++;
944         if (p->leaf)
945             break;
946
947                                         
948         decode_ptr (&src, &pos);
949         p->offset = src - p->bytes;
950         pp->level++;
951         pp->accessed_nodes[pp->level]++; 
952     }
953     pp->block[pp->level+1] = 0;
954     pp->maxlevel=pp->level;
955     if (level)
956         *level = pp->level;
957     return pp;
958 }
959
960 ISAMB_PP isamb_pp_open (ISAMB isamb, ISAMB_P pos)
961 {
962     return isamb_pp_open_x (isamb, pos, 0);
963 }
964
965 void isamb_pp_close_x (ISAMB_PP pp, int *size, int *blocks)
966 {
967     int i;
968     if (!pp)
969         return;
970     logf(LOG_DEBUG,"isamb_pp_close lev=%d returned %d values, skipped %d",
971         pp->maxlevel, pp->skipped_numbers, pp->returned_numbers);
972     for (i=pp->maxlevel;i>=0;i--)
973         if ( pp->skipped_nodes[i] || pp->accessed_nodes[i])
974             logf(LOG_DEBUG,"isamb_pp_close  level leaf-%d: %d read, %d skipped", i,
975                  pp->accessed_nodes[i], pp->skipped_nodes[i]);
976     pp->isamb->skipped_numbers += pp->skipped_numbers;
977     pp->isamb->returned_numbers += pp->returned_numbers;
978     for (i=pp->maxlevel;i>=0;i--)
979     {
980         pp->isamb->accessed_nodes[i] += pp->accessed_nodes[i];
981         pp->isamb->skipped_nodes[i] += pp->skipped_nodes[i];
982     }
983     if (size)
984         *size = pp->total_size;
985     if (blocks)
986         *blocks = pp->no_blocks;
987     for (i = 0; i <= pp->level; i++)
988         close_block (pp->isamb, pp->block[i]);
989     xfree (pp->block);
990     xfree (pp);
991 }
992
993 int isamb_block_info (ISAMB isamb, int cat)
994 {
995     if (cat >= 0 && cat < isamb->no_cat)
996         return isamb->file[cat].head.block_size;
997     return -1;
998 }
999
1000 void isamb_pp_close (ISAMB_PP pp)
1001 {
1002     isamb_pp_close_x (pp, 0, 0);
1003 }
1004
1005
1006 #if 0
1007 /* Old isamb_pp_read that Adam wrote, kept as a reference in case we need to
1008    debug the more complex pp_read that also forwards. May be deleted near end
1009    of 2004, if it has not shown to be useful */
1010
1011 int isamb_pp_read (ISAMB_PP pp, void *buf)
1012 {
1013     char *dst = buf;
1014     char *src;
1015     struct ISAMB_block *p = pp->block[pp->level];
1016     if (!p)
1017         return 0;
1018
1019     while (p->offset == p->size)
1020     {
1021         int pos, item_len;
1022         while (p->offset == p->size)
1023         {
1024             if (pp->level == 0)
1025                 return 0;
1026             close_block (pp->isamb, pp->block[pp->level]);
1027             pp->block[pp->level] = 0;
1028             (pp->level)--;
1029             p = pp->block[pp->level];
1030             assert (!p->leaf);  
1031         }
1032         src = p->bytes + p->offset;
1033         
1034         decode_ptr (&src, &item_len);
1035         src += item_len;
1036         decode_ptr (&src, &pos);
1037         
1038         p->offset = src - (char*) p->bytes;
1039
1040         ++(pp->level);
1041         
1042         while (1)
1043         {
1044             pp->block[pp->level] = p = open_block (pp->isamb, pos);
1045
1046             pp->total_size += p->size;
1047             pp->no_blocks++;
1048             
1049             if (p->leaf) 
1050             {
1051                 break;
1052             }
1053             src = p->bytes + p->offset;
1054             decode_ptr (&src, &pos);
1055             p->offset = src - (char*) p->bytes;
1056             pp->level++;
1057         }
1058     }
1059     assert (p->offset < p->size);
1060     assert (p->leaf);
1061     src = p->bytes + p->offset;
1062     (*pp->isamb->method->code_item)(ISAMC_DECODE, p->decodeClientData,
1063                                     &dst, &src);
1064     p->offset = src - (char*) p->bytes;
1065     key_logdump_txt(LOG_DEBUG,buf, "isamb_pp_read returning 1");
1066     return 1;
1067 }
1068
1069 #else
1070 int isamb_pp_read (ISAMB_PP pp, void *buf)
1071 {
1072     return isamb_pp_forward(pp,buf,0);
1073 }
1074 #endif
1075
1076 int isamb_pp_forward (ISAMB_PP pp, void *buf, const void *untilbuf)
1077 {
1078     /* pseudocode:
1079      *   while 1
1080      *     while at end of node
1081      *       climb higher. If out, return 0
1082      *     while not on a leaf (and not at its end)
1083      *       decode next
1084      *       if cmp
1085      *         descend to node
1086      *     decode next
1087      *     if cmp
1088      *       return 1
1089      */
1090         /* 
1091          * The upper nodes consist of a sequence of nodenumbers and keys
1092          * When opening a block,  the first node number is read in, and
1093          * offset points to the first key, which is the upper limit of keys
1094          * in the node just read.
1095          */
1096     char *dst = buf;
1097     char *src;
1098     struct ISAMB_block *p = pp->block[pp->level];
1099     int cmp;
1100     int item_len;
1101     int pos;
1102     int nxtpos;
1103     if (!p)
1104         return 0;
1105     logf(LOG_DEBUG,"isamb_pp_forward starting [%p] p=%d",pp,p->pos);
1106     key_logdump_txt(LOG_DEBUG, untilbuf," until");
1107     key_logdump_txt(LOG_DEBUG, buf, " buf");
1108     while (1)
1109     {
1110         while ( p->offset == p->size) 
1111         {  /* end of this block - climb higher */
1112             logf(LOG_DEBUG,"isamb_pp_forward climbing from l=%d",
1113                             pp->level);
1114             if (pp->level == 0)
1115             {
1116                 logf(LOG_DEBUG,"isamb_pp_forward returning 0 at root");
1117                 return 0; /* at end of the root, nothing left */
1118             }
1119             close_block(pp->isamb, pp->block[pp->level]);
1120             pp->block[pp->level]=0;
1121             (pp->level)--;
1122             p=pp->block[pp->level];
1123             logf(LOG_DEBUG,"isamb_pp_forward climbed to node %d off=%d",
1124                             p->pos, p->offset);
1125             assert(!p->leaf);
1126             /* skip the child we have handled */
1127             if (p->offset != p->size)
1128             { 
1129                 src = p->bytes + p->offset;
1130                 decode_ptr(&src, &item_len);
1131                 key_logdump_txt(LOG_DEBUG, src, " isamb_pp_forward climb skipping old key");
1132                 src += item_len;
1133                 decode_ptr(&src,&pos);
1134                 p->offset = src - (char*) p->bytes;
1135                 break; /* even if this puts us at the end of the block, we need to */
1136                        /* descend to the last pos. UGLY coding, clean up some */
1137                        /* day */
1138             }
1139         }
1140         if (!p->leaf)
1141         { 
1142             src = p->bytes + p->offset;
1143             if (p->offset == p->size)
1144                 cmp=-2 ; /* descend to the last node, as we have no value to cmp */
1145             else
1146             {
1147                 decode_ptr(&src, &item_len);
1148                 logf(LOG_DEBUG,"isamb_pp_forward (B) on a high node. ofs=%d sz=%d nxtpos=%d ",
1149                         p->offset,p->size,pos);
1150                 key_logdump(LOG_DEBUG, src);
1151                 if (untilbuf)
1152                     cmp=(*pp->isamb->method->compare_item)(untilbuf,src);
1153                 else
1154                     cmp=-2;
1155                 src += item_len;
1156                 decode_ptr(&src,&nxtpos);
1157             }
1158             if (cmp<2)
1159             { 
1160                 logf(LOG_DEBUG,"isambb_pp_forward descending l=%d p=%d ",
1161                             pp->level, pos);
1162                 ++(pp->level);
1163                 p = open_block(pp->isamb,pos);
1164                 pp->block[pp->level] = p ;
1165                 pp->total_size += p->size;
1166                 (pp->accessed_nodes[pp->maxlevel - pp->level])++;
1167                 pp->no_blocks++;
1168                 if ( !p->leaf)
1169                 { /* block starts with a pos */
1170                     src = p->bytes + p->offset;
1171                     decode_ptr(&src,&pos);
1172                     p->offset=src-(char*) p->bytes;
1173                     logf(LOG_DEBUG,"isamb_pp_forward: block %d starts with %d",
1174                                     p->pos, pos);
1175                 } 
1176             } /* descend to the node */
1177             else
1178             { /* skip the node */
1179                 p->offset = src - (char*) p->bytes;
1180                 pos=nxtpos;
1181                 (pp->skipped_nodes[pp->maxlevel - pp->level -1])++;
1182                 logf(LOG_DEBUG,
1183                     "isamb_pp_forward: skipping block on level %d, noting on %d (%d)",
1184                     pp->level, pp->maxlevel - pp->level-1 , 
1185                     pp->skipped_nodes[pp->maxlevel - pp->level-1 ]);
1186                 /* 0 is always leafs, 1 is one level above leafs etc, no
1187                  * matter how high tree */
1188             }
1189         } /* not on a leaf */
1190         else
1191         { /* on a leaf */
1192             src = p->bytes + p->offset;
1193             dst=buf;
1194             (*pp->isamb->method->code_item)(ISAMC_DECODE, p->decodeClientData,
1195                                             &dst, &src);
1196             p->offset = src - (char*) p->bytes;
1197             if (untilbuf)
1198                 cmp=(*pp->isamb->method->compare_item)(untilbuf,buf);
1199             else
1200                 cmp=-2;
1201             logf(LOG_DEBUG,"isamb_pp_forward on a leaf. cmp=%d", 
1202                               cmp);
1203             key_logdump(LOG_DEBUG, buf);
1204             if (cmp <2)
1205             {
1206                 if (untilbuf)
1207                     key_logdump_txt(LOG_DEBUG, buf,
1208                        "isamb_pp_forward returning 1");
1209                 else
1210                     key_logdump_txt(LOG_DEBUG, buf,
1211                         "isamb_pp_read returning 1 (fwd)");
1212                 pp->returned_numbers++;
1213                 return 1;
1214             }
1215             else
1216                 pp->skipped_numbers++;
1217         } /* leaf */
1218     } /* main loop */
1219 }
1220
1221
1222 int isamb_pp_num (ISAMB_PP pp)
1223 {
1224     return 1;
1225 }