isamb delete; more statistics
[idzebra-moved-to-github.git] / isamb / isamb.c
1 /*
2  *  Copyright (c) 2000-2002, Index Data.
3  *  See the file LICENSE for details.
4  *
5  *  $Id: isamb.c,v 1.13 2002-04-30 19:31:09 adam Exp $
6  */
7 #include <yaz/xmalloc.h>
8 #include <yaz/log.h>
9 #include <isamb.h>
10 #include <assert.h>
11
12 struct ISAMB_head {
13     int first_block;
14     int last_block;
15     int block_size;
16     int block_max;
17 };
18
19 #define ISAMB_DATA_OFFSET 3
20
21 #define DST_ITEM_MAX 256
22
23 /* approx 2*4 K + max size of item */
24 #define DST_BUF_SIZE 8448
25
26
27 struct ISAMB_file {
28     BFile bf;
29     int head_dirty;
30     struct ISAMB_head head;
31 };
32
33 struct ISAMB_s {
34     BFiles bfs;
35     ISAMC_M method;
36
37     struct ISAMB_file *file;
38     int no_cat;
39 };
40
41 struct ISAMB_block {
42     ISAMB_P pos;
43     int cat;
44     int size;
45     int leaf;
46     int dirty;
47     int offset;
48     char *bytes;
49     unsigned char *buf;
50     void *decodeClientData;
51 };
52
53 struct ISAMB_PP_s {
54     ISAMB isamb;
55     ISAMB_P pos;
56     int level;
57     int total_size;
58     int no_blocks;
59     struct ISAMB_block **block;
60 };
61
62 void encode_ptr (char **dst, int pos)
63 {
64     memcpy (*dst, &pos, sizeof(pos));
65     (*dst) += sizeof(pos);
66 }
67
68 void decode_ptr (char **src, int *pos)
69 {
70     memcpy (pos, *src, sizeof(*pos));
71     (*src) += sizeof(*pos);
72 }
73
74 ISAMB isamb_open (BFiles bfs, const char *name, int writeflag, ISAMC_M method)
75 {
76     ISAMB isamb = xmalloc (sizeof(*isamb));
77     int i, b_size = 64;
78
79     isamb->bfs = bfs;
80     isamb->method = (ISAMC_M) xmalloc (sizeof(*method));
81     memcpy (isamb->method, method, sizeof(*method));
82     isamb->no_cat = 3;
83
84     isamb->file = xmalloc (sizeof(*isamb->file) * isamb->no_cat);
85     for (i = 0; i<isamb->no_cat; i++)
86     {
87         char fname[DST_BUF_SIZE];
88         isamb->file[i].head.first_block = 1;
89         isamb->file[i].head.last_block = 1;
90         isamb->file[i].head.block_size = b_size;
91         isamb->file[i].head.block_max = b_size - ISAMB_DATA_OFFSET;
92         b_size = b_size * 4;
93         isamb->file[i].head_dirty = 0;
94         sprintf (fname, "%s%c", name, i+'A');
95         isamb->file[i].bf =
96             bf_open (bfs, fname, isamb->file[i].head.block_size, writeflag);
97     
98         bf_read (isamb->file[i].bf, 0, 0, sizeof(struct ISAMB_head),
99                  &isamb->file[i].head);
100     }
101     return isamb;
102 }
103
104 void isamb_close (ISAMB isamb)
105 {
106     int i;
107     for (i = 0; i<isamb->no_cat; i++)
108     {
109         if (isamb->file[i].head_dirty)
110             bf_write (isamb->file[i].bf, 0, 0,
111                       sizeof(struct ISAMB_head), &isamb->file[i].head);
112     }
113     xfree (isamb->file);
114     xfree (isamb->method);
115     xfree (isamb);
116 }
117
118 struct ISAMB_block *open_block (ISAMB b, ISAMC_P pos)
119 {
120     int cat = pos&3;
121     struct ISAMB_block *p;
122     if (!pos)
123         return 0;
124     p = xmalloc (sizeof(*p));
125     p->pos = pos;
126     p->cat = pos & 3;
127     p->buf = xmalloc (b->file[cat].head.block_size);
128     bf_read (b->file[cat].bf, pos/4, 0, 0, p->buf);
129     p->bytes = p->buf + ISAMB_DATA_OFFSET;
130     p->leaf = p->buf[0];
131     p->size = p->buf[1] + 256 * p->buf[2] - ISAMB_DATA_OFFSET;
132     p->offset = 0;
133     p->dirty = 0;
134     p->decodeClientData = (*b->method->code_start)(ISAMC_DECODE);
135     return p;
136 }
137
138 struct ISAMB_block *new_block (ISAMB b, int leaf, int cat)
139 {
140     struct ISAMB_block *p;
141     int block_no;
142     
143     p = xmalloc (sizeof(*p));
144     block_no = b->file[cat].head.last_block++;
145     p->cat = cat;
146     p->pos = block_no * 4 + cat;
147     p->cat = cat;
148     b->file[cat].head_dirty = 1;
149     p->buf = xmalloc (b->file[cat].head.block_size);
150     memset (p->buf, 0, b->file[cat].head.block_size);
151     p->bytes = p->buf + ISAMB_DATA_OFFSET;
152     p->leaf = leaf;
153     p->size = 0;
154     p->dirty = 1;
155     p->offset = 0;
156     p->decodeClientData = (*b->method->code_start)(ISAMC_DECODE);
157     return p;
158 }
159
160 struct ISAMB_block *new_leaf (ISAMB b, int cat)
161 {
162     return new_block (b, 1, cat);
163 }
164
165
166 struct ISAMB_block *new_int (ISAMB b, int cat)
167 {
168     return new_block (b, 0, cat);
169 }
170
171 void close_block (ISAMB b, struct ISAMB_block *p)
172 {
173     if (!p)
174         return;
175     if (p->dirty)
176     {
177         int size = p->size + ISAMB_DATA_OFFSET;
178         p->buf[0] = p->leaf;
179         p->buf[1] = size & 255;
180         p->buf[2] = size >> 8;
181         bf_write (b->file[p->cat].bf, p->pos/4, 0, 0, p->buf);
182     }
183     (*b->method->code_stop)(ISAMC_DECODE, p->decodeClientData);
184     xfree (p->buf);
185     xfree (p);
186 }
187
188 int insert_sub (ISAMB b, struct ISAMB_block **p,
189                 void *new_item, int *mode,
190                 ISAMC_I stream,
191                 struct ISAMB_block **sp,
192                 void *sub_item, int *sub_size,
193                 void *max_item);
194
195 int insert_int (ISAMB b, struct ISAMB_block *p, void *lookahead_item,
196                 int *mode,
197                 ISAMC_I stream, struct ISAMB_block **sp,
198                 void *split_item, int *split_size)
199 {
200     char *startp = p->bytes;
201     char *src = startp;
202     char *endp = p->bytes + p->size;
203     int pos;
204     struct ISAMB_block *sub_p1 = 0, *sub_p2 = 0;
205     char sub_item[DST_ITEM_MAX];
206     int sub_size;
207     int more;
208
209     *sp = 0;
210
211     decode_ptr (&src, &pos);
212     while (src != endp)
213     {
214         int item_len;
215         int d;
216         decode_ptr (&src, &item_len);
217         d = (*b->method->compare_item)(src, lookahead_item);
218         if (d > 0)
219         {
220             sub_p1 = open_block (b, pos);
221             assert (sub_p1);
222             more = insert_sub (b, &sub_p1, lookahead_item, mode,
223                                stream, &sub_p2, 
224                                sub_item, &sub_size, src);
225             break;
226         }
227         src += item_len;
228         decode_ptr (&src, &pos);
229     }
230     if (!sub_p1)
231     {
232         sub_p1 = open_block (b, pos);
233         assert (sub_p1);
234         more = insert_sub (b, &sub_p1, lookahead_item, mode, stream, &sub_p2, 
235                            sub_item, &sub_size, 0);
236     }
237     if (sub_p2)
238     {
239         /* there was a split - must insert pointer in this one */
240         char dst_buf[DST_BUF_SIZE];
241         char *dst = dst_buf;
242
243         assert (sub_size < 30 && sub_size > 1);
244
245         memcpy (dst, startp, src - startp);
246                 
247         dst += src - startp;
248
249         encode_ptr (&dst, sub_size);      /* sub length and item */
250         memcpy (dst, sub_item, sub_size);
251         dst += sub_size;
252
253         encode_ptr (&dst, sub_p2->pos);   /* pos */
254
255         if (endp - src)                   /* remaining data */
256         {
257             memcpy (dst, src, endp - src);
258             dst += endp - src;
259         }
260         p->size = dst - dst_buf;
261         if (p->size <= b->file[p->cat].head.block_max)
262         {
263             memcpy (startp, dst_buf, dst - dst_buf);
264         }
265         else
266         {
267             int p_new_size;
268             char *half;
269             src = dst_buf;
270             endp = dst;
271
272             half = src + b->file[p->cat].head.block_size/2;
273             decode_ptr (&src, &pos);
274             while (src <= half)
275             {
276                 decode_ptr (&src, split_size);
277                 src += *split_size;
278                 decode_ptr (&src, &pos);
279             }
280             p_new_size = src - dst_buf;
281             memcpy (p->bytes, dst_buf, p_new_size);
282
283             decode_ptr (&src, split_size);
284             memcpy (split_item, src, *split_size);
285             src += *split_size;
286
287             *sp = new_int (b, p->cat);
288             (*sp)->size = endp - src;
289             memcpy ((*sp)->bytes, src, (*sp)->size);
290
291             p->size = p_new_size;
292         }
293         p->dirty = 1;
294         close_block (b, sub_p2);
295     }
296     close_block (b, sub_p1);
297     return more;
298 }
299
300
301 int insert_leaf (ISAMB b, struct ISAMB_block **sp1, void *lookahead_item,
302                  int *lookahead_mode, ISAMC_I stream, struct ISAMB_block **sp2,
303                  void *sub_item, int *sub_size,
304                  void *max_item)
305 {
306     struct ISAMB_block *p = *sp1;
307     char *src = 0, *endp = 0;
308     char dst_buf[DST_BUF_SIZE], *dst = dst_buf;
309     int new_size;
310     void *c1 = (*b->method->code_start)(ISAMC_DECODE);
311     void *c2 = (*b->method->code_start)(ISAMC_ENCODE);
312     int more = 1;
313     int quater = b->file[b->no_cat-1].head.block_max / 4;
314     char *cut = dst_buf + quater * 2;
315     char *maxp = dst_buf + b->file[b->no_cat-1].head.block_max;
316     char *half1 = 0;
317     char *half2 = 0;
318     char cut_item_buf[DST_ITEM_MAX];
319     int cut_item_size = 0;
320
321     if (p && p->size)
322     {
323         char file_item_buf[DST_ITEM_MAX];
324         char *file_item = file_item_buf;
325             
326         src = p->bytes;
327         endp = p->bytes + p->size;
328         (*b->method->code_item)(ISAMC_DECODE, c1, &file_item, &src);
329         while (1)
330         {
331             char *dst_item = 0;
332             char *dst_0 = dst;
333             char *lookahead_next;
334             int d = -1;
335             
336             if (lookahead_item)
337                 d = (*b->method->compare_item)(file_item_buf, lookahead_item);
338             
339             if (d > 0)
340             {
341                 dst_item = lookahead_item;
342                 assert (*lookahead_mode);
343             }
344             else
345                 dst_item = file_item_buf;
346             if (!*lookahead_mode && d == 0)
347             {
348                 p->dirty = 1;
349             }
350             else if (!half1 && dst > cut)
351             {
352                 char *dst_item_0 = dst_item;
353                 half1 = dst; /* candidate for splitting */
354                 
355                 (*b->method->code_item)(ISAMC_ENCODE, c2, &dst, &dst_item);
356                 
357                 cut_item_size = dst_item - dst_item_0;
358                 memcpy (cut_item_buf, dst_item_0, cut_item_size);
359                 
360                 half2 = dst;
361             }
362             else
363                 (*b->method->code_item)(ISAMC_ENCODE, c2, &dst, &dst_item);
364             if (d > 0)  
365             {
366                 if (dst > maxp)
367                 {
368                     dst = dst_0;
369                     lookahead_item = 0;
370                 }
371                 else
372                 {
373                     lookahead_next = lookahead_item;
374                     if (!(*stream->read_item)(stream->clientData,
375                                               &lookahead_next,
376                                               lookahead_mode))
377                     {
378                         lookahead_item = 0;
379                         more = 0;
380                     }
381                     if (max_item &&
382                         (*b->method->compare_item)(max_item, lookahead_item) <= 0)
383                     {
384                         yaz_log (LOG_LOG, "max_item 1");
385                         lookahead_item = 0;
386                     }
387                     
388                     p->dirty = 1;
389                 }
390             }
391             else if (d == 0)
392             {
393                 lookahead_next = lookahead_item;
394                 if (!(*stream->read_item)(stream->clientData,
395                                           &lookahead_next, lookahead_mode))
396                 {
397                     lookahead_item = 0;
398                     more = 0;
399                 }
400                 if (src == endp)
401                     break;
402                 file_item = file_item_buf;
403                 (*b->method->code_item)(ISAMC_DECODE, c1, &file_item, &src);
404             }
405             else
406             {
407                 if (src == endp)
408                     break;
409                 file_item = file_item_buf;
410                 (*b->method->code_item)(ISAMC_DECODE, c1, &file_item, &src);
411             }
412         }
413     }
414     maxp = dst_buf + b->file[b->no_cat-1].head.block_max + quater;
415     while (lookahead_item)
416     {
417         char *dst_item = lookahead_item;
418         char *dst_0 = dst;
419         
420         if (max_item &&
421             (*b->method->compare_item)(max_item, lookahead_item) <= 0)
422         {
423             yaz_log (LOG_LOG, "max_item 2");
424             break;
425         }
426         if (!*lookahead_mode)
427         {
428             yaz_log (LOG_WARN, "Inconsistent register (2)");
429             abort();
430         }
431         else if (!half1 && dst > cut)   
432         {
433             char *dst_item_0 = dst_item;
434             half1 = dst; /* candidate for splitting */
435             
436             (*b->method->code_item)(ISAMC_ENCODE, c2, &dst, &dst_item);
437             
438             cut_item_size = dst_item - dst_item_0;
439             memcpy (cut_item_buf, dst_item_0, cut_item_size);
440             
441             half2 = dst;
442         }
443         else
444             (*b->method->code_item)(ISAMC_ENCODE, c2, &dst, &dst_item);
445
446         if (dst > maxp)
447         {
448             dst = dst_0;
449             break;
450         }
451         if (p)
452             p->dirty = 1;
453         dst_item = lookahead_item;
454         if (!(*stream->read_item)(stream->clientData, &dst_item,
455                                   lookahead_mode))
456         {
457             lookahead_item = 0;
458             more = 0;
459         }
460     }
461     new_size = dst - dst_buf;
462     if (p && p->cat != b->no_cat-1 && 
463         new_size > b->file[p->cat].head.block_max)
464     {
465         /* non-btree block will be removed */
466         close_block (b, p);
467         /* delete it too!! */
468         p = 0; /* make a new one anyway */
469     }
470     if (!p)
471     {   /* must create a new one */
472         int i;
473         for (i = 0; i < b->no_cat; i++)
474             if (new_size <= b->file[i].head.block_max)
475                 break;
476         if (i == b->no_cat)
477             i = b->no_cat - 1;
478         p = new_leaf (b, i);
479     }
480     if (new_size > b->file[p->cat].head.block_max)
481     {
482         char *first_dst;
483         char *cut_item = cut_item_buf;
484
485         assert (half1);
486         assert (half2);
487
488        /* first half */
489         p->size = half1 - dst_buf;
490         memcpy (p->bytes, dst_buf, half1 - dst_buf);
491
492         /* second half */
493         *sp2 = new_leaf (b, p->cat);
494
495         (*b->method->code_reset)(c2);
496
497         first_dst = (*sp2)->bytes;
498
499         (*b->method->code_item)(ISAMC_ENCODE, c2, &first_dst, &cut_item);
500
501         memcpy (first_dst, half2, dst - half2);
502
503         (*sp2)->size = (first_dst - (*sp2)->bytes) + (dst - half2);
504         (*sp2)->dirty = 1;
505         p->dirty = 1;
506         memcpy (sub_item, cut_item_buf, cut_item_size);
507         *sub_size = cut_item_size;
508     }
509     else
510     {
511         memcpy (p->bytes, dst_buf, dst - dst_buf);
512         p->size = new_size;
513     }
514     (*b->method->code_stop)(ISAMC_DECODE, c1);
515     (*b->method->code_stop)(ISAMC_ENCODE, c2);
516     *sp1 = p;
517     return more;
518 }
519
520 int insert_sub (ISAMB b, struct ISAMB_block **p, void *new_item,
521                 int *mode,
522                 ISAMC_I stream,
523                 struct ISAMB_block **sp,
524                 void *sub_item, int *sub_size,
525                 void *max_item)
526 {
527     if (!*p || (*p)->leaf)
528         return insert_leaf (b, p, new_item, mode, stream, sp, sub_item, 
529                             sub_size, max_item);
530     else
531         return insert_int (b, *p, new_item, mode, stream, sp, sub_item,
532                            sub_size);
533 }
534
535 int isamb_merge (ISAMB b, ISAMC_P pos, ISAMC_I stream)
536 {
537     char item_buf[DST_ITEM_MAX];
538     char *item_ptr;
539     int i_mode;
540     int more;
541
542     item_ptr = item_buf;
543     more = (*stream->read_item)(stream->clientData, &item_ptr, &i_mode);
544     while (more)
545     {
546         struct ISAMB_block *p = 0, *sp = 0;
547         char sub_item[DST_ITEM_MAX];
548         int sub_size;
549         
550         if (pos)
551             p = open_block (b, pos);
552         more = insert_sub (b, &p, item_buf, &i_mode, stream, &sp,
553                             sub_item, &sub_size, 0);
554         if (sp)
555         {    /* increase level of tree by one */
556             struct ISAMB_block *p2 = new_int (b, p->cat);
557             char *dst = p2->bytes + p2->size;
558             
559             encode_ptr (&dst, p->pos);
560             assert (sub_size < 20);
561             encode_ptr (&dst, sub_size);
562             memcpy (dst, sub_item, sub_size);
563             dst += sub_size;
564             encode_ptr (&dst, sp->pos);
565             
566             p2->size = dst - p2->bytes;
567             pos = p2->pos;  /* return new super page */
568             close_block (b, sp);
569             close_block (b, p2);
570         }
571         else
572             pos = p->pos;   /* return current one (again) */
573         close_block (b, p);
574     }
575     return pos;
576 }
577
578 ISAMB_PP isamb_pp_open_x (ISAMB isamb, ISAMB_P pos, int *level)
579 {
580     ISAMB_PP pp = xmalloc (sizeof(*pp));
581
582     pp->isamb = isamb;
583     pp->block = xmalloc (10 * sizeof(*pp->block));
584
585     pp->pos = pos;
586     pp->level = 0;
587     pp->total_size = 0;
588     pp->no_blocks = 0;
589     while (1)
590     {
591         struct ISAMB_block *p = open_block (isamb, pos);
592         char *src = p->bytes + p->offset;
593         pp->block[pp->level] = p;
594
595         pp->total_size += p->size;
596         pp->no_blocks++;
597
598         if (p->leaf)
599             break;
600
601         decode_ptr (&src, &pos);
602         p->offset = src - p->bytes;
603         pp->level++;
604     }
605     pp->block[pp->level+1] = 0;
606     if (level)
607         *level = pp->level;
608     return pp;
609 }
610
611 ISAMB_PP isamb_pp_open (ISAMB isamb, ISAMB_P pos)
612 {
613     return isamb_pp_open_x (isamb, pos, 0);
614 }
615
616 void isamb_pp_close_x (ISAMB_PP pp, int *size, int *blocks)
617 {
618     int i;
619     if (!pp)
620         return;
621     if (size)
622         *size = pp->total_size;
623     if (blocks)
624         *blocks = pp->no_blocks;
625     for (i = 0; i <= pp->level; i++)
626         close_block (pp->isamb, pp->block[i]);
627     xfree (pp->block);
628     xfree (pp);
629 }
630
631 int isamb_block_info (ISAMB isamb, int cat)
632 {
633     if (cat >= 0 && cat < isamb->no_cat)
634         return isamb->file[cat].head.block_size;
635     return -1;
636 }
637
638 void isamb_pp_close (ISAMB_PP pp)
639 {
640     return isamb_pp_close_x (pp, 0, 0);
641 }
642
643 int isamb_pp_read (ISAMB_PP pp, void *buf)
644 {
645     char *dst = buf;
646     char *src;
647     struct ISAMB_block *p = pp->block[pp->level];
648     if (!p)
649         return 0;
650
651     while (p->offset == p->size)
652     {
653         int pos, item_len;
654         while (p->offset == p->size)
655         {
656             if (pp->level == 0)
657                 return 0;
658             close_block (pp->isamb, pp->block[pp->level]);
659             pp->block[pp->level] = 0;
660             (pp->level)--;
661             p = pp->block[pp->level];
662             assert (!p->leaf);  /* must be int */
663         }
664         src = p->bytes + p->offset;
665         
666         decode_ptr (&src, &item_len);
667         src += item_len;
668         decode_ptr (&src, &pos);
669         
670         p->offset = src - (char*) p->bytes;
671
672         ++(pp->level);
673         
674         while (1)
675         {
676             pp->block[pp->level] = p = open_block (pp->isamb, pos);
677
678             pp->total_size += p->size;
679             pp->no_blocks++;
680             
681             if (p->leaf) /* leaf */
682             {
683                 break;
684             }
685             src = p->bytes + p->offset;
686             decode_ptr (&src, &pos);
687             p->offset = src - (char*) p->bytes;
688             pp->level++;
689         }
690     }
691     assert (p->offset < p->size);
692     assert (p->leaf);
693     src = p->bytes + p->offset;
694     (*pp->isamb->method->code_item)(ISAMC_DECODE, p->decodeClientData,
695                                     &dst, &src);
696     p->offset = src - (char*) p->bytes;
697     return 1;
698 }
699
700 int isamb_pp_num (ISAMB_PP pp)
701 {
702     return 1;
703 }