Much faster merge for b-tree
[idzebra-moved-to-github.git] / isamb / isamb.c
1 /*
2  *  Copyright (c) 1995-1998, Index Data.
3  *  See the file LICENSE for details.
4  *
5  *  $Id: isamb.c,v 1.8 2002-04-23 13:39:10 adam Exp $
6  */
7 #include <yaz/xmalloc.h>
8 #include <yaz/log.h>
9 #include <isamb.h>
10 #include <assert.h>
11
12 struct ISAMB_head {
13     int first_block;
14     int last_block;
15     int block_size;
16     int block_max;
17 };
18
19 #define ISAMB_DATA_OFFSET 3
20
21 #define DST_ITEM_MAX 256
22
23 /* approx 2*4 K + max size of item */
24 #define DST_BUF_SIZE 8448
25
26
27 struct ISAMB_file {
28     BFile bf;
29     int head_dirty;
30     struct ISAMB_head head;
31 };
32
33 struct ISAMB_s {
34     BFiles bfs;
35     ISAMC_M method;
36
37     struct ISAMB_file *file;
38     int no_cat;
39 };
40
41 struct ISAMB_block {
42     int pos;
43     int cat;
44     int size;
45     int leaf;
46     int dirty;
47     int offset;
48     char *bytes;
49     unsigned char *buf;
50     void *decodeClientData;
51 };
52
53 struct ISAMB_PP_s {
54     ISAMB isamb;
55     int level;
56     struct ISAMB_block **block;
57 };
58
59 void encode_ptr (char **dst, int pos)
60 {
61     memcpy (*dst, &pos, sizeof(pos));
62     (*dst) += sizeof(pos);
63 }
64
65 void decode_ptr (char **src, int *pos)
66 {
67     memcpy (pos, *src, sizeof(*pos));
68     (*src) += sizeof(*pos);
69 }
70
71 ISAMB isamb_open (BFiles bfs, const char *name, int writeflag, ISAMC_M method)
72 {
73     ISAMB isamb = xmalloc (sizeof(*isamb));
74     int i, b_size = 64;
75
76     isamb->bfs = bfs;
77     isamb->method = (ISAMC_M) xmalloc (sizeof(*method));
78     memcpy (isamb->method, method, sizeof(*method));
79     isamb->no_cat = 4;
80
81     isamb->file = xmalloc (sizeof(*isamb->file) * isamb->no_cat);
82     for (i = 0; i<isamb->no_cat; i++)
83     {
84         char fname[DST_BUF_SIZE];
85         isamb->file[i].head.first_block = 1;
86         isamb->file[i].head.last_block = 1;
87         isamb->file[i].head.block_size = b_size;
88         isamb->file[i].head.block_max = b_size - ISAMB_DATA_OFFSET;
89         b_size = b_size * 4;
90         isamb->file[i].head_dirty = 0;
91         sprintf (fname, "%s-%d", name, i);
92         isamb->file[i].bf =
93             bf_open (bfs, fname, isamb->file[i].head.block_size, writeflag);
94     
95         bf_read (isamb->file[i].bf, 0, 0, sizeof(struct ISAMB_head),
96                  &isamb->file[i].head);
97     }
98     return isamb;
99 }
100
101 void isamb_close (ISAMB isamb)
102 {
103     int i;
104     for (i = 0; i<isamb->no_cat; i++)
105     {
106         if (isamb->file[i].head_dirty)
107             bf_write (isamb->file[i].bf, 0, 0,
108                       sizeof(struct ISAMB_head), &isamb->file[i].head);
109     }
110     xfree (isamb->file);
111     xfree (isamb->method);
112     xfree (isamb);
113 }
114
115 struct ISAMB_block *open_block (ISAMB b, ISAMC_P pos)
116 {
117     int cat = pos&3;
118     struct ISAMB_block *p;
119     if (!pos)
120         return 0;
121     p = xmalloc (sizeof(*p));
122     p->pos = pos;
123     p->cat = pos & 3;
124     p->buf = xmalloc (b->file[cat].head.block_size);
125     bf_read (b->file[cat].bf, pos/4, 0, 0, p->buf);
126     p->bytes = p->buf + ISAMB_DATA_OFFSET;
127     p->leaf = p->buf[0];
128     p->size = p->buf[1] + 256 * p->buf[2] - ISAMB_DATA_OFFSET;
129     p->offset = 0;
130     p->dirty = 0;
131     p->decodeClientData = (*b->method->code_start)(ISAMC_DECODE);
132     return p;
133 }
134
135 struct ISAMB_block *new_block (ISAMB b, int leaf, int cat)
136 {
137     struct ISAMB_block *p;
138     int block_no;
139     
140     p = xmalloc (sizeof(*p));
141     block_no = b->file[cat].head.last_block++;
142     p->cat = cat;
143     p->pos = block_no * 4 + cat;
144     p->cat = cat;
145     b->file[cat].head_dirty = 1;
146     p->buf = xmalloc (b->file[cat].head.block_size);
147     memset (p->buf, 0, b->file[cat].head.block_size);
148     p->bytes = p->buf + ISAMB_DATA_OFFSET;
149     p->leaf = leaf;
150     p->size = 0;
151     p->dirty = 1;
152     p->offset = 0;
153     p->decodeClientData = (*b->method->code_start)(ISAMC_DECODE);
154     return p;
155 }
156
157 struct ISAMB_block *new_leaf (ISAMB b, int cat)
158 {
159     return new_block (b, 1, cat);
160 }
161
162
163 struct ISAMB_block *new_int (ISAMB b, int cat)
164 {
165     return new_block (b, 0, cat);
166 }
167
168 void close_block (ISAMB b, struct ISAMB_block *p)
169 {
170     if (!p)
171         return;
172     if (p->dirty)
173     {
174         int size = p->size + ISAMB_DATA_OFFSET;
175         p->buf[0] = p->leaf;
176         p->buf[1] = size & 255;
177         p->buf[2] = size >> 8;
178         bf_write (b->file[p->cat].bf, p->pos/4, 0, 0, p->buf);
179     }
180     (*b->method->code_stop)(ISAMC_DECODE, p->decodeClientData);
181     xfree (p->buf);
182     xfree (p);
183 }
184
185 int insert_sub (ISAMB b, struct ISAMB_block **p,
186                 void *new_item,
187                 ISAMC_I stream,
188                 struct ISAMB_block **sp,
189                 void *sub_item, int *sub_size,
190                 void *max_item);
191
192 int insert_int (ISAMB b, struct ISAMB_block *p, void *lookahead_item,
193                 ISAMC_I stream, struct ISAMB_block **sp,
194                 void *split_item, int *split_size)
195 {
196     char *startp = p->bytes;
197     char *src = startp;
198     char *endp = p->bytes + p->size;
199     int pos;
200     struct ISAMB_block *sub_p1 = 0, *sub_p2 = 0;
201     char sub_item[DST_ITEM_MAX];
202     int sub_size;
203     int more;
204
205     *sp = 0;
206
207     decode_ptr (&src, &pos);
208     while (src != endp)
209     {
210         int item_len;
211         int d;
212         decode_ptr (&src, &item_len);
213         d = (*b->method->compare_item)(src, lookahead_item);
214         if (d > 0)
215         {
216             sub_p1 = open_block (b, pos);
217             assert (sub_p1);
218             more = insert_sub (b, &sub_p1, lookahead_item, stream, &sub_p2, 
219                                sub_item, &sub_size, src);
220             break;
221         }
222         src += item_len;
223         decode_ptr (&src, &pos);
224     }
225     if (!sub_p1)
226     {
227         sub_p1 = open_block (b, pos);
228         assert (sub_p1);
229         more = insert_sub (b, &sub_p1, lookahead_item, stream, &sub_p2, 
230                            sub_item, &sub_size, 0);
231     }
232     if (sub_p2)
233     {
234         /* there was a split - must insert pointer in this one */
235         char dst_buf[DST_BUF_SIZE];
236         char *dst = dst_buf;
237
238         assert (sub_size < 30 && sub_size > 1);
239
240         memcpy (dst, startp, src - startp);
241                 
242         dst += src - startp;
243
244         encode_ptr (&dst, sub_size);      /* sub length and item */
245         memcpy (dst, sub_item, sub_size);
246         dst += sub_size;
247
248         encode_ptr (&dst, sub_p2->pos);   /* pos */
249
250         if (endp - src)                   /* remaining data */
251         {
252             memcpy (dst, src, endp - src);
253             dst += endp - src;
254         }
255         p->size = dst - dst_buf;
256         if (p->size <= b->file[p->cat].head.block_max)
257         {
258             memcpy (startp, dst_buf, dst - dst_buf);
259         }
260         else
261         {
262             int p_new_size;
263             char *half;
264             src = dst_buf;
265             endp = dst;
266
267             half = src + b->file[p->cat].head.block_size/2;
268             decode_ptr (&src, &pos);
269             while (src <= half)
270             {
271                 decode_ptr (&src, split_size);
272                 src += *split_size;
273                 decode_ptr (&src, &pos);
274             }
275             p_new_size = src - dst_buf;
276             memcpy (p->bytes, dst_buf, p_new_size);
277
278             decode_ptr (&src, split_size);
279             memcpy (split_item, src, *split_size);
280             src += *split_size;
281
282             *sp = new_int (b, p->cat);
283             (*sp)->size = endp - src;
284             memcpy ((*sp)->bytes, src, (*sp)->size);
285
286             yaz_log (LOG_LOG, "i split %d -> %d %d",
287                      p->size, p_new_size, (*sp)->size);
288             p->size = p_new_size;
289         }
290         p->dirty = 1;
291         close_block (b, sub_p2);
292     }
293     close_block (b, sub_p1);
294     return more;
295 }
296
297
298 int insert_leaf (ISAMB b, struct ISAMB_block **sp1, void *lookahead_item,
299                  ISAMC_I stream, struct ISAMB_block **sp2,
300                  void *sub_item, int *sub_size,
301                  void *max_item)
302 {
303     struct ISAMB_block *p = *sp1;
304     char *src = 0, *endp = 0;
305     char dst_buf[DST_BUF_SIZE], *dst = dst_buf;
306     int new_size;
307     void *c1 = (*b->method->code_start)(ISAMC_DECODE);
308     void *c2 = (*b->method->code_start)(ISAMC_ENCODE);
309     int more = 1;
310     int quater = b->file[b->no_cat-1].head.block_max / 4;
311     char *cut = dst_buf + quater * 2;
312     char *maxp = dst_buf + b->file[b->no_cat-1].head.block_max;
313     char *half1 = 0;
314     char *half2 = 0;
315     char cut_item_buf[DST_ITEM_MAX];
316     int cut_item_size = 0;
317
318     if (p && p->size)
319     {
320         char file_item_buf[DST_ITEM_MAX];
321         char *file_item = file_item_buf;
322             
323         src = p->bytes;
324         endp = p->bytes + p->size;
325         (*b->method->code_item)(ISAMC_DECODE, c1, &file_item, &src);
326         while (1)
327         {
328             char *dst_item = 0;
329             char *dst_0 = dst;
330             char *lookahead_next;
331             int lookahead_mode;
332             int d = -1;
333             
334             if (lookahead_item)
335                 d = (*b->method->compare_item)(file_item_buf, lookahead_item);
336             
337             if (d > 0)  
338                 dst_item = lookahead_item;
339             else
340                 dst_item = file_item_buf;
341             if (!half1 && dst > cut)   
342             {
343                 char *dst_item_0 = dst_item;
344                 half1 = dst; /* candidate for splitting */
345                 
346                 (*b->method->code_item)(ISAMC_ENCODE, c2, &dst, &dst_item);
347                 
348                 cut_item_size = dst_item - dst_item_0;
349                 memcpy (cut_item_buf, dst_item_0, cut_item_size);
350                 
351                 half2 = dst;
352             }
353             else
354                 (*b->method->code_item)(ISAMC_ENCODE, c2, &dst, &dst_item);
355             if (d > 0)  
356             {
357                 if (dst > maxp)
358                 {
359                     dst = dst_0;
360                     lookahead_item = 0;
361                 }
362                 else
363                 {
364                     lookahead_next = lookahead_item;
365                     if (!(*stream->read_item)(stream->clientData,
366                                               &lookahead_next,
367                                               &lookahead_mode))
368                     {
369                         lookahead_item = 0;
370                         more = 0;
371                     }
372                     if (max_item &&
373                         (*b->method->compare_item)(max_item, lookahead_item) <= 0)
374                     {
375                         assert (0);
376                         lookahead_item = 0;
377                     }
378                     
379                     p->dirty = 1;
380                 }
381             }
382             else if (d == 0)
383             {
384                 lookahead_next = lookahead_item;
385                 if (!(*stream->read_item)(stream->clientData,
386                                           &lookahead_next, &lookahead_mode))
387                 {
388                     lookahead_item = 0;
389                     more = 0;
390                 }
391                 if (src == endp)
392                     break;
393                 file_item = file_item_buf;
394                 (*b->method->code_item)(ISAMC_DECODE, c1, &file_item, &src);
395             }
396             else
397             {
398                 if (src == endp)
399                     break;
400                 file_item = file_item_buf;
401                 (*b->method->code_item)(ISAMC_DECODE, c1, &file_item, &src);
402             }
403         }
404     }
405     maxp = dst_buf + b->file[b->no_cat-1].head.block_max + quater;
406     while (lookahead_item)
407     {
408         int lookahead_mode;
409         char *dst_item = lookahead_item;
410         char *dst_0 = dst;
411         
412         if (max_item &&
413             (*b->method->compare_item)(max_item, lookahead_item) <= 0)
414         {
415             assert (0);
416             break;
417         }
418         if (!half1 && dst > cut)   
419         {
420             char *dst_item_0 = dst_item;
421             half1 = dst; /* candidate for splitting */
422             
423             (*b->method->code_item)(ISAMC_ENCODE, c2, &dst, &dst_item);
424             
425             cut_item_size = dst_item - dst_item_0;
426             memcpy (cut_item_buf, dst_item_0, cut_item_size);
427             
428             half2 = dst;
429         }
430         else
431             (*b->method->code_item)(ISAMC_ENCODE, c2, &dst, &dst_item);
432
433         if (dst > maxp)
434         {
435             dst = dst_0;
436             break;
437         }
438         if (p)
439             p->dirty = 1;
440         dst_item = lookahead_item;
441         if (!(*stream->read_item)(stream->clientData, &dst_item,
442                                   &lookahead_mode))
443         {
444             lookahead_item = 0;
445             more = 0;
446         }
447     }
448     new_size = dst - dst_buf;
449     if (p && p->cat != b->no_cat-1 && 
450         new_size > b->file[p->cat].head.block_max)
451     {
452         /* non-btree block will be removed */
453         close_block (b, p);
454         /* delete it too!! */
455         p = 0; /* make a new one anyway */
456     }
457     if (!p)
458     {   /* must create a new one */
459         int i;
460         for (i = 0; i < b->no_cat; i++)
461             if (new_size <= b->file[i].head.block_max)
462                 break;
463         if (i == b->no_cat)
464             i = b->no_cat - 1;
465         p = new_leaf (b, i);
466     }
467     if (new_size > b->file[p->cat].head.block_max)
468     {
469         char *first_dst;
470         char *cut_item = cut_item_buf;
471
472         assert (half1);
473         assert (half2);
474
475        /* first half */
476         p->size = half1 - dst_buf;
477         memcpy (p->bytes, dst_buf, half1 - dst_buf);
478
479         /* second half */
480         *sp2 = new_leaf (b, p->cat);
481
482         (*b->method->code_reset)(c2);
483
484         first_dst = (*sp2)->bytes;
485
486         (*b->method->code_item)(ISAMC_ENCODE, c2, &first_dst, &cut_item);
487
488         memcpy (first_dst, half2, dst - half2);
489
490         (*sp2)->size = (first_dst - (*sp2)->bytes) + (dst - half2);
491         (*sp2)->dirty = 1;
492         p->dirty = 1;
493         memcpy (sub_item, cut_item_buf, cut_item_size);
494         *sub_size = cut_item_size;
495
496         yaz_log (LOG_LOG, "l split %d / %d", p->size, (*sp2)->size);
497     }
498     else
499     {
500         memcpy (p->bytes, dst_buf, dst - dst_buf);
501         p->size = new_size;
502     }
503     (*b->method->code_stop)(ISAMC_DECODE, c1);
504     (*b->method->code_stop)(ISAMC_ENCODE, c2);
505     *sp1 = p;
506     return more;
507 }
508
509 int insert_sub (ISAMB b, struct ISAMB_block **p, void *new_item,
510                 ISAMC_I stream,
511                 struct ISAMB_block **sp,
512                 void *sub_item, int *sub_size,
513                 void *max_item)
514 {
515     if (!*p || (*p)->leaf)
516         return insert_leaf (b, p, new_item, stream, sp, sub_item, sub_size, 
517                             max_item);
518     else
519         return insert_int (b, *p, new_item, stream, sp, sub_item, sub_size);
520 }
521
522 int isamb_merge (ISAMB b, ISAMC_P pos, ISAMC_I stream)
523 {
524     char item_buf[DST_ITEM_MAX];
525     char *item_ptr;
526     int i_mode;
527     int more;
528
529     item_ptr = item_buf;
530     more = (*stream->read_item)(stream->clientData, &item_ptr, &i_mode);
531     while (more)
532     {
533         struct ISAMB_block *p = 0, *sp = 0;
534         char sub_item[DST_ITEM_MAX];
535         int sub_size;
536         
537         if (pos)
538             p = open_block (b, pos);
539         more = insert_sub (b, &p, item_buf, stream, &sp,
540                             sub_item, &sub_size, 0);
541         if (sp)
542         {    /* increase level of tree by one */
543             struct ISAMB_block *p2 = new_int (b, p->cat);
544             char *dst = p2->bytes + p2->size;
545             
546             encode_ptr (&dst, p->pos);
547             assert (sub_size < 20);
548             encode_ptr (&dst, sub_size);
549             memcpy (dst, sub_item, sub_size);
550             dst += sub_size;
551             encode_ptr (&dst, sp->pos);
552             
553             p2->size = dst - p2->bytes;
554             pos = p2->pos;  /* return new super page */
555             close_block (b, sp);
556             close_block (b, p2);
557         }
558         else
559             pos = p->pos;   /* return current one (again) */
560         close_block (b, p);
561     }
562     return pos;
563 }
564
565 ISAMB_PP isamb_pp_open (ISAMB isamb, ISAMB_P pos)
566 {
567     ISAMB_PP pp = xmalloc (sizeof(*pp));
568
569     pp->isamb = isamb;
570     pp->block = xmalloc (10 * sizeof(*pp->block));
571
572     pp->level = 0;
573     while (1)
574     {
575         struct ISAMB_block *p = open_block (isamb, pos);
576         char *src = p->bytes + p->offset;
577         pp->block[pp->level] = p;
578
579         if (p->bytes[0]) /* leaf */
580             break;
581
582         decode_ptr (&src, &pos);
583         p->offset = src - p->bytes;
584         pp->level++;
585     }
586     pp->block[pp->level+1] = 0;
587     return pp;
588 }
589
590 void isamb_pp_close (ISAMB_PP pp)
591 {
592     int i;
593     if (!pp)
594         return;
595     for (i = 0; i <= pp->level; i++)
596         close_block (pp->isamb, pp->block[i]);
597     xfree (pp->block);
598     xfree (pp);
599 }
600
601 int isamb_pp_read (ISAMB_PP pp, void *buf)
602 {
603     char *dst = buf;
604     char *src;
605     struct ISAMB_block *p = pp->block[pp->level];
606     if (!p)
607         return 0;
608
609     while (p->offset == p->size)
610     {
611         int pos, item_len;
612         while (p->offset == p->size)
613         {
614             if (pp->level == 0)
615                 return 0;
616             close_block (pp->isamb, pp->block[pp->level]);
617             pp->block[pp->level] = 0;
618             (pp->level)--;
619             p = pp->block[pp->level];
620             assert (p->bytes[0] == 0);  /* must be int */
621         }
622         src = p->bytes + p->offset;
623         
624         decode_ptr (&src, &item_len);
625         src += item_len;
626         decode_ptr (&src, &pos);
627         
628         p->offset = src - (char*) p->bytes;
629
630         ++(pp->level);
631         
632         while (1)
633         {
634             pp->block[pp->level] = p = open_block (pp->isamb, pos);
635             
636             if (p->leaf) /* leaf */
637             {
638                 break;
639             }
640             src = p->bytes + p->offset;
641             decode_ptr (&src, &pos);
642             p->offset = src - (char*) p->bytes;
643             pp->level++;
644         }
645     }
646     assert (p->offset < p->size);
647     assert (p->bytes[0]);
648     src = p->bytes + p->offset;
649     (*pp->isamb->method->code_item)(ISAMC_DECODE, p->decodeClientData,
650                                     &dst, &src);
651     p->offset = src - (char*) p->bytes;
652     return 1;
653 }
654
655 int isamb_pp_num (ISAMB_PP pp)
656 {
657     return 1;
658 }