Prelimiary work on bug 2338.
[idzebra-moved-to-github.git] / index / records.c
1 /* This file is part of the Zebra server.
2    Copyright (C) 1995-2008 Index Data
3
4 Zebra is free software; you can redistribute it and/or modify it under
5 the terms of the GNU General Public License as published by the Free
6 Software Foundation; either version 2, or (at your option) any later
7 version.
8
9 Zebra is distributed in the hope that it will be useful, but WITHOUT ANY
10 WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12 for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
17
18 */
19
20 /*
21  *  Format of first block
22  *      next       (8 bytes)
23  *      ref_count  (2 bytes)
24  *      block      (500 bytes)
25  *
26  *  Format of subsequent blocks 
27  *      next  (8 bytes)
28  *      block (502 bytes)
29  *
30  *  Format of each record
31  *      sysno
32  *      (length, data) - pairs
33  *      length = 0 if same as previous
34  */
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <assert.h>
38 #include <string.h>
39
40 #include <yaz/yaz-util.h>
41 #include <idzebra/bfile.h>
42 #include "recindex.h"
43
44 #if HAVE_BZLIB_H
45 #include <bzlib.h>
46 #endif
47
48 #define REC_BLOCK_TYPES 2
49 #define REC_HEAD_MAGIC "recindex"
50 #define REC_VERSION 5
51
52 struct records_info {
53     int rw;
54     int compression_method;
55
56     recindex_t recindex;
57
58     char *data_fname[REC_BLOCK_TYPES];
59     BFile data_BFile[REC_BLOCK_TYPES];
60
61     char *tmp_buf;
62     int tmp_size;
63
64     struct record_cache_entry *record_cache;
65     int cache_size;
66     int cache_cur;
67     int cache_max;
68
69     int compression_chunk_size;
70
71     Zebra_mutex mutex;
72
73     struct records_head {
74         char magic[8];
75         char version[4];
76         zint block_size[REC_BLOCK_TYPES];
77         zint block_free[REC_BLOCK_TYPES];
78         zint block_last[REC_BLOCK_TYPES];
79         zint block_used[REC_BLOCK_TYPES];
80         zint block_move[REC_BLOCK_TYPES];
81
82         zint total_bytes;
83         zint index_last;
84         zint index_free;
85         zint no_records;
86
87     } head;
88 };
89
90 enum recordCacheFlag { recordFlagNop, recordFlagWrite, recordFlagNew,
91                        recordFlagDelete };
92
93 struct record_cache_entry {
94     Record rec;
95     enum recordCacheFlag flag;
96 };
97
98 struct record_index_entry {
99     zint next;         /* first block of record info / next free entry */
100     int size;          /* size of record or 0 if free entry */
101 };
102
103 Record rec_cp(Record rec);
104
105 /* Modify argument to if below: 1=normal, 0=sysno testing */
106 #if 1
107 /* If this is used sysno are not converted (no testing) */
108 #define FAKE_OFFSET 0
109 #define USUAL_RANGE 6000000000LL
110
111 #else
112 /* Use a fake > 2^32 offset so we can test for proper 64-bit handling */
113 #define FAKE_OFFSET 6000000000LL
114 #define USUAL_RANGE 2000000000LL
115 #endif
116
117 static zint rec_sysno_to_ext(zint sysno)
118 {
119     assert(sysno >= 0 && sysno <= USUAL_RANGE);
120     return sysno + FAKE_OFFSET;
121 }
122
123 zint rec_sysno_to_int(zint sysno)
124 {
125     assert(sysno >= FAKE_OFFSET && sysno <= FAKE_OFFSET + USUAL_RANGE);
126     return sysno - FAKE_OFFSET;
127 }
128
129 static void rec_tmp_expand(Records p, int size)
130 {
131     if (p->tmp_size < size + 2048 ||
132         p->tmp_size < p->head.block_size[REC_BLOCK_TYPES-1]*2)
133     {
134         xfree(p->tmp_buf);
135         p->tmp_size = size + (int)
136                         (p->head.block_size[REC_BLOCK_TYPES-1])*2 + 2048;
137         p->tmp_buf = (char *) xmalloc(p->tmp_size);
138     }
139 }
140
141 static ZEBRA_RES rec_release_blocks(Records p, zint sysno)
142 {
143     struct record_index_entry entry;
144     zint freeblock;
145     char block_and_ref[sizeof(zint) + sizeof(short)];
146     int dst_type;
147     int first = 1;
148
149     if (recindex_read_indx(p->recindex, sysno, &entry, sizeof(entry), 1) != 1)
150         return ZEBRA_FAIL;
151
152     freeblock = entry.next;
153     assert(freeblock > 0);
154     dst_type = CAST_ZINT_TO_INT(freeblock & 7);
155     assert(dst_type < REC_BLOCK_TYPES);
156     freeblock = freeblock / 8;
157     while (freeblock)
158     {
159         if (bf_read(p->data_BFile[dst_type], freeblock, 0,
160                      first ? sizeof(block_and_ref) : sizeof(zint),
161                      block_and_ref) != 1)
162         {
163             yaz_log(YLOG_FATAL|YLOG_ERRNO, "read in rec_del_single");
164             return ZEBRA_FAIL;
165         }
166         if (first)
167         {
168             short ref;
169             memcpy(&ref, block_and_ref + sizeof(freeblock), sizeof(ref));
170             --ref;
171             memcpy(block_and_ref + sizeof(freeblock), &ref, sizeof(ref));
172             if (ref)
173             {
174                 if (bf_write(p->data_BFile[dst_type], freeblock, 0,
175                               sizeof(block_and_ref), block_and_ref))
176                 {
177                     yaz_log(YLOG_FATAL|YLOG_ERRNO, "write in rec_del_single");
178                     return ZEBRA_FAIL;
179                 }
180                 return ZEBRA_OK;
181             }
182             first = 0;
183         }
184         
185         if (bf_write(p->data_BFile[dst_type], freeblock, 0, sizeof(freeblock),
186                       &p->head.block_free[dst_type]))
187         {
188             yaz_log(YLOG_FATAL|YLOG_ERRNO, "write in rec_del_single");
189             return ZEBRA_FAIL;
190         }
191         p->head.block_free[dst_type] = freeblock;
192         memcpy(&freeblock, block_and_ref, sizeof(freeblock));
193
194         p->head.block_used[dst_type]--;
195     }
196     p->head.total_bytes -= entry.size;
197     return ZEBRA_OK;
198 }
199
200 static ZEBRA_RES rec_delete_single(Records p, Record rec)
201 {
202     struct record_index_entry entry;
203
204     /* all data in entry must be reset, since it's written verbatim */
205     memset(&entry, '\0', sizeof(entry));
206     if (rec_release_blocks(p, rec_sysno_to_int(rec->sysno)) != ZEBRA_OK)
207         return ZEBRA_FAIL;
208
209     yaz_log(YLOG_LOG, "rec_delete_single sysno=" ZINT_FORMAT, rec->sysno);
210     entry.next = p->head.index_free;
211     entry.size = 0;
212     p->head.index_free = rec_sysno_to_int(rec->sysno);
213     recindex_write_indx(p->recindex, rec_sysno_to_int(rec->sysno), &entry, sizeof(entry));
214     return ZEBRA_OK;
215 }
216
217 static ZEBRA_RES rec_write_tmp_buf(Records p, int size, zint *sysnos)
218 {
219     struct record_index_entry entry;
220     int no_written = 0;
221     char *cptr = p->tmp_buf;
222     zint block_prev = -1, block_free;
223     int dst_type = 0;
224     int i;
225
226     /* all data in entry must be reset, since it's written verbatim */
227     memset(&entry, '\0', sizeof(entry));
228
229     for (i = 1; i<REC_BLOCK_TYPES; i++)
230         if (size >= p->head.block_move[i])
231             dst_type = i;
232     while (no_written < size)
233     {
234         block_free = p->head.block_free[dst_type];
235         if (block_free)
236         {
237             if (bf_read(p->data_BFile[dst_type],
238                          block_free, 0, sizeof(*p->head.block_free),
239                          &p->head.block_free[dst_type]) != 1)
240             {
241                 yaz_log(YLOG_FATAL|YLOG_ERRNO, "read in %s at free block "
242                          ZINT_FORMAT,
243                          p->data_fname[dst_type], block_free);
244                 return ZEBRA_FAIL;
245             }
246         }
247         else
248             block_free = p->head.block_last[dst_type]++;
249         if (block_prev == -1)
250         {
251             entry.next = block_free*8 + dst_type;
252             entry.size = size;
253             p->head.total_bytes += size;
254             while (*sysnos > 0)
255             {
256                 recindex_write_indx(p->recindex, *sysnos, &entry, sizeof(entry));
257                 sysnos++;
258             }
259         }
260         else
261         {
262             memcpy(cptr, &block_free, sizeof(block_free));
263             bf_write(p->data_BFile[dst_type], block_prev, 0, 0, cptr);
264             cptr = p->tmp_buf + no_written;
265         }
266         block_prev = block_free;
267         no_written += CAST_ZINT_TO_INT(p->head.block_size[dst_type]) 
268             - sizeof(zint);
269         p->head.block_used[dst_type]++;
270     }
271     assert(block_prev != -1);
272     block_free = 0;
273     memcpy(cptr, &block_free, sizeof(block_free));
274     bf_write(p->data_BFile[dst_type], block_prev, 0,
275               sizeof(block_free) + (p->tmp_buf+size) - cptr, cptr);
276     return ZEBRA_OK;
277 }
278
279 Records rec_open(BFiles bfs, int rw, int compression_method)
280 {
281     Records p;
282     int i, r;
283     int version;
284     ZEBRA_RES ret = ZEBRA_OK;
285
286     p = (Records) xmalloc(sizeof(*p));
287     memset(&p->head, '\0', sizeof(p->head));
288     p->compression_method = compression_method;
289     p->rw = rw;
290     p->tmp_size = 1024;
291     p->tmp_buf = (char *) xmalloc(p->tmp_size);
292     p->compression_chunk_size = 0;
293     p->recindex = recindex_open(bfs, rw, 0 /* 1=isamb for recindex */);
294     r = recindex_read_head(p->recindex, p->tmp_buf);
295     switch (r)
296     {
297     case 0:
298         memcpy(p->head.magic, REC_HEAD_MAGIC, sizeof(p->head.magic));
299         sprintf(p->head.version, "%3d", REC_VERSION);
300         p->head.index_free = 0;
301         p->head.index_last = 1;
302         p->head.no_records = 0;
303         p->head.total_bytes = 0;
304         for (i = 0; i<REC_BLOCK_TYPES; i++)
305         {
306             p->head.block_free[i] = 0;
307             p->head.block_last[i] = 1;
308             p->head.block_used[i] = 0;
309         }
310         p->head.block_size[0] = 128;
311         p->head.block_move[0] = 0;
312         for (i = 1; i<REC_BLOCK_TYPES; i++)
313         {
314             p->head.block_size[i] = p->head.block_size[i-1] * 4;
315             p->head.block_move[i] = p->head.block_size[i] * 24;
316         }
317         if (rw)
318         {
319             if (recindex_write_head(p->recindex, 
320                                     &p->head, sizeof(p->head)) != ZEBRA_OK)
321                 ret = ZEBRA_FAIL;
322         }
323         break;
324     case 1:
325         memcpy(&p->head, p->tmp_buf, sizeof(p->head));
326         if (memcmp(p->head.magic, REC_HEAD_MAGIC, sizeof(p->head.magic)))
327         {
328             yaz_log(YLOG_FATAL, "file %s has bad format",
329                     recindex_get_fname(p->recindex));
330             ret = ZEBRA_FAIL;
331         }
332         version = atoi(p->head.version);
333         if (version != REC_VERSION)
334         {
335             yaz_log(YLOG_FATAL, "file %s is version %d, but version"
336                   " %d is required",
337                     recindex_get_fname(p->recindex), version, REC_VERSION);
338             ret = ZEBRA_FAIL;
339         }
340         p->compression_chunk_size = 90000; /* good for BZIP2 */
341         break;
342     }
343     for (i = 0; i<REC_BLOCK_TYPES; i++)
344     {
345         char str[80];
346         sprintf(str, "recd%c", i + 'A');
347         p->data_fname[i] = (char *) xmalloc(strlen(str)+1);
348         strcpy(p->data_fname[i], str);
349         p->data_BFile[i] = NULL;
350     }
351     for (i = 0; i<REC_BLOCK_TYPES; i++)
352     {
353         if (!(p->data_BFile[i] =
354               bf_open(bfs, p->data_fname[i],
355                       CAST_ZINT_TO_INT(p->head.block_size[i]), rw)))
356         {
357             yaz_log(YLOG_FATAL|YLOG_ERRNO, "bf_open %s", p->data_fname[i]);
358             ret = ZEBRA_FAIL;
359             break;
360         }
361     }
362     p->cache_max = 400;
363     p->cache_cur = 0;
364     p->record_cache = (struct record_cache_entry *)
365         xmalloc(sizeof(*p->record_cache)*p->cache_max);
366     zebra_mutex_init(&p->mutex);
367     if (ret == ZEBRA_FAIL)
368         rec_close(&p);
369     return p;
370 }
371
372 static void rec_encode_unsigned(unsigned n, unsigned char *buf, int *len)
373 {
374     (*len) = 0;
375     while (n > 127)
376     {
377         buf[*len] = 128 + (n & 127);
378         n = n >> 7;
379         (*len)++;
380     }
381     buf[*len] = n;
382     (*len)++;
383 }
384
385 static void rec_decode_unsigned(unsigned *np, unsigned char *buf, int *len)
386 {
387     unsigned n = 0;
388     unsigned w = 1;
389     (*len) = 0;
390
391     while (buf[*len] > 127)
392     {
393         n += w*(buf[*len] & 127);
394         w = w << 7;
395         (*len)++;
396     }
397     n += w * buf[*len];
398     (*len)++;
399     *np = n;
400 }
401
402 static void rec_encode_zint(zint n, unsigned char *buf, int *len)
403 {
404     (*len) = 0;
405     while (n > 127)
406     {
407         buf[*len] = (unsigned) (128 + (n & 127));
408         n = n >> 7;
409         (*len)++;
410     }
411     buf[*len] = (unsigned) n;
412     (*len)++;
413 }
414
415 static void rec_decode_zint(zint *np, unsigned char *buf, int *len)
416 {
417     zint  n = 0;
418     zint w = 1;
419     (*len) = 0;
420
421     while (buf[*len] > 127)
422     {
423         n += w*(buf[*len] & 127);
424         w = w << 7;
425         (*len)++;
426     }
427     n += w * buf[*len];
428     (*len)++;
429     *np = n;
430 }
431
432 static void rec_cache_flush_block1(Records p, Record rec, Record last_rec,
433                                    char **out_buf, int *out_size,
434                                    int *out_offset)
435 {
436     int i;
437     int len;
438
439     for (i = 0; i<REC_NO_INFO; i++)
440     {
441         if (*out_offset + CAST_ZINT_TO_INT(rec->size[i]) + 20 > *out_size)
442         {
443             int new_size = *out_offset + rec->size[i] + 65536;
444             char *np = (char *) xmalloc(new_size);
445             if (*out_offset)
446                 memcpy(np, *out_buf, *out_offset);
447             xfree(*out_buf);
448             *out_size = new_size;
449             *out_buf = np;
450         }
451         if (i == 0)
452         {
453             rec_encode_zint(rec_sysno_to_int(rec->sysno), 
454                             (unsigned char *) *out_buf + *out_offset, &len);
455             (*out_offset) += len;
456         }
457         if (rec->size[i] == 0)
458         {
459             rec_encode_unsigned(1, (unsigned char *) *out_buf + *out_offset,
460                                 &len);
461             (*out_offset) += len;
462         }
463         else if (last_rec && rec->size[i] == last_rec->size[i] &&
464                  !memcmp(rec->info[i], last_rec->info[i], rec->size[i]))
465         {
466             rec_encode_unsigned(0, (unsigned char *) *out_buf + *out_offset,
467                                 &len);
468             (*out_offset) += len;
469         }
470         else
471         {
472             rec_encode_unsigned(rec->size[i]+1,
473                                 (unsigned char *) *out_buf + *out_offset,
474                                 &len);
475             (*out_offset) += len;
476             memcpy(*out_buf + *out_offset, rec->info[i], rec->size[i]);
477             (*out_offset) += rec->size[i];
478         }
479     }
480 }
481
482 static ZEBRA_RES rec_write_multiple(Records p, int saveCount)
483 {
484     int i;
485     short ref_count = 0;
486     char compression_method;
487     Record last_rec = 0;
488     int out_size = 1000;
489     int out_offset = 0;
490     char *out_buf = (char *) xmalloc(out_size);
491     zint *sysnos = (zint *) xmalloc(sizeof(*sysnos) * (p->cache_cur + 1));
492     zint *sysnop = sysnos;
493     ZEBRA_RES ret = ZEBRA_OK;
494
495     for (i = 0; i<p->cache_cur - saveCount; i++)
496     {
497         struct record_cache_entry *e = p->record_cache + i;
498         switch (e->flag)
499         {
500         case recordFlagNew:
501             rec_cache_flush_block1(p, e->rec, last_rec, &out_buf,
502                                     &out_size, &out_offset);
503             *sysnop++ = rec_sysno_to_int(e->rec->sysno);
504             ref_count++;
505             e->flag = recordFlagNop;
506             last_rec = e->rec;
507             break;
508         case recordFlagWrite:
509             if (rec_release_blocks(p, rec_sysno_to_int(e->rec->sysno))
510                 != ZEBRA_OK)
511                 ret = ZEBRA_FAIL;
512
513             rec_cache_flush_block1(p, e->rec, last_rec, &out_buf,
514                                     &out_size, &out_offset);
515             *sysnop++ = rec_sysno_to_int(e->rec->sysno);
516             ref_count++;
517             e->flag = recordFlagNop;
518             last_rec = e->rec;
519             break;
520         case recordFlagDelete:
521             if (rec_delete_single(p, e->rec) != ZEBRA_OK)
522                 ret = ZEBRA_FAIL;
523
524             e->flag = recordFlagNop;
525             break;
526         default:
527             break;
528         }
529     }
530
531     *sysnop = -1;
532     if (ref_count)
533     {
534         unsigned int csize = 0;  /* indicate compression "not performed yet" */
535         compression_method = p->compression_method;
536         switch (compression_method)
537         {
538         case REC_COMPRESS_BZIP2:
539 #if HAVE_BZLIB_H        
540             csize = out_offset + (out_offset >> 6) + 620;
541             rec_tmp_expand(p, csize);
542 #ifdef BZ_CONFIG_ERROR
543             i = BZ2_bzBuffToBuffCompress 
544 #else
545             i = bzBuffToBuffCompress 
546 #endif
547                                     (p->tmp_buf+sizeof(zint)+sizeof(short)+
548                                       sizeof(char),
549                                       &csize, out_buf, out_offset, 1, 0, 30);
550             if (i != BZ_OK)
551             {
552                 yaz_log(YLOG_WARN, "bzBuffToBuffCompress error code=%d", i);
553                 csize = 0;
554             }
555             yaz_log(YLOG_LOG, "compress %4d %5d %5d", ref_count, out_offset,
556                   csize);
557 #endif
558             break;
559         case REC_COMPRESS_NONE:
560             break;
561         }
562         if (!csize)  
563         {
564             /* either no compression or compression not supported ... */
565             csize = out_offset;
566             rec_tmp_expand(p, csize);
567             memcpy(p->tmp_buf + sizeof(zint) + sizeof(short) + sizeof(char),
568                     out_buf, out_offset);
569             csize = out_offset;
570             compression_method = REC_COMPRESS_NONE;
571         }
572         memcpy(p->tmp_buf + sizeof(zint), &ref_count, sizeof(ref_count));
573         memcpy(p->tmp_buf + sizeof(zint)+sizeof(short),
574                 &compression_method, sizeof(compression_method));
575                 
576         /* -------- compression */
577         if (rec_write_tmp_buf(p, csize + sizeof(short) + sizeof(char), sysnos)
578             != ZEBRA_OK)
579             ret = ZEBRA_FAIL;
580     }
581     xfree(out_buf);
582     xfree(sysnos);
583     return ret;
584 }
585
586 static ZEBRA_RES rec_cache_flush(Records p, int saveCount)
587 {
588     int i, j;
589     ZEBRA_RES ret;
590
591     if (saveCount >= p->cache_cur)
592         saveCount = 0;
593
594     ret = rec_write_multiple(p, saveCount);
595
596     for (i = 0; i<p->cache_cur - saveCount; i++)
597     {
598         struct record_cache_entry *e = p->record_cache + i;
599         rec_free(&e->rec);
600     } 
601     /* i still being used ... */
602     for (j = 0; j<saveCount; j++, i++)
603         memcpy(p->record_cache+j, p->record_cache+i,
604                 sizeof(*p->record_cache));
605     p->cache_cur = saveCount;
606     return ret;
607 }
608
609 static Record *rec_cache_lookup(Records p, zint sysno,
610                                 enum recordCacheFlag flag)
611 {
612     int i;
613     for (i = 0; i<p->cache_cur; i++)
614     {
615         struct record_cache_entry *e = p->record_cache + i;
616         if (e->rec->sysno == sysno)
617         {
618             if (flag != recordFlagNop && e->flag == recordFlagNop)
619                 e->flag = flag;
620             return &e->rec;
621         }
622     }
623     return NULL;
624 }
625
626 static ZEBRA_RES rec_cache_insert(Records p, Record rec, enum recordCacheFlag flag)
627 {
628     struct record_cache_entry *e;
629     ZEBRA_RES ret = ZEBRA_OK;
630
631     if (p->cache_cur == p->cache_max)
632         ret = rec_cache_flush(p, 1);
633     else if (p->cache_cur > 0)
634     {
635         int i, j;
636         int used = 0;
637         for (i = 0; i<p->cache_cur; i++)
638         {
639             Record r = (p->record_cache + i)->rec;
640             for (j = 0; j<REC_NO_INFO; j++)
641                 used += r->size[j];
642         }
643         if (used > p->compression_chunk_size)
644             ret = rec_cache_flush(p, 1);
645     }
646     assert(p->cache_cur < p->cache_max);
647
648     e = p->record_cache + (p->cache_cur)++;
649     e->flag = flag;
650     e->rec = rec_cp(rec);
651     return ret;
652 }
653
654 ZEBRA_RES rec_close(Records *pp)
655 {
656     Records p = *pp;
657     int i;
658     ZEBRA_RES ret = ZEBRA_OK;
659
660     if (!p)
661         return ret;
662
663     zebra_mutex_destroy(&p->mutex);
664     if (rec_cache_flush(p, 0) != ZEBRA_OK)
665         ret = ZEBRA_FAIL;
666
667     xfree(p->record_cache);
668
669     if (p->rw)
670     {
671         if (recindex_write_head(p->recindex, &p->head, sizeof(p->head)) != ZEBRA_OK)
672             ret = ZEBRA_FAIL;
673     }
674
675     recindex_close(p->recindex);
676
677     for (i = 0; i<REC_BLOCK_TYPES; i++)
678     {
679         if (p->data_BFile[i])
680             bf_close(p->data_BFile[i]);
681         xfree(p->data_fname[i]);
682     }
683     xfree(p->tmp_buf);
684     xfree(p);
685     *pp = NULL;
686     return ret;
687 }
688
689 static Record rec_get_int(Records p, zint sysno)
690 {
691     int i, in_size, r;
692     Record rec, *recp;
693     struct record_index_entry entry;
694     zint freeblock;
695     int dst_type;
696     char *nptr, *cptr;
697     char *in_buf = 0;
698     char *bz_buf = 0;
699 #if HAVE_BZLIB_H
700     unsigned int bz_size;
701 #endif
702     char compression_method;
703
704     assert(sysno > 0);
705     assert(p);
706
707     if ((recp = rec_cache_lookup(p, sysno, recordFlagNop)))
708         return rec_cp(*recp);
709
710     if (recindex_read_indx(p->recindex, rec_sysno_to_int(sysno), &entry, sizeof(entry), 1) < 1)
711         return NULL;       /* record is not there! */
712
713     if (!entry.size)
714         return NULL;       /* record is deleted */
715
716     dst_type = (int) (entry.next & 7);
717     assert(dst_type < REC_BLOCK_TYPES);
718     freeblock = entry.next / 8;
719
720     assert(freeblock > 0);
721     
722     rec_tmp_expand(p, entry.size);
723
724     cptr = p->tmp_buf;
725     r = bf_read(p->data_BFile[dst_type], freeblock, 0, 0, cptr);
726     if (r < 0)
727         return 0;
728     memcpy(&freeblock, cptr, sizeof(freeblock));
729
730     while (freeblock)
731     {
732         zint tmp;
733
734         cptr += p->head.block_size[dst_type] - sizeof(freeblock);
735         
736         memcpy(&tmp, cptr, sizeof(tmp));
737         r = bf_read(p->data_BFile[dst_type], freeblock, 0, 0, cptr);
738         if (r < 0)
739             return 0;
740         memcpy(&freeblock, cptr, sizeof(freeblock));
741         memcpy(cptr, &tmp, sizeof(tmp));
742     }
743
744     rec = (Record) xmalloc(sizeof(*rec));
745     rec->sysno = sysno;
746     memcpy(&compression_method, p->tmp_buf + sizeof(zint) + sizeof(short),
747             sizeof(compression_method));
748     in_buf = p->tmp_buf + sizeof(zint) + sizeof(short) + sizeof(char);
749     in_size = entry.size - sizeof(short) - sizeof(char);
750     switch (compression_method)
751     {
752     case REC_COMPRESS_BZIP2:
753 #if HAVE_BZLIB_H
754         bz_size = entry.size * 20 + 100;
755         while (1)
756         {
757             bz_buf = (char *) xmalloc(bz_size);
758 #ifdef BZ_CONFIG_ERROR
759             i = BZ2_bzBuffToBuffDecompress
760 #else
761             i = bzBuffToBuffDecompress
762 #endif
763                 (bz_buf, &bz_size, in_buf, in_size, 0, 0);
764             yaz_log(YLOG_LOG, "decompress %5d %5d", in_size, bz_size);
765             if (i == BZ_OK)
766                 break;
767             yaz_log(YLOG_LOG, "failed");
768             xfree(bz_buf);
769             bz_size *= 2;
770         }
771         in_buf = bz_buf;
772         in_size = bz_size;
773 #else
774         yaz_log(YLOG_FATAL, "cannot decompress record(s) in BZIP2 format");
775         return 0;
776 #endif
777         break;
778     case REC_COMPRESS_NONE:
779         break;
780     }
781     for (i = 0; i<REC_NO_INFO; i++)
782         rec->info[i] = 0;
783
784     nptr = in_buf;                /* skip ref count */
785     while (nptr < in_buf + in_size)
786     {
787         zint this_sysno;
788         int len;
789         rec_decode_zint(&this_sysno, (unsigned char *) nptr, &len);
790         nptr += len;
791
792         for (i = 0; i < REC_NO_INFO; i++)
793         {
794             unsigned int this_size;
795             rec_decode_unsigned(&this_size, (unsigned char *) nptr, &len);
796             nptr += len;
797
798             if (this_size == 0)
799                 continue;
800             rec->size[i] = this_size-1;
801
802             if (rec->size[i])
803             {
804                 rec->info[i] = nptr;
805                 nptr += rec->size[i];
806             }
807             else
808                 rec->info[i] = NULL;
809         }
810         if (this_sysno == rec_sysno_to_int(sysno))
811             break;
812     }
813     for (i = 0; i<REC_NO_INFO; i++)
814     {
815         if (rec->info[i] && rec->size[i])
816         {
817             char *np = xmalloc(rec->size[i]+1);
818             memcpy(np, rec->info[i], rec->size[i]);
819             np[rec->size[i]] = '\0';
820             rec->info[i] = np;
821         }
822         else
823         {
824             assert(rec->info[i] == 0);
825             assert(rec->size[i] == 0);
826         }
827     }
828     xfree(bz_buf);
829     if (rec_cache_insert(p, rec, recordFlagNop) != ZEBRA_OK)
830         return 0;
831     return rec;
832 }
833
834 Record rec_get(Records p, zint sysno)
835 {
836     Record rec;
837     zebra_mutex_lock(&p->mutex);
838
839     rec = rec_get_int(p, sysno);
840     zebra_mutex_unlock(&p->mutex);
841     return rec;
842 }
843
844 Record rec_get_root(Records p)
845 {
846     return rec_get(p, rec_sysno_to_ext(1));
847 }
848
849 static Record rec_new_int(Records p)
850 {
851     int i;
852     zint sysno;
853     Record rec;
854
855     assert(p);
856     rec = (Record) xmalloc(sizeof(*rec));
857     if (1 || p->head.index_free == 0)
858         sysno = (p->head.index_last)++;
859     else
860     {
861         struct record_index_entry entry;
862
863         if (recindex_read_indx(p->recindex, p->head.index_free, &entry, sizeof(entry), 0) < 1)
864         {
865             xfree(rec);
866             return 0;
867         }
868         sysno = p->head.index_free;
869         p->head.index_free = entry.next;
870     }
871     (p->head.no_records)++;
872     rec->sysno = rec_sysno_to_ext(sysno);
873     for (i = 0; i < REC_NO_INFO; i++)
874     {
875         rec->info[i] = NULL;
876         rec->size[i] = 0;
877     }
878     rec_cache_insert(p, rec, recordFlagNew);
879     return rec;
880 }
881
882 Record rec_new(Records p)
883 {
884     Record rec;
885     zebra_mutex_lock(&p->mutex);
886
887     rec = rec_new_int(p);
888     zebra_mutex_unlock(&p->mutex);
889     return rec;
890 }
891
892 ZEBRA_RES rec_del(Records p, Record *recpp)
893 {
894     Record *recp;
895     ZEBRA_RES ret = ZEBRA_OK;
896
897     zebra_mutex_lock(&p->mutex);
898     (p->head.no_records)--;
899     if ((recp = rec_cache_lookup(p, (*recpp)->sysno, recordFlagDelete)))
900     {
901         rec_free(recp);
902         *recp = *recpp;
903     }
904     else
905     {
906         ret = rec_cache_insert(p, *recpp, recordFlagDelete);
907         rec_free(recpp);
908     }
909     zebra_mutex_unlock(&p->mutex);
910     *recpp = NULL;
911     return ret;
912 }
913
914 ZEBRA_RES rec_put(Records p, Record *recpp)
915 {
916     Record *recp;
917     ZEBRA_RES ret = ZEBRA_OK;
918
919     zebra_mutex_lock(&p->mutex);
920     if ((recp = rec_cache_lookup(p, (*recpp)->sysno, recordFlagWrite)))
921     {
922         rec_free(recp);
923         *recp = *recpp;
924     }
925     else
926     {
927         ret = rec_cache_insert(p, *recpp, recordFlagWrite);
928         rec_free(recpp);
929     }
930     zebra_mutex_unlock(&p->mutex);
931     *recpp = NULL;
932     return ret;
933 }
934
935 void rec_free(Record *recpp)
936 {
937     int i;
938
939     if (!*recpp)
940         return ;
941     for (i = 0; i < REC_NO_INFO; i++)
942         xfree((*recpp)->info[i]);
943     xfree(*recpp);
944     *recpp = NULL;
945 }
946
947 Record rec_cp(Record rec)
948 {
949     Record n;
950     int i;
951
952     n = (Record) xmalloc(sizeof(*n));
953     n->sysno = rec->sysno;
954     for (i = 0; i < REC_NO_INFO; i++)
955         if (!rec->info[i])
956         {
957             n->info[i] = NULL;
958             n->size[i] = 0;
959         }
960         else
961         {
962             n->size[i] = rec->size[i];
963             n->info[i] = (char *) xmalloc(rec->size[i]+1);
964             memcpy(n->info[i], rec->info[i], rec->size[i]);
965             n->info[i][rec->size[i]] = '\0';
966         }
967     return n;
968 }
969
970
971 char *rec_strdup(const char *s, size_t *len)
972 {
973     char *p;
974
975     if (!s)
976     {
977         *len = 0;
978         return NULL;
979     }
980     *len = strlen(s)+1;
981     p = (char *) xmalloc(*len);
982     strcpy(p, s);
983     return p;
984 }
985
986 void rec_prstat(Records records)
987 {
988     int i;
989     zint total_bytes = 0;
990     
991     yaz_log (YLOG_LOG,
992           "Total records                        %8" ZINT_FORMAT0,
993           records->head.no_records);
994
995     for (i = 0; i< REC_BLOCK_TYPES; i++)
996     {
997         yaz_log (YLOG_LOG, "Record blocks of size "ZINT_FORMAT,
998               records->head.block_size[i]);
999         yaz_log (YLOG_LOG,
1000           " Used/Total/Bytes used            "
1001               ZINT_FORMAT "/" ZINT_FORMAT "/" ZINT_FORMAT,
1002               records->head.block_used[i], records->head.block_last[i]-1,
1003               records->head.block_used[i] * records->head.block_size[i]);
1004         total_bytes +=
1005             records->head.block_used[i] * records->head.block_size[i];
1006     }
1007     yaz_log (YLOG_LOG,
1008           "Total size of record index in bytes  %8" ZINT_FORMAT0,
1009           records->head.total_bytes);
1010     yaz_log (YLOG_LOG,
1011           "Total size with overhead             %8" ZINT_FORMAT0,
1012           total_bytes);
1013 }
1014
1015 /*
1016  * Local variables:
1017  * c-basic-offset: 4
1018  * indent-tabs-mode: nil
1019  * End:
1020  * vim: shiftwidth=4 tabstop=8 expandtab
1021  */
1022