Change recd{A,B} block sizes
[idzebra-moved-to-github.git] / index / records.c
1 /* This file is part of the Zebra server.
2    Copyright (C) 1994-2009 Index Data
3
4 Zebra is free software; you can redistribute it and/or modify it under
5 the terms of the GNU General Public License as published by the Free
6 Software Foundation; either version 2, or (at your option) any later
7 version.
8
9 Zebra is distributed in the hope that it will be useful, but WITHOUT ANY
10 WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12 for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
17
18 */
19
20 /*
21  *  Format of first block (assumes a 512 block size)
22  *      next       (8 bytes)
23  *      ref_count  (2 bytes)
24  *      block      (500 bytes)
25  *
26  *  Format of subsequent blocks 
27  *      next  (8 bytes)
28  *      block (502 bytes)
29  *
30  *  Format of each record
31  *      sysno
32  *      (length, data) - pairs
33  *      length = 0 if same as previous
34  */
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <assert.h>
38 #include <string.h>
39
40 #include <yaz/yaz-util.h>
41 #include <idzebra/bfile.h>
42 #include "recindex.h"
43
44 #if HAVE_BZLIB_H
45 #include <bzlib.h>
46 #endif
47
48 #define REC_BLOCK_TYPES 2
49 #define REC_HEAD_MAGIC "recindex"
50 #define REC_VERSION 5
51
52 struct records_info {
53     int rw;
54     int compression_method;
55
56     recindex_t recindex;
57
58     char *data_fname[REC_BLOCK_TYPES];
59     BFile data_BFile[REC_BLOCK_TYPES];
60
61     char *tmp_buf;
62     int tmp_size;
63
64     struct record_cache_entry *record_cache;
65     int cache_size;
66     int cache_cur;
67     int cache_max;
68
69     int compression_chunk_size;
70
71     Zebra_mutex mutex;
72
73     struct records_head {
74         char magic[8];
75         char version[4];
76         zint block_size[REC_BLOCK_TYPES];
77         zint block_free[REC_BLOCK_TYPES];
78         zint block_last[REC_BLOCK_TYPES];
79         zint block_used[REC_BLOCK_TYPES];
80         zint block_move[REC_BLOCK_TYPES];
81
82         zint total_bytes;
83         zint index_last;
84         zint index_free;
85         zint no_records;
86
87     } head;
88 };
89
90 enum recordCacheFlag { recordFlagNop, recordFlagWrite, recordFlagNew,
91                        recordFlagDelete };
92
93 struct record_cache_entry {
94     Record rec;
95     enum recordCacheFlag flag;
96 };
97
98 struct record_index_entry {
99     zint next;         /* first block of record info / next free entry */
100     int size;          /* size of record or 0 if free entry */
101 };
102
103 Record rec_cp(Record rec);
104
105 /* Modify argument to if below: 1=normal, 0=sysno testing */
106 #if 1
107 /* If this is used sysno are not converted (no testing) */
108 #define FAKE_OFFSET 0
109 #define USUAL_RANGE 6000000000LL
110
111 #else
112 /* Use a fake > 2^32 offset so we can test for proper 64-bit handling */
113 #define FAKE_OFFSET 6000000000LL
114 #define USUAL_RANGE 2000000000LL
115 #endif
116
117 static zint rec_sysno_to_ext(zint sysno)
118 {
119     assert(sysno >= 0 && sysno <= USUAL_RANGE);
120     return sysno + FAKE_OFFSET;
121 }
122
123 zint rec_sysno_to_int(zint sysno)
124 {
125     assert(sysno >= FAKE_OFFSET && sysno <= FAKE_OFFSET + USUAL_RANGE);
126     return sysno - FAKE_OFFSET;
127 }
128
129 static void rec_tmp_expand(Records p, int size)
130 {
131     if (p->tmp_size < size + 2048 ||
132         p->tmp_size < p->head.block_size[REC_BLOCK_TYPES-1]*2)
133     {
134         xfree(p->tmp_buf);
135         p->tmp_size = size + (int)
136                         (p->head.block_size[REC_BLOCK_TYPES-1])*2 + 2048;
137         p->tmp_buf = (char *) xmalloc(p->tmp_size);
138     }
139 }
140
141 static ZEBRA_RES rec_release_blocks(Records p, zint sysno)
142 {
143     struct record_index_entry entry;
144     zint freeblock;
145     char block_and_ref[sizeof(zint) + sizeof(short)];
146     int dst_type;
147     int first = 1;
148
149     if (recindex_read_indx(p->recindex, sysno, &entry, sizeof(entry), 1) != 1)
150         return ZEBRA_FAIL;
151
152     freeblock = entry.next;
153     assert(freeblock > 0);
154     dst_type = CAST_ZINT_TO_INT(freeblock & 7);
155     assert(dst_type < REC_BLOCK_TYPES);
156     freeblock = freeblock / 8;
157     while (freeblock)
158     {
159         if (bf_read(p->data_BFile[dst_type], freeblock, 0,
160                      first ? sizeof(block_and_ref) : sizeof(zint),
161                      block_and_ref) != 1)
162         {
163             yaz_log(YLOG_FATAL|YLOG_ERRNO, "read in rec_del_single");
164             return ZEBRA_FAIL;
165         }
166         if (first)
167         {
168             short ref;
169             memcpy(&ref, block_and_ref + sizeof(freeblock), sizeof(ref));
170             --ref;
171             memcpy(block_and_ref + sizeof(freeblock), &ref, sizeof(ref));
172             if (ref)
173             {
174                 /* there is still a reference to this block.. */
175                 if (bf_write(p->data_BFile[dst_type], freeblock, 0,
176                               sizeof(block_and_ref), block_and_ref))
177                 {
178                     yaz_log(YLOG_FATAL|YLOG_ERRNO, "write in rec_del_single");
179                     return ZEBRA_FAIL;
180                 }
181                 return ZEBRA_OK;
182             }
183             /* the list of blocks can all be removed (ref == 0) */
184             first = 0;
185         }
186         
187         if (bf_write(p->data_BFile[dst_type], freeblock, 0, sizeof(freeblock),
188                       &p->head.block_free[dst_type]))
189         {
190             yaz_log(YLOG_FATAL|YLOG_ERRNO, "write in rec_del_single");
191             return ZEBRA_FAIL;
192         }
193         p->head.block_free[dst_type] = freeblock;
194         memcpy(&freeblock, block_and_ref, sizeof(freeblock));
195
196         p->head.block_used[dst_type]--;
197     }
198     p->head.total_bytes -= entry.size;
199     return ZEBRA_OK;
200 }
201
202 static ZEBRA_RES rec_delete_single(Records p, Record rec)
203 {
204     struct record_index_entry entry;
205
206     /* all data in entry must be reset, since it's written verbatim */
207     memset(&entry, '\0', sizeof(entry));
208     if (rec_release_blocks(p, rec_sysno_to_int(rec->sysno)) != ZEBRA_OK)
209         return ZEBRA_FAIL;
210
211     entry.next = p->head.index_free;
212     entry.size = 0;
213     p->head.index_free = rec_sysno_to_int(rec->sysno);
214     recindex_write_indx(p->recindex, rec_sysno_to_int(rec->sysno), &entry, sizeof(entry));
215     return ZEBRA_OK;
216 }
217
218 static ZEBRA_RES rec_write_tmp_buf(Records p, int size, zint *sysnos)
219 {
220     struct record_index_entry entry;
221     int no_written = 0;
222     char *cptr = p->tmp_buf;
223     zint block_prev = -1, block_free;
224     int dst_type = 0;
225     int i;
226
227     /* all data in entry must be reset, since it's written verbatim */
228     memset(&entry, '\0', sizeof(entry));
229
230     for (i = 1; i<REC_BLOCK_TYPES; i++)
231         if (size >= p->head.block_move[i])
232             dst_type = i;
233     while (no_written < size)
234     {
235         block_free = p->head.block_free[dst_type];
236         if (block_free)
237         {
238             if (bf_read(p->data_BFile[dst_type],
239                          block_free, 0, sizeof(*p->head.block_free),
240                          &p->head.block_free[dst_type]) != 1)
241             {
242                 yaz_log(YLOG_FATAL|YLOG_ERRNO, "read in %s at free block "
243                          ZINT_FORMAT,
244                          p->data_fname[dst_type], block_free);
245                 return ZEBRA_FAIL;
246             }
247         }
248         else
249             block_free = p->head.block_last[dst_type]++;
250         if (block_prev == -1)
251         {
252             entry.next = block_free*8 + dst_type;
253             entry.size = size;
254             p->head.total_bytes += size;
255             while (*sysnos > 0)
256             {
257                 recindex_write_indx(p->recindex, *sysnos, &entry, sizeof(entry));
258                 sysnos++;
259             }
260         }
261         else
262         {
263             memcpy(cptr, &block_free, sizeof(block_free));
264             bf_write(p->data_BFile[dst_type], block_prev, 0, 0, cptr);
265             cptr = p->tmp_buf + no_written;
266         }
267         block_prev = block_free;
268         no_written += CAST_ZINT_TO_INT(p->head.block_size[dst_type]) 
269             - sizeof(zint);
270         p->head.block_used[dst_type]++;
271     }
272     assert(block_prev != -1);
273     block_free = 0;
274     memcpy(cptr, &block_free, sizeof(block_free));
275     bf_write(p->data_BFile[dst_type], block_prev, 0,
276               sizeof(block_free) + (p->tmp_buf+size) - cptr, cptr);
277     return ZEBRA_OK;
278 }
279
280 Records rec_open(BFiles bfs, int rw, int compression_method)
281 {
282     Records p;
283     int i, r;
284     int version;
285     ZEBRA_RES ret = ZEBRA_OK;
286
287     p = (Records) xmalloc(sizeof(*p));
288     memset(&p->head, '\0', sizeof(p->head));
289     p->compression_method = compression_method;
290     p->rw = rw;
291     p->tmp_size = 4096;
292     p->tmp_buf = (char *) xmalloc(p->tmp_size);
293     p->compression_chunk_size = 0;
294     if (compression_method == REC_COMPRESS_BZIP2)
295         p->compression_chunk_size = 90000;
296     p->recindex = recindex_open(bfs, rw, 0 /* 1=isamb for recindex */);
297     r = recindex_read_head(p->recindex, p->tmp_buf);
298     switch (r)
299     {
300     case 0:
301         memcpy(p->head.magic, REC_HEAD_MAGIC, sizeof(p->head.magic));
302         sprintf(p->head.version, "%3d", REC_VERSION);
303         p->head.index_free = 0;
304         p->head.index_last = 1;
305         p->head.no_records = 0;
306         p->head.total_bytes = 0;
307         for (i = 0; i<REC_BLOCK_TYPES; i++)
308         {
309             p->head.block_free[i] = 0;
310             p->head.block_last[i] = 1;
311             p->head.block_used[i] = 0;
312         }
313         p->head.block_size[0] = 256;
314         p->head.block_move[0] = 0;
315         for (i = 1; i<REC_BLOCK_TYPES; i++)
316         {
317             p->head.block_size[i] = p->head.block_size[i-1] * 8;
318             p->head.block_move[i] = p->head.block_size[i] * 2;
319         }
320         if (rw)
321         {
322             if (recindex_write_head(p->recindex, 
323                                     &p->head, sizeof(p->head)) != ZEBRA_OK)
324                 ret = ZEBRA_FAIL;
325         }
326         break;
327     case 1:
328         memcpy(&p->head, p->tmp_buf, sizeof(p->head));
329         if (memcmp(p->head.magic, REC_HEAD_MAGIC, sizeof(p->head.magic)))
330         {
331             yaz_log(YLOG_FATAL, "file %s has bad format",
332                     recindex_get_fname(p->recindex));
333             ret = ZEBRA_FAIL;
334         }
335         version = atoi(p->head.version);
336         if (version != REC_VERSION)
337         {
338             yaz_log(YLOG_FATAL, "file %s is version %d, but version"
339                   " %d is required",
340                     recindex_get_fname(p->recindex), version, REC_VERSION);
341             ret = ZEBRA_FAIL;
342         }
343         break;
344     }
345     for (i = 0; i<REC_BLOCK_TYPES; i++)
346     {
347         char str[80];
348         sprintf(str, "recd%c", i + 'A');
349         p->data_fname[i] = (char *) xmalloc(strlen(str)+1);
350         strcpy(p->data_fname[i], str);
351         p->data_BFile[i] = NULL;
352     }
353     for (i = 0; i<REC_BLOCK_TYPES; i++)
354     {
355         if (!(p->data_BFile[i] =
356               bf_open(bfs, p->data_fname[i],
357                       CAST_ZINT_TO_INT(p->head.block_size[i]), rw)))
358         {
359             yaz_log(YLOG_FATAL|YLOG_ERRNO, "bf_open %s", p->data_fname[i]);
360             ret = ZEBRA_FAIL;
361             break;
362         }
363     }
364     p->cache_max = 400;
365     p->cache_cur = 0;
366     p->record_cache = (struct record_cache_entry *)
367         xmalloc(sizeof(*p->record_cache)*p->cache_max);
368     zebra_mutex_init(&p->mutex);
369     if (ret == ZEBRA_FAIL)
370         rec_close(&p);
371     return p;
372 }
373
374 static void rec_encode_unsigned(unsigned n, unsigned char *buf, int *len)
375 {
376     (*len) = 0;
377     while (n > 127)
378     {
379         buf[*len] = 128 + (n & 127);
380         n = n >> 7;
381         (*len)++;
382     }
383     buf[*len] = n;
384     (*len)++;
385 }
386
387 static void rec_decode_unsigned(unsigned *np, unsigned char *buf, int *len)
388 {
389     unsigned n = 0;
390     unsigned w = 1;
391     (*len) = 0;
392
393     while (buf[*len] > 127)
394     {
395         n += w*(buf[*len] & 127);
396         w = w << 7;
397         (*len)++;
398     }
399     n += w * buf[*len];
400     (*len)++;
401     *np = n;
402 }
403
404 static void rec_encode_zint(zint n, unsigned char *buf, int *len)
405 {
406     (*len) = 0;
407     while (n > 127)
408     {
409         buf[*len] = (unsigned) (128 + (n & 127));
410         n = n >> 7;
411         (*len)++;
412     }
413     buf[*len] = (unsigned) n;
414     (*len)++;
415 }
416
417 static void rec_decode_zint(zint *np, unsigned char *buf, int *len)
418 {
419     zint  n = 0;
420     zint w = 1;
421     (*len) = 0;
422
423     while (buf[*len] > 127)
424     {
425         n += w*(buf[*len] & 127);
426         w = w << 7;
427         (*len)++;
428     }
429     n += w * buf[*len];
430     (*len)++;
431     *np = n;
432 }
433
434 static void rec_cache_flush_block1(Records p, Record rec, Record last_rec,
435                                    char **out_buf, int *out_size,
436                                    int *out_offset)
437 {
438     int i;
439     int len;
440
441     for (i = 0; i<REC_NO_INFO; i++)
442     {
443         if (*out_offset + CAST_ZINT_TO_INT(rec->size[i]) + 20 > *out_size)
444         {
445             int new_size = *out_offset + rec->size[i] + 65536;
446             char *np = (char *) xmalloc(new_size);
447             if (*out_offset)
448                 memcpy(np, *out_buf, *out_offset);
449             xfree(*out_buf);
450             *out_size = new_size;
451             *out_buf = np;
452         }
453         if (i == 0)
454         {
455             rec_encode_zint(rec_sysno_to_int(rec->sysno), 
456                             (unsigned char *) *out_buf + *out_offset, &len);
457             (*out_offset) += len;
458         }
459         if (rec->size[i] == 0)
460         {
461             rec_encode_unsigned(1, (unsigned char *) *out_buf + *out_offset,
462                                 &len);
463             (*out_offset) += len;
464         }
465         else if (last_rec && rec->size[i] == last_rec->size[i] &&
466                  !memcmp(rec->info[i], last_rec->info[i], rec->size[i]))
467         {
468             rec_encode_unsigned(0, (unsigned char *) *out_buf + *out_offset,
469                                 &len);
470             (*out_offset) += len;
471         }
472         else
473         {
474             rec_encode_unsigned(rec->size[i]+1,
475                                 (unsigned char *) *out_buf + *out_offset,
476                                 &len);
477             (*out_offset) += len;
478             memcpy(*out_buf + *out_offset, rec->info[i], rec->size[i]);
479             (*out_offset) += rec->size[i];
480         }
481     }
482 }
483
484 static ZEBRA_RES rec_flush_shared(Records p, short ref_count, zint *sysnos,
485                                   char *out_buf, int out_offset)
486 {
487     ZEBRA_RES ret = ZEBRA_OK;
488     if (ref_count)
489     {
490         int i;
491         unsigned int csize = 0;  /* indicate compression "not performed yet" */
492         char compression_method = p->compression_method;
493         switch (compression_method)
494         {
495         case REC_COMPRESS_BZIP2:
496 #if HAVE_BZLIB_H        
497             csize = out_offset + (out_offset >> 6) + 620;
498             rec_tmp_expand(p, csize);
499 #ifdef BZ_CONFIG_ERROR
500             i = BZ2_bzBuffToBuffCompress 
501 #else
502             i = bzBuffToBuffCompress 
503 #endif
504                                     (p->tmp_buf+sizeof(zint)+sizeof(short)+
505                                       sizeof(char),
506                                       &csize, out_buf, out_offset, 1, 0, 30);
507             if (i != BZ_OK)
508             {
509                 yaz_log(YLOG_WARN, "bzBuffToBuffCompress error code=%d", i);
510                 csize = 0;
511             }
512             yaz_log(YLOG_LOG, "compress %4d %5d %5d", ref_count, out_offset,
513                   csize);
514 #endif
515             break;
516         case REC_COMPRESS_NONE:
517             break;
518         }
519         if (!csize)  
520         {
521             /* either no compression or compression not supported ... */
522             csize = out_offset;
523             rec_tmp_expand(p, csize);
524             memcpy(p->tmp_buf + sizeof(zint) + sizeof(short) + sizeof(char),
525                     out_buf, out_offset);
526             csize = out_offset;
527             compression_method = REC_COMPRESS_NONE;
528         }
529         memcpy(p->tmp_buf + sizeof(zint), &ref_count, sizeof(ref_count));
530         memcpy(p->tmp_buf + sizeof(zint)+sizeof(short),
531                 &compression_method, sizeof(compression_method));
532                 
533         /* -------- compression */
534         if (rec_write_tmp_buf(p, csize + sizeof(short) + sizeof(char), sysnos)
535             != ZEBRA_OK)
536             ret = ZEBRA_FAIL;
537     }
538     return ret;
539 }
540
541 static ZEBRA_RES rec_write_multiple(Records p, int saveCount)
542 {
543     int i;
544     short ref_count = 0;
545     Record last_rec = 0;
546     int out_size = 1000;
547     int out_offset = 0;
548     char *out_buf = (char *) xmalloc(out_size);
549     zint *sysnos = (zint *) xmalloc(sizeof(*sysnos) * (p->cache_cur + 1));
550     zint *sysnop = sysnos;
551     ZEBRA_RES ret = ZEBRA_OK;
552
553     for (i = 0; i<p->cache_cur - saveCount; i++)
554     {
555         struct record_cache_entry *e = p->record_cache + i;
556         switch (e->flag)
557         {
558         case recordFlagNew:
559             rec_cache_flush_block1(p, e->rec, last_rec, &out_buf,
560                                     &out_size, &out_offset);
561             *sysnop++ = rec_sysno_to_int(e->rec->sysno);
562             ref_count++;
563             e->flag = recordFlagNop;
564             last_rec = e->rec;
565             break;
566         case recordFlagWrite:
567             if (rec_release_blocks(p, rec_sysno_to_int(e->rec->sysno))
568                 != ZEBRA_OK)
569                 ret = ZEBRA_FAIL;
570
571             rec_cache_flush_block1(p, e->rec, last_rec, &out_buf,
572                                     &out_size, &out_offset);
573             *sysnop++ = rec_sysno_to_int(e->rec->sysno);
574             ref_count++;
575             e->flag = recordFlagNop;
576             last_rec = e->rec;
577             break;
578         case recordFlagDelete:
579             if (rec_delete_single(p, e->rec) != ZEBRA_OK)
580                 ret = ZEBRA_FAIL;
581
582             e->flag = recordFlagNop;
583             break;
584         case recordFlagNop:
585             break;
586         default:
587             break;
588         }
589     }
590
591     *sysnop = -1;
592     rec_flush_shared(p, ref_count, sysnos, out_buf, out_offset);
593     xfree(out_buf);
594     xfree(sysnos);
595     return ret;
596 }
597
598 static ZEBRA_RES rec_cache_flush(Records p, int saveCount)
599 {
600     int i, j;
601     ZEBRA_RES ret;
602
603     if (saveCount >= p->cache_cur)
604         saveCount = 0;
605
606     ret = rec_write_multiple(p, saveCount);
607
608     for (i = 0; i<p->cache_cur - saveCount; i++)
609     {
610         struct record_cache_entry *e = p->record_cache + i;
611         rec_free(&e->rec);
612     } 
613     /* i still being used ... */
614     for (j = 0; j<saveCount; j++, i++)
615         memcpy(p->record_cache+j, p->record_cache+i,
616                 sizeof(*p->record_cache));
617     p->cache_cur = saveCount;
618     return ret;
619 }
620
621 static Record *rec_cache_lookup(Records p, zint sysno,
622                                 enum recordCacheFlag flag)
623 {
624     int i;
625     for (i = 0; i<p->cache_cur; i++)
626     {
627         struct record_cache_entry *e = p->record_cache + i;
628         if (e->rec->sysno == sysno)
629         {
630             if (flag != recordFlagNop && e->flag == recordFlagNop)
631                 e->flag = flag;
632             return &e->rec;
633         }
634     }
635     return NULL;
636 }
637
638 static ZEBRA_RES rec_cache_insert(Records p, Record rec, enum recordCacheFlag flag)
639 {
640     struct record_cache_entry *e;
641     ZEBRA_RES ret = ZEBRA_OK;
642
643     if (p->cache_cur == p->cache_max)
644         ret = rec_cache_flush(p, 1);
645     else if (p->cache_cur > 0)
646     {
647         int i, j;
648         int used = 0;
649         for (i = 0; i<p->cache_cur; i++)
650         {
651             Record r = (p->record_cache + i)->rec;
652             for (j = 0; j<REC_NO_INFO; j++)
653                 used += r->size[j];
654         }
655         if (used > p->compression_chunk_size)
656             ret = rec_cache_flush(p, 1);
657     }
658     assert(p->cache_cur < p->cache_max);
659
660     e = p->record_cache + (p->cache_cur)++;
661     e->flag = flag;
662     e->rec = rec_cp(rec);
663     return ret;
664 }
665
666 ZEBRA_RES rec_close(Records *pp)
667 {
668     Records p = *pp;
669     int i;
670     ZEBRA_RES ret = ZEBRA_OK;
671
672     if (!p)
673         return ret;
674
675     zebra_mutex_destroy(&p->mutex);
676     if (rec_cache_flush(p, 0) != ZEBRA_OK)
677         ret = ZEBRA_FAIL;
678
679     xfree(p->record_cache);
680
681     if (p->rw)
682     {
683         if (recindex_write_head(p->recindex, &p->head, sizeof(p->head)) != ZEBRA_OK)
684             ret = ZEBRA_FAIL;
685     }
686
687     recindex_close(p->recindex);
688
689     for (i = 0; i<REC_BLOCK_TYPES; i++)
690     {
691         if (p->data_BFile[i])
692             bf_close(p->data_BFile[i]);
693         xfree(p->data_fname[i]);
694     }
695     xfree(p->tmp_buf);
696     xfree(p);
697     *pp = NULL;
698     return ret;
699 }
700
701 static Record rec_get_int(Records p, zint sysno)
702 {
703     int i, in_size, r;
704     Record rec, *recp;
705     struct record_index_entry entry;
706     zint freeblock;
707     int dst_type;
708     char *nptr, *cptr;
709     char *in_buf = 0;
710     char *bz_buf = 0;
711 #if HAVE_BZLIB_H
712     unsigned int bz_size;
713 #endif
714     char compression_method;
715
716     assert(sysno > 0);
717     assert(p);
718
719     if ((recp = rec_cache_lookup(p, sysno, recordFlagNop)))
720         return rec_cp(*recp);
721
722     if (recindex_read_indx(p->recindex, rec_sysno_to_int(sysno), &entry, sizeof(entry), 1) < 1)
723         return NULL;       /* record is not there! */
724
725     if (!entry.size)
726         return NULL;       /* record is deleted */
727
728     dst_type = (int) (entry.next & 7);
729     assert(dst_type < REC_BLOCK_TYPES);
730     freeblock = entry.next / 8;
731
732     assert(freeblock > 0);
733     
734     rec_tmp_expand(p, entry.size);
735
736     cptr = p->tmp_buf;
737     r = bf_read(p->data_BFile[dst_type], freeblock, 0, 0, cptr);
738     if (r < 0)
739         return 0;
740     memcpy(&freeblock, cptr, sizeof(freeblock));
741
742     while (freeblock)
743     {
744         zint tmp;
745
746         cptr += p->head.block_size[dst_type] - sizeof(freeblock);
747         
748         memcpy(&tmp, cptr, sizeof(tmp));
749         r = bf_read(p->data_BFile[dst_type], freeblock, 0, 0, cptr);
750         if (r < 0)
751             return 0;
752         memcpy(&freeblock, cptr, sizeof(freeblock));
753         memcpy(cptr, &tmp, sizeof(tmp));
754     }
755
756     rec = (Record) xmalloc(sizeof(*rec));
757     rec->sysno = sysno;
758     memcpy(&compression_method, p->tmp_buf + sizeof(zint) + sizeof(short),
759             sizeof(compression_method));
760     in_buf = p->tmp_buf + sizeof(zint) + sizeof(short) + sizeof(char);
761     in_size = entry.size - sizeof(short) - sizeof(char);
762     switch (compression_method)
763     {
764     case REC_COMPRESS_BZIP2:
765 #if HAVE_BZLIB_H
766         bz_size = entry.size * 20 + 100;
767         while (1)
768         {
769             bz_buf = (char *) xmalloc(bz_size);
770 #ifdef BZ_CONFIG_ERROR
771             i = BZ2_bzBuffToBuffDecompress
772 #else
773             i = bzBuffToBuffDecompress
774 #endif
775                 (bz_buf, &bz_size, in_buf, in_size, 0, 0);
776             yaz_log(YLOG_LOG, "decompress %5d %5d", in_size, bz_size);
777             if (i == BZ_OK)
778                 break;
779             yaz_log(YLOG_LOG, "failed");
780             xfree(bz_buf);
781             bz_size *= 2;
782         }
783         in_buf = bz_buf;
784         in_size = bz_size;
785 #else
786         yaz_log(YLOG_FATAL, "cannot decompress record(s) in BZIP2 format");
787         return 0;
788 #endif
789         break;
790     case REC_COMPRESS_NONE:
791         break;
792     }
793     for (i = 0; i<REC_NO_INFO; i++)
794         rec->info[i] = 0;
795
796     nptr = in_buf;                /* skip ref count */
797     while (nptr < in_buf + in_size)
798     {
799         zint this_sysno;
800         int len;
801         rec_decode_zint(&this_sysno, (unsigned char *) nptr, &len);
802         nptr += len;
803
804         for (i = 0; i < REC_NO_INFO; i++)
805         {
806             unsigned int this_size;
807             rec_decode_unsigned(&this_size, (unsigned char *) nptr, &len);
808             nptr += len;
809
810             if (this_size == 0)
811                 continue;
812             rec->size[i] = this_size-1;
813
814             if (rec->size[i])
815             {
816                 rec->info[i] = nptr;
817                 nptr += rec->size[i];
818             }
819             else
820                 rec->info[i] = NULL;
821         }
822         if (this_sysno == rec_sysno_to_int(sysno))
823             break;
824     }
825     for (i = 0; i<REC_NO_INFO; i++)
826     {
827         if (rec->info[i] && rec->size[i])
828         {
829             char *np = xmalloc(rec->size[i]+1);
830             memcpy(np, rec->info[i], rec->size[i]);
831             np[rec->size[i]] = '\0';
832             rec->info[i] = np;
833         }
834         else
835         {
836             assert(rec->info[i] == 0);
837             assert(rec->size[i] == 0);
838         }
839     }
840     xfree(bz_buf);
841     if (rec_cache_insert(p, rec, recordFlagNop) != ZEBRA_OK)
842         return 0;
843     return rec;
844 }
845
846 Record rec_get(Records p, zint sysno)
847 {
848     Record rec;
849     zebra_mutex_lock(&p->mutex);
850
851     rec = rec_get_int(p, sysno);
852     zebra_mutex_unlock(&p->mutex);
853     return rec;
854 }
855
856 Record rec_get_root(Records p)
857 {
858     return rec_get(p, rec_sysno_to_ext(1));
859 }
860
861 static Record rec_new_int(Records p)
862 {
863     int i;
864     zint sysno;
865     Record rec;
866
867     assert(p);
868     rec = (Record) xmalloc(sizeof(*rec));
869     if (1 || p->head.index_free == 0)
870         sysno = (p->head.index_last)++;
871     else
872     {
873         struct record_index_entry entry;
874
875         if (recindex_read_indx(p->recindex, p->head.index_free, &entry, sizeof(entry), 0) < 1)
876         {
877             xfree(rec);
878             return 0;
879         }
880         sysno = p->head.index_free;
881         p->head.index_free = entry.next;
882     }
883     (p->head.no_records)++;
884     rec->sysno = rec_sysno_to_ext(sysno);
885     for (i = 0; i < REC_NO_INFO; i++)
886     {
887         rec->info[i] = NULL;
888         rec->size[i] = 0;
889     }
890     rec_cache_insert(p, rec, recordFlagNew);
891     return rec;
892 }
893
894 Record rec_new(Records p)
895 {
896     Record rec;
897     zebra_mutex_lock(&p->mutex);
898
899     rec = rec_new_int(p);
900     zebra_mutex_unlock(&p->mutex);
901     return rec;
902 }
903
904 ZEBRA_RES rec_del(Records p, Record *recpp)
905 {
906     Record *recp;
907     ZEBRA_RES ret = ZEBRA_OK;
908
909     zebra_mutex_lock(&p->mutex);
910     (p->head.no_records)--;
911     if ((recp = rec_cache_lookup(p, (*recpp)->sysno, recordFlagDelete)))
912     {
913         rec_free(recp);
914         *recp = *recpp;
915     }
916     else
917     {
918         ret = rec_cache_insert(p, *recpp, recordFlagDelete);
919         rec_free(recpp);
920     }
921     zebra_mutex_unlock(&p->mutex);
922     *recpp = NULL;
923     return ret;
924 }
925
926 ZEBRA_RES rec_put(Records p, Record *recpp)
927 {
928     Record *recp;
929     ZEBRA_RES ret = ZEBRA_OK;
930
931     zebra_mutex_lock(&p->mutex);
932     if ((recp = rec_cache_lookup(p, (*recpp)->sysno, recordFlagWrite)))
933     {
934         rec_free(recp);
935         *recp = *recpp;
936     }
937     else
938     {
939         ret = rec_cache_insert(p, *recpp, recordFlagWrite);
940         rec_free(recpp);
941     }
942     zebra_mutex_unlock(&p->mutex);
943     *recpp = NULL;
944     return ret;
945 }
946
947 void rec_free(Record *recpp)
948 {
949     int i;
950
951     if (!*recpp)
952         return ;
953     for (i = 0; i < REC_NO_INFO; i++)
954         xfree((*recpp)->info[i]);
955     xfree(*recpp);
956     *recpp = NULL;
957 }
958
959 Record rec_cp(Record rec)
960 {
961     Record n;
962     int i;
963
964     n = (Record) xmalloc(sizeof(*n));
965     n->sysno = rec->sysno;
966     for (i = 0; i < REC_NO_INFO; i++)
967         if (!rec->info[i])
968         {
969             n->info[i] = NULL;
970             n->size[i] = 0;
971         }
972         else
973         {
974             n->size[i] = rec->size[i];
975             n->info[i] = (char *) xmalloc(rec->size[i]+1);
976             memcpy(n->info[i], rec->info[i], rec->size[i]);
977             n->info[i][rec->size[i]] = '\0';
978         }
979     return n;
980 }
981
982
983 char *rec_strdup(const char *s, size_t *len)
984 {
985     char *p;
986
987     if (!s)
988     {
989         *len = 0;
990         return NULL;
991     }
992     *len = strlen(s)+1;
993     p = (char *) xmalloc(*len);
994     strcpy(p, s);
995     return p;
996 }
997
998 void rec_prstat(Records records, int verbose)
999 {
1000     int i;
1001     zint total_bytes = 0;
1002     
1003     yaz_log (YLOG_LOG,
1004           "Total records                        %8" ZINT_FORMAT0,
1005           records->head.no_records);
1006
1007     for (i = 0; i< REC_BLOCK_TYPES; i++)
1008     {
1009         yaz_log (YLOG_LOG, "Record blocks of size "ZINT_FORMAT,
1010               records->head.block_size[i]);
1011         yaz_log (YLOG_LOG,
1012           " Used/Total/Bytes used            "
1013               ZINT_FORMAT "/" ZINT_FORMAT "/" ZINT_FORMAT,
1014               records->head.block_used[i], records->head.block_last[i]-1,
1015               records->head.block_used[i] * records->head.block_size[i]);
1016         total_bytes +=
1017             records->head.block_used[i] * records->head.block_size[i];
1018
1019         yaz_log(YLOG_LOG, " Block Last " ZINT_FORMAT, records->head.block_last[i]);
1020         if (verbose)
1021         {   /* analyse free lists */
1022             zint no_free = 0;
1023             zint block_free = records->head.block_free[i];
1024             WRBUF w = wrbuf_alloc();
1025             while (block_free)
1026             {
1027                 zint nblock;
1028                 no_free++;
1029                 wrbuf_printf(w, " " ZINT_FORMAT, block_free);
1030                 if (bf_read(records->data_BFile[i],
1031                             block_free, 0, sizeof(nblock), &nblock) != 1)
1032                 {
1033                     yaz_log(YLOG_FATAL|YLOG_ERRNO, "read in %s at free block "
1034                             ZINT_FORMAT,
1035                             records->data_fname[i], block_free);
1036                     break;
1037                 }
1038                 block_free = nblock;
1039             }
1040             yaz_log (YLOG_LOG,
1041                      " Number in free list       %8" ZINT_FORMAT0, no_free);
1042             if (no_free)
1043                 yaz_log(YLOG_LOG, "%s", wrbuf_cstr(w));
1044             wrbuf_destroy(w);
1045         }
1046     }
1047     yaz_log (YLOG_LOG,
1048           "Total size of record index in bytes  %8" ZINT_FORMAT0,
1049           records->head.total_bytes);
1050     yaz_log (YLOG_LOG,
1051           "Total size with overhead             %8" ZINT_FORMAT0,
1052           total_bytes);
1053 }
1054
1055 /*
1056  * Local variables:
1057  * c-basic-offset: 4
1058  * c-file-style: "Stroustrup"
1059  * indent-tabs-mode: nil
1060  * End:
1061  * vim: shiftwidth=4 tabstop=8 expandtab
1062  */
1063