Updated footer comment
[idzebra-moved-to-github.git] / index / records.c
1 /* This file is part of the Zebra server.
2    Copyright (C) 1994-2009 Index Data
3
4 Zebra is free software; you can redistribute it and/or modify it under
5 the terms of the GNU General Public License as published by the Free
6 Software Foundation; either version 2, or (at your option) any later
7 version.
8
9 Zebra is distributed in the hope that it will be useful, but WITHOUT ANY
10 WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12 for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
17
18 */
19
20 /*
21  *  Format of first block
22  *      next       (8 bytes)
23  *      ref_count  (2 bytes)
24  *      block      (500 bytes)
25  *
26  *  Format of subsequent blocks 
27  *      next  (8 bytes)
28  *      block (502 bytes)
29  *
30  *  Format of each record
31  *      sysno
32  *      (length, data) - pairs
33  *      length = 0 if same as previous
34  */
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <assert.h>
38 #include <string.h>
39
40 #include <yaz/yaz-util.h>
41 #include <idzebra/bfile.h>
42 #include "recindex.h"
43
44 #if HAVE_BZLIB_H
45 #include <bzlib.h>
46 #endif
47
48 #define REC_BLOCK_TYPES 2
49 #define REC_HEAD_MAGIC "recindex"
50 #define REC_VERSION 5
51
52 struct records_info {
53     int rw;
54     int compression_method;
55
56     recindex_t recindex;
57
58     char *data_fname[REC_BLOCK_TYPES];
59     BFile data_BFile[REC_BLOCK_TYPES];
60
61     char *tmp_buf;
62     int tmp_size;
63
64     struct record_cache_entry *record_cache;
65     int cache_size;
66     int cache_cur;
67     int cache_max;
68
69     int compression_chunk_size;
70
71     Zebra_mutex mutex;
72
73     struct records_head {
74         char magic[8];
75         char version[4];
76         zint block_size[REC_BLOCK_TYPES];
77         zint block_free[REC_BLOCK_TYPES];
78         zint block_last[REC_BLOCK_TYPES];
79         zint block_used[REC_BLOCK_TYPES];
80         zint block_move[REC_BLOCK_TYPES];
81
82         zint total_bytes;
83         zint index_last;
84         zint index_free;
85         zint no_records;
86
87     } head;
88 };
89
90 enum recordCacheFlag { recordFlagNop, recordFlagWrite, recordFlagNew,
91                        recordFlagDelete };
92
93 struct record_cache_entry {
94     Record rec;
95     enum recordCacheFlag flag;
96 };
97
98 struct record_index_entry {
99     zint next;         /* first block of record info / next free entry */
100     int size;          /* size of record or 0 if free entry */
101 };
102
103 Record rec_cp(Record rec);
104
105 /* Modify argument to if below: 1=normal, 0=sysno testing */
106 #if 1
107 /* If this is used sysno are not converted (no testing) */
108 #define FAKE_OFFSET 0
109 #define USUAL_RANGE 6000000000LL
110
111 #else
112 /* Use a fake > 2^32 offset so we can test for proper 64-bit handling */
113 #define FAKE_OFFSET 6000000000LL
114 #define USUAL_RANGE 2000000000LL
115 #endif
116
117 static zint rec_sysno_to_ext(zint sysno)
118 {
119     assert(sysno >= 0 && sysno <= USUAL_RANGE);
120     return sysno + FAKE_OFFSET;
121 }
122
123 zint rec_sysno_to_int(zint sysno)
124 {
125     assert(sysno >= FAKE_OFFSET && sysno <= FAKE_OFFSET + USUAL_RANGE);
126     return sysno - FAKE_OFFSET;
127 }
128
129 static void rec_tmp_expand(Records p, int size)
130 {
131     if (p->tmp_size < size + 2048 ||
132         p->tmp_size < p->head.block_size[REC_BLOCK_TYPES-1]*2)
133     {
134         xfree(p->tmp_buf);
135         p->tmp_size = size + (int)
136                         (p->head.block_size[REC_BLOCK_TYPES-1])*2 + 2048;
137         p->tmp_buf = (char *) xmalloc(p->tmp_size);
138     }
139 }
140
141 static ZEBRA_RES rec_release_blocks(Records p, zint sysno)
142 {
143     struct record_index_entry entry;
144     zint freeblock;
145     char block_and_ref[sizeof(zint) + sizeof(short)];
146     int dst_type;
147     int first = 1;
148
149     if (recindex_read_indx(p->recindex, sysno, &entry, sizeof(entry), 1) != 1)
150         return ZEBRA_FAIL;
151
152     freeblock = entry.next;
153     assert(freeblock > 0);
154     dst_type = CAST_ZINT_TO_INT(freeblock & 7);
155     assert(dst_type < REC_BLOCK_TYPES);
156     freeblock = freeblock / 8;
157     while (freeblock)
158     {
159         if (bf_read(p->data_BFile[dst_type], freeblock, 0,
160                      first ? sizeof(block_and_ref) : sizeof(zint),
161                      block_and_ref) != 1)
162         {
163             yaz_log(YLOG_FATAL|YLOG_ERRNO, "read in rec_del_single");
164             return ZEBRA_FAIL;
165         }
166         if (first)
167         {
168             short ref;
169             memcpy(&ref, block_and_ref + sizeof(freeblock), sizeof(ref));
170             --ref;
171             memcpy(block_and_ref + sizeof(freeblock), &ref, sizeof(ref));
172             if (ref)
173             {
174                 /* there is still a reference to this block.. */
175                 if (bf_write(p->data_BFile[dst_type], freeblock, 0,
176                               sizeof(block_and_ref), block_and_ref))
177                 {
178                     yaz_log(YLOG_FATAL|YLOG_ERRNO, "write in rec_del_single");
179                     return ZEBRA_FAIL;
180                 }
181                 return ZEBRA_OK;
182             }
183             /* the list of blocks can all be removed (ref == 0) */
184             first = 0;
185         }
186         
187         if (bf_write(p->data_BFile[dst_type], freeblock, 0, sizeof(freeblock),
188                       &p->head.block_free[dst_type]))
189         {
190             yaz_log(YLOG_FATAL|YLOG_ERRNO, "write in rec_del_single");
191             return ZEBRA_FAIL;
192         }
193         p->head.block_free[dst_type] = freeblock;
194         memcpy(&freeblock, block_and_ref, sizeof(freeblock));
195
196         p->head.block_used[dst_type]--;
197     }
198     p->head.total_bytes -= entry.size;
199     return ZEBRA_OK;
200 }
201
202 static ZEBRA_RES rec_delete_single(Records p, Record rec)
203 {
204     struct record_index_entry entry;
205
206     /* all data in entry must be reset, since it's written verbatim */
207     memset(&entry, '\0', sizeof(entry));
208     if (rec_release_blocks(p, rec_sysno_to_int(rec->sysno)) != ZEBRA_OK)
209         return ZEBRA_FAIL;
210
211     entry.next = p->head.index_free;
212     entry.size = 0;
213     p->head.index_free = rec_sysno_to_int(rec->sysno);
214     recindex_write_indx(p->recindex, rec_sysno_to_int(rec->sysno), &entry, sizeof(entry));
215     return ZEBRA_OK;
216 }
217
218 static ZEBRA_RES rec_write_tmp_buf(Records p, int size, zint *sysnos)
219 {
220     struct record_index_entry entry;
221     int no_written = 0;
222     char *cptr = p->tmp_buf;
223     zint block_prev = -1, block_free;
224     int dst_type = 0;
225     int i;
226
227     /* all data in entry must be reset, since it's written verbatim */
228     memset(&entry, '\0', sizeof(entry));
229
230     for (i = 1; i<REC_BLOCK_TYPES; i++)
231         if (size >= p->head.block_move[i])
232             dst_type = i;
233     while (no_written < size)
234     {
235         block_free = p->head.block_free[dst_type];
236         if (block_free)
237         {
238             if (bf_read(p->data_BFile[dst_type],
239                          block_free, 0, sizeof(*p->head.block_free),
240                          &p->head.block_free[dst_type]) != 1)
241             {
242                 yaz_log(YLOG_FATAL|YLOG_ERRNO, "read in %s at free block "
243                          ZINT_FORMAT,
244                          p->data_fname[dst_type], block_free);
245                 return ZEBRA_FAIL;
246             }
247         }
248         else
249             block_free = p->head.block_last[dst_type]++;
250         if (block_prev == -1)
251         {
252             entry.next = block_free*8 + dst_type;
253             entry.size = size;
254             p->head.total_bytes += size;
255             while (*sysnos > 0)
256             {
257                 recindex_write_indx(p->recindex, *sysnos, &entry, sizeof(entry));
258                 sysnos++;
259             }
260         }
261         else
262         {
263             memcpy(cptr, &block_free, sizeof(block_free));
264             bf_write(p->data_BFile[dst_type], block_prev, 0, 0, cptr);
265             cptr = p->tmp_buf + no_written;
266         }
267         block_prev = block_free;
268         no_written += CAST_ZINT_TO_INT(p->head.block_size[dst_type]) 
269             - sizeof(zint);
270         p->head.block_used[dst_type]++;
271     }
272     assert(block_prev != -1);
273     block_free = 0;
274     memcpy(cptr, &block_free, sizeof(block_free));
275     bf_write(p->data_BFile[dst_type], block_prev, 0,
276               sizeof(block_free) + (p->tmp_buf+size) - cptr, cptr);
277     return ZEBRA_OK;
278 }
279
280 Records rec_open(BFiles bfs, int rw, int compression_method)
281 {
282     Records p;
283     int i, r;
284     int version;
285     ZEBRA_RES ret = ZEBRA_OK;
286
287     p = (Records) xmalloc(sizeof(*p));
288     memset(&p->head, '\0', sizeof(p->head));
289     p->compression_method = compression_method;
290     p->rw = rw;
291     p->tmp_size = 1024;
292     p->tmp_buf = (char *) xmalloc(p->tmp_size);
293     p->compression_chunk_size = 0;
294     p->recindex = recindex_open(bfs, rw, 0 /* 1=isamb for recindex */);
295     r = recindex_read_head(p->recindex, p->tmp_buf);
296     switch (r)
297     {
298     case 0:
299         memcpy(p->head.magic, REC_HEAD_MAGIC, sizeof(p->head.magic));
300         sprintf(p->head.version, "%3d", REC_VERSION);
301         p->head.index_free = 0;
302         p->head.index_last = 1;
303         p->head.no_records = 0;
304         p->head.total_bytes = 0;
305         for (i = 0; i<REC_BLOCK_TYPES; i++)
306         {
307             p->head.block_free[i] = 0;
308             p->head.block_last[i] = 1;
309             p->head.block_used[i] = 0;
310         }
311         p->head.block_size[0] = 128;
312         p->head.block_move[0] = 0;
313         for (i = 1; i<REC_BLOCK_TYPES; i++)
314         {
315             p->head.block_size[i] = p->head.block_size[i-1] * 4;
316             p->head.block_move[i] = p->head.block_size[i] * 24;
317         }
318         if (rw)
319         {
320             if (recindex_write_head(p->recindex, 
321                                     &p->head, sizeof(p->head)) != ZEBRA_OK)
322                 ret = ZEBRA_FAIL;
323         }
324         break;
325     case 1:
326         memcpy(&p->head, p->tmp_buf, sizeof(p->head));
327         if (memcmp(p->head.magic, REC_HEAD_MAGIC, sizeof(p->head.magic)))
328         {
329             yaz_log(YLOG_FATAL, "file %s has bad format",
330                     recindex_get_fname(p->recindex));
331             ret = ZEBRA_FAIL;
332         }
333         version = atoi(p->head.version);
334         if (version != REC_VERSION)
335         {
336             yaz_log(YLOG_FATAL, "file %s is version %d, but version"
337                   " %d is required",
338                     recindex_get_fname(p->recindex), version, REC_VERSION);
339             ret = ZEBRA_FAIL;
340         }
341         p->compression_chunk_size = 90000; /* good for BZIP2 */
342         break;
343     }
344     for (i = 0; i<REC_BLOCK_TYPES; i++)
345     {
346         char str[80];
347         sprintf(str, "recd%c", i + 'A');
348         p->data_fname[i] = (char *) xmalloc(strlen(str)+1);
349         strcpy(p->data_fname[i], str);
350         p->data_BFile[i] = NULL;
351     }
352     for (i = 0; i<REC_BLOCK_TYPES; i++)
353     {
354         if (!(p->data_BFile[i] =
355               bf_open(bfs, p->data_fname[i],
356                       CAST_ZINT_TO_INT(p->head.block_size[i]), rw)))
357         {
358             yaz_log(YLOG_FATAL|YLOG_ERRNO, "bf_open %s", p->data_fname[i]);
359             ret = ZEBRA_FAIL;
360             break;
361         }
362     }
363     p->cache_max = 400;
364     p->cache_cur = 0;
365     p->record_cache = (struct record_cache_entry *)
366         xmalloc(sizeof(*p->record_cache)*p->cache_max);
367     zebra_mutex_init(&p->mutex);
368     if (ret == ZEBRA_FAIL)
369         rec_close(&p);
370     return p;
371 }
372
373 static void rec_encode_unsigned(unsigned n, unsigned char *buf, int *len)
374 {
375     (*len) = 0;
376     while (n > 127)
377     {
378         buf[*len] = 128 + (n & 127);
379         n = n >> 7;
380         (*len)++;
381     }
382     buf[*len] = n;
383     (*len)++;
384 }
385
386 static void rec_decode_unsigned(unsigned *np, unsigned char *buf, int *len)
387 {
388     unsigned n = 0;
389     unsigned w = 1;
390     (*len) = 0;
391
392     while (buf[*len] > 127)
393     {
394         n += w*(buf[*len] & 127);
395         w = w << 7;
396         (*len)++;
397     }
398     n += w * buf[*len];
399     (*len)++;
400     *np = n;
401 }
402
403 static void rec_encode_zint(zint n, unsigned char *buf, int *len)
404 {
405     (*len) = 0;
406     while (n > 127)
407     {
408         buf[*len] = (unsigned) (128 + (n & 127));
409         n = n >> 7;
410         (*len)++;
411     }
412     buf[*len] = (unsigned) n;
413     (*len)++;
414 }
415
416 static void rec_decode_zint(zint *np, unsigned char *buf, int *len)
417 {
418     zint  n = 0;
419     zint w = 1;
420     (*len) = 0;
421
422     while (buf[*len] > 127)
423     {
424         n += w*(buf[*len] & 127);
425         w = w << 7;
426         (*len)++;
427     }
428     n += w * buf[*len];
429     (*len)++;
430     *np = n;
431 }
432
433 static void rec_cache_flush_block1(Records p, Record rec, Record last_rec,
434                                    char **out_buf, int *out_size,
435                                    int *out_offset)
436 {
437     int i;
438     int len;
439
440     for (i = 0; i<REC_NO_INFO; i++)
441     {
442         if (*out_offset + CAST_ZINT_TO_INT(rec->size[i]) + 20 > *out_size)
443         {
444             int new_size = *out_offset + rec->size[i] + 65536;
445             char *np = (char *) xmalloc(new_size);
446             if (*out_offset)
447                 memcpy(np, *out_buf, *out_offset);
448             xfree(*out_buf);
449             *out_size = new_size;
450             *out_buf = np;
451         }
452         if (i == 0)
453         {
454             rec_encode_zint(rec_sysno_to_int(rec->sysno), 
455                             (unsigned char *) *out_buf + *out_offset, &len);
456             (*out_offset) += len;
457         }
458         if (rec->size[i] == 0)
459         {
460             rec_encode_unsigned(1, (unsigned char *) *out_buf + *out_offset,
461                                 &len);
462             (*out_offset) += len;
463         }
464         else if (last_rec && rec->size[i] == last_rec->size[i] &&
465                  !memcmp(rec->info[i], last_rec->info[i], rec->size[i]))
466         {
467             rec_encode_unsigned(0, (unsigned char *) *out_buf + *out_offset,
468                                 &len);
469             (*out_offset) += len;
470         }
471         else
472         {
473             rec_encode_unsigned(rec->size[i]+1,
474                                 (unsigned char *) *out_buf + *out_offset,
475                                 &len);
476             (*out_offset) += len;
477             memcpy(*out_buf + *out_offset, rec->info[i], rec->size[i]);
478             (*out_offset) += rec->size[i];
479         }
480     }
481 }
482
483 static ZEBRA_RES rec_write_multiple(Records p, int saveCount)
484 {
485     int i;
486     short ref_count = 0;
487     char compression_method;
488     Record last_rec = 0;
489     int out_size = 1000;
490     int out_offset = 0;
491     char *out_buf = (char *) xmalloc(out_size);
492     zint *sysnos = (zint *) xmalloc(sizeof(*sysnos) * (p->cache_cur + 1));
493     zint *sysnop = sysnos;
494     ZEBRA_RES ret = ZEBRA_OK;
495
496     for (i = 0; i<p->cache_cur - saveCount; i++)
497     {
498         struct record_cache_entry *e = p->record_cache + i;
499         switch (e->flag)
500         {
501         case recordFlagNew:
502             rec_cache_flush_block1(p, e->rec, last_rec, &out_buf,
503                                     &out_size, &out_offset);
504             *sysnop++ = rec_sysno_to_int(e->rec->sysno);
505             ref_count++;
506             e->flag = recordFlagNop;
507             last_rec = e->rec;
508             break;
509         case recordFlagWrite:
510             if (rec_release_blocks(p, rec_sysno_to_int(e->rec->sysno))
511                 != ZEBRA_OK)
512                 ret = ZEBRA_FAIL;
513
514             rec_cache_flush_block1(p, e->rec, last_rec, &out_buf,
515                                     &out_size, &out_offset);
516             *sysnop++ = rec_sysno_to_int(e->rec->sysno);
517             ref_count++;
518             e->flag = recordFlagNop;
519             last_rec = e->rec;
520             break;
521         case recordFlagDelete:
522             if (rec_delete_single(p, e->rec) != ZEBRA_OK)
523                 ret = ZEBRA_FAIL;
524
525             e->flag = recordFlagNop;
526             break;
527         case recordFlagNop:
528             break;
529         default:
530             break;
531         }
532     }
533
534     *sysnop = -1;
535     if (ref_count)
536     {
537         unsigned int csize = 0;  /* indicate compression "not performed yet" */
538         compression_method = p->compression_method;
539         switch (compression_method)
540         {
541         case REC_COMPRESS_BZIP2:
542 #if HAVE_BZLIB_H        
543             csize = out_offset + (out_offset >> 6) + 620;
544             rec_tmp_expand(p, csize);
545 #ifdef BZ_CONFIG_ERROR
546             i = BZ2_bzBuffToBuffCompress 
547 #else
548             i = bzBuffToBuffCompress 
549 #endif
550                                     (p->tmp_buf+sizeof(zint)+sizeof(short)+
551                                       sizeof(char),
552                                       &csize, out_buf, out_offset, 1, 0, 30);
553             if (i != BZ_OK)
554             {
555                 yaz_log(YLOG_WARN, "bzBuffToBuffCompress error code=%d", i);
556                 csize = 0;
557             }
558             yaz_log(YLOG_LOG, "compress %4d %5d %5d", ref_count, out_offset,
559                   csize);
560 #endif
561             break;
562         case REC_COMPRESS_NONE:
563             break;
564         }
565         if (!csize)  
566         {
567             /* either no compression or compression not supported ... */
568             csize = out_offset;
569             rec_tmp_expand(p, csize);
570             memcpy(p->tmp_buf + sizeof(zint) + sizeof(short) + sizeof(char),
571                     out_buf, out_offset);
572             csize = out_offset;
573             compression_method = REC_COMPRESS_NONE;
574         }
575         memcpy(p->tmp_buf + sizeof(zint), &ref_count, sizeof(ref_count));
576         memcpy(p->tmp_buf + sizeof(zint)+sizeof(short),
577                 &compression_method, sizeof(compression_method));
578                 
579         /* -------- compression */
580         if (rec_write_tmp_buf(p, csize + sizeof(short) + sizeof(char), sysnos)
581             != ZEBRA_OK)
582             ret = ZEBRA_FAIL;
583     }
584     xfree(out_buf);
585     xfree(sysnos);
586     return ret;
587 }
588
589 static ZEBRA_RES rec_cache_flush(Records p, int saveCount)
590 {
591     int i, j;
592     ZEBRA_RES ret;
593
594     if (saveCount >= p->cache_cur)
595         saveCount = 0;
596
597     ret = rec_write_multiple(p, saveCount);
598
599     for (i = 0; i<p->cache_cur - saveCount; i++)
600     {
601         struct record_cache_entry *e = p->record_cache + i;
602         rec_free(&e->rec);
603     } 
604     /* i still being used ... */
605     for (j = 0; j<saveCount; j++, i++)
606         memcpy(p->record_cache+j, p->record_cache+i,
607                 sizeof(*p->record_cache));
608     p->cache_cur = saveCount;
609     return ret;
610 }
611
612 static Record *rec_cache_lookup(Records p, zint sysno,
613                                 enum recordCacheFlag flag)
614 {
615     int i;
616     for (i = 0; i<p->cache_cur; i++)
617     {
618         struct record_cache_entry *e = p->record_cache + i;
619         if (e->rec->sysno == sysno)
620         {
621             if (flag != recordFlagNop && e->flag == recordFlagNop)
622                 e->flag = flag;
623             return &e->rec;
624         }
625     }
626     return NULL;
627 }
628
629 static ZEBRA_RES rec_cache_insert(Records p, Record rec, enum recordCacheFlag flag)
630 {
631     struct record_cache_entry *e;
632     ZEBRA_RES ret = ZEBRA_OK;
633
634     if (p->cache_cur == p->cache_max)
635         ret = rec_cache_flush(p, 1);
636     else if (p->cache_cur > 0)
637     {
638         int i, j;
639         int used = 0;
640         for (i = 0; i<p->cache_cur; i++)
641         {
642             Record r = (p->record_cache + i)->rec;
643             for (j = 0; j<REC_NO_INFO; j++)
644                 used += r->size[j];
645         }
646         if (used > p->compression_chunk_size)
647             ret = rec_cache_flush(p, 1);
648     }
649     assert(p->cache_cur < p->cache_max);
650
651     e = p->record_cache + (p->cache_cur)++;
652     e->flag = flag;
653     e->rec = rec_cp(rec);
654     return ret;
655 }
656
657 ZEBRA_RES rec_close(Records *pp)
658 {
659     Records p = *pp;
660     int i;
661     ZEBRA_RES ret = ZEBRA_OK;
662
663     if (!p)
664         return ret;
665
666     zebra_mutex_destroy(&p->mutex);
667     if (rec_cache_flush(p, 0) != ZEBRA_OK)
668         ret = ZEBRA_FAIL;
669
670     xfree(p->record_cache);
671
672     if (p->rw)
673     {
674         if (recindex_write_head(p->recindex, &p->head, sizeof(p->head)) != ZEBRA_OK)
675             ret = ZEBRA_FAIL;
676     }
677
678     recindex_close(p->recindex);
679
680     for (i = 0; i<REC_BLOCK_TYPES; i++)
681     {
682         if (p->data_BFile[i])
683             bf_close(p->data_BFile[i]);
684         xfree(p->data_fname[i]);
685     }
686     xfree(p->tmp_buf);
687     xfree(p);
688     *pp = NULL;
689     return ret;
690 }
691
692 static Record rec_get_int(Records p, zint sysno)
693 {
694     int i, in_size, r;
695     Record rec, *recp;
696     struct record_index_entry entry;
697     zint freeblock;
698     int dst_type;
699     char *nptr, *cptr;
700     char *in_buf = 0;
701     char *bz_buf = 0;
702 #if HAVE_BZLIB_H
703     unsigned int bz_size;
704 #endif
705     char compression_method;
706
707     assert(sysno > 0);
708     assert(p);
709
710     if ((recp = rec_cache_lookup(p, sysno, recordFlagNop)))
711         return rec_cp(*recp);
712
713     if (recindex_read_indx(p->recindex, rec_sysno_to_int(sysno), &entry, sizeof(entry), 1) < 1)
714         return NULL;       /* record is not there! */
715
716     if (!entry.size)
717         return NULL;       /* record is deleted */
718
719     dst_type = (int) (entry.next & 7);
720     assert(dst_type < REC_BLOCK_TYPES);
721     freeblock = entry.next / 8;
722
723     assert(freeblock > 0);
724     
725     rec_tmp_expand(p, entry.size);
726
727     cptr = p->tmp_buf;
728     r = bf_read(p->data_BFile[dst_type], freeblock, 0, 0, cptr);
729     if (r < 0)
730         return 0;
731     memcpy(&freeblock, cptr, sizeof(freeblock));
732
733     while (freeblock)
734     {
735         zint tmp;
736
737         cptr += p->head.block_size[dst_type] - sizeof(freeblock);
738         
739         memcpy(&tmp, cptr, sizeof(tmp));
740         r = bf_read(p->data_BFile[dst_type], freeblock, 0, 0, cptr);
741         if (r < 0)
742             return 0;
743         memcpy(&freeblock, cptr, sizeof(freeblock));
744         memcpy(cptr, &tmp, sizeof(tmp));
745     }
746
747     rec = (Record) xmalloc(sizeof(*rec));
748     rec->sysno = sysno;
749     memcpy(&compression_method, p->tmp_buf + sizeof(zint) + sizeof(short),
750             sizeof(compression_method));
751     in_buf = p->tmp_buf + sizeof(zint) + sizeof(short) + sizeof(char);
752     in_size = entry.size - sizeof(short) - sizeof(char);
753     switch (compression_method)
754     {
755     case REC_COMPRESS_BZIP2:
756 #if HAVE_BZLIB_H
757         bz_size = entry.size * 20 + 100;
758         while (1)
759         {
760             bz_buf = (char *) xmalloc(bz_size);
761 #ifdef BZ_CONFIG_ERROR
762             i = BZ2_bzBuffToBuffDecompress
763 #else
764             i = bzBuffToBuffDecompress
765 #endif
766                 (bz_buf, &bz_size, in_buf, in_size, 0, 0);
767             yaz_log(YLOG_LOG, "decompress %5d %5d", in_size, bz_size);
768             if (i == BZ_OK)
769                 break;
770             yaz_log(YLOG_LOG, "failed");
771             xfree(bz_buf);
772             bz_size *= 2;
773         }
774         in_buf = bz_buf;
775         in_size = bz_size;
776 #else
777         yaz_log(YLOG_FATAL, "cannot decompress record(s) in BZIP2 format");
778         return 0;
779 #endif
780         break;
781     case REC_COMPRESS_NONE:
782         break;
783     }
784     for (i = 0; i<REC_NO_INFO; i++)
785         rec->info[i] = 0;
786
787     nptr = in_buf;                /* skip ref count */
788     while (nptr < in_buf + in_size)
789     {
790         zint this_sysno;
791         int len;
792         rec_decode_zint(&this_sysno, (unsigned char *) nptr, &len);
793         nptr += len;
794
795         for (i = 0; i < REC_NO_INFO; i++)
796         {
797             unsigned int this_size;
798             rec_decode_unsigned(&this_size, (unsigned char *) nptr, &len);
799             nptr += len;
800
801             if (this_size == 0)
802                 continue;
803             rec->size[i] = this_size-1;
804
805             if (rec->size[i])
806             {
807                 rec->info[i] = nptr;
808                 nptr += rec->size[i];
809             }
810             else
811                 rec->info[i] = NULL;
812         }
813         if (this_sysno == rec_sysno_to_int(sysno))
814             break;
815     }
816     for (i = 0; i<REC_NO_INFO; i++)
817     {
818         if (rec->info[i] && rec->size[i])
819         {
820             char *np = xmalloc(rec->size[i]+1);
821             memcpy(np, rec->info[i], rec->size[i]);
822             np[rec->size[i]] = '\0';
823             rec->info[i] = np;
824         }
825         else
826         {
827             assert(rec->info[i] == 0);
828             assert(rec->size[i] == 0);
829         }
830     }
831     xfree(bz_buf);
832     if (rec_cache_insert(p, rec, recordFlagNop) != ZEBRA_OK)
833         return 0;
834     return rec;
835 }
836
837 Record rec_get(Records p, zint sysno)
838 {
839     Record rec;
840     zebra_mutex_lock(&p->mutex);
841
842     rec = rec_get_int(p, sysno);
843     zebra_mutex_unlock(&p->mutex);
844     return rec;
845 }
846
847 Record rec_get_root(Records p)
848 {
849     return rec_get(p, rec_sysno_to_ext(1));
850 }
851
852 static Record rec_new_int(Records p)
853 {
854     int i;
855     zint sysno;
856     Record rec;
857
858     assert(p);
859     rec = (Record) xmalloc(sizeof(*rec));
860     if (1 || p->head.index_free == 0)
861         sysno = (p->head.index_last)++;
862     else
863     {
864         struct record_index_entry entry;
865
866         if (recindex_read_indx(p->recindex, p->head.index_free, &entry, sizeof(entry), 0) < 1)
867         {
868             xfree(rec);
869             return 0;
870         }
871         sysno = p->head.index_free;
872         p->head.index_free = entry.next;
873     }
874     (p->head.no_records)++;
875     rec->sysno = rec_sysno_to_ext(sysno);
876     for (i = 0; i < REC_NO_INFO; i++)
877     {
878         rec->info[i] = NULL;
879         rec->size[i] = 0;
880     }
881     rec_cache_insert(p, rec, recordFlagNew);
882     return rec;
883 }
884
885 Record rec_new(Records p)
886 {
887     Record rec;
888     zebra_mutex_lock(&p->mutex);
889
890     rec = rec_new_int(p);
891     zebra_mutex_unlock(&p->mutex);
892     return rec;
893 }
894
895 ZEBRA_RES rec_del(Records p, Record *recpp)
896 {
897     Record *recp;
898     ZEBRA_RES ret = ZEBRA_OK;
899
900     zebra_mutex_lock(&p->mutex);
901     (p->head.no_records)--;
902     if ((recp = rec_cache_lookup(p, (*recpp)->sysno, recordFlagDelete)))
903     {
904         rec_free(recp);
905         *recp = *recpp;
906     }
907     else
908     {
909         ret = rec_cache_insert(p, *recpp, recordFlagDelete);
910         rec_free(recpp);
911     }
912     zebra_mutex_unlock(&p->mutex);
913     *recpp = NULL;
914     return ret;
915 }
916
917 ZEBRA_RES rec_put(Records p, Record *recpp)
918 {
919     Record *recp;
920     ZEBRA_RES ret = ZEBRA_OK;
921
922     zebra_mutex_lock(&p->mutex);
923     if ((recp = rec_cache_lookup(p, (*recpp)->sysno, recordFlagWrite)))
924     {
925         rec_free(recp);
926         *recp = *recpp;
927     }
928     else
929     {
930         ret = rec_cache_insert(p, *recpp, recordFlagWrite);
931         rec_free(recpp);
932     }
933     zebra_mutex_unlock(&p->mutex);
934     *recpp = NULL;
935     return ret;
936 }
937
938 void rec_free(Record *recpp)
939 {
940     int i;
941
942     if (!*recpp)
943         return ;
944     for (i = 0; i < REC_NO_INFO; i++)
945         xfree((*recpp)->info[i]);
946     xfree(*recpp);
947     *recpp = NULL;
948 }
949
950 Record rec_cp(Record rec)
951 {
952     Record n;
953     int i;
954
955     n = (Record) xmalloc(sizeof(*n));
956     n->sysno = rec->sysno;
957     for (i = 0; i < REC_NO_INFO; i++)
958         if (!rec->info[i])
959         {
960             n->info[i] = NULL;
961             n->size[i] = 0;
962         }
963         else
964         {
965             n->size[i] = rec->size[i];
966             n->info[i] = (char *) xmalloc(rec->size[i]+1);
967             memcpy(n->info[i], rec->info[i], rec->size[i]);
968             n->info[i][rec->size[i]] = '\0';
969         }
970     return n;
971 }
972
973
974 char *rec_strdup(const char *s, size_t *len)
975 {
976     char *p;
977
978     if (!s)
979     {
980         *len = 0;
981         return NULL;
982     }
983     *len = strlen(s)+1;
984     p = (char *) xmalloc(*len);
985     strcpy(p, s);
986     return p;
987 }
988
989 void rec_prstat(Records records, int verbose)
990 {
991     int i;
992     zint total_bytes = 0;
993     
994     yaz_log (YLOG_LOG,
995           "Total records                        %8" ZINT_FORMAT0,
996           records->head.no_records);
997
998     for (i = 0; i< REC_BLOCK_TYPES; i++)
999     {
1000         yaz_log (YLOG_LOG, "Record blocks of size "ZINT_FORMAT,
1001               records->head.block_size[i]);
1002         yaz_log (YLOG_LOG,
1003           " Used/Total/Bytes used            "
1004               ZINT_FORMAT "/" ZINT_FORMAT "/" ZINT_FORMAT,
1005               records->head.block_used[i], records->head.block_last[i]-1,
1006               records->head.block_used[i] * records->head.block_size[i]);
1007         total_bytes +=
1008             records->head.block_used[i] * records->head.block_size[i];
1009
1010         yaz_log(YLOG_LOG, " Block Last " ZINT_FORMAT, records->head.block_last[i]);
1011         if (verbose)
1012         {   /* analyse free lists */
1013             zint no_free = 0;
1014             zint block_free = records->head.block_free[i];
1015             WRBUF w = wrbuf_alloc();
1016             while (block_free)
1017             {
1018                 zint nblock;
1019                 no_free++;
1020                 wrbuf_printf(w, " " ZINT_FORMAT, block_free);
1021                 if (bf_read(records->data_BFile[i],
1022                             block_free, 0, sizeof(nblock), &nblock) != 1)
1023                 {
1024                     yaz_log(YLOG_FATAL|YLOG_ERRNO, "read in %s at free block "
1025                             ZINT_FORMAT,
1026                             records->data_fname[i], block_free);
1027                     break;
1028                 }
1029                 block_free = nblock;
1030             }
1031             yaz_log (YLOG_LOG,
1032                      " Number in free list       %8" ZINT_FORMAT0, no_free);
1033             if (no_free)
1034                 yaz_log(YLOG_LOG, "%s", wrbuf_cstr(w));
1035             wrbuf_destroy(w);
1036         }
1037     }
1038     yaz_log (YLOG_LOG,
1039           "Total size of record index in bytes  %8" ZINT_FORMAT0,
1040           records->head.total_bytes);
1041     yaz_log (YLOG_LOG,
1042           "Total size with overhead             %8" ZINT_FORMAT0,
1043           total_bytes);
1044 }
1045
1046 /*
1047  * Local variables:
1048  * c-basic-offset: 4
1049  * c-file-style: "Stroustrup"
1050  * indent-tabs-mode: nil
1051  * End:
1052  * vim: shiftwidth=4 tabstop=8 expandtab
1053  */
1054