More isamb statistics
[idzebra-moved-to-github.git] / isamb / isamb.c
1 /*
2  *  Copyright (c) 2000-2002, Index Data.
3  *  See the file LICENSE for details.
4  *
5  *  $Id: isamb.c,v 1.11 2002-04-29 18:03:46 adam Exp $
6  */
7 #include <yaz/xmalloc.h>
8 #include <yaz/log.h>
9 #include <isamb.h>
10 #include <assert.h>
11
12 struct ISAMB_head {
13     int first_block;
14     int last_block;
15     int block_size;
16     int block_max;
17 };
18
19 #define ISAMB_DATA_OFFSET 3
20
21 #define DST_ITEM_MAX 256
22
23 /* approx 2*4 K + max size of item */
24 #define DST_BUF_SIZE 8448
25
26
27 struct ISAMB_file {
28     BFile bf;
29     int head_dirty;
30     struct ISAMB_head head;
31 };
32
33 struct ISAMB_s {
34     BFiles bfs;
35     ISAMC_M method;
36
37     struct ISAMB_file *file;
38     int no_cat;
39 };
40
41 struct ISAMB_block {
42     int pos;
43     int cat;
44     int size;
45     int leaf;
46     int dirty;
47     int offset;
48     char *bytes;
49     unsigned char *buf;
50     void *decodeClientData;
51 };
52
53 struct ISAMB_PP_s {
54     ISAMB isamb;
55     int level;
56     int total_size;
57     int no_blocks;
58     struct ISAMB_block **block;
59 };
60
61 void encode_ptr (char **dst, int pos)
62 {
63     memcpy (*dst, &pos, sizeof(pos));
64     (*dst) += sizeof(pos);
65 }
66
67 void decode_ptr (char **src, int *pos)
68 {
69     memcpy (pos, *src, sizeof(*pos));
70     (*src) += sizeof(*pos);
71 }
72
73 ISAMB isamb_open (BFiles bfs, const char *name, int writeflag, ISAMC_M method)
74 {
75     ISAMB isamb = xmalloc (sizeof(*isamb));
76     int i, b_size = 64;
77
78     isamb->bfs = bfs;
79     isamb->method = (ISAMC_M) xmalloc (sizeof(*method));
80     memcpy (isamb->method, method, sizeof(*method));
81     isamb->no_cat = 4;
82
83     isamb->file = xmalloc (sizeof(*isamb->file) * isamb->no_cat);
84     for (i = 0; i<isamb->no_cat; i++)
85     {
86         char fname[DST_BUF_SIZE];
87         isamb->file[i].head.first_block = 1;
88         isamb->file[i].head.last_block = 1;
89         isamb->file[i].head.block_size = b_size;
90         isamb->file[i].head.block_max = b_size - ISAMB_DATA_OFFSET;
91         b_size = b_size * 4;
92         isamb->file[i].head_dirty = 0;
93         sprintf (fname, "%s-%d", name, i);
94         isamb->file[i].bf =
95             bf_open (bfs, fname, isamb->file[i].head.block_size, writeflag);
96     
97         bf_read (isamb->file[i].bf, 0, 0, sizeof(struct ISAMB_head),
98                  &isamb->file[i].head);
99     }
100     return isamb;
101 }
102
103 void isamb_close (ISAMB isamb)
104 {
105     int i;
106     for (i = 0; i<isamb->no_cat; i++)
107     {
108         if (isamb->file[i].head_dirty)
109             bf_write (isamb->file[i].bf, 0, 0,
110                       sizeof(struct ISAMB_head), &isamb->file[i].head);
111     }
112     xfree (isamb->file);
113     xfree (isamb->method);
114     xfree (isamb);
115 }
116
117 struct ISAMB_block *open_block (ISAMB b, ISAMC_P pos)
118 {
119     int cat = pos&3;
120     struct ISAMB_block *p;
121     if (!pos)
122         return 0;
123     p = xmalloc (sizeof(*p));
124     p->pos = pos;
125     p->cat = pos & 3;
126     p->buf = xmalloc (b->file[cat].head.block_size);
127     bf_read (b->file[cat].bf, pos/4, 0, 0, p->buf);
128     p->bytes = p->buf + ISAMB_DATA_OFFSET;
129     p->leaf = p->buf[0];
130     p->size = p->buf[1] + 256 * p->buf[2] - ISAMB_DATA_OFFSET;
131     p->offset = 0;
132     p->dirty = 0;
133     p->decodeClientData = (*b->method->code_start)(ISAMC_DECODE);
134     return p;
135 }
136
137 struct ISAMB_block *new_block (ISAMB b, int leaf, int cat)
138 {
139     struct ISAMB_block *p;
140     int block_no;
141     
142     p = xmalloc (sizeof(*p));
143     block_no = b->file[cat].head.last_block++;
144     p->cat = cat;
145     p->pos = block_no * 4 + cat;
146     p->cat = cat;
147     b->file[cat].head_dirty = 1;
148     p->buf = xmalloc (b->file[cat].head.block_size);
149     memset (p->buf, 0, b->file[cat].head.block_size);
150     p->bytes = p->buf + ISAMB_DATA_OFFSET;
151     p->leaf = leaf;
152     p->size = 0;
153     p->dirty = 1;
154     p->offset = 0;
155     p->decodeClientData = (*b->method->code_start)(ISAMC_DECODE);
156     return p;
157 }
158
159 struct ISAMB_block *new_leaf (ISAMB b, int cat)
160 {
161     return new_block (b, 1, cat);
162 }
163
164
165 struct ISAMB_block *new_int (ISAMB b, int cat)
166 {
167     return new_block (b, 0, cat);
168 }
169
170 void close_block (ISAMB b, struct ISAMB_block *p)
171 {
172     if (!p)
173         return;
174     if (p->dirty)
175     {
176         int size = p->size + ISAMB_DATA_OFFSET;
177         p->buf[0] = p->leaf;
178         p->buf[1] = size & 255;
179         p->buf[2] = size >> 8;
180         bf_write (b->file[p->cat].bf, p->pos/4, 0, 0, p->buf);
181     }
182     (*b->method->code_stop)(ISAMC_DECODE, p->decodeClientData);
183     xfree (p->buf);
184     xfree (p);
185 }
186
187 int insert_sub (ISAMB b, struct ISAMB_block **p,
188                 void *new_item,
189                 ISAMC_I stream,
190                 struct ISAMB_block **sp,
191                 void *sub_item, int *sub_size,
192                 void *max_item);
193
194 int insert_int (ISAMB b, struct ISAMB_block *p, void *lookahead_item,
195                 ISAMC_I stream, struct ISAMB_block **sp,
196                 void *split_item, int *split_size)
197 {
198     char *startp = p->bytes;
199     char *src = startp;
200     char *endp = p->bytes + p->size;
201     int pos;
202     struct ISAMB_block *sub_p1 = 0, *sub_p2 = 0;
203     char sub_item[DST_ITEM_MAX];
204     int sub_size;
205     int more;
206
207     *sp = 0;
208
209     decode_ptr (&src, &pos);
210     while (src != endp)
211     {
212         int item_len;
213         int d;
214         decode_ptr (&src, &item_len);
215         d = (*b->method->compare_item)(src, lookahead_item);
216         if (d > 0)
217         {
218             sub_p1 = open_block (b, pos);
219             assert (sub_p1);
220             more = insert_sub (b, &sub_p1, lookahead_item, stream, &sub_p2, 
221                                sub_item, &sub_size, src);
222             break;
223         }
224         src += item_len;
225         decode_ptr (&src, &pos);
226     }
227     if (!sub_p1)
228     {
229         sub_p1 = open_block (b, pos);
230         assert (sub_p1);
231         more = insert_sub (b, &sub_p1, lookahead_item, stream, &sub_p2, 
232                            sub_item, &sub_size, 0);
233     }
234     if (sub_p2)
235     {
236         /* there was a split - must insert pointer in this one */
237         char dst_buf[DST_BUF_SIZE];
238         char *dst = dst_buf;
239
240         assert (sub_size < 30 && sub_size > 1);
241
242         memcpy (dst, startp, src - startp);
243                 
244         dst += src - startp;
245
246         encode_ptr (&dst, sub_size);      /* sub length and item */
247         memcpy (dst, sub_item, sub_size);
248         dst += sub_size;
249
250         encode_ptr (&dst, sub_p2->pos);   /* pos */
251
252         if (endp - src)                   /* remaining data */
253         {
254             memcpy (dst, src, endp - src);
255             dst += endp - src;
256         }
257         p->size = dst - dst_buf;
258         if (p->size <= b->file[p->cat].head.block_max)
259         {
260             memcpy (startp, dst_buf, dst - dst_buf);
261         }
262         else
263         {
264             int p_new_size;
265             char *half;
266             src = dst_buf;
267             endp = dst;
268
269             half = src + b->file[p->cat].head.block_size/2;
270             decode_ptr (&src, &pos);
271             while (src <= half)
272             {
273                 decode_ptr (&src, split_size);
274                 src += *split_size;
275                 decode_ptr (&src, &pos);
276             }
277             p_new_size = src - dst_buf;
278             memcpy (p->bytes, dst_buf, p_new_size);
279
280             decode_ptr (&src, split_size);
281             memcpy (split_item, src, *split_size);
282             src += *split_size;
283
284             *sp = new_int (b, p->cat);
285             (*sp)->size = endp - src;
286             memcpy ((*sp)->bytes, src, (*sp)->size);
287
288             p->size = p_new_size;
289         }
290         p->dirty = 1;
291         close_block (b, sub_p2);
292     }
293     close_block (b, sub_p1);
294     return more;
295 }
296
297
298 int insert_leaf (ISAMB b, struct ISAMB_block **sp1, void *lookahead_item,
299                  ISAMC_I stream, struct ISAMB_block **sp2,
300                  void *sub_item, int *sub_size,
301                  void *max_item)
302 {
303     struct ISAMB_block *p = *sp1;
304     char *src = 0, *endp = 0;
305     char dst_buf[DST_BUF_SIZE], *dst = dst_buf;
306     int new_size;
307     void *c1 = (*b->method->code_start)(ISAMC_DECODE);
308     void *c2 = (*b->method->code_start)(ISAMC_ENCODE);
309     int more = 1;
310     int quater = b->file[b->no_cat-1].head.block_max / 4;
311     char *cut = dst_buf + quater * 2;
312     char *maxp = dst_buf + b->file[b->no_cat-1].head.block_max;
313     char *half1 = 0;
314     char *half2 = 0;
315     char cut_item_buf[DST_ITEM_MAX];
316     int cut_item_size = 0;
317
318     if (p && p->size)
319     {
320         char file_item_buf[DST_ITEM_MAX];
321         char *file_item = file_item_buf;
322             
323         src = p->bytes;
324         endp = p->bytes + p->size;
325         (*b->method->code_item)(ISAMC_DECODE, c1, &file_item, &src);
326         while (1)
327         {
328             char *dst_item = 0;
329             char *dst_0 = dst;
330             char *lookahead_next;
331             int lookahead_mode;
332             int d = -1;
333             
334             if (lookahead_item)
335                 d = (*b->method->compare_item)(file_item_buf, lookahead_item);
336             
337             if (d > 0)  
338                 dst_item = lookahead_item;
339             else
340                 dst_item = file_item_buf;
341             if (!half1 && dst > cut)   
342             {
343                 char *dst_item_0 = dst_item;
344                 half1 = dst; /* candidate for splitting */
345                 
346                 (*b->method->code_item)(ISAMC_ENCODE, c2, &dst, &dst_item);
347                 
348                 cut_item_size = dst_item - dst_item_0;
349                 memcpy (cut_item_buf, dst_item_0, cut_item_size);
350                 
351                 half2 = dst;
352             }
353             else
354                 (*b->method->code_item)(ISAMC_ENCODE, c2, &dst, &dst_item);
355             if (d > 0)  
356             {
357                 if (dst > maxp)
358                 {
359                     dst = dst_0;
360                     lookahead_item = 0;
361                 }
362                 else
363                 {
364                     lookahead_next = lookahead_item;
365                     if (!(*stream->read_item)(stream->clientData,
366                                               &lookahead_next,
367                                               &lookahead_mode))
368                     {
369                         lookahead_item = 0;
370                         more = 0;
371                     }
372                     if (max_item &&
373                         (*b->method->compare_item)(max_item, lookahead_item) <= 0)
374                     {
375                         yaz_log (LOG_LOG, "max_item 1");
376                         lookahead_item = 0;
377                     }
378                     
379                     p->dirty = 1;
380                 }
381             }
382             else if (d == 0)
383             {
384                 lookahead_next = lookahead_item;
385                 if (!(*stream->read_item)(stream->clientData,
386                                           &lookahead_next, &lookahead_mode))
387                 {
388                     lookahead_item = 0;
389                     more = 0;
390                 }
391                 if (src == endp)
392                     break;
393                 file_item = file_item_buf;
394                 (*b->method->code_item)(ISAMC_DECODE, c1, &file_item, &src);
395             }
396             else
397             {
398                 if (src == endp)
399                     break;
400                 file_item = file_item_buf;
401                 (*b->method->code_item)(ISAMC_DECODE, c1, &file_item, &src);
402             }
403         }
404     }
405     maxp = dst_buf + b->file[b->no_cat-1].head.block_max + quater;
406     while (lookahead_item)
407     {
408         int lookahead_mode;
409         char *dst_item = lookahead_item;
410         char *dst_0 = dst;
411         
412         if (max_item &&
413             (*b->method->compare_item)(max_item, lookahead_item) <= 0)
414         {
415             yaz_log (LOG_LOG, "max_item 2");
416             break;
417         }
418         if (!half1 && dst > cut)   
419         {
420             char *dst_item_0 = dst_item;
421             half1 = dst; /* candidate for splitting */
422             
423             (*b->method->code_item)(ISAMC_ENCODE, c2, &dst, &dst_item);
424             
425             cut_item_size = dst_item - dst_item_0;
426             memcpy (cut_item_buf, dst_item_0, cut_item_size);
427             
428             half2 = dst;
429         }
430         else
431             (*b->method->code_item)(ISAMC_ENCODE, c2, &dst, &dst_item);
432
433         if (dst > maxp)
434         {
435             dst = dst_0;
436             break;
437         }
438         if (p)
439             p->dirty = 1;
440         dst_item = lookahead_item;
441         if (!(*stream->read_item)(stream->clientData, &dst_item,
442                                   &lookahead_mode))
443         {
444             lookahead_item = 0;
445             more = 0;
446         }
447     }
448     new_size = dst - dst_buf;
449     if (p && p->cat != b->no_cat-1 && 
450         new_size > b->file[p->cat].head.block_max)
451     {
452         /* non-btree block will be removed */
453         close_block (b, p);
454         /* delete it too!! */
455         p = 0; /* make a new one anyway */
456     }
457     if (!p)
458     {   /* must create a new one */
459         int i;
460         for (i = 0; i < b->no_cat; i++)
461             if (new_size <= b->file[i].head.block_max)
462                 break;
463         if (i == b->no_cat)
464             i = b->no_cat - 1;
465         p = new_leaf (b, i);
466     }
467     if (new_size > b->file[p->cat].head.block_max)
468     {
469         char *first_dst;
470         char *cut_item = cut_item_buf;
471
472         assert (half1);
473         assert (half2);
474
475        /* first half */
476         p->size = half1 - dst_buf;
477         memcpy (p->bytes, dst_buf, half1 - dst_buf);
478
479         /* second half */
480         *sp2 = new_leaf (b, p->cat);
481
482         (*b->method->code_reset)(c2);
483
484         first_dst = (*sp2)->bytes;
485
486         (*b->method->code_item)(ISAMC_ENCODE, c2, &first_dst, &cut_item);
487
488         memcpy (first_dst, half2, dst - half2);
489
490         (*sp2)->size = (first_dst - (*sp2)->bytes) + (dst - half2);
491         (*sp2)->dirty = 1;
492         p->dirty = 1;
493         memcpy (sub_item, cut_item_buf, cut_item_size);
494         *sub_size = cut_item_size;
495     }
496     else
497     {
498         memcpy (p->bytes, dst_buf, dst - dst_buf);
499         p->size = new_size;
500     }
501     (*b->method->code_stop)(ISAMC_DECODE, c1);
502     (*b->method->code_stop)(ISAMC_ENCODE, c2);
503     *sp1 = p;
504     return more;
505 }
506
507 int insert_sub (ISAMB b, struct ISAMB_block **p, void *new_item,
508                 ISAMC_I stream,
509                 struct ISAMB_block **sp,
510                 void *sub_item, int *sub_size,
511                 void *max_item)
512 {
513     if (!*p || (*p)->leaf)
514         return insert_leaf (b, p, new_item, stream, sp, sub_item, sub_size, 
515                             max_item);
516     else
517         return insert_int (b, *p, new_item, stream, sp, sub_item, sub_size);
518 }
519
520 int isamb_merge (ISAMB b, ISAMC_P pos, ISAMC_I stream)
521 {
522     char item_buf[DST_ITEM_MAX];
523     char *item_ptr;
524     int i_mode;
525     int more;
526
527     item_ptr = item_buf;
528     more = (*stream->read_item)(stream->clientData, &item_ptr, &i_mode);
529     while (more)
530     {
531         struct ISAMB_block *p = 0, *sp = 0;
532         char sub_item[DST_ITEM_MAX];
533         int sub_size;
534         
535         if (pos)
536             p = open_block (b, pos);
537         more = insert_sub (b, &p, item_buf, stream, &sp,
538                             sub_item, &sub_size, 0);
539         if (sp)
540         {    /* increase level of tree by one */
541             struct ISAMB_block *p2 = new_int (b, p->cat);
542             char *dst = p2->bytes + p2->size;
543             
544             encode_ptr (&dst, p->pos);
545             assert (sub_size < 20);
546             encode_ptr (&dst, sub_size);
547             memcpy (dst, sub_item, sub_size);
548             dst += sub_size;
549             encode_ptr (&dst, sp->pos);
550             
551             p2->size = dst - p2->bytes;
552             pos = p2->pos;  /* return new super page */
553             close_block (b, sp);
554             close_block (b, p2);
555         }
556         else
557             pos = p->pos;   /* return current one (again) */
558         close_block (b, p);
559     }
560     return pos;
561 }
562
563 ISAMB_PP isamb_pp_open_x (ISAMB isamb, ISAMB_P pos, int *level)
564 {
565     ISAMB_PP pp = xmalloc (sizeof(*pp));
566
567     pp->isamb = isamb;
568     pp->block = xmalloc (10 * sizeof(*pp->block));
569
570     pp->level = 0;
571     pp->total_size = 0;
572     pp->no_blocks = 0;
573     while (1)
574     {
575         struct ISAMB_block *p = open_block (isamb, pos);
576         char *src = p->bytes + p->offset;
577         pp->block[pp->level] = p;
578
579         pp->total_size += p->size;
580         pp->no_blocks++;
581
582         if (p->bytes[0]) /* leaf */
583             break;
584
585         decode_ptr (&src, &pos);
586         p->offset = src - p->bytes;
587         pp->level++;
588     }
589     pp->block[pp->level+1] = 0;
590     if (level)
591         *level = pp->level;
592     return pp;
593 }
594
595 ISAMB_PP isamb_pp_open (ISAMB isamb, ISAMB_P pos)
596 {
597     return isamb_pp_open_x (isamb, pos, 0);
598 }
599
600 void isamb_pp_close_x (ISAMB_PP pp, int *size, int *blocks)
601 {
602     int i;
603     if (!pp)
604         return;
605     if (size)
606         *size = pp->total_size;
607     if (blocks)
608         *blocks = pp->no_blocks;
609     for (i = 0; i <= pp->level; i++)
610         close_block (pp->isamb, pp->block[i]);
611     xfree (pp->block);
612     xfree (pp);
613 }
614
615 int isamb_block_info (ISAMB isamb, int cat)
616 {
617     if (cat >= 0 && cat < isamb->no_cat)
618         return isamb->file[cat].head.block_size;
619     return -1;
620 }
621
622 void isamb_pp_close (ISAMB_PP pp)
623 {
624     return isamb_pp_close_x (pp, 0, 0);
625 }
626
627 int isamb_pp_read (ISAMB_PP pp, void *buf)
628 {
629     char *dst = buf;
630     char *src;
631     struct ISAMB_block *p = pp->block[pp->level];
632     if (!p)
633         return 0;
634
635     while (p->offset == p->size)
636     {
637         int pos, item_len;
638         while (p->offset == p->size)
639         {
640             if (pp->level == 0)
641                 return 0;
642             close_block (pp->isamb, pp->block[pp->level]);
643             pp->block[pp->level] = 0;
644             (pp->level)--;
645             p = pp->block[pp->level];
646             assert (p->bytes[0] == 0);  /* must be int */
647         }
648         src = p->bytes + p->offset;
649         
650         decode_ptr (&src, &item_len);
651         src += item_len;
652         decode_ptr (&src, &pos);
653         
654         p->offset = src - (char*) p->bytes;
655
656         ++(pp->level);
657         
658         while (1)
659         {
660             pp->block[pp->level] = p = open_block (pp->isamb, pos);
661
662             pp->total_size += p->size;
663             pp->no_blocks++;
664             
665             if (p->leaf) /* leaf */
666             {
667                 break;
668             }
669             src = p->bytes + p->offset;
670             decode_ptr (&src, &pos);
671             p->offset = src - (char*) p->bytes;
672             pp->level++;
673         }
674     }
675     assert (p->offset < p->size);
676     assert (p->bytes[0]);
677     src = p->bytes + p->offset;
678     (*pp->isamb->method->code_item)(ISAMC_DECODE, p->decodeClientData,
679                                     &dst, &src);
680     p->offset = src - (char*) p->bytes;
681     return 1;
682 }
683
684 int isamb_pp_num (ISAMB_PP pp)
685 {
686     return 1;
687 }