Allow records to be zlib-compressed
[idzebra-moved-to-github.git] / index / records.c
1 /* This file is part of the Zebra server.
2    Copyright (C) 1994-2009 Index Data
3
4 Zebra is free software; you can redistribute it and/or modify it under
5 the terms of the GNU General Public License as published by the Free
6 Software Foundation; either version 2, or (at your option) any later
7 version.
8
9 Zebra is distributed in the hope that it will be useful, but WITHOUT ANY
10 WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12 for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
17
18 */
19
20 /*
21  *  Format of first block (assumes a 512 block size)
22  *      next       (8 bytes)
23  *      ref_count  (2 bytes)
24  *      block      (500 bytes)
25  *
26  *  Format of subsequent blocks 
27  *      next  (8 bytes)
28  *      block (502 bytes)
29  *
30  *  Format of each record
31  *      sysno
32  *      (length, data) - pairs
33  *      length = 0 if same as previous
34  */
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <assert.h>
38 #include <string.h>
39
40 #include <yaz/yaz-util.h>
41 #include <idzebra/bfile.h>
42 #include "recindex.h"
43
44 #if HAVE_BZLIB_H
45 #include <bzlib.h>
46 #endif
47 #if HAVE_ZLIB_H
48 #include <zlib.h>
49 #endif
50
51 #define REC_BLOCK_TYPES 2
52 #define REC_HEAD_MAGIC "recindex"
53 #define REC_VERSION 5
54
55 struct records_info {
56     int rw;
57     int compression_method;
58
59     recindex_t recindex;
60
61     char *data_fname[REC_BLOCK_TYPES];
62     BFile data_BFile[REC_BLOCK_TYPES];
63
64     char *tmp_buf;
65     int tmp_size;
66
67     struct record_cache_entry *record_cache;
68     int cache_size;
69     int cache_cur;
70     int cache_max;
71
72     int compression_chunk_size;
73
74     Zebra_mutex mutex;
75
76     struct records_head {
77         char magic[8];
78         char version[4];
79         zint block_size[REC_BLOCK_TYPES];
80         zint block_free[REC_BLOCK_TYPES];
81         zint block_last[REC_BLOCK_TYPES];
82         zint block_used[REC_BLOCK_TYPES];
83         zint block_move[REC_BLOCK_TYPES];
84
85         zint total_bytes;
86         zint index_last;
87         zint index_free;
88         zint no_records;
89
90     } head;
91 };
92
93 enum recordCacheFlag { recordFlagNop, recordFlagWrite, recordFlagNew,
94                        recordFlagDelete };
95
96 struct record_cache_entry {
97     Record rec;
98     enum recordCacheFlag flag;
99 };
100
101 struct record_index_entry {
102     zint next;         /* first block of record info / next free entry */
103     int size;          /* size of record or 0 if free entry */
104 };
105
106 Record rec_cp(Record rec);
107
108 /* Modify argument to if below: 1=normal, 0=sysno testing */
109 #if 1
110 /* If this is used sysno are not converted (no testing) */
111 #define FAKE_OFFSET 0
112 #define USUAL_RANGE 6000000000LL
113
114 #else
115 /* Use a fake > 2^32 offset so we can test for proper 64-bit handling */
116 #define FAKE_OFFSET 6000000000LL
117 #define USUAL_RANGE 2000000000LL
118 #endif
119
120 static zint rec_sysno_to_ext(zint sysno)
121 {
122     assert(sysno >= 0 && sysno <= USUAL_RANGE);
123     return sysno + FAKE_OFFSET;
124 }
125
126 zint rec_sysno_to_int(zint sysno)
127 {
128     assert(sysno >= FAKE_OFFSET && sysno <= FAKE_OFFSET + USUAL_RANGE);
129     return sysno - FAKE_OFFSET;
130 }
131
132 static void rec_tmp_expand(Records p, int size)
133 {
134     if (p->tmp_size < size + 2048 ||
135         p->tmp_size < p->head.block_size[REC_BLOCK_TYPES-1]*2)
136     {
137         xfree(p->tmp_buf);
138         p->tmp_size = size + (int)
139                         (p->head.block_size[REC_BLOCK_TYPES-1])*2 + 2048;
140         p->tmp_buf = (char *) xmalloc(p->tmp_size);
141     }
142 }
143
144 static ZEBRA_RES rec_release_blocks(Records p, zint sysno)
145 {
146     struct record_index_entry entry;
147     zint freeblock;
148     char block_and_ref[sizeof(zint) + sizeof(short)];
149     int dst_type;
150     int first = 1;
151
152     if (recindex_read_indx(p->recindex, sysno, &entry, sizeof(entry), 1) != 1)
153         return ZEBRA_FAIL;
154
155     freeblock = entry.next;
156     assert(freeblock > 0);
157     dst_type = CAST_ZINT_TO_INT(freeblock & 7);
158     assert(dst_type < REC_BLOCK_TYPES);
159     freeblock = freeblock / 8;
160     while (freeblock)
161     {
162         if (bf_read(p->data_BFile[dst_type], freeblock, 0,
163                      first ? sizeof(block_and_ref) : sizeof(zint),
164                      block_and_ref) != 1)
165         {
166             yaz_log(YLOG_FATAL|YLOG_ERRNO, "read in rec_del_single");
167             return ZEBRA_FAIL;
168         }
169         if (first)
170         {
171             short ref;
172             memcpy(&ref, block_and_ref + sizeof(freeblock), sizeof(ref));
173             --ref;
174             memcpy(block_and_ref + sizeof(freeblock), &ref, sizeof(ref));
175             if (ref)
176             {
177                 /* there is still a reference to this block.. */
178                 if (bf_write(p->data_BFile[dst_type], freeblock, 0,
179                               sizeof(block_and_ref), block_and_ref))
180                 {
181                     yaz_log(YLOG_FATAL|YLOG_ERRNO, "write in rec_del_single");
182                     return ZEBRA_FAIL;
183                 }
184                 return ZEBRA_OK;
185             }
186             /* the list of blocks can all be removed (ref == 0) */
187             first = 0;
188         }
189         
190         if (bf_write(p->data_BFile[dst_type], freeblock, 0, sizeof(freeblock),
191                       &p->head.block_free[dst_type]))
192         {
193             yaz_log(YLOG_FATAL|YLOG_ERRNO, "write in rec_del_single");
194             return ZEBRA_FAIL;
195         }
196         p->head.block_free[dst_type] = freeblock;
197         memcpy(&freeblock, block_and_ref, sizeof(freeblock));
198
199         p->head.block_used[dst_type]--;
200     }
201     p->head.total_bytes -= entry.size;
202     return ZEBRA_OK;
203 }
204
205 static ZEBRA_RES rec_delete_single(Records p, Record rec)
206 {
207     struct record_index_entry entry;
208
209     /* all data in entry must be reset, since it's written verbatim */
210     memset(&entry, '\0', sizeof(entry));
211     if (rec_release_blocks(p, rec_sysno_to_int(rec->sysno)) != ZEBRA_OK)
212         return ZEBRA_FAIL;
213
214     entry.next = p->head.index_free;
215     entry.size = 0;
216     p->head.index_free = rec_sysno_to_int(rec->sysno);
217     recindex_write_indx(p->recindex, rec_sysno_to_int(rec->sysno), &entry, sizeof(entry));
218     return ZEBRA_OK;
219 }
220
221 static ZEBRA_RES rec_write_tmp_buf(Records p, int size, zint *sysnos)
222 {
223     struct record_index_entry entry;
224     int no_written = 0;
225     char *cptr = p->tmp_buf;
226     zint block_prev = -1, block_free;
227     int dst_type = 0;
228     int i;
229
230     /* all data in entry must be reset, since it's written verbatim */
231     memset(&entry, '\0', sizeof(entry));
232
233     for (i = 1; i<REC_BLOCK_TYPES; i++)
234         if (size >= p->head.block_move[i])
235             dst_type = i;
236     while (no_written < size)
237     {
238         block_free = p->head.block_free[dst_type];
239         if (block_free)
240         {
241             if (bf_read(p->data_BFile[dst_type],
242                          block_free, 0, sizeof(*p->head.block_free),
243                          &p->head.block_free[dst_type]) != 1)
244             {
245                 yaz_log(YLOG_FATAL|YLOG_ERRNO, "read in %s at free block "
246                          ZINT_FORMAT,
247                          p->data_fname[dst_type], block_free);
248                 return ZEBRA_FAIL;
249             }
250         }
251         else
252             block_free = p->head.block_last[dst_type]++;
253         if (block_prev == -1)
254         {
255             entry.next = block_free*8 + dst_type;
256             entry.size = size;
257             p->head.total_bytes += size;
258             while (*sysnos > 0)
259             {
260                 recindex_write_indx(p->recindex, *sysnos, &entry, sizeof(entry));
261                 sysnos++;
262             }
263         }
264         else
265         {
266             memcpy(cptr, &block_free, sizeof(block_free));
267             bf_write(p->data_BFile[dst_type], block_prev, 0, 0, cptr);
268             cptr = p->tmp_buf + no_written;
269         }
270         block_prev = block_free;
271         no_written += CAST_ZINT_TO_INT(p->head.block_size[dst_type]) 
272             - sizeof(zint);
273         p->head.block_used[dst_type]++;
274     }
275     assert(block_prev != -1);
276     block_free = 0;
277     memcpy(cptr, &block_free, sizeof(block_free));
278     bf_write(p->data_BFile[dst_type], block_prev, 0,
279               sizeof(block_free) + (p->tmp_buf+size) - cptr, cptr);
280     return ZEBRA_OK;
281 }
282
283 Records rec_open(BFiles bfs, int rw, int compression_method)
284 {
285     Records p;
286     int i, r;
287     int version;
288     ZEBRA_RES ret = ZEBRA_OK;
289
290     p = (Records) xmalloc(sizeof(*p));
291     memset(&p->head, '\0', sizeof(p->head));
292     p->compression_method = compression_method;
293     p->rw = rw;
294     p->tmp_size = 4096;
295     p->tmp_buf = (char *) xmalloc(p->tmp_size);
296     p->compression_chunk_size = 0;
297     if (compression_method == REC_COMPRESS_BZIP2)
298         p->compression_chunk_size = 90000;
299     p->recindex = recindex_open(bfs, rw, 0 /* 1=isamb for recindex */);
300     r = recindex_read_head(p->recindex, p->tmp_buf);
301     switch (r)
302     {
303     case 0:
304         memcpy(p->head.magic, REC_HEAD_MAGIC, sizeof(p->head.magic));
305         sprintf(p->head.version, "%3d", REC_VERSION);
306         p->head.index_free = 0;
307         p->head.index_last = 1;
308         p->head.no_records = 0;
309         p->head.total_bytes = 0;
310         for (i = 0; i<REC_BLOCK_TYPES; i++)
311         {
312             p->head.block_free[i] = 0;
313             p->head.block_last[i] = 1;
314             p->head.block_used[i] = 0;
315         }
316         p->head.block_size[0] = 256;
317         p->head.block_move[0] = 0;
318         for (i = 1; i<REC_BLOCK_TYPES; i++)
319         {
320             p->head.block_size[i] = p->head.block_size[i-1] * 8;
321             p->head.block_move[i] = p->head.block_size[i] * 2;
322         }
323         if (rw)
324         {
325             if (recindex_write_head(p->recindex, 
326                                     &p->head, sizeof(p->head)) != ZEBRA_OK)
327                 ret = ZEBRA_FAIL;
328         }
329         break;
330     case 1:
331         memcpy(&p->head, p->tmp_buf, sizeof(p->head));
332         if (memcmp(p->head.magic, REC_HEAD_MAGIC, sizeof(p->head.magic)))
333         {
334             yaz_log(YLOG_FATAL, "file %s has bad format",
335                     recindex_get_fname(p->recindex));
336             ret = ZEBRA_FAIL;
337         }
338         version = atoi(p->head.version);
339         if (version != REC_VERSION)
340         {
341             yaz_log(YLOG_FATAL, "file %s is version %d, but version"
342                   " %d is required",
343                     recindex_get_fname(p->recindex), version, REC_VERSION);
344             ret = ZEBRA_FAIL;
345         }
346         break;
347     }
348     for (i = 0; i<REC_BLOCK_TYPES; i++)
349     {
350         char str[80];
351         sprintf(str, "recd%c", i + 'A');
352         p->data_fname[i] = (char *) xmalloc(strlen(str)+1);
353         strcpy(p->data_fname[i], str);
354         p->data_BFile[i] = NULL;
355     }
356     for (i = 0; i<REC_BLOCK_TYPES; i++)
357     {
358         if (!(p->data_BFile[i] =
359               bf_open(bfs, p->data_fname[i],
360                       CAST_ZINT_TO_INT(p->head.block_size[i]), rw)))
361         {
362             yaz_log(YLOG_FATAL|YLOG_ERRNO, "bf_open %s", p->data_fname[i]);
363             ret = ZEBRA_FAIL;
364             break;
365         }
366     }
367     p->cache_max = 400;
368     p->cache_cur = 0;
369     p->record_cache = (struct record_cache_entry *)
370         xmalloc(sizeof(*p->record_cache)*p->cache_max);
371     zebra_mutex_init(&p->mutex);
372     if (ret == ZEBRA_FAIL)
373         rec_close(&p);
374     return p;
375 }
376
377 static void rec_encode_unsigned(unsigned n, unsigned char *buf, int *len)
378 {
379     (*len) = 0;
380     while (n > 127)
381     {
382         buf[*len] = 128 + (n & 127);
383         n = n >> 7;
384         (*len)++;
385     }
386     buf[*len] = n;
387     (*len)++;
388 }
389
390 static void rec_decode_unsigned(unsigned *np, unsigned char *buf, int *len)
391 {
392     unsigned n = 0;
393     unsigned w = 1;
394     (*len) = 0;
395
396     while (buf[*len] > 127)
397     {
398         n += w*(buf[*len] & 127);
399         w = w << 7;
400         (*len)++;
401     }
402     n += w * buf[*len];
403     (*len)++;
404     *np = n;
405 }
406
407 static void rec_encode_zint(zint n, unsigned char *buf, int *len)
408 {
409     (*len) = 0;
410     while (n > 127)
411     {
412         buf[*len] = (unsigned) (128 + (n & 127));
413         n = n >> 7;
414         (*len)++;
415     }
416     buf[*len] = (unsigned) n;
417     (*len)++;
418 }
419
420 static void rec_decode_zint(zint *np, unsigned char *buf, int *len)
421 {
422     zint  n = 0;
423     zint w = 1;
424     (*len) = 0;
425
426     while (buf[*len] > 127)
427     {
428         n += w*(buf[*len] & 127);
429         w = w << 7;
430         (*len)++;
431     }
432     n += w * buf[*len];
433     (*len)++;
434     *np = n;
435 }
436
437 static void rec_cache_flush_block1(Records p, Record rec, Record last_rec,
438                                    char **out_buf, int *out_size,
439                                    int *out_offset)
440 {
441     int i;
442     int len;
443
444     for (i = 0; i<REC_NO_INFO; i++)
445     {
446         if (*out_offset + CAST_ZINT_TO_INT(rec->size[i]) + 20 > *out_size)
447         {
448             int new_size = *out_offset + rec->size[i] + 65536;
449             char *np = (char *) xmalloc(new_size);
450             if (*out_offset)
451                 memcpy(np, *out_buf, *out_offset);
452             xfree(*out_buf);
453             *out_size = new_size;
454             *out_buf = np;
455         }
456         if (i == 0)
457         {
458             rec_encode_zint(rec_sysno_to_int(rec->sysno), 
459                             (unsigned char *) *out_buf + *out_offset, &len);
460             (*out_offset) += len;
461         }
462         if (rec->size[i] == 0)
463         {
464             rec_encode_unsigned(1, (unsigned char *) *out_buf + *out_offset,
465                                 &len);
466             (*out_offset) += len;
467         }
468         else if (last_rec && rec->size[i] == last_rec->size[i] &&
469                  !memcmp(rec->info[i], last_rec->info[i], rec->size[i]))
470         {
471             rec_encode_unsigned(0, (unsigned char *) *out_buf + *out_offset,
472                                 &len);
473             (*out_offset) += len;
474         }
475         else
476         {
477             rec_encode_unsigned(rec->size[i]+1,
478                                 (unsigned char *) *out_buf + *out_offset,
479                                 &len);
480             (*out_offset) += len;
481             memcpy(*out_buf + *out_offset, rec->info[i], rec->size[i]);
482             (*out_offset) += rec->size[i];
483         }
484     }
485 }
486
487 static ZEBRA_RES rec_flush_shared(Records p, short ref_count, zint *sysnos,
488                                   char *out_buf, int out_offset)
489 {
490     ZEBRA_RES ret = ZEBRA_OK;
491     if (ref_count)
492     {
493         int i;
494         unsigned int csize = 0;  /* indicate compression "not performed yet" */
495         char compression_method = p->compression_method;
496         switch (compression_method)
497         {
498         case REC_COMPRESS_ZLIB:
499 #if HAVE_ZLIB_H
500             csize = out_offset + (out_offset >> 6) + 620;
501             while (1)
502             {
503                 int r;
504                 uLongf destLen = csize;
505                 rec_tmp_expand(p, csize);
506                 r = compress((Bytef *) p->tmp_buf+sizeof(zint)+sizeof(short)+
507                              sizeof(char),
508                              &destLen, (const Bytef *) out_buf, out_offset);
509                 csize = destLen;
510                 if (r == Z_OK)
511                 {
512                     yaz_log(YLOG_LOG, "compress %4d %5d %5d", ref_count,
513                             out_offset, csize);
514                     break;
515                 }
516                 if (r != Z_MEM_ERROR)
517                 {
518                     yaz_log(YLOG_WARN, "compress error: %d", r);
519                     csize = 0;
520                     break;
521                 }
522                 csize = csize * 2;
523             }
524 #endif
525             break;
526         case REC_COMPRESS_BZIP2:
527 #if HAVE_BZLIB_H        
528             csize = out_offset + (out_offset >> 6) + 620;
529             rec_tmp_expand(p, csize);
530 #ifdef BZ_CONFIG_ERROR
531             i = BZ2_bzBuffToBuffCompress 
532 #else
533             i = bzBuffToBuffCompress 
534 #endif
535                                     (p->tmp_buf+sizeof(zint)+sizeof(short)+
536                                       sizeof(char),
537                                       &csize, out_buf, out_offset, 1, 0, 30);
538             if (i != BZ_OK)
539             {
540                 yaz_log(YLOG_WARN, "bzBuffToBuffCompress error code=%d", i);
541                 csize = 0;
542             }
543             yaz_log(YLOG_LOG, "compress %4d %5d %5d", ref_count,
544                     out_offset, csize);
545 #endif
546             break;
547         case REC_COMPRESS_NONE:
548             break;
549         }
550         if (!csize)  
551         {
552             /* either no compression or compression not supported ... */
553             csize = out_offset;
554             rec_tmp_expand(p, csize);
555             memcpy(p->tmp_buf + sizeof(zint) + sizeof(short) + sizeof(char),
556                     out_buf, out_offset);
557             csize = out_offset;
558             compression_method = REC_COMPRESS_NONE;
559         }
560         memcpy(p->tmp_buf + sizeof(zint), &ref_count, sizeof(ref_count));
561         memcpy(p->tmp_buf + sizeof(zint)+sizeof(short),
562                 &compression_method, sizeof(compression_method));
563                 
564         /* -------- compression */
565         if (rec_write_tmp_buf(p, csize + sizeof(short) + sizeof(char), sysnos)
566             != ZEBRA_OK)
567             ret = ZEBRA_FAIL;
568     }
569     return ret;
570 }
571
572 static ZEBRA_RES rec_write_multiple(Records p, int saveCount)
573 {
574     int i;
575     short ref_count = 0;
576     Record last_rec = 0;
577     int out_size = 1000;
578     int out_offset = 0;
579     char *out_buf = (char *) xmalloc(out_size);
580     zint *sysnos = (zint *) xmalloc(sizeof(*sysnos) * (p->cache_cur + 1));
581     zint *sysnop = sysnos;
582     ZEBRA_RES ret = ZEBRA_OK;
583
584     for (i = 0; i<p->cache_cur - saveCount; i++)
585     {
586         struct record_cache_entry *e = p->record_cache + i;
587         switch (e->flag)
588         {
589         case recordFlagNew:
590             rec_cache_flush_block1(p, e->rec, last_rec, &out_buf,
591                                     &out_size, &out_offset);
592             *sysnop++ = rec_sysno_to_int(e->rec->sysno);
593             ref_count++;
594             e->flag = recordFlagNop;
595             last_rec = e->rec;
596             break;
597         case recordFlagWrite:
598             if (rec_release_blocks(p, rec_sysno_to_int(e->rec->sysno))
599                 != ZEBRA_OK)
600                 ret = ZEBRA_FAIL;
601
602             rec_cache_flush_block1(p, e->rec, last_rec, &out_buf,
603                                     &out_size, &out_offset);
604             *sysnop++ = rec_sysno_to_int(e->rec->sysno);
605             ref_count++;
606             e->flag = recordFlagNop;
607             last_rec = e->rec;
608             break;
609         case recordFlagDelete:
610             if (rec_delete_single(p, e->rec) != ZEBRA_OK)
611                 ret = ZEBRA_FAIL;
612
613             e->flag = recordFlagNop;
614             break;
615         case recordFlagNop:
616             break;
617         default:
618             break;
619         }
620     }
621
622     *sysnop = -1;
623     rec_flush_shared(p, ref_count, sysnos, out_buf, out_offset);
624     xfree(out_buf);
625     xfree(sysnos);
626     return ret;
627 }
628
629 static ZEBRA_RES rec_cache_flush(Records p, int saveCount)
630 {
631     int i, j;
632     ZEBRA_RES ret;
633
634     if (saveCount >= p->cache_cur)
635         saveCount = 0;
636
637     ret = rec_write_multiple(p, saveCount);
638
639     for (i = 0; i<p->cache_cur - saveCount; i++)
640     {
641         struct record_cache_entry *e = p->record_cache + i;
642         rec_free(&e->rec);
643     } 
644     /* i still being used ... */
645     for (j = 0; j<saveCount; j++, i++)
646         memcpy(p->record_cache+j, p->record_cache+i,
647                 sizeof(*p->record_cache));
648     p->cache_cur = saveCount;
649     return ret;
650 }
651
652 static Record *rec_cache_lookup(Records p, zint sysno,
653                                 enum recordCacheFlag flag)
654 {
655     int i;
656     for (i = 0; i<p->cache_cur; i++)
657     {
658         struct record_cache_entry *e = p->record_cache + i;
659         if (e->rec->sysno == sysno)
660         {
661             if (flag != recordFlagNop && e->flag == recordFlagNop)
662                 e->flag = flag;
663             return &e->rec;
664         }
665     }
666     return NULL;
667 }
668
669 static ZEBRA_RES rec_cache_insert(Records p, Record rec, enum recordCacheFlag flag)
670 {
671     struct record_cache_entry *e;
672     ZEBRA_RES ret = ZEBRA_OK;
673
674     if (p->cache_cur == p->cache_max)
675         ret = rec_cache_flush(p, 1);
676     else if (p->cache_cur > 0)
677     {
678         int i, j;
679         int used = 0;
680         for (i = 0; i<p->cache_cur; i++)
681         {
682             Record r = (p->record_cache + i)->rec;
683             for (j = 0; j<REC_NO_INFO; j++)
684                 used += r->size[j];
685         }
686         if (used > p->compression_chunk_size)
687             ret = rec_cache_flush(p, 1);
688     }
689     assert(p->cache_cur < p->cache_max);
690
691     e = p->record_cache + (p->cache_cur)++;
692     e->flag = flag;
693     e->rec = rec_cp(rec);
694     return ret;
695 }
696
697 ZEBRA_RES rec_close(Records *pp)
698 {
699     Records p = *pp;
700     int i;
701     ZEBRA_RES ret = ZEBRA_OK;
702
703     if (!p)
704         return ret;
705
706     zebra_mutex_destroy(&p->mutex);
707     if (rec_cache_flush(p, 0) != ZEBRA_OK)
708         ret = ZEBRA_FAIL;
709
710     xfree(p->record_cache);
711
712     if (p->rw)
713     {
714         if (recindex_write_head(p->recindex, &p->head, sizeof(p->head)) != ZEBRA_OK)
715             ret = ZEBRA_FAIL;
716     }
717
718     recindex_close(p->recindex);
719
720     for (i = 0; i<REC_BLOCK_TYPES; i++)
721     {
722         if (p->data_BFile[i])
723             bf_close(p->data_BFile[i]);
724         xfree(p->data_fname[i]);
725     }
726     xfree(p->tmp_buf);
727     xfree(p);
728     *pp = NULL;
729     return ret;
730 }
731
732 static Record rec_get_int(Records p, zint sysno)
733 {
734     int i, in_size, r;
735     Record rec, *recp;
736     struct record_index_entry entry;
737     zint freeblock;
738     int dst_type;
739     char *nptr, *cptr;
740     char *in_buf = 0;
741     char *bz_buf = 0;
742 #if HAVE_BZLIB_H
743     unsigned int bz_size;
744 #endif
745     char compression_method;
746
747     assert(sysno > 0);
748     assert(p);
749
750     if ((recp = rec_cache_lookup(p, sysno, recordFlagNop)))
751         return rec_cp(*recp);
752
753     if (recindex_read_indx(p->recindex, rec_sysno_to_int(sysno), &entry, sizeof(entry), 1) < 1)
754         return NULL;       /* record is not there! */
755
756     if (!entry.size)
757         return NULL;       /* record is deleted */
758
759     dst_type = (int) (entry.next & 7);
760     assert(dst_type < REC_BLOCK_TYPES);
761     freeblock = entry.next / 8;
762
763     assert(freeblock > 0);
764     
765     rec_tmp_expand(p, entry.size);
766
767     cptr = p->tmp_buf;
768     r = bf_read(p->data_BFile[dst_type], freeblock, 0, 0, cptr);
769     if (r < 0)
770         return 0;
771     memcpy(&freeblock, cptr, sizeof(freeblock));
772
773     while (freeblock)
774     {
775         zint tmp;
776
777         cptr += p->head.block_size[dst_type] - sizeof(freeblock);
778         
779         memcpy(&tmp, cptr, sizeof(tmp));
780         r = bf_read(p->data_BFile[dst_type], freeblock, 0, 0, cptr);
781         if (r < 0)
782             return 0;
783         memcpy(&freeblock, cptr, sizeof(freeblock));
784         memcpy(cptr, &tmp, sizeof(tmp));
785     }
786
787     rec = (Record) xmalloc(sizeof(*rec));
788     rec->sysno = sysno;
789     memcpy(&compression_method, p->tmp_buf + sizeof(zint) + sizeof(short),
790             sizeof(compression_method));
791     in_buf = p->tmp_buf + sizeof(zint) + sizeof(short) + sizeof(char);
792     in_size = entry.size - sizeof(short) - sizeof(char);
793     switch (compression_method)
794     {
795     case REC_COMPRESS_ZLIB:
796 #if HAVE_ZLIB_H
797         bz_size = entry.size * 20 + 100;
798         while (1)
799         {
800             uLongf destLen = bz_size;
801             bz_buf = (char *) xmalloc(bz_size);
802             i = uncompress((Bytef *) bz_buf, &destLen,
803                            (const Bytef *) in_buf, in_size);
804             if (i == Z_OK)
805             {
806                 yaz_log(YLOG_LOG, "decompress %5d %5d", in_size, bz_size);
807                 bz_size = destLen; 
808                 break;
809             }
810             yaz_log(YLOG_LOG, "failed");
811             xfree(bz_buf);
812             bz_size *= 2;
813         }
814         in_buf = bz_buf;
815         in_size = bz_size;
816 #else
817         yaz_log(YLOG_FATAL, "cannot decompress record(s) in ZLIB format");
818         return 0;
819 #endif
820         break;
821     case REC_COMPRESS_BZIP2:
822 #if HAVE_BZLIB_H
823         bz_size = entry.size * 20 + 100;
824         while (1)
825         {
826             bz_buf = (char *) xmalloc(bz_size);
827 #ifdef BZ_CONFIG_ERROR
828             i = BZ2_bzBuffToBuffDecompress
829 #else
830             i = bzBuffToBuffDecompress
831 #endif
832                 (bz_buf, &bz_size, in_buf, in_size, 0, 0);
833             yaz_log(YLOG_LOG, "decompress %5d %5d", in_size, bz_size);
834             if (i == BZ_OK)
835                 break;
836             yaz_log(YLOG_LOG, "failed");
837             xfree(bz_buf);
838             bz_size *= 2;
839         }
840         in_buf = bz_buf;
841         in_size = bz_size;
842 #else
843         yaz_log(YLOG_FATAL, "cannot decompress record(s) in BZIP2 format");
844         return 0;
845 #endif
846         break;
847     case REC_COMPRESS_NONE:
848         break;
849     }
850     for (i = 0; i<REC_NO_INFO; i++)
851         rec->info[i] = 0;
852
853     nptr = in_buf;                /* skip ref count */
854     while (nptr < in_buf + in_size)
855     {
856         zint this_sysno;
857         int len;
858         rec_decode_zint(&this_sysno, (unsigned char *) nptr, &len);
859         nptr += len;
860
861         for (i = 0; i < REC_NO_INFO; i++)
862         {
863             unsigned int this_size;
864             rec_decode_unsigned(&this_size, (unsigned char *) nptr, &len);
865             nptr += len;
866
867             if (this_size == 0)
868                 continue;
869             rec->size[i] = this_size-1;
870
871             if (rec->size[i])
872             {
873                 rec->info[i] = nptr;
874                 nptr += rec->size[i];
875             }
876             else
877                 rec->info[i] = NULL;
878         }
879         if (this_sysno == rec_sysno_to_int(sysno))
880             break;
881     }
882     for (i = 0; i<REC_NO_INFO; i++)
883     {
884         if (rec->info[i] && rec->size[i])
885         {
886             char *np = xmalloc(rec->size[i]+1);
887             memcpy(np, rec->info[i], rec->size[i]);
888             np[rec->size[i]] = '\0';
889             rec->info[i] = np;
890         }
891         else
892         {
893             assert(rec->info[i] == 0);
894             assert(rec->size[i] == 0);
895         }
896     }
897     xfree(bz_buf);
898     if (rec_cache_insert(p, rec, recordFlagNop) != ZEBRA_OK)
899         return 0;
900     return rec;
901 }
902
903 Record rec_get(Records p, zint sysno)
904 {
905     Record rec;
906     zebra_mutex_lock(&p->mutex);
907
908     rec = rec_get_int(p, sysno);
909     zebra_mutex_unlock(&p->mutex);
910     return rec;
911 }
912
913 Record rec_get_root(Records p)
914 {
915     return rec_get(p, rec_sysno_to_ext(1));
916 }
917
918 static Record rec_new_int(Records p)
919 {
920     int i;
921     zint sysno;
922     Record rec;
923
924     assert(p);
925     rec = (Record) xmalloc(sizeof(*rec));
926     if (1 || p->head.index_free == 0)
927         sysno = (p->head.index_last)++;
928     else
929     {
930         struct record_index_entry entry;
931
932         if (recindex_read_indx(p->recindex, p->head.index_free, &entry, sizeof(entry), 0) < 1)
933         {
934             xfree(rec);
935             return 0;
936         }
937         sysno = p->head.index_free;
938         p->head.index_free = entry.next;
939     }
940     (p->head.no_records)++;
941     rec->sysno = rec_sysno_to_ext(sysno);
942     for (i = 0; i < REC_NO_INFO; i++)
943     {
944         rec->info[i] = NULL;
945         rec->size[i] = 0;
946     }
947     rec_cache_insert(p, rec, recordFlagNew);
948     return rec;
949 }
950
951 Record rec_new(Records p)
952 {
953     Record rec;
954     zebra_mutex_lock(&p->mutex);
955
956     rec = rec_new_int(p);
957     zebra_mutex_unlock(&p->mutex);
958     return rec;
959 }
960
961 ZEBRA_RES rec_del(Records p, Record *recpp)
962 {
963     Record *recp;
964     ZEBRA_RES ret = ZEBRA_OK;
965
966     zebra_mutex_lock(&p->mutex);
967     (p->head.no_records)--;
968     if ((recp = rec_cache_lookup(p, (*recpp)->sysno, recordFlagDelete)))
969     {
970         rec_free(recp);
971         *recp = *recpp;
972     }
973     else
974     {
975         ret = rec_cache_insert(p, *recpp, recordFlagDelete);
976         rec_free(recpp);
977     }
978     zebra_mutex_unlock(&p->mutex);
979     *recpp = NULL;
980     return ret;
981 }
982
983 ZEBRA_RES rec_put(Records p, Record *recpp)
984 {
985     Record *recp;
986     ZEBRA_RES ret = ZEBRA_OK;
987
988     zebra_mutex_lock(&p->mutex);
989     if ((recp = rec_cache_lookup(p, (*recpp)->sysno, recordFlagWrite)))
990     {
991         rec_free(recp);
992         *recp = *recpp;
993     }
994     else
995     {
996         ret = rec_cache_insert(p, *recpp, recordFlagWrite);
997         rec_free(recpp);
998     }
999     zebra_mutex_unlock(&p->mutex);
1000     *recpp = NULL;
1001     return ret;
1002 }
1003
1004 void rec_free(Record *recpp)
1005 {
1006     int i;
1007
1008     if (!*recpp)
1009         return ;
1010     for (i = 0; i < REC_NO_INFO; i++)
1011         xfree((*recpp)->info[i]);
1012     xfree(*recpp);
1013     *recpp = NULL;
1014 }
1015
1016 Record rec_cp(Record rec)
1017 {
1018     Record n;
1019     int i;
1020
1021     n = (Record) xmalloc(sizeof(*n));
1022     n->sysno = rec->sysno;
1023     for (i = 0; i < REC_NO_INFO; i++)
1024         if (!rec->info[i])
1025         {
1026             n->info[i] = NULL;
1027             n->size[i] = 0;
1028         }
1029         else
1030         {
1031             n->size[i] = rec->size[i];
1032             n->info[i] = (char *) xmalloc(rec->size[i]+1);
1033             memcpy(n->info[i], rec->info[i], rec->size[i]);
1034             n->info[i][rec->size[i]] = '\0';
1035         }
1036     return n;
1037 }
1038
1039
1040 char *rec_strdup(const char *s, size_t *len)
1041 {
1042     char *p;
1043
1044     if (!s)
1045     {
1046         *len = 0;
1047         return NULL;
1048     }
1049     *len = strlen(s)+1;
1050     p = (char *) xmalloc(*len);
1051     strcpy(p, s);
1052     return p;
1053 }
1054
1055 void rec_prstat(Records records, int verbose)
1056 {
1057     int i;
1058     zint total_bytes = 0;
1059     
1060     yaz_log (YLOG_LOG,
1061           "Total records                        %8" ZINT_FORMAT0,
1062           records->head.no_records);
1063
1064     for (i = 0; i< REC_BLOCK_TYPES; i++)
1065     {
1066         yaz_log (YLOG_LOG, "Record blocks of size "ZINT_FORMAT,
1067               records->head.block_size[i]);
1068         yaz_log (YLOG_LOG,
1069           " Used/Total/Bytes used            "
1070               ZINT_FORMAT "/" ZINT_FORMAT "/" ZINT_FORMAT,
1071               records->head.block_used[i], records->head.block_last[i]-1,
1072               records->head.block_used[i] * records->head.block_size[i]);
1073         total_bytes +=
1074             records->head.block_used[i] * records->head.block_size[i];
1075
1076         yaz_log(YLOG_LOG, " Block Last " ZINT_FORMAT, records->head.block_last[i]);
1077         if (verbose)
1078         {   /* analyse free lists */
1079             zint no_free = 0;
1080             zint block_free = records->head.block_free[i];
1081             WRBUF w = wrbuf_alloc();
1082             while (block_free)
1083             {
1084                 zint nblock;
1085                 no_free++;
1086                 wrbuf_printf(w, " " ZINT_FORMAT, block_free);
1087                 if (bf_read(records->data_BFile[i],
1088                             block_free, 0, sizeof(nblock), &nblock) != 1)
1089                 {
1090                     yaz_log(YLOG_FATAL|YLOG_ERRNO, "read in %s at free block "
1091                             ZINT_FORMAT,
1092                             records->data_fname[i], block_free);
1093                     break;
1094                 }
1095                 block_free = nblock;
1096             }
1097             yaz_log (YLOG_LOG,
1098                      " Number in free list       %8" ZINT_FORMAT0, no_free);
1099             if (no_free)
1100                 yaz_log(YLOG_LOG, "%s", wrbuf_cstr(w));
1101             wrbuf_destroy(w);
1102         }
1103     }
1104     yaz_log (YLOG_LOG,
1105           "Total size of record index in bytes  %8" ZINT_FORMAT0,
1106           records->head.total_bytes);
1107     yaz_log (YLOG_LOG,
1108           "Total size with overhead             %8" ZINT_FORMAT0,
1109           total_bytes);
1110 }
1111
1112 /*
1113  * Local variables:
1114  * c-basic-offset: 4
1115  * c-file-style: "Stroustrup"
1116  * indent-tabs-mode: nil
1117  * End:
1118  * vim: shiftwidth=4 tabstop=8 expandtab
1119  */
1120