isamb fixes for pp_read. Statistics
[idzebra-moved-to-github.git] / isamb / isamb.c
1 /*
2  *  Copyright (c) 2000-2002, Index Data.
3  *  See the file LICENSE for details.
4  *
5  *  $Id: isamb.c,v 1.12 2002-04-30 08:28:37 adam Exp $
6  */
7 #include <yaz/xmalloc.h>
8 #include <yaz/log.h>
9 #include <isamb.h>
10 #include <assert.h>
11
12 struct ISAMB_head {
13     int first_block;
14     int last_block;
15     int block_size;
16     int block_max;
17 };
18
19 #define ISAMB_DATA_OFFSET 3
20
21 #define DST_ITEM_MAX 256
22
23 /* approx 2*4 K + max size of item */
24 #define DST_BUF_SIZE 8448
25
26
27 struct ISAMB_file {
28     BFile bf;
29     int head_dirty;
30     struct ISAMB_head head;
31 };
32
33 struct ISAMB_s {
34     BFiles bfs;
35     ISAMC_M method;
36
37     struct ISAMB_file *file;
38     int no_cat;
39 };
40
41 struct ISAMB_block {
42     ISAMB_P pos;
43     int cat;
44     int size;
45     int leaf;
46     int dirty;
47     int offset;
48     char *bytes;
49     unsigned char *buf;
50     void *decodeClientData;
51 };
52
53 struct ISAMB_PP_s {
54     ISAMB isamb;
55     ISAMB_P pos;
56     int level;
57     int total_size;
58     int no_blocks;
59     struct ISAMB_block **block;
60 };
61
62 void encode_ptr (char **dst, int pos)
63 {
64     memcpy (*dst, &pos, sizeof(pos));
65     (*dst) += sizeof(pos);
66 }
67
68 void decode_ptr (char **src, int *pos)
69 {
70     memcpy (pos, *src, sizeof(*pos));
71     (*src) += sizeof(*pos);
72 }
73
74 ISAMB isamb_open (BFiles bfs, const char *name, int writeflag, ISAMC_M method)
75 {
76     ISAMB isamb = xmalloc (sizeof(*isamb));
77     int i, b_size = 64;
78
79     isamb->bfs = bfs;
80     isamb->method = (ISAMC_M) xmalloc (sizeof(*method));
81     memcpy (isamb->method, method, sizeof(*method));
82     isamb->no_cat = 3;
83
84     isamb->file = xmalloc (sizeof(*isamb->file) * isamb->no_cat);
85     for (i = 0; i<isamb->no_cat; i++)
86     {
87         char fname[DST_BUF_SIZE];
88         isamb->file[i].head.first_block = 1;
89         isamb->file[i].head.last_block = 1;
90         isamb->file[i].head.block_size = b_size;
91         isamb->file[i].head.block_max = b_size - ISAMB_DATA_OFFSET;
92         b_size = b_size * 4;
93         isamb->file[i].head_dirty = 0;
94         sprintf (fname, "%s-%d", name, i);
95         isamb->file[i].bf =
96             bf_open (bfs, fname, isamb->file[i].head.block_size, writeflag);
97     
98         bf_read (isamb->file[i].bf, 0, 0, sizeof(struct ISAMB_head),
99                  &isamb->file[i].head);
100     }
101     return isamb;
102 }
103
104 void isamb_close (ISAMB isamb)
105 {
106     int i;
107     for (i = 0; i<isamb->no_cat; i++)
108     {
109         if (isamb->file[i].head_dirty)
110             bf_write (isamb->file[i].bf, 0, 0,
111                       sizeof(struct ISAMB_head), &isamb->file[i].head);
112     }
113     xfree (isamb->file);
114     xfree (isamb->method);
115     xfree (isamb);
116 }
117
118 struct ISAMB_block *open_block (ISAMB b, ISAMC_P pos)
119 {
120     int cat = pos&3;
121     struct ISAMB_block *p;
122     if (!pos)
123         return 0;
124     p = xmalloc (sizeof(*p));
125     p->pos = pos;
126     p->cat = pos & 3;
127     p->buf = xmalloc (b->file[cat].head.block_size);
128     bf_read (b->file[cat].bf, pos/4, 0, 0, p->buf);
129     p->bytes = p->buf + ISAMB_DATA_OFFSET;
130     p->leaf = p->buf[0];
131     p->size = p->buf[1] + 256 * p->buf[2] - ISAMB_DATA_OFFSET;
132     p->offset = 0;
133     p->dirty = 0;
134     p->decodeClientData = (*b->method->code_start)(ISAMC_DECODE);
135     return p;
136 }
137
138 struct ISAMB_block *new_block (ISAMB b, int leaf, int cat)
139 {
140     struct ISAMB_block *p;
141     int block_no;
142     
143     p = xmalloc (sizeof(*p));
144     block_no = b->file[cat].head.last_block++;
145     p->cat = cat;
146     p->pos = block_no * 4 + cat;
147     p->cat = cat;
148     b->file[cat].head_dirty = 1;
149     p->buf = xmalloc (b->file[cat].head.block_size);
150     memset (p->buf, 0, b->file[cat].head.block_size);
151     p->bytes = p->buf + ISAMB_DATA_OFFSET;
152     p->leaf = leaf;
153     p->size = 0;
154     p->dirty = 1;
155     p->offset = 0;
156     p->decodeClientData = (*b->method->code_start)(ISAMC_DECODE);
157     return p;
158 }
159
160 struct ISAMB_block *new_leaf (ISAMB b, int cat)
161 {
162     return new_block (b, 1, cat);
163 }
164
165
166 struct ISAMB_block *new_int (ISAMB b, int cat)
167 {
168     return new_block (b, 0, cat);
169 }
170
171 void close_block (ISAMB b, struct ISAMB_block *p)
172 {
173     if (!p)
174         return;
175     if (p->dirty)
176     {
177         int size = p->size + ISAMB_DATA_OFFSET;
178         p->buf[0] = p->leaf;
179         p->buf[1] = size & 255;
180         p->buf[2] = size >> 8;
181         bf_write (b->file[p->cat].bf, p->pos/4, 0, 0, p->buf);
182     }
183     (*b->method->code_stop)(ISAMC_DECODE, p->decodeClientData);
184     xfree (p->buf);
185     xfree (p);
186 }
187
188 int insert_sub (ISAMB b, struct ISAMB_block **p,
189                 void *new_item,
190                 ISAMC_I stream,
191                 struct ISAMB_block **sp,
192                 void *sub_item, int *sub_size,
193                 void *max_item);
194
195 int insert_int (ISAMB b, struct ISAMB_block *p, void *lookahead_item,
196                 ISAMC_I stream, struct ISAMB_block **sp,
197                 void *split_item, int *split_size)
198 {
199     char *startp = p->bytes;
200     char *src = startp;
201     char *endp = p->bytes + p->size;
202     int pos;
203     struct ISAMB_block *sub_p1 = 0, *sub_p2 = 0;
204     char sub_item[DST_ITEM_MAX];
205     int sub_size;
206     int more;
207
208     *sp = 0;
209
210     decode_ptr (&src, &pos);
211     while (src != endp)
212     {
213         int item_len;
214         int d;
215         decode_ptr (&src, &item_len);
216         d = (*b->method->compare_item)(src, lookahead_item);
217         if (d > 0)
218         {
219             sub_p1 = open_block (b, pos);
220             assert (sub_p1);
221             more = insert_sub (b, &sub_p1, lookahead_item, stream, &sub_p2, 
222                                sub_item, &sub_size, src);
223             break;
224         }
225         src += item_len;
226         decode_ptr (&src, &pos);
227     }
228     if (!sub_p1)
229     {
230         sub_p1 = open_block (b, pos);
231         assert (sub_p1);
232         more = insert_sub (b, &sub_p1, lookahead_item, stream, &sub_p2, 
233                            sub_item, &sub_size, 0);
234     }
235     if (sub_p2)
236     {
237         /* there was a split - must insert pointer in this one */
238         char dst_buf[DST_BUF_SIZE];
239         char *dst = dst_buf;
240
241         assert (sub_size < 30 && sub_size > 1);
242
243         memcpy (dst, startp, src - startp);
244                 
245         dst += src - startp;
246
247         encode_ptr (&dst, sub_size);      /* sub length and item */
248         memcpy (dst, sub_item, sub_size);
249         dst += sub_size;
250
251         encode_ptr (&dst, sub_p2->pos);   /* pos */
252
253         if (endp - src)                   /* remaining data */
254         {
255             memcpy (dst, src, endp - src);
256             dst += endp - src;
257         }
258         p->size = dst - dst_buf;
259         if (p->size <= b->file[p->cat].head.block_max)
260         {
261             memcpy (startp, dst_buf, dst - dst_buf);
262         }
263         else
264         {
265             int p_new_size;
266             char *half;
267             src = dst_buf;
268             endp = dst;
269
270             half = src + b->file[p->cat].head.block_size/2;
271             decode_ptr (&src, &pos);
272             while (src <= half)
273             {
274                 decode_ptr (&src, split_size);
275                 src += *split_size;
276                 decode_ptr (&src, &pos);
277             }
278             p_new_size = src - dst_buf;
279             memcpy (p->bytes, dst_buf, p_new_size);
280
281             decode_ptr (&src, split_size);
282             memcpy (split_item, src, *split_size);
283             src += *split_size;
284
285             *sp = new_int (b, p->cat);
286             (*sp)->size = endp - src;
287             memcpy ((*sp)->bytes, src, (*sp)->size);
288
289             p->size = p_new_size;
290         }
291         p->dirty = 1;
292         close_block (b, sub_p2);
293     }
294     close_block (b, sub_p1);
295     return more;
296 }
297
298
299 int insert_leaf (ISAMB b, struct ISAMB_block **sp1, void *lookahead_item,
300                  ISAMC_I stream, struct ISAMB_block **sp2,
301                  void *sub_item, int *sub_size,
302                  void *max_item)
303 {
304     struct ISAMB_block *p = *sp1;
305     char *src = 0, *endp = 0;
306     char dst_buf[DST_BUF_SIZE], *dst = dst_buf;
307     int new_size;
308     void *c1 = (*b->method->code_start)(ISAMC_DECODE);
309     void *c2 = (*b->method->code_start)(ISAMC_ENCODE);
310     int more = 1;
311     int quater = b->file[b->no_cat-1].head.block_max / 4;
312     char *cut = dst_buf + quater * 2;
313     char *maxp = dst_buf + b->file[b->no_cat-1].head.block_max;
314     char *half1 = 0;
315     char *half2 = 0;
316     char cut_item_buf[DST_ITEM_MAX];
317     int cut_item_size = 0;
318
319     if (p && p->size)
320     {
321         char file_item_buf[DST_ITEM_MAX];
322         char *file_item = file_item_buf;
323             
324         src = p->bytes;
325         endp = p->bytes + p->size;
326         (*b->method->code_item)(ISAMC_DECODE, c1, &file_item, &src);
327         while (1)
328         {
329             char *dst_item = 0;
330             char *dst_0 = dst;
331             char *lookahead_next;
332             int lookahead_mode;
333             int d = -1;
334             
335             if (lookahead_item)
336                 d = (*b->method->compare_item)(file_item_buf, lookahead_item);
337             
338             if (d > 0)  
339                 dst_item = lookahead_item;
340             else
341                 dst_item = file_item_buf;
342             if (!half1 && dst > cut)   
343             {
344                 char *dst_item_0 = dst_item;
345                 half1 = dst; /* candidate for splitting */
346                 
347                 (*b->method->code_item)(ISAMC_ENCODE, c2, &dst, &dst_item);
348                 
349                 cut_item_size = dst_item - dst_item_0;
350                 memcpy (cut_item_buf, dst_item_0, cut_item_size);
351                 
352                 half2 = dst;
353             }
354             else
355                 (*b->method->code_item)(ISAMC_ENCODE, c2, &dst, &dst_item);
356             if (d > 0)  
357             {
358                 if (dst > maxp)
359                 {
360                     dst = dst_0;
361                     lookahead_item = 0;
362                 }
363                 else
364                 {
365                     lookahead_next = lookahead_item;
366                     if (!(*stream->read_item)(stream->clientData,
367                                               &lookahead_next,
368                                               &lookahead_mode))
369                     {
370                         lookahead_item = 0;
371                         more = 0;
372                     }
373                     if (max_item &&
374                         (*b->method->compare_item)(max_item, lookahead_item) <= 0)
375                     {
376                         yaz_log (LOG_LOG, "max_item 1");
377                         lookahead_item = 0;
378                     }
379                     
380                     p->dirty = 1;
381                 }
382             }
383             else if (d == 0)
384             {
385                 lookahead_next = lookahead_item;
386                 if (!(*stream->read_item)(stream->clientData,
387                                           &lookahead_next, &lookahead_mode))
388                 {
389                     lookahead_item = 0;
390                     more = 0;
391                 }
392                 if (src == endp)
393                     break;
394                 file_item = file_item_buf;
395                 (*b->method->code_item)(ISAMC_DECODE, c1, &file_item, &src);
396             }
397             else
398             {
399                 if (src == endp)
400                     break;
401                 file_item = file_item_buf;
402                 (*b->method->code_item)(ISAMC_DECODE, c1, &file_item, &src);
403             }
404         }
405     }
406     maxp = dst_buf + b->file[b->no_cat-1].head.block_max + quater;
407     while (lookahead_item)
408     {
409         int lookahead_mode;
410         char *dst_item = lookahead_item;
411         char *dst_0 = dst;
412         
413         if (max_item &&
414             (*b->method->compare_item)(max_item, lookahead_item) <= 0)
415         {
416             yaz_log (LOG_LOG, "max_item 2");
417             break;
418         }
419         if (!half1 && dst > cut)   
420         {
421             char *dst_item_0 = dst_item;
422             half1 = dst; /* candidate for splitting */
423             
424             (*b->method->code_item)(ISAMC_ENCODE, c2, &dst, &dst_item);
425             
426             cut_item_size = dst_item - dst_item_0;
427             memcpy (cut_item_buf, dst_item_0, cut_item_size);
428             
429             half2 = dst;
430         }
431         else
432             (*b->method->code_item)(ISAMC_ENCODE, c2, &dst, &dst_item);
433
434         if (dst > maxp)
435         {
436             dst = dst_0;
437             break;
438         }
439         if (p)
440             p->dirty = 1;
441         dst_item = lookahead_item;
442         if (!(*stream->read_item)(stream->clientData, &dst_item,
443                                   &lookahead_mode))
444         {
445             lookahead_item = 0;
446             more = 0;
447         }
448     }
449     new_size = dst - dst_buf;
450     if (p && p->cat != b->no_cat-1 && 
451         new_size > b->file[p->cat].head.block_max)
452     {
453         /* non-btree block will be removed */
454         close_block (b, p);
455         /* delete it too!! */
456         p = 0; /* make a new one anyway */
457     }
458     if (!p)
459     {   /* must create a new one */
460         int i;
461         for (i = 0; i < b->no_cat; i++)
462             if (new_size <= b->file[i].head.block_max)
463                 break;
464         if (i == b->no_cat)
465             i = b->no_cat - 1;
466         p = new_leaf (b, i);
467     }
468     if (new_size > b->file[p->cat].head.block_max)
469     {
470         char *first_dst;
471         char *cut_item = cut_item_buf;
472
473         assert (half1);
474         assert (half2);
475
476        /* first half */
477         p->size = half1 - dst_buf;
478         memcpy (p->bytes, dst_buf, half1 - dst_buf);
479
480         /* second half */
481         *sp2 = new_leaf (b, p->cat);
482
483         (*b->method->code_reset)(c2);
484
485         first_dst = (*sp2)->bytes;
486
487         (*b->method->code_item)(ISAMC_ENCODE, c2, &first_dst, &cut_item);
488
489         memcpy (first_dst, half2, dst - half2);
490
491         (*sp2)->size = (first_dst - (*sp2)->bytes) + (dst - half2);
492         (*sp2)->dirty = 1;
493         p->dirty = 1;
494         memcpy (sub_item, cut_item_buf, cut_item_size);
495         *sub_size = cut_item_size;
496     }
497     else
498     {
499         memcpy (p->bytes, dst_buf, dst - dst_buf);
500         p->size = new_size;
501     }
502     (*b->method->code_stop)(ISAMC_DECODE, c1);
503     (*b->method->code_stop)(ISAMC_ENCODE, c2);
504     *sp1 = p;
505     return more;
506 }
507
508 int insert_sub (ISAMB b, struct ISAMB_block **p, void *new_item,
509                 ISAMC_I stream,
510                 struct ISAMB_block **sp,
511                 void *sub_item, int *sub_size,
512                 void *max_item)
513 {
514     if (!*p || (*p)->leaf)
515         return insert_leaf (b, p, new_item, stream, sp, sub_item, sub_size, 
516                             max_item);
517     else
518         return insert_int (b, *p, new_item, stream, sp, sub_item, sub_size);
519 }
520
521 int isamb_merge (ISAMB b, ISAMC_P pos, ISAMC_I stream)
522 {
523     char item_buf[DST_ITEM_MAX];
524     char *item_ptr;
525     int i_mode;
526     int more;
527
528     item_ptr = item_buf;
529     more = (*stream->read_item)(stream->clientData, &item_ptr, &i_mode);
530     while (more)
531     {
532         struct ISAMB_block *p = 0, *sp = 0;
533         char sub_item[DST_ITEM_MAX];
534         int sub_size;
535         
536         if (pos)
537             p = open_block (b, pos);
538         more = insert_sub (b, &p, item_buf, stream, &sp,
539                             sub_item, &sub_size, 0);
540         if (sp)
541         {    /* increase level of tree by one */
542             struct ISAMB_block *p2 = new_int (b, p->cat);
543             char *dst = p2->bytes + p2->size;
544             
545             encode_ptr (&dst, p->pos);
546             assert (sub_size < 20);
547             encode_ptr (&dst, sub_size);
548             memcpy (dst, sub_item, sub_size);
549             dst += sub_size;
550             encode_ptr (&dst, sp->pos);
551             
552             p2->size = dst - p2->bytes;
553             pos = p2->pos;  /* return new super page */
554             close_block (b, sp);
555             close_block (b, p2);
556         }
557         else
558             pos = p->pos;   /* return current one (again) */
559         close_block (b, p);
560     }
561     return pos;
562 }
563
564 ISAMB_PP isamb_pp_open_x (ISAMB isamb, ISAMB_P pos, int *level)
565 {
566     ISAMB_PP pp = xmalloc (sizeof(*pp));
567
568     pp->isamb = isamb;
569     pp->block = xmalloc (10 * sizeof(*pp->block));
570
571     pp->pos = pos;
572     pp->level = 0;
573     pp->total_size = 0;
574     pp->no_blocks = 0;
575     while (1)
576     {
577         struct ISAMB_block *p = open_block (isamb, pos);
578         char *src = p->bytes + p->offset;
579         pp->block[pp->level] = p;
580
581         pp->total_size += p->size;
582         pp->no_blocks++;
583
584         if (p->leaf)
585             break;
586
587         decode_ptr (&src, &pos);
588         p->offset = src - p->bytes;
589         pp->level++;
590     }
591     pp->block[pp->level+1] = 0;
592     if (level)
593         *level = pp->level;
594     return pp;
595 }
596
597 ISAMB_PP isamb_pp_open (ISAMB isamb, ISAMB_P pos)
598 {
599     return isamb_pp_open_x (isamb, pos, 0);
600 }
601
602 void isamb_pp_close_x (ISAMB_PP pp, int *size, int *blocks)
603 {
604     int i;
605     if (!pp)
606         return;
607     if (size)
608         *size = pp->total_size;
609     if (blocks)
610         *blocks = pp->no_blocks;
611     for (i = 0; i <= pp->level; i++)
612         close_block (pp->isamb, pp->block[i]);
613     xfree (pp->block);
614     xfree (pp);
615 }
616
617 int isamb_block_info (ISAMB isamb, int cat)
618 {
619     if (cat >= 0 && cat < isamb->no_cat)
620         return isamb->file[cat].head.block_size;
621     return -1;
622 }
623
624 void isamb_pp_close (ISAMB_PP pp)
625 {
626     return isamb_pp_close_x (pp, 0, 0);
627 }
628
629 int isamb_pp_read (ISAMB_PP pp, void *buf)
630 {
631     char *dst = buf;
632     char *src;
633     struct ISAMB_block *p = pp->block[pp->level];
634     if (!p)
635         return 0;
636
637     while (p->offset == p->size)
638     {
639         int pos, item_len;
640         while (p->offset == p->size)
641         {
642             if (pp->level == 0)
643                 return 0;
644             close_block (pp->isamb, pp->block[pp->level]);
645             pp->block[pp->level] = 0;
646             (pp->level)--;
647             p = pp->block[pp->level];
648             assert (!p->leaf);  /* must be int */
649         }
650         src = p->bytes + p->offset;
651         
652         decode_ptr (&src, &item_len);
653         src += item_len;
654         decode_ptr (&src, &pos);
655         
656         p->offset = src - (char*) p->bytes;
657
658         ++(pp->level);
659         
660         while (1)
661         {
662             pp->block[pp->level] = p = open_block (pp->isamb, pos);
663
664             pp->total_size += p->size;
665             pp->no_blocks++;
666             
667             if (p->leaf) /* leaf */
668             {
669                 break;
670             }
671             src = p->bytes + p->offset;
672             decode_ptr (&src, &pos);
673             p->offset = src - (char*) p->bytes;
674             pp->level++;
675         }
676     }
677     assert (p->offset < p->size);
678     assert (p->leaf);
679     src = p->bytes + p->offset;
680     (*pp->isamb->method->code_item)(ISAMC_DECODE, p->decodeClientData,
681                                     &dst, &src);
682     p->offset = src - (char*) p->bytes;
683     return 1;
684 }
685
686 int isamb_pp_num (ISAMB_PP pp)
687 {
688     return 1;
689 }