1 /* This file is part of the Zebra server.
2 Copyright (C) 1994-2009 Index Data
4 Zebra is free software; you can redistribute it and/or modify it under
5 the terms of the GNU General Public License as published by the Free
6 Software Foundation; either version 2, or (at your option) any later
9 Zebra is distributed in the hope that it will be useful, but WITHOUT ANY
10 WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 #include <yaz/xmalloc.h>
24 #include <idzebra/isamb.h>
32 #define ISAMB_MAJOR_VERSION 3
33 #define ISAMB_MINOR_VERSION_NO_ROOT 0
34 #define ISAMB_MINOR_VERSION_WITH_ROOT 1
46 /* if 1, upper nodes items are encoded; 0 if not encoded */
49 /* maximum size of encoded buffer */
50 #define DST_ITEM_MAX 5000
52 /* max page size for _any_ isamb use */
53 #define ISAMB_MAX_PAGE 32768
55 #define ISAMB_MAX_LEVEL 10
56 /* approx 2*max page + max size of item */
57 #define DST_BUF_SIZE (2*ISAMB_MAX_PAGE+DST_ITEM_MAX+100)
59 /* should be maximum block size of multiple thereof */
60 #define ISAMB_CACHE_ENTRY_SIZE ISAMB_MAX_PAGE
62 /* CAT_MAX: _must_ be power of 2 */
64 #define CAT_MASK (CAT_MAX-1)
65 /* CAT_NO: <= CAT_MAX */
68 /* Smallest block size */
69 #define ISAMB_MIN_SIZE 32
71 #define ISAMB_FAC_SIZE 4
73 /* ISAMB_PTR_CODEC = 1 var, =0 fixed */
74 #define ISAMB_PTR_CODEC 1
76 struct ISAMB_cache_entry {
81 struct ISAMB_cache_entry *next;
87 struct ISAMB_head head;
88 struct ISAMB_cache_entry *cache_entries;
95 struct ISAMB_file *file;
97 int cache; /* 0 = no cache, 1 = use cache, -1 = dummy isam (for testing only) */
98 int log_io; /* log level for bf_read/bf_write calls */
99 int log_freelist; /* log level for freelist handling */
100 zint skipped_numbers; /* on a leaf node */
101 zint returned_numbers;
102 zint skipped_nodes[ISAMB_MAX_LEVEL]; /* [0]=skipped leaves, 1 = higher etc */
103 zint accessed_nodes[ISAMB_MAX_LEVEL]; /* nodes we did not skip */
104 zint number_of_int_splits;
105 zint number_of_leaf_splits;
106 int enable_int_count; /* whether we count nodes (or not) */
107 int cache_size; /* size of blocks to cache (if cache=1) */
120 zint no_items; /* number of nodes in this + children */
124 void *decodeClientData;
132 int maxlevel; /* total depth */
135 zint skipped_numbers; /* on a leaf node */
136 zint returned_numbers;
137 zint skipped_nodes[ISAMB_MAX_LEVEL]; /* [0]=skipped leaves, 1 = higher etc */
138 zint accessed_nodes[ISAMB_MAX_LEVEL]; /* nodes we did not skip */
139 struct ISAMB_block **block;
140 int scope; /* on what level we forward */
144 #define encode_item_len encode_ptr
146 static void encode_ptr(char **dst, zint pos)
148 unsigned char *bp = (unsigned char*) *dst;
152 *bp++ = (unsigned char) (128 | (pos & 127));
155 *bp++ = (unsigned char) pos;
159 static void encode_ptr(char **dst, zint pos)
161 memcpy(*dst, &pos, sizeof(pos));
162 (*dst) += sizeof(pos);
166 #define decode_item_len decode_ptr
168 static void decode_ptr(const char **src, zint *pos)
174 while (((c = *(const unsigned char *)((*src)++)) & 128))
176 d += ((zint) (c & 127) << r);
179 d += ((zint) c << r);
183 static void decode_ptr(const char **src, zint *pos)
185 memcpy(pos, *src, sizeof(*pos));
186 (*src) += sizeof(*pos);
191 void isamb_set_int_count(ISAMB b, int v)
193 b->enable_int_count = v;
196 void isamb_set_cache_size(ISAMB b, int v)
201 ISAMB isamb_open2(BFiles bfs, const char *name, int writeflag, ISAMC_M *method,
202 int cache, int no_cat, int *sizes, int use_root_ptr)
204 ISAMB isamb = xmalloc(sizeof(*isamb));
207 assert(no_cat <= CAT_MAX);
210 isamb->method = (ISAMC_M *) xmalloc(sizeof(*method));
211 memcpy(isamb->method, method, sizeof(*method));
212 isamb->no_cat = no_cat;
214 isamb->log_freelist = 0;
215 isamb->cache = cache;
216 isamb->skipped_numbers = 0;
217 isamb->returned_numbers = 0;
218 isamb->number_of_int_splits = 0;
219 isamb->number_of_leaf_splits = 0;
220 isamb->enable_int_count = 1;
221 isamb->cache_size = 40;
224 isamb->minor_version = ISAMB_MINOR_VERSION_WITH_ROOT;
226 isamb->minor_version = ISAMB_MINOR_VERSION_NO_ROOT;
230 for (i = 0; i<ISAMB_MAX_LEVEL; i++)
231 isamb->skipped_nodes[i] = isamb->accessed_nodes[i] = 0;
235 yaz_log(YLOG_WARN, "isamb_open %s. Degraded TEST mode", name);
239 assert(cache == 0 || cache == 1);
241 isamb->file = xmalloc(sizeof(*isamb->file) * isamb->no_cat);
243 for (i = 0; i < isamb->no_cat; i++)
245 isamb->file[i].bf = 0;
246 isamb->file[i].head_dirty = 0;
247 isamb->file[i].cache_entries = 0;
250 for (i = 0; i < isamb->no_cat; i++)
252 char fname[DST_BUF_SIZE];
253 char hbuf[DST_BUF_SIZE];
255 sprintf(fname, "%s%c", name, i+'A');
257 isamb->file[i].bf = bf_open(bfs, fname, ISAMB_CACHE_ENTRY_SIZE,
260 isamb->file[i].bf = bf_open(bfs, fname, sizes[i], writeflag);
262 if (!isamb->file[i].bf)
268 /* fill-in default values (for empty isamb) */
269 isamb->file[i].head.first_block = ISAMB_CACHE_ENTRY_SIZE/sizes[i]+1;
270 isamb->file[i].head.last_block = isamb->file[i].head.first_block;
271 isamb->file[i].head.block_size = sizes[i];
272 assert(sizes[i] <= ISAMB_CACHE_ENTRY_SIZE);
274 if (i == isamb->no_cat-1 || sizes[i] > 128)
275 isamb->file[i].head.block_offset = 8;
277 isamb->file[i].head.block_offset = 4;
279 isamb->file[i].head.block_offset = 11;
281 isamb->file[i].head.block_max =
282 sizes[i] - isamb->file[i].head.block_offset;
283 isamb->file[i].head.free_list = 0;
284 if (bf_read(isamb->file[i].bf, 0, 0, 0, hbuf))
286 /* got header assume "isamb"major minor len can fit in 16 bytes */
288 int major, minor, len, pos = 0;
291 if (memcmp(hbuf, "isamb", 5))
293 yaz_log(YLOG_WARN, "bad isamb header for file %s", fname);
297 if (sscanf(hbuf+5, "%d %d %d", &major, &minor, &len) != 3)
299 yaz_log(YLOG_WARN, "bad isamb header for file %s", fname);
303 if (major != ISAMB_MAJOR_VERSION)
305 yaz_log(YLOG_WARN, "bad major version for file %s %d, must be %d",
306 fname, major, ISAMB_MAJOR_VERSION);
310 for (left = len - sizes[i]; left > 0; left = left - sizes[i])
313 if (!bf_read(isamb->file[i].bf, pos, 0, 0, hbuf + pos*sizes[i]))
315 yaz_log(YLOG_WARN, "truncated isamb header for "
316 "file=%s len=%d pos=%d",
323 decode_ptr(&src, &isamb->file[i].head.first_block);
324 decode_ptr(&src, &isamb->file[i].head.last_block);
325 decode_ptr(&src, &zint_tmp);
326 isamb->file[i].head.block_size = (int) zint_tmp;
327 decode_ptr(&src, &zint_tmp);
328 isamb->file[i].head.block_max = (int) zint_tmp;
329 decode_ptr(&src, &isamb->file[i].head.free_list);
330 if (isamb->minor_version >= ISAMB_MINOR_VERSION_WITH_ROOT)
331 decode_ptr(&src, &isamb->root_ptr);
333 assert(isamb->file[i].head.block_size >= isamb->file[i].head.block_offset);
334 /* must rewrite the header if root ptr is in use (bug #1017) */
335 if (use_root_ptr && writeflag)
336 isamb->file[i].head_dirty = 1;
338 isamb->file[i].head_dirty = 0;
339 assert(isamb->file[i].head.block_size == sizes[i]);
342 yaz_log(YLOG_WARN, "isamb debug enabled. Things will be slower than usual");
347 ISAMB isamb_open(BFiles bfs, const char *name, int writeflag, ISAMC_M *method,
351 int i, b_size = ISAMB_MIN_SIZE;
353 for (i = 0; i<CAT_NO; i++)
356 b_size = b_size * ISAMB_FAC_SIZE;
358 return isamb_open2(bfs, name, writeflag, method, cache,
362 static void flush_blocks(ISAMB b, int cat)
364 while (b->file[cat].cache_entries)
366 struct ISAMB_cache_entry *ce_this = b->file[cat].cache_entries;
367 b->file[cat].cache_entries = ce_this->next;
371 yaz_log(b->log_io, "bf_write: flush_blocks");
372 bf_write(b->file[cat].bf, ce_this->pos, 0, 0, ce_this->buf);
379 static int cache_block(ISAMB b, ISAM_P pos, unsigned char *userbuf, int wr)
381 int cat = (int) (pos&CAT_MASK);
382 int off = (int) (((pos/CAT_MAX) &
383 (ISAMB_CACHE_ENTRY_SIZE / b->file[cat].head.block_size - 1))
384 * b->file[cat].head.block_size);
385 zint norm = pos / (CAT_MASK*ISAMB_CACHE_ENTRY_SIZE / b->file[cat].head.block_size);
387 struct ISAMB_cache_entry **ce, *ce_this = 0, **ce_last = 0;
392 assert(ISAMB_CACHE_ENTRY_SIZE >= b->file[cat].head.block_size);
393 for (ce = &b->file[cat].cache_entries; *ce; ce = &(*ce)->next, no++)
396 if ((*ce)->pos == norm)
399 *ce = (*ce)->next; /* remove from list */
401 ce_this->next = b->file[cat].cache_entries; /* move to front */
402 b->file[cat].cache_entries = ce_this;
406 memcpy(ce_this->buf + off, userbuf,
407 b->file[cat].head.block_size);
411 memcpy(userbuf, ce_this->buf + off,
412 b->file[cat].head.block_size);
416 if (no >= b->cache_size)
418 assert(ce_last && *ce_last);
420 *ce_last = 0; /* remove the last entry from list */
423 yaz_log(b->log_io, "bf_write: cache_block");
424 bf_write(b->file[cat].bf, ce_this->pos, 0, 0, ce_this->buf);
429 ce_this = xmalloc(sizeof(*ce_this));
430 ce_this->next = b->file[cat].cache_entries;
431 b->file[cat].cache_entries = ce_this;
432 ce_this->buf = xmalloc(ISAMB_CACHE_ENTRY_SIZE);
434 yaz_log(b->log_io, "bf_read: cache_block");
435 if (!bf_read(b->file[cat].bf, norm, 0, 0, ce_this->buf))
436 memset(ce_this->buf, 0, ISAMB_CACHE_ENTRY_SIZE);
439 memcpy(ce_this->buf + off, userbuf, b->file[cat].head.block_size);
445 memcpy(userbuf, ce_this->buf + off, b->file[cat].head.block_size);
451 void isamb_close(ISAMB isamb)
454 for (i = 0; isamb->accessed_nodes[i]; i++)
455 yaz_log(YLOG_DEBUG, "isamb_close level leaf-%d: "ZINT_FORMAT" read, "
456 ZINT_FORMAT" skipped",
457 i, isamb->accessed_nodes[i], isamb->skipped_nodes[i]);
458 yaz_log(YLOG_DEBUG, "isamb_close returned "ZINT_FORMAT" values, "
459 "skipped "ZINT_FORMAT,
460 isamb->skipped_numbers, isamb->returned_numbers);
462 for (i = 0; i<isamb->no_cat; i++)
464 flush_blocks(isamb, i);
465 if (isamb->file[i].head_dirty)
467 char hbuf[DST_BUF_SIZE];
468 int major = ISAMB_MAJOR_VERSION;
470 char *dst = hbuf + 16;
472 int b_size = isamb->file[i].head.block_size;
474 encode_ptr(&dst, isamb->file[i].head.first_block);
475 encode_ptr(&dst, isamb->file[i].head.last_block);
476 encode_ptr(&dst, isamb->file[i].head.block_size);
477 encode_ptr(&dst, isamb->file[i].head.block_max);
478 encode_ptr(&dst, isamb->file[i].head.free_list);
480 if (isamb->minor_version >= ISAMB_MINOR_VERSION_WITH_ROOT)
481 encode_ptr(&dst, isamb->root_ptr);
483 memset(dst, '\0', b_size); /* ensure no random bytes are written */
487 /* print exactly 16 bytes (including trailing 0) */
488 sprintf(hbuf, "isamb%02d %02d %02d\r\n", major,
489 isamb->minor_version, len);
491 bf_write(isamb->file[i].bf, pos, 0, 0, hbuf);
493 for (left = len - b_size; left > 0; left = left - b_size)
496 bf_write(isamb->file[i].bf, pos, 0, 0, hbuf + pos*b_size);
499 if (isamb->file[i].bf)
500 bf_close (isamb->file[i].bf);
503 xfree(isamb->method);
507 /* open_block: read one block at pos.
508 Decode leading sys bytes .. consisting of
510 0: leader byte, != 0 leaf, == 0, non-leaf
511 1-2: used size of block
512 3-7*: number of items and all children
514 * Reserve 5 bytes for large block sizes. 1 for small ones .. Number
515 of items. We can thus have at most 2^40 nodes.
517 static struct ISAMB_block *open_block(ISAMB b, ISAM_P pos)
519 int cat = (int) (pos&CAT_MASK);
521 int offset = b->file[cat].head.block_offset;
522 struct ISAMB_block *p;
525 p = xmalloc(sizeof(*p));
527 p->cat = (int) (pos & CAT_MASK);
528 p->buf = xmalloc(b->file[cat].head.block_size);
531 if (!cache_block (b, pos, p->buf, 0))
533 yaz_log(b->log_io, "bf_read: open_block");
534 if (bf_read(b->file[cat].bf, pos/CAT_MAX, 0, 0, p->buf) != 1)
536 yaz_log(YLOG_FATAL, "isamb: read fail for pos=%ld block=%ld",
537 (long) pos, (long) pos/CAT_MAX);
538 zebra_exit("isamb:open_block");
541 p->bytes = (char *)p->buf + offset;
543 p->size = (p->buf[1] + 256 * p->buf[2]) - offset;
546 yaz_log(YLOG_FATAL, "Bad block size %d in pos=" ZINT_FORMAT "\n",
549 assert(p->size >= 0);
550 src = (char*) p->buf + 3;
551 decode_ptr(&src, &p->no_items);
556 p->decodeClientData = (*b->method->codec.start)();
560 struct ISAMB_block *new_block(ISAMB b, int leaf, int cat)
562 struct ISAMB_block *p;
564 p = xmalloc(sizeof(*p));
565 p->buf = xmalloc(b->file[cat].head.block_size);
567 if (!b->file[cat].head.free_list)
570 block_no = b->file[cat].head.last_block++;
571 p->pos = block_no * CAT_MAX + cat;
573 yaz_log(b->log_freelist, "got block "
574 ZINT_FORMAT " from last %d:" ZINT_FORMAT, p->pos,
575 cat, p->pos/CAT_MAX);
579 p->pos = b->file[cat].head.free_list;
580 assert((p->pos & CAT_MASK) == cat);
581 if (!cache_block(b, p->pos, p->buf, 0))
583 yaz_log(b->log_io, "bf_read: new_block");
584 if (!bf_read(b->file[cat].bf, p->pos/CAT_MAX, 0, 0, p->buf))
586 yaz_log(YLOG_FATAL, "isamb: read fail for pos=%ld block=%ld",
587 (long) p->pos/CAT_MAX, (long) p->pos/CAT_MAX);
588 zebra_exit("isamb:new_block");
592 yaz_log(b->log_freelist, "got block "
593 ZINT_FORMAT " from freelist %d:" ZINT_FORMAT, p->pos,
594 cat, p->pos/CAT_MAX);
595 memcpy(&b->file[cat].head.free_list, p->buf, sizeof(zint));
598 b->file[cat].head_dirty = 1;
599 memset(p->buf, 0, b->file[cat].head.block_size);
600 p->bytes = (char*)p->buf + b->file[cat].head.block_offset;
607 p->decodeClientData = (*b->method->codec.start)();
611 struct ISAMB_block *new_leaf(ISAMB b, int cat)
613 return new_block(b, 1, cat);
617 struct ISAMB_block *new_int(ISAMB b, int cat)
619 return new_block(b, 0, cat);
622 static void check_block(ISAMB b, struct ISAMB_block *p)
624 assert(b); /* mostly to make the compiler shut up about unused b */
632 char *startp = p->bytes;
633 const char *src = startp;
634 char *endp = p->bytes + p->size;
636 void *c1 = (*b->method->codec.start)();
638 decode_ptr(&src, &pos);
639 assert((pos&CAT_MASK) == p->cat);
643 char file_item_buf[DST_ITEM_MAX];
644 char *file_item = file_item_buf;
645 (*b->method->codec.reset)(c1);
646 (*b->method->codec.decode)(c1, &file_item, &src);
649 decode_item_len(&src, &item_len);
650 assert(item_len > 0 && item_len < 128);
653 decode_ptr(&src, &pos);
654 if ((pos&CAT_MASK) != p->cat)
656 assert((pos&CAT_MASK) == p->cat);
659 (*b->method->codec.stop)(c1);
663 void close_block(ISAMB b, struct ISAMB_block *p)
669 yaz_log(b->log_freelist, "release block " ZINT_FORMAT " from freelist %d:" ZINT_FORMAT,
670 p->pos, p->cat, p->pos/CAT_MAX);
671 memcpy(p->buf, &b->file[p->cat].head.free_list, sizeof(zint));
672 b->file[p->cat].head.free_list = p->pos;
673 if (!cache_block(b, p->pos, p->buf, 1))
675 yaz_log(b->log_io, "bf_write: close_block (deleted)");
676 bf_write(b->file[p->cat].bf, p->pos/CAT_MAX, 0, 0, p->buf);
681 int offset = b->file[p->cat].head.block_offset;
682 int size = p->size + offset;
683 char *dst = (char*)p->buf + 3;
684 assert(p->size >= 0);
686 /* memset becuase encode_ptr usually does not write all bytes */
687 memset(p->buf, 0, b->file[p->cat].head.block_offset);
689 p->buf[1] = size & 255;
690 p->buf[2] = size >> 8;
691 encode_ptr(&dst, p->no_items);
693 if (!cache_block(b, p->pos, p->buf, 1))
695 yaz_log(b->log_io, "bf_write: close_block");
696 bf_write(b->file[p->cat].bf, p->pos/CAT_MAX, 0, 0, p->buf);
699 (*b->method->codec.stop)(p->decodeClientData);
704 int insert_sub(ISAMB b, struct ISAMB_block **p,
705 void *new_item, int *mode,
707 struct ISAMB_block **sp,
708 void *sub_item, int *sub_size,
709 const void *max_item);
711 int insert_int(ISAMB b, struct ISAMB_block *p, void *lookahead_item,
713 ISAMC_I *stream, struct ISAMB_block **sp,
714 void *split_item, int *split_size, const void *last_max_item)
716 char *startp = p->bytes;
717 const char *src = startp;
718 char *endp = p->bytes + p->size;
720 struct ISAMB_block *sub_p1 = 0, *sub_p2 = 0;
721 char sub_item[DST_ITEM_MAX];
725 void *c1 = (*b->method->codec.start)();
729 assert(p->size >= 0);
730 decode_ptr(&src, &pos);
734 const char *src0 = src;
736 char file_item_buf[DST_ITEM_MAX];
737 char *file_item = file_item_buf;
738 (*b->method->codec.reset)(c1);
739 (*b->method->codec.decode)(c1, &file_item, &src);
740 d = (*b->method->compare_item)(file_item_buf, lookahead_item);
743 sub_p1 = open_block(b, pos);
745 diff_terms -= sub_p1->no_items;
746 more = insert_sub(b, &sub_p1, lookahead_item, mode,
748 sub_item, &sub_size, file_item_buf);
749 diff_terms += sub_p1->no_items;
755 decode_item_len(&src, &item_len);
756 d = (*b->method->compare_item)(src, lookahead_item);
759 sub_p1 = open_block(b, pos);
761 diff_terms -= sub_p1->no_items;
762 more = insert_sub(b, &sub_p1, lookahead_item, mode,
764 sub_item, &sub_size, src);
765 diff_terms += sub_p1->no_items;
771 decode_ptr(&src, &pos);
775 /* we reached the end. So lookahead > last item */
776 sub_p1 = open_block(b, pos);
778 diff_terms -= sub_p1->no_items;
779 more = insert_sub(b, &sub_p1, lookahead_item, mode, stream, &sub_p2,
780 sub_item, &sub_size, last_max_item);
781 diff_terms += sub_p1->no_items;
784 diff_terms += sub_p2->no_items;
788 p->no_items += diff_terms;
792 /* there was a split - must insert pointer in this one */
793 char dst_buf[DST_BUF_SIZE];
796 const char *sub_item_ptr = sub_item;
798 assert(sub_size < DST_ITEM_MAX && sub_size > 1);
800 memcpy(dst, startp, src - startp);
805 (*b->method->codec.reset)(c1);
806 (*b->method->codec.encode)(c1, &dst, &sub_item_ptr);
808 encode_item_len(&dst, sub_size); /* sub length and item */
809 memcpy(dst, sub_item, sub_size);
813 encode_ptr(&dst, sub_p2->pos); /* pos */
815 if (endp - src) /* remaining data */
817 memcpy(dst, src, endp - src);
820 p->size = dst - dst_buf;
821 assert(p->size >= 0);
823 if (p->size <= b->file[p->cat].head.block_max)
825 /* it fits OK in this block */
826 memcpy(startp, dst_buf, dst - dst_buf);
828 close_block(b, sub_p2);
832 /* must split _this_ block as well .. */
833 struct ISAMB_block *sub_p3;
835 char file_item_buf[DST_ITEM_MAX];
836 char *file_item = file_item_buf;
840 zint no_items_first_half = 0;
846 b->number_of_int_splits++;
849 close_block(b, sub_p2);
851 half = src + b->file[p->cat].head.block_size/2;
852 decode_ptr(&src, &pos);
854 if (b->enable_int_count)
856 /* read sub block so we can get no_items for it */
857 sub_p3 = open_block(b, pos);
858 no_items_first_half += sub_p3->no_items;
859 close_block(b, sub_p3);
865 file_item = file_item_buf;
866 (*b->method->codec.reset)(c1);
867 (*b->method->codec.decode)(c1, &file_item, &src);
869 decode_item_len(&src, &split_size_tmp);
870 *split_size = (int) split_size_tmp;
873 decode_ptr(&src, &pos);
875 if (b->enable_int_count)
877 /* read sub block so we can get no_items for it */
878 sub_p3 = open_block(b, pos);
879 no_items_first_half += sub_p3->no_items;
880 close_block(b, sub_p3);
883 /* p is first half */
884 p_new_size = src - dst_buf;
885 memcpy(p->bytes, dst_buf, p_new_size);
888 file_item = file_item_buf;
889 (*b->method->codec.reset)(c1);
890 (*b->method->codec.decode)(c1, &file_item, &src);
891 *split_size = file_item - file_item_buf;
892 memcpy(split_item, file_item_buf, *split_size);
894 decode_item_len(&src, &split_size_tmp);
895 *split_size = (int) split_size_tmp;
896 memcpy(split_item, src, *split_size);
899 /* *sp is second half */
900 *sp = new_int(b, p->cat);
901 (*sp)->size = endp - src;
902 memcpy((*sp)->bytes, src, (*sp)->size);
904 p->size = p_new_size;
906 /* adjust no_items in first&second half */
907 (*sp)->no_items = p->no_items - no_items_first_half;
908 p->no_items = no_items_first_half;
912 close_block(b, sub_p1);
913 (*b->method->codec.stop)(c1);
917 int insert_leaf(ISAMB b, struct ISAMB_block **sp1, void *lookahead_item,
918 int *lookahead_mode, ISAMC_I *stream,
919 struct ISAMB_block **sp2,
920 void *sub_item, int *sub_size,
921 const void *max_item)
923 struct ISAMB_block *p = *sp1;
926 char dst_buf[DST_BUF_SIZE], *dst = dst_buf;
928 void *c1 = (*b->method->codec.start)();
929 void *c2 = (*b->method->codec.start)();
931 int quater = b->file[b->no_cat-1].head.block_max / 4;
932 char *mid_cut = dst_buf + quater * 2;
933 char *tail_cut = dst_buf + quater * 3;
934 char *maxp = dst_buf + b->file[b->no_cat-1].head.block_max;
937 char cut_item_buf[DST_ITEM_MAX];
938 int cut_item_size = 0;
939 int no_items = 0; /* number of items (total) */
940 int no_items_1 = 0; /* number of items (first half) */
941 int inserted_dst_bytes = 0;
945 char file_item_buf[DST_ITEM_MAX];
946 char *file_item = file_item_buf;
949 endp = p->bytes + p->size;
950 (*b->method->codec.decode)(c1, &file_item, &src);
953 const char *dst_item = 0; /* resulting item to be inserted */
954 char *lookahead_next;
959 d = (*b->method->compare_item)(file_item_buf, lookahead_item);
961 /* d now holds comparison between existing file item and
964 d > 0: lookahead before file
965 d < 0: lookahead after file
969 /* lookahead must be inserted */
970 dst_item = lookahead_item;
971 /* if this is not an insertion, it's really bad .. */
972 if (!*lookahead_mode)
974 yaz_log(YLOG_WARN, "isamb: Inconsistent register (1)");
975 assert(*lookahead_mode);
978 else if (d == 0 && *lookahead_mode == 2)
980 /* For mode == 2, we insert the new key anyway - even
981 though the comparison is 0. */
982 dst_item = lookahead_item;
986 dst_item = file_item_buf;
988 if (d == 0 && !*lookahead_mode)
990 /* it's a deletion and they match so there is nothing
991 to be inserted anyway .. But mark the thing dirty
992 (file item was part of input.. The item will not be
996 else if (!half1 && dst > mid_cut)
998 /* we have reached the splitting point for the first time */
999 const char *dst_item_0 = dst_item;
1000 half1 = dst; /* candidate for splitting */
1002 /* encode the resulting item */
1003 (*b->method->codec.encode)(c2, &dst, &dst_item);
1005 cut_item_size = dst_item - dst_item_0;
1006 assert(cut_item_size > 0);
1007 memcpy(cut_item_buf, dst_item_0, cut_item_size);
1010 no_items_1 = no_items;
1015 /* encode the resulting item */
1016 (*b->method->codec.encode)(c2, &dst, &dst_item);
1020 /* now move "pointers" .. result has been encoded .. */
1023 /* we must move the lookahead pointer */
1025 inserted_dst_bytes += (dst - dst_0);
1026 if (inserted_dst_bytes >= quater)
1027 /* no more room. Mark lookahead as "gone".. */
1031 /* move it really.. */
1032 lookahead_next = lookahead_item;
1033 if (!(*stream->read_item)(stream->clientData,
1037 /* end of stream reached: no "more" and no lookahead */
1041 if (lookahead_item && max_item &&
1042 (*b->method->compare_item)(max_item, lookahead_item) <= 0)
1044 /* the lookahead goes beyond what we allow in this
1045 leaf. Mark it as "gone" */
1054 /* exact match .. move both pointers */
1056 lookahead_next = lookahead_item;
1057 if (!(*stream->read_item)(stream->clientData,
1058 &lookahead_next, lookahead_mode))
1064 break; /* end of file stream reached .. */
1065 file_item = file_item_buf; /* move file pointer */
1066 (*b->method->codec.decode)(c1, &file_item, &src);
1070 /* file pointer must be moved */
1073 file_item = file_item_buf;
1074 (*b->method->codec.decode)(c1, &file_item, &src);
1079 /* this loop runs when we are "appending" to a leaf page. That is
1080 either it's empty (new) or all file items have been read in
1083 maxp = dst_buf + b->file[b->no_cat-1].head.block_max + quater;
1084 while (lookahead_item)
1087 const char *src = lookahead_item;
1090 /* if we have a lookahead item, we stop if we exceed the value of it */
1092 (*b->method->compare_item)(max_item, lookahead_item) <= 0)
1094 /* stop if we have reached the value of max item */
1097 if (!*lookahead_mode)
1099 /* this is append. So a delete is bad */
1100 yaz_log(YLOG_WARN, "isamb: Inconsistent register (2)");
1101 assert(*lookahead_mode);
1103 else if (!half1 && dst > tail_cut)
1105 const char *src_0 = src;
1106 half1 = dst; /* candidate for splitting */
1108 (*b->method->codec.encode)(c2, &dst, &src);
1110 cut_item_size = src - src_0;
1111 assert(cut_item_size > 0);
1112 memcpy(cut_item_buf, src_0, cut_item_size);
1114 no_items_1 = no_items;
1118 (*b->method->codec.encode)(c2, &dst, &src);
1128 dst_item = lookahead_item;
1129 if (!(*stream->read_item)(stream->clientData, &dst_item,
1136 new_size = dst - dst_buf;
1137 if (p && p->cat != b->no_cat-1 &&
1138 new_size > b->file[p->cat].head.block_max)
1140 /* non-btree block will be removed */
1143 /* delete it too!! */
1144 p = 0; /* make a new one anyway */
1147 { /* must create a new one */
1149 for (i = 0; i < b->no_cat; i++)
1150 if (new_size <= b->file[i].head.block_max)
1156 if (new_size > b->file[p->cat].head.block_max)
1159 const char *cut_item = cut_item_buf;
1164 assert(cut_item_size > 0);
1167 p->size = half1 - dst_buf;
1168 assert(p->size <= b->file[p->cat].head.block_max);
1169 memcpy(p->bytes, dst_buf, half1 - dst_buf);
1170 p->no_items = no_items_1;
1173 *sp2 = new_leaf(b, p->cat);
1175 (*b->method->codec.reset)(c2);
1177 b->number_of_leaf_splits++;
1179 first_dst = (*sp2)->bytes;
1181 (*b->method->codec.encode)(c2, &first_dst, &cut_item);
1183 memcpy(first_dst, half2, dst - half2);
1185 (*sp2)->size = (first_dst - (*sp2)->bytes) + (dst - half2);
1186 assert((*sp2)->size <= b->file[p->cat].head.block_max);
1187 (*sp2)->no_items = no_items - no_items_1;
1190 memcpy(sub_item, cut_item_buf, cut_item_size);
1191 *sub_size = cut_item_size;
1195 memcpy(p->bytes, dst_buf, dst - dst_buf);
1197 p->no_items = no_items;
1199 (*b->method->codec.stop)(c1);
1200 (*b->method->codec.stop)(c2);
1205 int insert_sub(ISAMB b, struct ISAMB_block **p, void *new_item,
1208 struct ISAMB_block **sp,
1209 void *sub_item, int *sub_size,
1210 const void *max_item)
1212 if (!*p || (*p)->leaf)
1213 return insert_leaf(b, p, new_item, mode, stream, sp, sub_item,
1214 sub_size, max_item);
1216 return insert_int(b, *p, new_item, mode, stream, sp, sub_item,
1217 sub_size, max_item);
1220 int isamb_unlink(ISAMB b, ISAM_P pos)
1222 struct ISAMB_block *p1;
1226 p1 = open_block(b, pos);
1231 const char *src = p1->bytes + p1->offset;
1233 void *c1 = (*b->method->codec.start)();
1235 decode_ptr(&src, &sub_p);
1236 isamb_unlink(b, sub_p);
1238 while (src != p1->bytes + p1->size)
1241 char file_item_buf[DST_ITEM_MAX];
1242 char *file_item = file_item_buf;
1243 (*b->method->codec.reset)(c1);
1244 (*b->method->codec.decode)(c1, &file_item, &src);
1247 decode_item_len(&src, &item_len);
1250 decode_ptr(&src, &sub_p);
1251 isamb_unlink(b, sub_p);
1254 (*b->method->codec.stop)(c1);
1261 void isamb_merge(ISAMB b, ISAM_P *pos, ISAMC_I *stream)
1263 char item_buf[DST_ITEM_MAX];
1267 int must_delete = 0;
1274 item_ptr = item_buf;
1276 (*stream->read_item)(stream->clientData, &item_ptr, &i_mode);
1281 item_ptr = item_buf;
1282 more = (*stream->read_item)(stream->clientData, &item_ptr, &i_mode);
1285 struct ISAMB_block *p = 0, *sp = 0;
1286 char sub_item[DST_ITEM_MAX];
1290 p = open_block(b, *pos);
1291 more = insert_sub(b, &p, item_buf, &i_mode, stream, &sp,
1292 sub_item, &sub_size, 0);
1294 { /* increase level of tree by one */
1295 struct ISAMB_block *p2 = new_int(b, p->cat);
1296 char *dst = p2->bytes + p2->size;
1298 void *c1 = (*b->method->codec.start)();
1299 const char *sub_item_ptr = sub_item;
1302 encode_ptr(&dst, p->pos);
1303 assert(sub_size < DST_ITEM_MAX && sub_size > 1);
1305 (*b->method->codec.reset)(c1);
1306 (*b->method->codec.encode)(c1, &dst, &sub_item_ptr);
1308 encode_item_len(&dst, sub_size);
1309 memcpy(dst, sub_item, sub_size);
1312 encode_ptr(&dst, sp->pos);
1314 p2->size = dst - p2->bytes;
1315 p2->no_items = p->no_items + sp->no_items;
1316 *pos = p2->pos; /* return new super page */
1320 (*b->method->codec.stop)(c1);
1325 *pos = p->pos; /* return current one (again) */
1327 if (p->no_items == 0)
1335 isamb_unlink(b, *pos);
1340 ISAMB_PP isamb_pp_open_x(ISAMB isamb, ISAM_P pos, int *level, int scope)
1342 ISAMB_PP pp = xmalloc(sizeof(*pp));
1348 pp->block = xmalloc(ISAMB_MAX_LEVEL * sizeof(*pp->block));
1355 pp->skipped_numbers = 0;
1356 pp->returned_numbers = 0;
1358 for (i = 0; i<ISAMB_MAX_LEVEL; i++)
1359 pp->skipped_nodes[i] = pp->accessed_nodes[i] = 0;
1362 struct ISAMB_block *p = open_block(isamb, pos);
1363 const char *src = p->bytes + p->offset;
1364 pp->block[pp->level] = p;
1366 pp->total_size += p->size;
1370 decode_ptr(&src, &pos);
1371 p->offset = src - p->bytes;
1373 pp->accessed_nodes[pp->level]++;
1375 pp->block[pp->level+1] = 0;
1376 pp->maxlevel = pp->level;
1382 ISAMB_PP isamb_pp_open(ISAMB isamb, ISAM_P pos, int scope)
1384 return isamb_pp_open_x(isamb, pos, 0, scope);
1387 void isamb_pp_close_x(ISAMB_PP pp, zint *size, zint *blocks)
1392 yaz_log(YLOG_DEBUG, "isamb_pp_close lev=%d returned "ZINT_FORMAT" values, "
1393 "skipped "ZINT_FORMAT,
1394 pp->maxlevel, pp->skipped_numbers, pp->returned_numbers);
1395 for (i = pp->maxlevel; i>=0; i--)
1396 if (pp->skipped_nodes[i] || pp->accessed_nodes[i])
1397 yaz_log(YLOG_DEBUG, "isamb_pp_close level leaf-%d: "
1398 ZINT_FORMAT" read, "ZINT_FORMAT" skipped", i,
1399 pp->accessed_nodes[i], pp->skipped_nodes[i]);
1400 pp->isamb->skipped_numbers += pp->skipped_numbers;
1401 pp->isamb->returned_numbers += pp->returned_numbers;
1402 for (i = pp->maxlevel; i>=0; i--)
1404 pp->isamb->accessed_nodes[i] += pp->accessed_nodes[i];
1405 pp->isamb->skipped_nodes[i] += pp->skipped_nodes[i];
1408 *size = pp->total_size;
1410 *blocks = pp->no_blocks;
1411 for (i = 0; i <= pp->level; i++)
1412 close_block(pp->isamb, pp->block[i]);
1417 int isamb_block_info(ISAMB isamb, int cat)
1419 if (cat >= 0 && cat < isamb->no_cat)
1420 return isamb->file[cat].head.block_size;
1424 void isamb_pp_close(ISAMB_PP pp)
1426 isamb_pp_close_x(pp, 0, 0);
1429 /* simple recursive dumper .. */
1430 static void isamb_dump_r(ISAMB b, ISAM_P pos, void (*pr)(const char *str),
1434 char prefix_str[1024];
1437 struct ISAMB_block *p = open_block(b, pos);
1438 sprintf(prefix_str, "%*s " ZINT_FORMAT " cat=%d size=%d max=%d items="
1439 ZINT_FORMAT, level*2, "",
1440 pos, p->cat, p->size, b->file[p->cat].head.block_max,
1443 sprintf(prefix_str, "%*s " ZINT_FORMAT, level*2, "", pos);
1446 while (p->offset < p->size)
1448 const char *src = p->bytes + p->offset;
1450 (*b->method->codec.decode)(p->decodeClientData, &dst, &src);
1451 (*b->method->log_item)(YLOG_DEBUG, buf, prefix_str);
1452 p->offset = src - (char*) p->bytes;
1454 assert(p->offset == p->size);
1458 const char *src = p->bytes + p->offset;
1461 decode_ptr(&src, &sub);
1462 p->offset = src - (char*) p->bytes;
1464 isamb_dump_r(b, sub, pr, level+1);
1466 while (p->offset < p->size)
1469 char file_item_buf[DST_ITEM_MAX];
1470 char *file_item = file_item_buf;
1471 void *c1 = (*b->method->codec.start)();
1472 (*b->method->codec.decode)(c1, &file_item, &src);
1473 (*b->method->codec.stop)(c1);
1474 (*b->method->log_item)(YLOG_DEBUG, file_item_buf, prefix_str);
1477 decode_item_len(&src, &item_len);
1478 (*b->method->log_item)(YLOG_DEBUG, src, prefix_str);
1481 decode_ptr(&src, &sub);
1483 p->offset = src - (char*) p->bytes;
1485 isamb_dump_r(b, sub, pr, level+1);
1492 void isamb_dump(ISAMB b, ISAM_P pos, void (*pr)(const char *str))
1494 isamb_dump_r(b, pos, pr, 0);
1497 int isamb_pp_read(ISAMB_PP pp, void *buf)
1499 return isamb_pp_forward(pp, buf, 0);
1503 void isamb_pp_pos(ISAMB_PP pp, double *current, double *total)
1504 { /* return an estimate of the current position and of the total number of */
1505 /* occureences in the isam tree, based on the current leaf */
1509 /* if end-of-stream PP may not be leaf */
1511 *total = (double) (pp->block[0]->no_items);
1512 *current = (double) pp->returned_numbers;
1514 yaz_log(YLOG_LOG, "isamb_pp_pos returning: cur= %0.1f tot=%0.1f rn="
1515 ZINT_FORMAT, *current, *total, pp->returned_numbers);
1519 int isamb_pp_forward(ISAMB_PP pp, void *buf, const void *untilb)
1523 struct ISAMB_block *p = pp->block[pp->level];
1524 ISAMB b = pp->isamb;
1528 while (p->offset == p->size)
1534 char file_item_buf[DST_ITEM_MAX];
1535 char *file_item = file_item_buf;
1539 while (p->offset == p->size)
1543 close_block(pp->isamb, pp->block[pp->level]);
1544 pp->block[pp->level] = 0;
1546 p = pp->block[pp->level];
1551 src = p->bytes + p->offset;
1554 c1 = (*b->method->codec.start)();
1555 (*b->method->codec.decode)(c1, &file_item, &src);
1557 decode_ptr(&src, &item_len);
1560 decode_ptr(&src, &pos);
1561 p->offset = src - (char*) p->bytes;
1563 src = p->bytes + p->offset;
1567 if (!untilb || p->offset == p->size)
1569 assert(p->offset < p->size);
1572 file_item = file_item_buf;
1573 (*b->method->codec.reset)(c1);
1574 (*b->method->codec.decode)(c1, &file_item, &src);
1575 if ((*b->method->compare_item)(untilb, file_item_buf) < pp->scope)
1581 decode_item_len(&src, &item_len);
1582 if ((*b->method->compare_item)(untilb, src) < pp->scope)
1586 decode_ptr(&src, &pos);
1587 p->offset = src - (char*) p->bytes;
1594 pp->block[pp->level] = p = open_block(pp->isamb, pos);
1596 pp->total_size += p->size;
1604 src = p->bytes + p->offset;
1607 decode_ptr(&src, &pos);
1608 p->offset = src - (char*) p->bytes;
1610 if (!untilb || p->offset == p->size)
1612 assert(p->offset < p->size);
1615 file_item = file_item_buf;
1616 (*b->method->codec.reset)(c1);
1617 (*b->method->codec.decode)(c1, &file_item, &src);
1618 if ((*b->method->compare_item)(untilb, file_item_buf) < pp->scope)
1624 decode_ptr(&src, &item_len);
1625 if ((*b->method->compare_item)(untilb, src) <= pp->scope)
1633 (*b->method->codec.stop)(c1);
1636 assert(p->offset < p->size);
1641 src = p->bytes + p->offset;
1642 (*pp->isamb->method->codec.decode)(p->decodeClientData, &dst, &src);
1643 p->offset = src - (char*) p->bytes;
1644 if (!untilb || (*pp->isamb->method->compare_item)(untilb, dst0) < pp->scope)
1647 if (p->offset == p->size) goto again;
1649 pp->returned_numbers++;
1653 zint isamb_get_int_splits(ISAMB b)
1655 return b->number_of_int_splits;
1658 zint isamb_get_leaf_splits(ISAMB b)
1660 return b->number_of_leaf_splits;
1663 zint isamb_get_root_ptr(ISAMB b)
1668 void isamb_set_root_ptr(ISAMB b, zint root_ptr)
1670 b->root_ptr = root_ptr;
1677 * c-file-style: "Stroustrup"
1678 * indent-tabs-mode: nil
1680 * vim: shiftwidth=4 tabstop=8 expandtab