Initialize log_freelist
[idzebra-moved-to-github.git] / isamb / isamb.c
1 /* $Id: isamb.c,v 1.25 2003-03-20 14:37:35 adam Exp $
2    Copyright (C) 1995,1996,1997,1998,1999,2000,2001,2002,2003
3    Index Data Aps
4
5 This file is part of the Zebra server.
6
7 Zebra is free software; you can redistribute it and/or modify it under
8 the terms of the GNU General Public License as published by the Free
9 Software Foundation; either version 2, or (at your option) any later
10 version.
11
12 Zebra is distributed in the hope that it will be useful, but WITHOUT ANY
13 WARRANTY; without even the implied warranty of MERCHANTABILITY or
14 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
15 for more details.
16
17 You should have received a copy of the GNU General Public License
18 along with Zebra; see the file LICENSE.zebra.  If not, write to the
19 Free Software Foundation, 59 Temple Place - Suite 330, Boston, MA
20 02111-1307, USA.
21 */
22
23 #include <string.h>
24 #include <yaz/xmalloc.h>
25 #include <yaz/log.h>
26 #include <isamb.h>
27 #include <assert.h>
28
29 struct ISAMB_head {
30     int first_block;
31     int last_block;
32     int block_size;
33     int block_max;
34     int free_list;
35 };
36
37 #define ISAMB_DATA_OFFSET 3
38
39 #define DST_ITEM_MAX 256
40
41 /* approx 2*4 K + max size of item */
42 #define DST_BUF_SIZE 8448
43
44 #define ISAMB_CACHE_ENTRY_SIZE 4096
45
46 struct ISAMB_cache_entry {
47     ISAMB_P pos;
48     unsigned char *buf;
49     int dirty;
50     int hits;
51     struct ISAMB_cache_entry *next;
52 };
53
54
55 struct ISAMB_file {
56     BFile bf;
57     int head_dirty;
58     struct ISAMB_head head;
59     struct ISAMB_cache_entry *cache_entries;
60 };
61
62 struct ISAMB_s {
63     BFiles bfs;
64     ISAMC_M method;
65
66     struct ISAMB_file *file;
67     int no_cat;
68     int cache; /* 0=no cache, 1=use cache, -1=dummy isam (for testing only) */
69     int log_io;        /* log level for bf_read/bf_write calls */
70     int log_freelist;  /* log level for freelist handling */
71 };
72
73 struct ISAMB_block {
74     ISAMB_P pos;
75     int cat;
76     int size;
77     int leaf;
78     int dirty;
79     int deleted;
80     int offset;
81     char *bytes;
82     unsigned char *buf;
83     void *decodeClientData;
84     int log_rw;
85 };
86
87 struct ISAMB_PP_s {
88     ISAMB isamb;
89     ISAMB_P pos;
90     int level;
91     int total_size;
92     int no_blocks;
93     struct ISAMB_block **block;
94 };
95
96 void encode_ptr (char **dst, int pos)
97 {
98     memcpy (*dst, &pos, sizeof(pos));
99     (*dst) += sizeof(pos);
100 }
101
102 void decode_ptr (char **src, int *pos)
103 {
104     memcpy (pos, *src, sizeof(*pos));
105     (*src) += sizeof(*pos);
106 }
107
108 ISAMB isamb_open (BFiles bfs, const char *name, int writeflag, ISAMC_M method,
109                   int cache)
110 {
111     ISAMB isamb = xmalloc (sizeof(*isamb));
112     int i, b_size = 32;
113
114     isamb->bfs = bfs;
115     isamb->method = (ISAMC_M) xmalloc (sizeof(*method));
116     memcpy (isamb->method, method, sizeof(*method));
117     isamb->no_cat = 4;
118     isamb->log_io = 0;
119     isamb->log_freelist = 0;
120     isamb->cache = cache;
121
122     assert (cache == 0);
123     isamb->file = xmalloc (sizeof(*isamb->file) * isamb->no_cat);
124     for (i = 0; i<isamb->no_cat; i++)
125     {
126         char fname[DST_BUF_SIZE];
127         isamb->file[i].cache_entries = 0;
128         isamb->file[i].head_dirty = 0;
129         sprintf (fname, "%s%c", name, i+'A');
130         if (cache)
131             isamb->file[i].bf = bf_open (bfs, fname, ISAMB_CACHE_ENTRY_SIZE,
132                                          writeflag);
133         else
134             isamb->file[i].bf = bf_open (bfs, fname, b_size, writeflag);
135
136         
137         if (!bf_read (isamb->file[i].bf, 0, 0, sizeof(struct ISAMB_head),
138                       &isamb->file[i].head))
139         {
140             isamb->file[i].head.first_block = ISAMB_CACHE_ENTRY_SIZE/b_size+1;
141             isamb->file[i].head.last_block = isamb->file[i].head.first_block;
142             isamb->file[i].head.block_size = b_size;
143             isamb->file[i].head.block_max = b_size - ISAMB_DATA_OFFSET;
144             isamb->file[i].head.free_list = 0;
145         }
146         assert (isamb->file[i].head.block_size >= ISAMB_DATA_OFFSET);
147         isamb->file[i].head_dirty = 0;
148         assert(isamb->file[i].head.block_size == b_size);
149         b_size = b_size * 4;
150     }
151     return isamb;
152 }
153
154 static void flush_blocks (ISAMB b, int cat)
155 {
156     while (b->file[cat].cache_entries)
157     {
158         struct ISAMB_cache_entry *ce_this = b->file[cat].cache_entries;
159         b->file[cat].cache_entries = ce_this->next;
160
161         if (ce_this->dirty)
162         {
163             yaz_log (b->log_io, "bf_write: flush_blocks");
164             bf_write (b->file[cat].bf, ce_this->pos, 0, 0, ce_this->buf);
165         }
166         xfree (ce_this->buf);
167         xfree (ce_this);
168     }
169 }
170
171 static int get_block (ISAMB b, ISAMC_P pos, char *userbuf, int wr)
172 {
173     int cat = pos&3;
174     int off = ((pos/4) & 
175                (ISAMB_CACHE_ENTRY_SIZE / b->file[cat].head.block_size - 1))
176         * b->file[cat].head.block_size;
177     int norm = pos / (4*ISAMB_CACHE_ENTRY_SIZE / b->file[cat].head.block_size);
178     int no = 0;
179     struct ISAMB_cache_entry **ce, *ce_this = 0, **ce_last = 0;
180
181     if (!b->cache)
182         return 0;
183
184     assert (ISAMB_CACHE_ENTRY_SIZE >= b->file[cat].head.block_size);
185     for (ce = &b->file[cat].cache_entries; *ce; ce = &(*ce)->next, no++)
186     {
187         ce_last = ce;
188         if ((*ce)->pos == norm)
189         {
190             ce_this = *ce;
191             *ce = (*ce)->next;   /* remove from list */
192             
193             ce_this->next = b->file[cat].cache_entries;  /* move to front */
194             b->file[cat].cache_entries = ce_this;
195             
196             if (wr)
197             {
198                 memcpy (ce_this->buf + off, userbuf, 
199                         b->file[cat].head.block_size);
200                 ce_this->dirty = 1;
201             }
202             else
203                 memcpy (userbuf, ce_this->buf + off,
204                         b->file[cat].head.block_size);
205             return 1;
206         }
207     }
208     if (no >= 40)
209     {
210         assert (no == 40);
211         assert (ce_last && *ce_last);
212         ce_this = *ce_last;
213         *ce_last = 0;  /* remove the last entry from list */
214         if (ce_this->dirty)
215         {
216             yaz_log (b->log_io, "bf_write: get_block");
217             bf_write (b->file[cat].bf, ce_this->pos, 0, 0, ce_this->buf);
218         }
219         xfree (ce_this->buf);
220         xfree (ce_this);
221     }
222     ce_this = xmalloc (sizeof(*ce_this));
223     ce_this->next = b->file[cat].cache_entries;
224     b->file[cat].cache_entries = ce_this;
225     ce_this->buf = xmalloc (ISAMB_CACHE_ENTRY_SIZE);
226     ce_this->pos = norm;
227     yaz_log (b->log_io, "bf_read: get_block");
228     if (!bf_read (b->file[cat].bf, norm, 0, 0, ce_this->buf))
229         memset (ce_this->buf, 0, ISAMB_CACHE_ENTRY_SIZE);
230     if (wr)
231     {
232         memcpy (ce_this->buf + off, userbuf, b->file[cat].head.block_size);
233         ce_this->dirty = 1;
234     }
235     else
236     {
237         ce_this->dirty = 0;
238         memcpy (userbuf, ce_this->buf + off, b->file[cat].head.block_size);
239     }
240     return 1;
241 }
242
243
244 void isamb_close (ISAMB isamb)
245 {
246     int i;
247     for (i = 0; i<isamb->no_cat; i++)
248     {
249         flush_blocks (isamb, i);
250         if (isamb->file[i].head_dirty)
251             bf_write (isamb->file[i].bf, 0, 0,
252                       sizeof(struct ISAMB_head), &isamb->file[i].head);
253         
254         bf_close (isamb->file[i].bf);
255     }
256     xfree (isamb->file);
257     xfree (isamb->method);
258     xfree (isamb);
259 }
260
261
262 struct ISAMB_block *open_block (ISAMB b, ISAMC_P pos)
263 {
264     int cat = pos&3;
265     struct ISAMB_block *p;
266     if (!pos)
267         return 0;
268     p = xmalloc (sizeof(*p));
269     p->pos = pos;
270     p->cat = pos & 3;
271     p->buf = xmalloc (b->file[cat].head.block_size);
272
273     if (!get_block (b, pos, p->buf, 0))
274     {
275         yaz_log (b->log_io, "bf_read: open_block");
276         if (!bf_read (b->file[cat].bf, pos/4, 0, 0, p->buf))
277         {
278             yaz_log (LOG_FATAL, "read failure for pos=%ld block=%ld",
279                      (long) pos, (long) pos/4);
280             abort();
281         }
282     }
283     p->bytes = p->buf + ISAMB_DATA_OFFSET;
284     p->leaf = p->buf[0];
285     p->size = (p->buf[1] + 256 * p->buf[2]) - ISAMB_DATA_OFFSET;
286     if (p->size < 0)
287     {
288         fprintf (stderr, "pos=%d\n", pos);
289     }
290     assert (p->size >= 0);
291     p->offset = 0;
292     p->dirty = 0;
293     p->deleted = 0;
294     p->decodeClientData = (*b->method->code_start)(ISAMC_DECODE);
295     return p;
296 }
297
298 struct ISAMB_block *new_block (ISAMB b, int leaf, int cat)
299 {
300     struct ISAMB_block *p;
301
302     p = xmalloc (sizeof(*p));
303     p->buf = xmalloc (b->file[cat].head.block_size);
304
305     if (!b->file[cat].head.free_list)
306     {
307         int block_no;
308         block_no = b->file[cat].head.last_block++;
309         p->pos = block_no * 4 + cat;
310     }
311     else
312     {
313         p->pos = b->file[cat].head.free_list;
314         assert((p->pos & 3) == cat);
315         if (!get_block (b, p->pos, p->buf, 0))
316         {
317             yaz_log (b->log_io, "bf_read: new_block");
318             if (!bf_read (b->file[cat].bf, p->pos/4, 0, 0, p->buf))
319             {
320                 yaz_log (LOG_FATAL, "read failure for pos=%ld block=%ld",
321                          (long) p->pos/4, (long) p->pos/4);
322                 abort ();
323             }
324         }
325         yaz_log (b->log_freelist, "got block %d from freelist %d:%d", p->pos,
326                  cat, p->pos/4);
327         memcpy (&b->file[cat].head.free_list, p->buf, sizeof(int));
328     }
329     p->cat = cat;
330     b->file[cat].head_dirty = 1;
331     memset (p->buf, 0, b->file[cat].head.block_size);
332     p->bytes = p->buf + ISAMB_DATA_OFFSET;
333     p->leaf = leaf;
334     p->size = 0;
335     p->dirty = 1;
336     p->deleted = 0;
337     p->offset = 0;
338     p->decodeClientData = (*b->method->code_start)(ISAMC_DECODE);
339     return p;
340 }
341
342 struct ISAMB_block *new_leaf (ISAMB b, int cat)
343 {
344     return new_block (b, 1, cat);
345 }
346
347
348 struct ISAMB_block *new_int (ISAMB b, int cat)
349 {
350     return new_block (b, 0, cat);
351 }
352
353 static void check_block (ISAMB b, struct ISAMB_block *p)
354 {
355     if (p->leaf)
356     {
357         ;
358     }
359     else
360     {
361         /* sanity check */
362         char *startp = p->bytes;
363         char *src = startp;
364         char *endp = p->bytes + p->size;
365         int pos;
366             
367         decode_ptr (&src, &pos);
368         assert ((pos&3) == p->cat);
369         while (src != endp)
370         {
371             int item_len;
372             decode_ptr (&src, &item_len);
373             assert (item_len > 0 && item_len < 30);
374             src += item_len;
375             decode_ptr (&src, &pos);
376             assert ((pos&3) == p->cat);
377         }
378     }
379 }
380
381 void close_block (ISAMB b, struct ISAMB_block *p)
382 {
383     if (!p)
384         return;
385     if (p->deleted)
386     {
387         yaz_log (b->log_freelist, "release block %d from freelist %d:%d",
388                  p->pos, p->cat, p->pos/4);
389         memcpy (p->buf, &b->file[p->cat].head.free_list, sizeof(int));
390         b->file[p->cat].head.free_list = p->pos;
391         if (!get_block (b, p->pos, p->buf, 1))
392         {
393             yaz_log (b->log_io, "bf_write: close_block (deleted)");
394             bf_write (b->file[p->cat].bf, p->pos/4, 0, 0, p->buf);
395         }
396     }
397     else if (p->dirty)
398     {
399         int size = p->size + ISAMB_DATA_OFFSET;
400         assert (p->size >= 0);
401         p->buf[0] = p->leaf;
402         p->buf[1] = size & 255;
403         p->buf[2] = size >> 8;
404         check_block(b, p);
405         if (!get_block (b, p->pos, p->buf, 1))
406         {
407             yaz_log (b->log_io, "bf_write: close_block");
408             bf_write (b->file[p->cat].bf, p->pos/4, 0, 0, p->buf);
409         }
410     }
411     (*b->method->code_stop)(ISAMC_DECODE, p->decodeClientData);
412     xfree (p->buf);
413     xfree (p);
414 }
415
416 int insert_sub (ISAMB b, struct ISAMB_block **p,
417                 void *new_item, int *mode,
418                 ISAMC_I stream,
419                 struct ISAMB_block **sp,
420                 void *sub_item, int *sub_size,
421                 void *max_item);
422
423 int insert_int (ISAMB b, struct ISAMB_block *p, void *lookahead_item,
424                 int *mode,
425                 ISAMC_I stream, struct ISAMB_block **sp,
426                 void *split_item, int *split_size, void *last_max_item)
427 {
428     char *startp = p->bytes;
429     char *src = startp;
430     char *endp = p->bytes + p->size;
431     int pos;
432     struct ISAMB_block *sub_p1 = 0, *sub_p2 = 0;
433     char sub_item[DST_ITEM_MAX];
434     int sub_size;
435     int more;
436
437     *sp = 0;
438
439     assert(p->size >= 0);
440     decode_ptr (&src, &pos);
441     while (src != endp)
442     {
443         int item_len;
444         int d;
445         char *src0 = src;
446         decode_ptr (&src, &item_len);
447         d = (*b->method->compare_item)(src, lookahead_item);
448         if (d > 0)
449         {
450             sub_p1 = open_block (b, pos);
451             assert (sub_p1);
452             more = insert_sub (b, &sub_p1, lookahead_item, mode,
453                                stream, &sub_p2, 
454                                sub_item, &sub_size, src);
455             src = src0;
456             break;
457         }
458         src += item_len;
459         decode_ptr (&src, &pos);
460     }
461     if (!sub_p1)
462     {
463         sub_p1 = open_block (b, pos);
464         assert (sub_p1);
465         more = insert_sub (b, &sub_p1, lookahead_item, mode, stream, &sub_p2, 
466                            sub_item, &sub_size, last_max_item);
467     }
468     if (sub_p2)
469     {
470         /* there was a split - must insert pointer in this one */
471         char dst_buf[DST_BUF_SIZE];
472         char *dst = dst_buf;
473
474         assert (sub_size < 30 && sub_size > 1);
475
476         memcpy (dst, startp, src - startp);
477                 
478         dst += src - startp;
479
480         encode_ptr (&dst, sub_size);      /* sub length and item */
481         memcpy (dst, sub_item, sub_size);
482         dst += sub_size;
483
484         encode_ptr (&dst, sub_p2->pos);   /* pos */
485
486         if (endp - src)                   /* remaining data */
487         {
488             memcpy (dst, src, endp - src);
489             dst += endp - src;
490         }
491         p->size = dst - dst_buf;
492         assert (p->size >= 0);
493         if (p->size <= b->file[p->cat].head.block_max)
494         {
495             memcpy (startp, dst_buf, dst - dst_buf);
496         }
497         else
498         {
499             int p_new_size;
500             char *half;
501             src = dst_buf;
502             endp = dst;
503
504             half = src + b->file[p->cat].head.block_size/2;
505             decode_ptr (&src, &pos);
506             while (src <= half)
507             {
508                 decode_ptr (&src, split_size);
509                 src += *split_size;
510                 decode_ptr (&src, &pos);
511             }
512             p_new_size = src - dst_buf;
513             memcpy (p->bytes, dst_buf, p_new_size);
514
515             decode_ptr (&src, split_size);
516             memcpy (split_item, src, *split_size);
517             src += *split_size;
518
519             *sp = new_int (b, p->cat);
520             (*sp)->size = endp - src;
521             memcpy ((*sp)->bytes, src, (*sp)->size);
522
523             p->size = p_new_size;
524         }
525         p->dirty = 1;
526         close_block (b, sub_p2);
527     }
528     close_block (b, sub_p1);
529     return more;
530 }
531
532
533 int insert_leaf (ISAMB b, struct ISAMB_block **sp1, void *lookahead_item,
534                  int *lookahead_mode, ISAMC_I stream, struct ISAMB_block **sp2,
535                  void *sub_item, int *sub_size,
536                  void *max_item)
537 {
538     struct ISAMB_block *p = *sp1;
539     char *src = 0, *endp = 0;
540     char dst_buf[DST_BUF_SIZE], *dst = dst_buf;
541     int new_size;
542     void *c1 = (*b->method->code_start)(ISAMC_DECODE);
543     void *c2 = (*b->method->code_start)(ISAMC_ENCODE);
544     int more = 1;
545     int quater = b->file[b->no_cat-1].head.block_max / 4;
546     char *cut = dst_buf + quater * 2;
547     char *maxp = dst_buf + b->file[b->no_cat-1].head.block_max;
548     char *half1 = 0;
549     char *half2 = 0;
550     char cut_item_buf[DST_ITEM_MAX];
551     int cut_item_size = 0;
552
553     if (p && p->size)
554     {
555         char file_item_buf[DST_ITEM_MAX];
556         char *file_item = file_item_buf;
557             
558         src = p->bytes;
559         endp = p->bytes + p->size;
560         (*b->method->code_item)(ISAMC_DECODE, c1, &file_item, &src);
561         while (1)
562         {
563             char *dst_item = 0;
564             char *dst_0 = dst;
565             char *lookahead_next;
566             int d = -1;
567             
568             if (lookahead_item)
569                 d = (*b->method->compare_item)(file_item_buf, lookahead_item);
570             
571             if (d > 0)
572             {
573                 dst_item = lookahead_item;
574                 assert (*lookahead_mode);
575             }
576             else
577                 dst_item = file_item_buf;
578             if (!*lookahead_mode && d == 0)
579             {
580                 p->dirty = 1;
581             }
582             else if (!half1 && dst > cut)
583             {
584                 char *dst_item_0 = dst_item;
585                 half1 = dst; /* candidate for splitting */
586                 
587                 (*b->method->code_item)(ISAMC_ENCODE, c2, &dst, &dst_item);
588                 
589                 cut_item_size = dst_item - dst_item_0;
590                 memcpy (cut_item_buf, dst_item_0, cut_item_size);
591                 
592                 half2 = dst;
593             }
594             else
595                 (*b->method->code_item)(ISAMC_ENCODE, c2, &dst, &dst_item);
596             if (d > 0)  
597             {
598                 if (dst > maxp)
599                 {
600                     dst = dst_0;
601                     lookahead_item = 0;
602                 }
603                 else
604                 {
605                     lookahead_next = lookahead_item;
606                     if (!(*stream->read_item)(stream->clientData,
607                                               &lookahead_next,
608                                               lookahead_mode))
609                     {
610                         lookahead_item = 0;
611                         more = 0;
612                     }
613                     if (lookahead_item && max_item &&
614                         (*b->method->compare_item)(max_item, lookahead_item) <= 0)
615                     {
616                         /* max_item 1 */
617                         lookahead_item = 0;
618                     }
619                     
620                     p->dirty = 1;
621                 }
622             }
623             else if (d == 0)
624             {
625                 lookahead_next = lookahead_item;
626                 if (!(*stream->read_item)(stream->clientData,
627                                           &lookahead_next, lookahead_mode))
628                 {
629                     lookahead_item = 0;
630                     more = 0;
631                 }
632                 if (src == endp)
633                     break;
634                 file_item = file_item_buf;
635                 (*b->method->code_item)(ISAMC_DECODE, c1, &file_item, &src);
636             }
637             else
638             {
639                 if (src == endp)
640                     break;
641                 file_item = file_item_buf;
642                 (*b->method->code_item)(ISAMC_DECODE, c1, &file_item, &src);
643             }
644         }
645     }
646     maxp = dst_buf + b->file[b->no_cat-1].head.block_max + quater;
647     while (lookahead_item)
648     {
649         char *dst_item = lookahead_item;
650         char *dst_0 = dst;
651         
652         if (max_item &&
653             (*b->method->compare_item)(max_item, lookahead_item) <= 0)
654         {
655             /* max_item 2 */
656             break;
657         }
658         if (!*lookahead_mode)
659         {
660             yaz_log (LOG_WARN, "Inconsistent register (2)");
661             abort();
662         }
663         else if (!half1 && dst > cut)   
664         {
665             char *dst_item_0 = dst_item;
666             half1 = dst; /* candidate for splitting */
667             
668             (*b->method->code_item)(ISAMC_ENCODE, c2, &dst, &dst_item);
669             
670             cut_item_size = dst_item - dst_item_0;
671             memcpy (cut_item_buf, dst_item_0, cut_item_size);
672             
673             half2 = dst;
674         }
675         else
676             (*b->method->code_item)(ISAMC_ENCODE, c2, &dst, &dst_item);
677
678         if (dst > maxp)
679         {
680             dst = dst_0;
681             break;
682         }
683         if (p)
684             p->dirty = 1;
685         dst_item = lookahead_item;
686         if (!(*stream->read_item)(stream->clientData, &dst_item,
687                                   lookahead_mode))
688         {
689             lookahead_item = 0;
690             more = 0;
691         }
692     }
693     new_size = dst - dst_buf;
694     if (p && p->cat != b->no_cat-1 && 
695         new_size > b->file[p->cat].head.block_max)
696     {
697         /* non-btree block will be removed */
698         p->deleted = 1;
699         close_block (b, p);
700         /* delete it too!! */
701         p = 0; /* make a new one anyway */
702     }
703     if (!p)
704     {   /* must create a new one */
705         int i;
706         for (i = 0; i < b->no_cat; i++)
707             if (new_size <= b->file[i].head.block_max)
708                 break;
709         if (i == b->no_cat)
710             i = b->no_cat - 1;
711         p = new_leaf (b, i);
712     }
713     if (new_size > b->file[p->cat].head.block_max)
714     {
715         char *first_dst;
716         char *cut_item = cut_item_buf;
717
718         assert (half1);
719         assert (half2);
720
721        /* first half */
722         p->size = half1 - dst_buf;
723         memcpy (p->bytes, dst_buf, half1 - dst_buf);
724
725         /* second half */
726         *sp2 = new_leaf (b, p->cat);
727
728         (*b->method->code_reset)(c2);
729
730         first_dst = (*sp2)->bytes;
731
732         (*b->method->code_item)(ISAMC_ENCODE, c2, &first_dst, &cut_item);
733
734         memcpy (first_dst, half2, dst - half2);
735
736         (*sp2)->size = (first_dst - (*sp2)->bytes) + (dst - half2);
737         (*sp2)->dirty = 1;
738         p->dirty = 1;
739         memcpy (sub_item, cut_item_buf, cut_item_size);
740         *sub_size = cut_item_size;
741     }
742     else
743     {
744         memcpy (p->bytes, dst_buf, dst - dst_buf);
745         p->size = new_size;
746     }
747     (*b->method->code_stop)(ISAMC_DECODE, c1);
748     (*b->method->code_stop)(ISAMC_ENCODE, c2);
749     *sp1 = p;
750     return more;
751 }
752
753 int insert_sub (ISAMB b, struct ISAMB_block **p, void *new_item,
754                 int *mode,
755                 ISAMC_I stream,
756                 struct ISAMB_block **sp,
757                 void *sub_item, int *sub_size,
758                 void *max_item)
759 {
760     if (!*p || (*p)->leaf)
761         return insert_leaf (b, p, new_item, mode, stream, sp, sub_item, 
762                             sub_size, max_item);
763     else
764         return insert_int (b, *p, new_item, mode, stream, sp, sub_item,
765                            sub_size, max_item);
766 }
767
768 int isamb_merge (ISAMB b, ISAMC_P pos, ISAMC_I stream)
769 {
770     char item_buf[DST_ITEM_MAX];
771     char *item_ptr;
772     int i_mode;
773     int more;
774
775     if (b->cache < 0)
776     {
777         int more = 1;
778         while (more)
779         {
780             item_ptr = item_buf;
781             more =
782                 (*stream->read_item)(stream->clientData, &item_ptr, &i_mode);
783         }
784         return 1;
785     }
786     item_ptr = item_buf;
787     more = (*stream->read_item)(stream->clientData, &item_ptr, &i_mode);
788     while (more)
789     {
790         struct ISAMB_block *p = 0, *sp = 0;
791         char sub_item[DST_ITEM_MAX];
792         int sub_size;
793         
794         if (pos)
795             p = open_block (b, pos);
796         more = insert_sub (b, &p, item_buf, &i_mode, stream, &sp,
797                             sub_item, &sub_size, 0);
798         if (sp)
799         {    /* increase level of tree by one */
800             struct ISAMB_block *p2 = new_int (b, p->cat);
801             char *dst = p2->bytes + p2->size;
802             
803             encode_ptr (&dst, p->pos);
804             assert (sub_size < 20);
805             encode_ptr (&dst, sub_size);
806             memcpy (dst, sub_item, sub_size);
807             dst += sub_size;
808             encode_ptr (&dst, sp->pos);
809             
810             p2->size = dst - p2->bytes;
811             pos = p2->pos;  /* return new super page */
812             close_block (b, sp);
813             close_block (b, p2);
814         }
815         else
816             pos = p->pos;   /* return current one (again) */
817         close_block (b, p);
818     }
819     return pos;
820 }
821
822 ISAMB_PP isamb_pp_open_x (ISAMB isamb, ISAMB_P pos, int *level)
823 {
824     ISAMB_PP pp = xmalloc (sizeof(*pp));
825
826     pp->isamb = isamb;
827     pp->block = xmalloc (10 * sizeof(*pp->block));
828
829     pp->pos = pos;
830     pp->level = 0;
831     pp->total_size = 0;
832     pp->no_blocks = 0;
833     while (1)
834     {
835         struct ISAMB_block *p = open_block (isamb, pos);
836         char *src = p->bytes + p->offset;
837         pp->block[pp->level] = p;
838
839         pp->total_size += p->size;
840         pp->no_blocks++;
841
842         if (p->leaf)
843             break;
844
845         decode_ptr (&src, &pos);
846         p->offset = src - p->bytes;
847         pp->level++;
848     }
849     pp->block[pp->level+1] = 0;
850     if (level)
851         *level = pp->level;
852     return pp;
853 }
854
855 ISAMB_PP isamb_pp_open (ISAMB isamb, ISAMB_P pos)
856 {
857     return isamb_pp_open_x (isamb, pos, 0);
858 }
859
860 void isamb_pp_close_x (ISAMB_PP pp, int *size, int *blocks)
861 {
862     int i;
863     if (!pp)
864         return;
865     if (size)
866         *size = pp->total_size;
867     if (blocks)
868         *blocks = pp->no_blocks;
869     for (i = 0; i <= pp->level; i++)
870         close_block (pp->isamb, pp->block[i]);
871     xfree (pp->block);
872     xfree (pp);
873 }
874
875 int isamb_block_info (ISAMB isamb, int cat)
876 {
877     if (cat >= 0 && cat < isamb->no_cat)
878         return isamb->file[cat].head.block_size;
879     return -1;
880 }
881
882 void isamb_pp_close (ISAMB_PP pp)
883 {
884     isamb_pp_close_x (pp, 0, 0);
885 }
886
887 int isamb_pp_read (ISAMB_PP pp, void *buf)
888 {
889     char *dst = buf;
890     char *src;
891     struct ISAMB_block *p = pp->block[pp->level];
892     if (!p)
893         return 0;
894
895     while (p->offset == p->size)
896     {
897         int pos, item_len;
898         while (p->offset == p->size)
899         {
900             if (pp->level == 0)
901                 return 0;
902             close_block (pp->isamb, pp->block[pp->level]);
903             pp->block[pp->level] = 0;
904             (pp->level)--;
905             p = pp->block[pp->level];
906             assert (!p->leaf);  /* must be int */
907         }
908         src = p->bytes + p->offset;
909         
910         decode_ptr (&src, &item_len);
911         src += item_len;
912         decode_ptr (&src, &pos);
913         
914         p->offset = src - (char*) p->bytes;
915
916         ++(pp->level);
917         
918         while (1)
919         {
920             pp->block[pp->level] = p = open_block (pp->isamb, pos);
921
922             pp->total_size += p->size;
923             pp->no_blocks++;
924             
925             if (p->leaf) /* leaf */
926             {
927                 break;
928             }
929             src = p->bytes + p->offset;
930             decode_ptr (&src, &pos);
931             p->offset = src - (char*) p->bytes;
932             pp->level++;
933         }
934     }
935     assert (p->offset < p->size);
936     assert (p->leaf);
937     src = p->bytes + p->offset;
938     (*pp->isamb->method->code_item)(ISAMC_DECODE, p->decodeClientData,
939                                     &dst, &src);
940     p->offset = src - (char*) p->bytes;
941     return 1;
942 }
943
944 int isamb_pp_num (ISAMB_PP pp)
945 {
946     return 1;
947 }