Break long lines in debian/control
[idzebra-moved-to-github.git] / index / records.c
1 /* This file is part of the Zebra server.
2    Copyright (C) 1994-2011 Index Data
3
4 Zebra is free software; you can redistribute it and/or modify it under
5 the terms of the GNU General Public License as published by the Free
6 Software Foundation; either version 2, or (at your option) any later
7 version.
8
9 Zebra is distributed in the hope that it will be useful, but WITHOUT ANY
10 WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12 for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
17
18 */
19
20 /*
21  *  Format of first block (assumes a 512 block size)
22  *      next       (8 bytes)
23  *      ref_count  (2 bytes)
24  *      block      (500 bytes)
25  *
26  *  Format of subsequent blocks 
27  *      next  (8 bytes)
28  *      block (502 bytes)
29  *
30  *  Format of each record
31  *      sysno
32  *      (length, data) - pairs
33  *      length = 0 if same as previous
34  */
35 #if HAVE_CONFIG_H
36 #include <config.h>
37 #endif
38 #include <stdio.h>
39 #include <stdlib.h>
40 #include <assert.h>
41 #include <string.h>
42
43 #include <yaz/yaz-util.h>
44 #include <idzebra/bfile.h>
45 #include "recindex.h"
46
47 #if HAVE_BZLIB_H
48 #include <bzlib.h>
49 #endif
50 #if HAVE_ZLIB_H
51 #include <zlib.h>
52 #endif
53
54 #define REC_BLOCK_TYPES 2
55 #define REC_HEAD_MAGIC "recindex"
56 #define REC_VERSION 5
57
58 struct records_info {
59     int rw;
60     int compression_method;
61
62     recindex_t recindex;
63
64     char *data_fname[REC_BLOCK_TYPES];
65     BFile data_BFile[REC_BLOCK_TYPES];
66
67     char *tmp_buf;
68     int tmp_size;
69
70     struct record_cache_entry *record_cache;
71     int cache_size;
72     int cache_cur;
73     int cache_max;
74
75     int compression_chunk_size;
76
77     Zebra_mutex mutex;
78
79     struct records_head {
80         char magic[8];
81         char version[4];
82         zint block_size[REC_BLOCK_TYPES];
83         zint block_free[REC_BLOCK_TYPES];
84         zint block_last[REC_BLOCK_TYPES];
85         zint block_used[REC_BLOCK_TYPES];
86         zint block_move[REC_BLOCK_TYPES];
87
88         zint total_bytes;
89         zint index_last;
90         zint index_free;
91         zint no_records;
92
93     } head;
94 };
95
96 enum recordCacheFlag { recordFlagNop, recordFlagWrite, recordFlagNew,
97                        recordFlagDelete };
98
99 struct record_cache_entry {
100     Record rec;
101     enum recordCacheFlag flag;
102 };
103
104 struct record_index_entry {
105     zint next;         /* first block of record info / next free entry */
106     int size;          /* size of record or 0 if free entry */
107 };
108
109 Record rec_cp(Record rec);
110
111 /* Modify argument to if below: 1=normal, 0=sysno testing */
112 #if 1
113 /* If this is used sysno are not converted (no testing) */
114 #define FAKE_OFFSET 0
115 #define USUAL_RANGE 6000000000LL
116
117 #else
118 /* Use a fake > 2^32 offset so we can test for proper 64-bit handling */
119 #define FAKE_OFFSET 6000000000LL
120 #define USUAL_RANGE 2000000000LL
121 #endif
122
123 static zint rec_sysno_to_ext(zint sysno)
124 {
125     assert(sysno >= 0 && sysno <= USUAL_RANGE);
126     return sysno + FAKE_OFFSET;
127 }
128
129 zint rec_sysno_to_int(zint sysno)
130 {
131     assert(sysno >= FAKE_OFFSET && sysno <= FAKE_OFFSET + USUAL_RANGE);
132     return sysno - FAKE_OFFSET;
133 }
134
135 static void rec_tmp_expand(Records p, int size)
136 {
137     if (p->tmp_size < size + 2048 ||
138         p->tmp_size < p->head.block_size[REC_BLOCK_TYPES-1]*2)
139     {
140         xfree(p->tmp_buf);
141         p->tmp_size = size + (int)
142                         (p->head.block_size[REC_BLOCK_TYPES-1])*2 + 2048;
143         p->tmp_buf = (char *) xmalloc(p->tmp_size);
144     }
145 }
146
147 static ZEBRA_RES rec_release_blocks(Records p, zint sysno)
148 {
149     struct record_index_entry entry;
150     zint freeblock;
151     char block_and_ref[sizeof(zint) + sizeof(short)];
152     int dst_type;
153     int first = 1;
154
155     if (recindex_read_indx(p->recindex, sysno, &entry, sizeof(entry), 1) != 1)
156         return ZEBRA_FAIL;
157
158     freeblock = entry.next;
159     assert(freeblock > 0);
160     dst_type = CAST_ZINT_TO_INT(freeblock & 7);
161     assert(dst_type < REC_BLOCK_TYPES);
162     freeblock = freeblock / 8;
163     while (freeblock)
164     {
165         if (bf_read(p->data_BFile[dst_type], freeblock, 0,
166                      first ? sizeof(block_and_ref) : sizeof(zint),
167                      block_and_ref) != 1)
168         {
169             yaz_log(YLOG_FATAL|YLOG_ERRNO, "read in rec_del_single");
170             return ZEBRA_FAIL;
171         }
172         if (first)
173         {
174             short ref;
175             memcpy(&ref, block_and_ref + sizeof(freeblock), sizeof(ref));
176             --ref;
177             memcpy(block_and_ref + sizeof(freeblock), &ref, sizeof(ref));
178             if (ref)
179             {
180                 /* there is still a reference to this block.. */
181                 if (bf_write(p->data_BFile[dst_type], freeblock, 0,
182                               sizeof(block_and_ref), block_and_ref))
183                 {
184                     yaz_log(YLOG_FATAL|YLOG_ERRNO, "write in rec_del_single");
185                     return ZEBRA_FAIL;
186                 }
187                 return ZEBRA_OK;
188             }
189             /* the list of blocks can all be removed (ref == 0) */
190             first = 0;
191         }
192         
193         if (bf_write(p->data_BFile[dst_type], freeblock, 0, sizeof(freeblock),
194                       &p->head.block_free[dst_type]))
195         {
196             yaz_log(YLOG_FATAL|YLOG_ERRNO, "write in rec_del_single");
197             return ZEBRA_FAIL;
198         }
199         p->head.block_free[dst_type] = freeblock;
200         memcpy(&freeblock, block_and_ref, sizeof(freeblock));
201
202         p->head.block_used[dst_type]--;
203     }
204     p->head.total_bytes -= entry.size;
205     return ZEBRA_OK;
206 }
207
208 static ZEBRA_RES rec_delete_single(Records p, Record rec)
209 {
210     struct record_index_entry entry;
211
212     /* all data in entry must be reset, since it's written verbatim */
213     memset(&entry, '\0', sizeof(entry));
214     if (rec_release_blocks(p, rec_sysno_to_int(rec->sysno)) != ZEBRA_OK)
215         return ZEBRA_FAIL;
216
217     entry.next = p->head.index_free;
218     entry.size = 0;
219     p->head.index_free = rec_sysno_to_int(rec->sysno);
220     recindex_write_indx(p->recindex, rec_sysno_to_int(rec->sysno), &entry, sizeof(entry));
221     return ZEBRA_OK;
222 }
223
224 static ZEBRA_RES rec_write_tmp_buf(Records p, int size, zint *sysnos)
225 {
226     struct record_index_entry entry;
227     int no_written = 0;
228     char *cptr = p->tmp_buf;
229     zint block_prev = -1, block_free;
230     int dst_type = 0;
231     int i;
232
233     /* all data in entry must be reset, since it's written verbatim */
234     memset(&entry, '\0', sizeof(entry));
235
236     for (i = 1; i<REC_BLOCK_TYPES; i++)
237         if (size >= p->head.block_move[i])
238             dst_type = i;
239     while (no_written < size)
240     {
241         block_free = p->head.block_free[dst_type];
242         if (block_free)
243         {
244             if (bf_read(p->data_BFile[dst_type],
245                          block_free, 0, sizeof(*p->head.block_free),
246                          &p->head.block_free[dst_type]) != 1)
247             {
248                 yaz_log(YLOG_FATAL|YLOG_ERRNO, "read in %s at free block "
249                          ZINT_FORMAT,
250                          p->data_fname[dst_type], block_free);
251                 return ZEBRA_FAIL;
252             }
253         }
254         else
255             block_free = p->head.block_last[dst_type]++;
256         if (block_prev == -1)
257         {
258             entry.next = block_free*8 + dst_type;
259             entry.size = size;
260             p->head.total_bytes += size;
261             while (*sysnos > 0)
262             {
263                 recindex_write_indx(p->recindex, *sysnos, &entry, sizeof(entry));
264                 sysnos++;
265             }
266         }
267         else
268         {
269             memcpy(cptr, &block_free, sizeof(block_free));
270             bf_write(p->data_BFile[dst_type], block_prev, 0, 0, cptr);
271             cptr = p->tmp_buf + no_written;
272         }
273         block_prev = block_free;
274         no_written += CAST_ZINT_TO_INT(p->head.block_size[dst_type]) 
275             - sizeof(zint);
276         p->head.block_used[dst_type]++;
277     }
278     assert(block_prev != -1);
279     block_free = 0;
280     memcpy(cptr, &block_free, sizeof(block_free));
281     bf_write(p->data_BFile[dst_type], block_prev, 0,
282               sizeof(block_free) + (p->tmp_buf+size) - cptr, cptr);
283     return ZEBRA_OK;
284 }
285
286 int rec_check_compression_method(int compression_method)
287 {
288     switch(compression_method)
289     {
290     case REC_COMPRESS_ZLIB:
291 #if HAVE_ZLIB_H
292         return 1;
293 #else
294         return 0;
295 #endif
296     case REC_COMPRESS_BZIP2:
297 #if HAVE_BZLIB_H
298         return 1;
299 #else
300         return 0;
301 #endif
302     case REC_COMPRESS_NONE:
303         return 1;
304     }
305     return 0;
306 }
307
308 Records rec_open(BFiles bfs, int rw, int compression_method)
309 {
310     Records p;
311     int i, r;
312     int version;
313     ZEBRA_RES ret = ZEBRA_OK;
314
315     p = (Records) xmalloc(sizeof(*p));
316     memset(&p->head, '\0', sizeof(p->head));
317     p->compression_method = compression_method;
318     p->rw = rw;
319     p->tmp_size = 4096;
320     p->tmp_buf = (char *) xmalloc(p->tmp_size);
321     p->compression_chunk_size = 0;
322     if (compression_method == REC_COMPRESS_BZIP2)
323         p->compression_chunk_size = 90000;
324     p->recindex = recindex_open(bfs, rw, 0 /* 1=isamb for recindex */);
325     r = recindex_read_head(p->recindex, p->tmp_buf);
326     switch (r)
327     {
328     case 0:
329         memcpy(p->head.magic, REC_HEAD_MAGIC, sizeof(p->head.magic));
330         sprintf(p->head.version, "%3d", REC_VERSION);
331         p->head.index_free = 0;
332         p->head.index_last = 1;
333         p->head.no_records = 0;
334         p->head.total_bytes = 0;
335         for (i = 0; i<REC_BLOCK_TYPES; i++)
336         {
337             p->head.block_free[i] = 0;
338             p->head.block_last[i] = 1;
339             p->head.block_used[i] = 0;
340         }
341         p->head.block_size[0] = 256;
342         p->head.block_move[0] = 0;
343         for (i = 1; i<REC_BLOCK_TYPES; i++)
344         {
345             p->head.block_size[i] = p->head.block_size[i-1] * 8;
346             p->head.block_move[i] = p->head.block_size[i] * 2;
347         }
348         if (rw)
349         {
350             if (recindex_write_head(p->recindex, 
351                                     &p->head, sizeof(p->head)) != ZEBRA_OK)
352                 ret = ZEBRA_FAIL;
353         }
354         break;
355     case 1:
356         memcpy(&p->head, p->tmp_buf, sizeof(p->head));
357         if (memcmp(p->head.magic, REC_HEAD_MAGIC, sizeof(p->head.magic)))
358         {
359             yaz_log(YLOG_FATAL, "file %s has bad format",
360                     recindex_get_fname(p->recindex));
361             ret = ZEBRA_FAIL;
362         }
363         version = atoi(p->head.version);
364         if (version != REC_VERSION)
365         {
366             yaz_log(YLOG_FATAL, "file %s is version %d, but version"
367                   " %d is required",
368                     recindex_get_fname(p->recindex), version, REC_VERSION);
369             ret = ZEBRA_FAIL;
370         }
371         break;
372     }
373     for (i = 0; i<REC_BLOCK_TYPES; i++)
374     {
375         char str[80];
376         sprintf(str, "recd%c", i + 'A');
377         p->data_fname[i] = (char *) xmalloc(strlen(str)+1);
378         strcpy(p->data_fname[i], str);
379         p->data_BFile[i] = NULL;
380     }
381     for (i = 0; i<REC_BLOCK_TYPES; i++)
382     {
383         if (!(p->data_BFile[i] =
384               bf_open(bfs, p->data_fname[i],
385                       CAST_ZINT_TO_INT(p->head.block_size[i]), rw)))
386         {
387             yaz_log(YLOG_FATAL|YLOG_ERRNO, "bf_open %s", p->data_fname[i]);
388             ret = ZEBRA_FAIL;
389             break;
390         }
391     }
392     p->cache_max = 400;
393     p->cache_cur = 0;
394     p->record_cache = (struct record_cache_entry *)
395         xmalloc(sizeof(*p->record_cache)*p->cache_max);
396     zebra_mutex_init(&p->mutex);
397     if (ret == ZEBRA_FAIL)
398         rec_close(&p);
399     return p;
400 }
401
402 static void rec_encode_unsigned(unsigned n, unsigned char *buf, int *len)
403 {
404     (*len) = 0;
405     while (n > 127)
406     {
407         buf[*len] = 128 + (n & 127);
408         n = n >> 7;
409         (*len)++;
410     }
411     buf[*len] = n;
412     (*len)++;
413 }
414
415 static void rec_decode_unsigned(unsigned *np, unsigned char *buf, int *len)
416 {
417     unsigned n = 0;
418     unsigned w = 1;
419     (*len) = 0;
420
421     while (buf[*len] > 127)
422     {
423         n += w*(buf[*len] & 127);
424         w = w << 7;
425         (*len)++;
426     }
427     n += w * buf[*len];
428     (*len)++;
429     *np = n;
430 }
431
432 static void rec_encode_zint(zint n, unsigned char *buf, int *len)
433 {
434     (*len) = 0;
435     while (n > 127)
436     {
437         buf[*len] = (unsigned) (128 + (n & 127));
438         n = n >> 7;
439         (*len)++;
440     }
441     buf[*len] = (unsigned) n;
442     (*len)++;
443 }
444
445 static void rec_decode_zint(zint *np, unsigned char *buf, int *len)
446 {
447     zint  n = 0;
448     zint w = 1;
449     (*len) = 0;
450
451     while (buf[*len] > 127)
452     {
453         n += w*(buf[*len] & 127);
454         w = w << 7;
455         (*len)++;
456     }
457     n += w * buf[*len];
458     (*len)++;
459     *np = n;
460 }
461
462 static void rec_cache_flush_block1(Records p, Record rec, Record last_rec,
463                                    char **out_buf, int *out_size,
464                                    int *out_offset)
465 {
466     int i;
467     int len;
468
469     for (i = 0; i<REC_NO_INFO; i++)
470     {
471         if (*out_offset + CAST_ZINT_TO_INT(rec->size[i]) + 20 > *out_size)
472         {
473             int new_size = *out_offset + rec->size[i] + 65536;
474             char *np = (char *) xmalloc(new_size);
475             if (*out_offset)
476                 memcpy(np, *out_buf, *out_offset);
477             xfree(*out_buf);
478             *out_size = new_size;
479             *out_buf = np;
480         }
481         if (i == 0)
482         {
483             rec_encode_zint(rec_sysno_to_int(rec->sysno), 
484                             (unsigned char *) *out_buf + *out_offset, &len);
485             (*out_offset) += len;
486         }
487         if (rec->size[i] == 0)
488         {
489             rec_encode_unsigned(1, (unsigned char *) *out_buf + *out_offset,
490                                 &len);
491             (*out_offset) += len;
492         }
493         else if (last_rec && rec->size[i] == last_rec->size[i] &&
494                  !memcmp(rec->info[i], last_rec->info[i], rec->size[i]))
495         {
496             rec_encode_unsigned(0, (unsigned char *) *out_buf + *out_offset,
497                                 &len);
498             (*out_offset) += len;
499         }
500         else
501         {
502             rec_encode_unsigned(rec->size[i]+1,
503                                 (unsigned char *) *out_buf + *out_offset,
504                                 &len);
505             (*out_offset) += len;
506             memcpy(*out_buf + *out_offset, rec->info[i], rec->size[i]);
507             (*out_offset) += rec->size[i];
508         }
509     }
510 }
511
512 static ZEBRA_RES rec_flush_shared(Records p, short ref_count, zint *sysnos,
513                                   char *out_buf, int out_offset)
514 {
515     ZEBRA_RES ret = ZEBRA_OK;
516     if (ref_count)
517     {
518         int i;
519         unsigned int csize = 0;  /* indicate compression "not performed yet" */
520         char compression_method = p->compression_method;
521         switch (compression_method)
522         {
523         case REC_COMPRESS_ZLIB:
524 #if HAVE_ZLIB_H
525             csize = out_offset + (out_offset >> 6) + 620;
526             while (1)
527             {
528                 int r;
529                 uLongf destLen = csize;
530                 rec_tmp_expand(p, csize);
531                 r = compress((Bytef *) p->tmp_buf+sizeof(zint)+sizeof(short)+
532                              sizeof(char),
533                              &destLen, (const Bytef *) out_buf, out_offset);
534                 csize = destLen;
535                 if (r == Z_OK)
536                 {
537                     break;
538                 }
539                 if (r != Z_MEM_ERROR)
540                 {
541                     yaz_log(YLOG_WARN, "compress error: %d", r);
542                     csize = 0;
543                     break;
544                 }
545                 csize = csize * 2;
546             }
547 #endif
548             break;
549         case REC_COMPRESS_BZIP2:
550 #if HAVE_BZLIB_H        
551             csize = out_offset + (out_offset >> 6) + 620;
552             rec_tmp_expand(p, csize);
553 #ifdef BZ_CONFIG_ERROR
554             i = BZ2_bzBuffToBuffCompress 
555 #else
556             i = bzBuffToBuffCompress 
557 #endif
558                                     (p->tmp_buf+sizeof(zint)+sizeof(short)+
559                                       sizeof(char),
560                                       &csize, out_buf, out_offset, 1, 0, 30);
561             if (i != BZ_OK)
562             {
563                 yaz_log(YLOG_WARN, "bzBuffToBuffCompress error code=%d", i);
564                 csize = 0;
565             }
566 #endif
567             break;
568         case REC_COMPRESS_NONE:
569             break;
570         }
571         if (!csize)  
572         {
573             /* either no compression or compression not supported ... */
574             csize = out_offset;
575             rec_tmp_expand(p, csize);
576             memcpy(p->tmp_buf + sizeof(zint) + sizeof(short) + sizeof(char),
577                     out_buf, out_offset);
578             csize = out_offset;
579             compression_method = REC_COMPRESS_NONE;
580         }
581         memcpy(p->tmp_buf + sizeof(zint), &ref_count, sizeof(ref_count));
582         memcpy(p->tmp_buf + sizeof(zint)+sizeof(short),
583                 &compression_method, sizeof(compression_method));
584                 
585         /* -------- compression */
586         if (rec_write_tmp_buf(p, csize + sizeof(short) + sizeof(char), sysnos)
587             != ZEBRA_OK)
588             ret = ZEBRA_FAIL;
589     }
590     return ret;
591 }
592
593 static ZEBRA_RES rec_write_multiple(Records p, int saveCount)
594 {
595     int i;
596     short ref_count = 0;
597     Record last_rec = 0;
598     int out_size = 1000;
599     int out_offset = 0;
600     char *out_buf = (char *) xmalloc(out_size);
601     zint *sysnos = (zint *) xmalloc(sizeof(*sysnos) * (p->cache_cur + 1));
602     zint *sysnop = sysnos;
603     ZEBRA_RES ret = ZEBRA_OK;
604
605     for (i = 0; i<p->cache_cur - saveCount; i++)
606     {
607         struct record_cache_entry *e = p->record_cache + i;
608         switch (e->flag)
609         {
610         case recordFlagNew:
611             rec_cache_flush_block1(p, e->rec, last_rec, &out_buf,
612                                     &out_size, &out_offset);
613             *sysnop++ = rec_sysno_to_int(e->rec->sysno);
614             ref_count++;
615             e->flag = recordFlagNop;
616             last_rec = e->rec;
617             break;
618         case recordFlagWrite:
619             if (rec_release_blocks(p, rec_sysno_to_int(e->rec->sysno))
620                 != ZEBRA_OK)
621                 ret = ZEBRA_FAIL;
622
623             rec_cache_flush_block1(p, e->rec, last_rec, &out_buf,
624                                     &out_size, &out_offset);
625             *sysnop++ = rec_sysno_to_int(e->rec->sysno);
626             ref_count++;
627             e->flag = recordFlagNop;
628             last_rec = e->rec;
629             break;
630         case recordFlagDelete:
631             if (rec_delete_single(p, e->rec) != ZEBRA_OK)
632                 ret = ZEBRA_FAIL;
633
634             e->flag = recordFlagNop;
635             break;
636         case recordFlagNop:
637             break;
638         default:
639             break;
640         }
641     }
642
643     *sysnop = -1;
644     rec_flush_shared(p, ref_count, sysnos, out_buf, out_offset);
645     xfree(out_buf);
646     xfree(sysnos);
647     return ret;
648 }
649
650 static ZEBRA_RES rec_cache_flush(Records p, int saveCount)
651 {
652     int i, j;
653     ZEBRA_RES ret;
654
655     if (saveCount >= p->cache_cur)
656         saveCount = 0;
657
658     ret = rec_write_multiple(p, saveCount);
659
660     for (i = 0; i<p->cache_cur - saveCount; i++)
661     {
662         struct record_cache_entry *e = p->record_cache + i;
663         rec_free(&e->rec);
664     } 
665     /* i still being used ... */
666     for (j = 0; j<saveCount; j++, i++)
667         memcpy(p->record_cache+j, p->record_cache+i,
668                 sizeof(*p->record_cache));
669     p->cache_cur = saveCount;
670     return ret;
671 }
672
673 static Record *rec_cache_lookup(Records p, zint sysno,
674                                 enum recordCacheFlag flag)
675 {
676     int i;
677     for (i = 0; i<p->cache_cur; i++)
678     {
679         struct record_cache_entry *e = p->record_cache + i;
680         if (e->rec->sysno == sysno)
681         {
682             if (flag != recordFlagNop && e->flag == recordFlagNop)
683                 e->flag = flag;
684             return &e->rec;
685         }
686     }
687     return NULL;
688 }
689
690 static ZEBRA_RES rec_cache_insert(Records p, Record rec, enum recordCacheFlag flag)
691 {
692     struct record_cache_entry *e;
693     ZEBRA_RES ret = ZEBRA_OK;
694
695     if (p->cache_cur == p->cache_max)
696         ret = rec_cache_flush(p, 1);
697     else if (p->cache_cur > 0)
698     {
699         int i, j;
700         int used = 0;
701         for (i = 0; i<p->cache_cur; i++)
702         {
703             Record r = (p->record_cache + i)->rec;
704             for (j = 0; j<REC_NO_INFO; j++)
705                 used += r->size[j];
706         }
707         if (used > p->compression_chunk_size)
708             ret = rec_cache_flush(p, 1);
709     }
710     assert(p->cache_cur < p->cache_max);
711
712     e = p->record_cache + (p->cache_cur)++;
713     e->flag = flag;
714     e->rec = rec_cp(rec);
715     return ret;
716 }
717
718 ZEBRA_RES rec_close(Records *pp)
719 {
720     Records p = *pp;
721     int i;
722     ZEBRA_RES ret = ZEBRA_OK;
723
724     if (!p)
725         return ret;
726
727     zebra_mutex_destroy(&p->mutex);
728     if (rec_cache_flush(p, 0) != ZEBRA_OK)
729         ret = ZEBRA_FAIL;
730
731     xfree(p->record_cache);
732
733     if (p->rw)
734     {
735         if (recindex_write_head(p->recindex, &p->head, sizeof(p->head)) != ZEBRA_OK)
736             ret = ZEBRA_FAIL;
737     }
738
739     recindex_close(p->recindex);
740
741     for (i = 0; i<REC_BLOCK_TYPES; i++)
742     {
743         if (p->data_BFile[i])
744             bf_close(p->data_BFile[i]);
745         xfree(p->data_fname[i]);
746     }
747     xfree(p->tmp_buf);
748     xfree(p);
749     *pp = NULL;
750     return ret;
751 }
752
753 static Record rec_get_int(Records p, zint sysno)
754 {
755     int i, in_size, r;
756     Record rec, *recp;
757     struct record_index_entry entry;
758     zint freeblock;
759     int dst_type;
760     char *nptr, *cptr;
761     char *in_buf = 0;
762     char *bz_buf = 0;
763     char compression_method;
764
765     assert(sysno > 0);
766     assert(p);
767
768     if ((recp = rec_cache_lookup(p, sysno, recordFlagNop)))
769         return rec_cp(*recp);
770
771     if (recindex_read_indx(p->recindex, rec_sysno_to_int(sysno), &entry, sizeof(entry), 1) < 1)
772         return NULL;       /* record is not there! */
773
774     if (!entry.size)
775         return NULL;       /* record is deleted */
776
777     dst_type = (int) (entry.next & 7);
778     assert(dst_type < REC_BLOCK_TYPES);
779     freeblock = entry.next / 8;
780
781     assert(freeblock > 0);
782     
783     rec_tmp_expand(p, entry.size);
784
785     cptr = p->tmp_buf;
786     r = bf_read(p->data_BFile[dst_type], freeblock, 0, 0, cptr);
787     if (r < 0)
788         return 0;
789     memcpy(&freeblock, cptr, sizeof(freeblock));
790
791     while (freeblock)
792     {
793         zint tmp;
794
795         cptr += p->head.block_size[dst_type] - sizeof(freeblock);
796         
797         memcpy(&tmp, cptr, sizeof(tmp));
798         r = bf_read(p->data_BFile[dst_type], freeblock, 0, 0, cptr);
799         if (r < 0)
800             return 0;
801         memcpy(&freeblock, cptr, sizeof(freeblock));
802         memcpy(cptr, &tmp, sizeof(tmp));
803     }
804
805     rec = (Record) xmalloc(sizeof(*rec));
806     rec->sysno = sysno;
807     memcpy(&compression_method, p->tmp_buf + sizeof(zint) + sizeof(short),
808             sizeof(compression_method));
809     in_buf = p->tmp_buf + sizeof(zint) + sizeof(short) + sizeof(char);
810     in_size = entry.size - sizeof(short) - sizeof(char);
811     switch (compression_method)
812     {
813     case REC_COMPRESS_ZLIB:
814 #if HAVE_ZLIB_H
815         if (1)
816         {
817             unsigned int bz_size = entry.size * 20 + 100;
818             while (1)
819             {
820                 uLongf destLen = bz_size;
821                 bz_buf = (char *) xmalloc(bz_size);
822                 i = uncompress((Bytef *) bz_buf, &destLen,
823                                (const Bytef *) in_buf, in_size);
824                 if (i == Z_OK)
825                 {
826                     bz_size = destLen; 
827                     break;
828                 }
829                 yaz_log(YLOG_LOG, "failed");
830                 xfree(bz_buf);
831                 bz_size *= 2;
832             }
833             in_buf = bz_buf;
834             in_size = bz_size;
835         }
836 #else
837         yaz_log(YLOG_FATAL, "cannot decompress record(s) in ZLIB format");
838         return 0;
839 #endif
840         break;
841     case REC_COMPRESS_BZIP2:
842 #if HAVE_BZLIB_H
843         if (1)
844         {
845             unsigned int bz_size = entry.size * 20 + 100;
846             while (1)
847             {
848                 bz_buf = (char *) xmalloc(bz_size);
849 #ifdef BZ_CONFIG_ERROR
850                 i = BZ2_bzBuffToBuffDecompress
851 #else
852                     i = bzBuffToBuffDecompress
853 #endif
854                     (bz_buf, &bz_size, in_buf, in_size, 0, 0);
855                 if (i == BZ_OK)
856                     break;
857                 yaz_log(YLOG_LOG, "failed");
858                 xfree(bz_buf);
859                 bz_size *= 2;
860             }
861             in_buf = bz_buf;
862             in_size = bz_size;
863         }
864 #else
865         yaz_log(YLOG_FATAL, "cannot decompress record(s) in BZIP2 format");
866         return 0;
867 #endif
868         break;
869     case REC_COMPRESS_NONE:
870         break;
871     }
872     for (i = 0; i<REC_NO_INFO; i++)
873         rec->info[i] = 0;
874
875     nptr = in_buf;                /* skip ref count */
876     while (nptr < in_buf + in_size)
877     {
878         zint this_sysno;
879         int len;
880         rec_decode_zint(&this_sysno, (unsigned char *) nptr, &len);
881         nptr += len;
882
883         for (i = 0; i < REC_NO_INFO; i++)
884         {
885             unsigned int this_size;
886             rec_decode_unsigned(&this_size, (unsigned char *) nptr, &len);
887             nptr += len;
888
889             if (this_size == 0)
890                 continue;
891             rec->size[i] = this_size-1;
892
893             if (rec->size[i])
894             {
895                 rec->info[i] = nptr;
896                 nptr += rec->size[i];
897             }
898             else
899                 rec->info[i] = NULL;
900         }
901         if (this_sysno == rec_sysno_to_int(sysno))
902             break;
903     }
904     for (i = 0; i<REC_NO_INFO; i++)
905     {
906         if (rec->info[i] && rec->size[i])
907         {
908             char *np = xmalloc(rec->size[i]+1);
909             memcpy(np, rec->info[i], rec->size[i]);
910             np[rec->size[i]] = '\0';
911             rec->info[i] = np;
912         }
913         else
914         {
915             assert(rec->info[i] == 0);
916             assert(rec->size[i] == 0);
917         }
918     }
919     xfree(bz_buf);
920     if (rec_cache_insert(p, rec, recordFlagNop) != ZEBRA_OK)
921         return 0;
922     return rec;
923 }
924
925 Record rec_get(Records p, zint sysno)
926 {
927     Record rec;
928     zebra_mutex_lock(&p->mutex);
929
930     rec = rec_get_int(p, sysno);
931     zebra_mutex_unlock(&p->mutex);
932     return rec;
933 }
934
935 Record rec_get_root(Records p)
936 {
937     return rec_get(p, rec_sysno_to_ext(1));
938 }
939
940 Record rec_get_next(Records p, Record rec)
941 {
942     Record next = 0;
943     zint next_sysno_int = rec_sysno_to_int(rec->sysno);
944
945     while (!next)
946     {
947          ++next_sysno_int;
948         if (next_sysno_int == p->head.index_last)
949             break;
950         next = rec_get(p, rec_sysno_to_ext(next_sysno_int));
951     }
952     return next;
953 }
954
955 static Record rec_new_int(Records p)
956 {
957     int i;
958     zint sysno;
959     Record rec;
960
961     assert(p);
962     rec = (Record) xmalloc(sizeof(*rec));
963     if (1 || p->head.index_free == 0)
964         sysno = (p->head.index_last)++;
965     else
966     {
967         struct record_index_entry entry;
968
969         if (recindex_read_indx(p->recindex, p->head.index_free, &entry, sizeof(entry), 0) < 1)
970         {
971             xfree(rec);
972             return 0;
973         }
974         sysno = p->head.index_free;
975         p->head.index_free = entry.next;
976     }
977     (p->head.no_records)++;
978     rec->sysno = rec_sysno_to_ext(sysno);
979     for (i = 0; i < REC_NO_INFO; i++)
980     {
981         rec->info[i] = NULL;
982         rec->size[i] = 0;
983     }
984     rec_cache_insert(p, rec, recordFlagNew);
985     return rec;
986 }
987
988 Record rec_new(Records p)
989 {
990     Record rec;
991     zebra_mutex_lock(&p->mutex);
992
993     rec = rec_new_int(p);
994     zebra_mutex_unlock(&p->mutex);
995     return rec;
996 }
997
998 ZEBRA_RES rec_del(Records p, Record *recpp)
999 {
1000     Record *recp;
1001     ZEBRA_RES ret = ZEBRA_OK;
1002
1003     zebra_mutex_lock(&p->mutex);
1004     (p->head.no_records)--;
1005     if ((recp = rec_cache_lookup(p, (*recpp)->sysno, recordFlagDelete)))
1006     {
1007         rec_free(recp);
1008         *recp = *recpp;
1009     }
1010     else
1011     {
1012         ret = rec_cache_insert(p, *recpp, recordFlagDelete);
1013         rec_free(recpp);
1014     }
1015     zebra_mutex_unlock(&p->mutex);
1016     *recpp = NULL;
1017     return ret;
1018 }
1019
1020 ZEBRA_RES rec_put(Records p, Record *recpp)
1021 {
1022     Record *recp;
1023     ZEBRA_RES ret = ZEBRA_OK;
1024
1025     zebra_mutex_lock(&p->mutex);
1026     if ((recp = rec_cache_lookup(p, (*recpp)->sysno, recordFlagWrite)))
1027     {
1028         rec_free(recp);
1029         *recp = *recpp;
1030     }
1031     else
1032     {
1033         ret = rec_cache_insert(p, *recpp, recordFlagWrite);
1034         rec_free(recpp);
1035     }
1036     zebra_mutex_unlock(&p->mutex);
1037     *recpp = NULL;
1038     return ret;
1039 }
1040
1041 void rec_free(Record *recpp)
1042 {
1043     int i;
1044
1045     if (!*recpp)
1046         return ;
1047     for (i = 0; i < REC_NO_INFO; i++)
1048         xfree((*recpp)->info[i]);
1049     xfree(*recpp);
1050     *recpp = NULL;
1051 }
1052
1053 Record rec_cp(Record rec)
1054 {
1055     Record n;
1056     int i;
1057
1058     n = (Record) xmalloc(sizeof(*n));
1059     n->sysno = rec->sysno;
1060     for (i = 0; i < REC_NO_INFO; i++)
1061         if (!rec->info[i])
1062         {
1063             n->info[i] = NULL;
1064             n->size[i] = 0;
1065         }
1066         else
1067         {
1068             n->size[i] = rec->size[i];
1069             n->info[i] = (char *) xmalloc(rec->size[i]+1);
1070             memcpy(n->info[i], rec->info[i], rec->size[i]);
1071             n->info[i][rec->size[i]] = '\0';
1072         }
1073     return n;
1074 }
1075
1076
1077 char *rec_strdup(const char *s, size_t *len)
1078 {
1079     char *p;
1080
1081     if (!s)
1082     {
1083         *len = 0;
1084         return NULL;
1085     }
1086     *len = strlen(s)+1;
1087     p = (char *) xmalloc(*len);
1088     strcpy(p, s);
1089     return p;
1090 }
1091
1092 void rec_prstat(Records records, int verbose)
1093 {
1094     int i;
1095     zint total_bytes = 0;
1096     
1097     yaz_log (YLOG_LOG,
1098           "Total records                        %8" ZINT_FORMAT0,
1099           records->head.no_records);
1100
1101     for (i = 0; i< REC_BLOCK_TYPES; i++)
1102     {
1103         yaz_log (YLOG_LOG, "Record blocks of size "ZINT_FORMAT,
1104               records->head.block_size[i]);
1105         yaz_log (YLOG_LOG,
1106           " Used/Total/Bytes used            "
1107               ZINT_FORMAT "/" ZINT_FORMAT "/" ZINT_FORMAT,
1108               records->head.block_used[i], records->head.block_last[i]-1,
1109               records->head.block_used[i] * records->head.block_size[i]);
1110         total_bytes +=
1111             records->head.block_used[i] * records->head.block_size[i];
1112
1113         yaz_log(YLOG_LOG, " Block Last " ZINT_FORMAT, records->head.block_last[i]);
1114         if (verbose)
1115         {   /* analyse free lists */
1116             zint no_free = 0;
1117             zint block_free = records->head.block_free[i];
1118             WRBUF w = wrbuf_alloc();
1119             while (block_free)
1120             {
1121                 zint nblock;
1122                 no_free++;
1123                 wrbuf_printf(w, " " ZINT_FORMAT, block_free);
1124                 if (bf_read(records->data_BFile[i],
1125                             block_free, 0, sizeof(nblock), &nblock) != 1)
1126                 {
1127                     yaz_log(YLOG_FATAL|YLOG_ERRNO, "read in %s at free block "
1128                             ZINT_FORMAT,
1129                             records->data_fname[i], block_free);
1130                     break;
1131                 }
1132                 block_free = nblock;
1133             }
1134             yaz_log (YLOG_LOG,
1135                      " Number in free list       %8" ZINT_FORMAT0, no_free);
1136             if (no_free)
1137                 yaz_log(YLOG_LOG, "%s", wrbuf_cstr(w));
1138             wrbuf_destroy(w);
1139         }
1140     }
1141     yaz_log (YLOG_LOG,
1142           "Total size of record index in bytes  %8" ZINT_FORMAT0,
1143           records->head.total_bytes);
1144     yaz_log (YLOG_LOG,
1145           "Total size with overhead             %8" ZINT_FORMAT0,
1146           total_bytes);
1147 }
1148
1149 /*
1150  * Local variables:
1151  * c-basic-offset: 4
1152  * c-file-style: "Stroustrup"
1153  * indent-tabs-mode: nil
1154  * End:
1155  * vim: shiftwidth=4 tabstop=8 expandtab
1156  */
1157