Happy new year.
[idzebra-moved-to-github.git] / index / records.c
1 /* This file is part of the Zebra server.
2    Copyright (C) 1994-2011 Index Data
3
4 Zebra is free software; you can redistribute it and/or modify it under
5 the terms of the GNU General Public License as published by the Free
6 Software Foundation; either version 2, or (at your option) any later
7 version.
8
9 Zebra is distributed in the hope that it will be useful, but WITHOUT ANY
10 WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12 for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
17
18 */
19
20 /*
21  *  Format of first block (assumes a 512 block size)
22  *      next       (8 bytes)
23  *      ref_count  (2 bytes)
24  *      block      (500 bytes)
25  *
26  *  Format of subsequent blocks 
27  *      next  (8 bytes)
28  *      block (502 bytes)
29  *
30  *  Format of each record
31  *      sysno
32  *      (length, data) - pairs
33  *      length = 0 if same as previous
34  */
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <assert.h>
38 #include <string.h>
39
40 #include <yaz/yaz-util.h>
41 #include <idzebra/bfile.h>
42 #include "recindex.h"
43
44 #if HAVE_BZLIB_H
45 #include <bzlib.h>
46 #endif
47 #if HAVE_ZLIB_H
48 #include <zlib.h>
49 #endif
50
51 #define REC_BLOCK_TYPES 2
52 #define REC_HEAD_MAGIC "recindex"
53 #define REC_VERSION 5
54
55 struct records_info {
56     int rw;
57     int compression_method;
58
59     recindex_t recindex;
60
61     char *data_fname[REC_BLOCK_TYPES];
62     BFile data_BFile[REC_BLOCK_TYPES];
63
64     char *tmp_buf;
65     int tmp_size;
66
67     struct record_cache_entry *record_cache;
68     int cache_size;
69     int cache_cur;
70     int cache_max;
71
72     int compression_chunk_size;
73
74     Zebra_mutex mutex;
75
76     struct records_head {
77         char magic[8];
78         char version[4];
79         zint block_size[REC_BLOCK_TYPES];
80         zint block_free[REC_BLOCK_TYPES];
81         zint block_last[REC_BLOCK_TYPES];
82         zint block_used[REC_BLOCK_TYPES];
83         zint block_move[REC_BLOCK_TYPES];
84
85         zint total_bytes;
86         zint index_last;
87         zint index_free;
88         zint no_records;
89
90     } head;
91 };
92
93 enum recordCacheFlag { recordFlagNop, recordFlagWrite, recordFlagNew,
94                        recordFlagDelete };
95
96 struct record_cache_entry {
97     Record rec;
98     enum recordCacheFlag flag;
99 };
100
101 struct record_index_entry {
102     zint next;         /* first block of record info / next free entry */
103     int size;          /* size of record or 0 if free entry */
104 };
105
106 Record rec_cp(Record rec);
107
108 /* Modify argument to if below: 1=normal, 0=sysno testing */
109 #if 1
110 /* If this is used sysno are not converted (no testing) */
111 #define FAKE_OFFSET 0
112 #define USUAL_RANGE 6000000000LL
113
114 #else
115 /* Use a fake > 2^32 offset so we can test for proper 64-bit handling */
116 #define FAKE_OFFSET 6000000000LL
117 #define USUAL_RANGE 2000000000LL
118 #endif
119
120 static zint rec_sysno_to_ext(zint sysno)
121 {
122     assert(sysno >= 0 && sysno <= USUAL_RANGE);
123     return sysno + FAKE_OFFSET;
124 }
125
126 zint rec_sysno_to_int(zint sysno)
127 {
128     assert(sysno >= FAKE_OFFSET && sysno <= FAKE_OFFSET + USUAL_RANGE);
129     return sysno - FAKE_OFFSET;
130 }
131
132 static void rec_tmp_expand(Records p, int size)
133 {
134     if (p->tmp_size < size + 2048 ||
135         p->tmp_size < p->head.block_size[REC_BLOCK_TYPES-1]*2)
136     {
137         xfree(p->tmp_buf);
138         p->tmp_size = size + (int)
139                         (p->head.block_size[REC_BLOCK_TYPES-1])*2 + 2048;
140         p->tmp_buf = (char *) xmalloc(p->tmp_size);
141     }
142 }
143
144 static ZEBRA_RES rec_release_blocks(Records p, zint sysno)
145 {
146     struct record_index_entry entry;
147     zint freeblock;
148     char block_and_ref[sizeof(zint) + sizeof(short)];
149     int dst_type;
150     int first = 1;
151
152     if (recindex_read_indx(p->recindex, sysno, &entry, sizeof(entry), 1) != 1)
153         return ZEBRA_FAIL;
154
155     freeblock = entry.next;
156     assert(freeblock > 0);
157     dst_type = CAST_ZINT_TO_INT(freeblock & 7);
158     assert(dst_type < REC_BLOCK_TYPES);
159     freeblock = freeblock / 8;
160     while (freeblock)
161     {
162         if (bf_read(p->data_BFile[dst_type], freeblock, 0,
163                      first ? sizeof(block_and_ref) : sizeof(zint),
164                      block_and_ref) != 1)
165         {
166             yaz_log(YLOG_FATAL|YLOG_ERRNO, "read in rec_del_single");
167             return ZEBRA_FAIL;
168         }
169         if (first)
170         {
171             short ref;
172             memcpy(&ref, block_and_ref + sizeof(freeblock), sizeof(ref));
173             --ref;
174             memcpy(block_and_ref + sizeof(freeblock), &ref, sizeof(ref));
175             if (ref)
176             {
177                 /* there is still a reference to this block.. */
178                 if (bf_write(p->data_BFile[dst_type], freeblock, 0,
179                               sizeof(block_and_ref), block_and_ref))
180                 {
181                     yaz_log(YLOG_FATAL|YLOG_ERRNO, "write in rec_del_single");
182                     return ZEBRA_FAIL;
183                 }
184                 return ZEBRA_OK;
185             }
186             /* the list of blocks can all be removed (ref == 0) */
187             first = 0;
188         }
189         
190         if (bf_write(p->data_BFile[dst_type], freeblock, 0, sizeof(freeblock),
191                       &p->head.block_free[dst_type]))
192         {
193             yaz_log(YLOG_FATAL|YLOG_ERRNO, "write in rec_del_single");
194             return ZEBRA_FAIL;
195         }
196         p->head.block_free[dst_type] = freeblock;
197         memcpy(&freeblock, block_and_ref, sizeof(freeblock));
198
199         p->head.block_used[dst_type]--;
200     }
201     p->head.total_bytes -= entry.size;
202     return ZEBRA_OK;
203 }
204
205 static ZEBRA_RES rec_delete_single(Records p, Record rec)
206 {
207     struct record_index_entry entry;
208
209     /* all data in entry must be reset, since it's written verbatim */
210     memset(&entry, '\0', sizeof(entry));
211     if (rec_release_blocks(p, rec_sysno_to_int(rec->sysno)) != ZEBRA_OK)
212         return ZEBRA_FAIL;
213
214     entry.next = p->head.index_free;
215     entry.size = 0;
216     p->head.index_free = rec_sysno_to_int(rec->sysno);
217     recindex_write_indx(p->recindex, rec_sysno_to_int(rec->sysno), &entry, sizeof(entry));
218     return ZEBRA_OK;
219 }
220
221 static ZEBRA_RES rec_write_tmp_buf(Records p, int size, zint *sysnos)
222 {
223     struct record_index_entry entry;
224     int no_written = 0;
225     char *cptr = p->tmp_buf;
226     zint block_prev = -1, block_free;
227     int dst_type = 0;
228     int i;
229
230     /* all data in entry must be reset, since it's written verbatim */
231     memset(&entry, '\0', sizeof(entry));
232
233     for (i = 1; i<REC_BLOCK_TYPES; i++)
234         if (size >= p->head.block_move[i])
235             dst_type = i;
236     while (no_written < size)
237     {
238         block_free = p->head.block_free[dst_type];
239         if (block_free)
240         {
241             if (bf_read(p->data_BFile[dst_type],
242                          block_free, 0, sizeof(*p->head.block_free),
243                          &p->head.block_free[dst_type]) != 1)
244             {
245                 yaz_log(YLOG_FATAL|YLOG_ERRNO, "read in %s at free block "
246                          ZINT_FORMAT,
247                          p->data_fname[dst_type], block_free);
248                 return ZEBRA_FAIL;
249             }
250         }
251         else
252             block_free = p->head.block_last[dst_type]++;
253         if (block_prev == -1)
254         {
255             entry.next = block_free*8 + dst_type;
256             entry.size = size;
257             p->head.total_bytes += size;
258             while (*sysnos > 0)
259             {
260                 recindex_write_indx(p->recindex, *sysnos, &entry, sizeof(entry));
261                 sysnos++;
262             }
263         }
264         else
265         {
266             memcpy(cptr, &block_free, sizeof(block_free));
267             bf_write(p->data_BFile[dst_type], block_prev, 0, 0, cptr);
268             cptr = p->tmp_buf + no_written;
269         }
270         block_prev = block_free;
271         no_written += CAST_ZINT_TO_INT(p->head.block_size[dst_type]) 
272             - sizeof(zint);
273         p->head.block_used[dst_type]++;
274     }
275     assert(block_prev != -1);
276     block_free = 0;
277     memcpy(cptr, &block_free, sizeof(block_free));
278     bf_write(p->data_BFile[dst_type], block_prev, 0,
279               sizeof(block_free) + (p->tmp_buf+size) - cptr, cptr);
280     return ZEBRA_OK;
281 }
282
283 int rec_check_compression_method(int compression_method)
284 {
285     switch(compression_method)
286     {
287     case REC_COMPRESS_ZLIB:
288 #if HAVE_ZLIB_H
289         return 1;
290 #else
291         return 0;
292 #endif
293     case REC_COMPRESS_BZIP2:
294 #if HAVE_BZLIB_H
295         return 1;
296 #else
297         return 0;
298 #endif
299     case REC_COMPRESS_NONE:
300         return 1;
301     }
302     return 0;
303 }
304
305 Records rec_open(BFiles bfs, int rw, int compression_method)
306 {
307     Records p;
308     int i, r;
309     int version;
310     ZEBRA_RES ret = ZEBRA_OK;
311
312     p = (Records) xmalloc(sizeof(*p));
313     memset(&p->head, '\0', sizeof(p->head));
314     p->compression_method = compression_method;
315     p->rw = rw;
316     p->tmp_size = 4096;
317     p->tmp_buf = (char *) xmalloc(p->tmp_size);
318     p->compression_chunk_size = 0;
319     if (compression_method == REC_COMPRESS_BZIP2)
320         p->compression_chunk_size = 90000;
321     p->recindex = recindex_open(bfs, rw, 0 /* 1=isamb for recindex */);
322     r = recindex_read_head(p->recindex, p->tmp_buf);
323     switch (r)
324     {
325     case 0:
326         memcpy(p->head.magic, REC_HEAD_MAGIC, sizeof(p->head.magic));
327         sprintf(p->head.version, "%3d", REC_VERSION);
328         p->head.index_free = 0;
329         p->head.index_last = 1;
330         p->head.no_records = 0;
331         p->head.total_bytes = 0;
332         for (i = 0; i<REC_BLOCK_TYPES; i++)
333         {
334             p->head.block_free[i] = 0;
335             p->head.block_last[i] = 1;
336             p->head.block_used[i] = 0;
337         }
338         p->head.block_size[0] = 256;
339         p->head.block_move[0] = 0;
340         for (i = 1; i<REC_BLOCK_TYPES; i++)
341         {
342             p->head.block_size[i] = p->head.block_size[i-1] * 8;
343             p->head.block_move[i] = p->head.block_size[i] * 2;
344         }
345         if (rw)
346         {
347             if (recindex_write_head(p->recindex, 
348                                     &p->head, sizeof(p->head)) != ZEBRA_OK)
349                 ret = ZEBRA_FAIL;
350         }
351         break;
352     case 1:
353         memcpy(&p->head, p->tmp_buf, sizeof(p->head));
354         if (memcmp(p->head.magic, REC_HEAD_MAGIC, sizeof(p->head.magic)))
355         {
356             yaz_log(YLOG_FATAL, "file %s has bad format",
357                     recindex_get_fname(p->recindex));
358             ret = ZEBRA_FAIL;
359         }
360         version = atoi(p->head.version);
361         if (version != REC_VERSION)
362         {
363             yaz_log(YLOG_FATAL, "file %s is version %d, but version"
364                   " %d is required",
365                     recindex_get_fname(p->recindex), version, REC_VERSION);
366             ret = ZEBRA_FAIL;
367         }
368         break;
369     }
370     for (i = 0; i<REC_BLOCK_TYPES; i++)
371     {
372         char str[80];
373         sprintf(str, "recd%c", i + 'A');
374         p->data_fname[i] = (char *) xmalloc(strlen(str)+1);
375         strcpy(p->data_fname[i], str);
376         p->data_BFile[i] = NULL;
377     }
378     for (i = 0; i<REC_BLOCK_TYPES; i++)
379     {
380         if (!(p->data_BFile[i] =
381               bf_open(bfs, p->data_fname[i],
382                       CAST_ZINT_TO_INT(p->head.block_size[i]), rw)))
383         {
384             yaz_log(YLOG_FATAL|YLOG_ERRNO, "bf_open %s", p->data_fname[i]);
385             ret = ZEBRA_FAIL;
386             break;
387         }
388     }
389     p->cache_max = 400;
390     p->cache_cur = 0;
391     p->record_cache = (struct record_cache_entry *)
392         xmalloc(sizeof(*p->record_cache)*p->cache_max);
393     zebra_mutex_init(&p->mutex);
394     if (ret == ZEBRA_FAIL)
395         rec_close(&p);
396     return p;
397 }
398
399 static void rec_encode_unsigned(unsigned n, unsigned char *buf, int *len)
400 {
401     (*len) = 0;
402     while (n > 127)
403     {
404         buf[*len] = 128 + (n & 127);
405         n = n >> 7;
406         (*len)++;
407     }
408     buf[*len] = n;
409     (*len)++;
410 }
411
412 static void rec_decode_unsigned(unsigned *np, unsigned char *buf, int *len)
413 {
414     unsigned n = 0;
415     unsigned w = 1;
416     (*len) = 0;
417
418     while (buf[*len] > 127)
419     {
420         n += w*(buf[*len] & 127);
421         w = w << 7;
422         (*len)++;
423     }
424     n += w * buf[*len];
425     (*len)++;
426     *np = n;
427 }
428
429 static void rec_encode_zint(zint n, unsigned char *buf, int *len)
430 {
431     (*len) = 0;
432     while (n > 127)
433     {
434         buf[*len] = (unsigned) (128 + (n & 127));
435         n = n >> 7;
436         (*len)++;
437     }
438     buf[*len] = (unsigned) n;
439     (*len)++;
440 }
441
442 static void rec_decode_zint(zint *np, unsigned char *buf, int *len)
443 {
444     zint  n = 0;
445     zint w = 1;
446     (*len) = 0;
447
448     while (buf[*len] > 127)
449     {
450         n += w*(buf[*len] & 127);
451         w = w << 7;
452         (*len)++;
453     }
454     n += w * buf[*len];
455     (*len)++;
456     *np = n;
457 }
458
459 static void rec_cache_flush_block1(Records p, Record rec, Record last_rec,
460                                    char **out_buf, int *out_size,
461                                    int *out_offset)
462 {
463     int i;
464     int len;
465
466     for (i = 0; i<REC_NO_INFO; i++)
467     {
468         if (*out_offset + CAST_ZINT_TO_INT(rec->size[i]) + 20 > *out_size)
469         {
470             int new_size = *out_offset + rec->size[i] + 65536;
471             char *np = (char *) xmalloc(new_size);
472             if (*out_offset)
473                 memcpy(np, *out_buf, *out_offset);
474             xfree(*out_buf);
475             *out_size = new_size;
476             *out_buf = np;
477         }
478         if (i == 0)
479         {
480             rec_encode_zint(rec_sysno_to_int(rec->sysno), 
481                             (unsigned char *) *out_buf + *out_offset, &len);
482             (*out_offset) += len;
483         }
484         if (rec->size[i] == 0)
485         {
486             rec_encode_unsigned(1, (unsigned char *) *out_buf + *out_offset,
487                                 &len);
488             (*out_offset) += len;
489         }
490         else if (last_rec && rec->size[i] == last_rec->size[i] &&
491                  !memcmp(rec->info[i], last_rec->info[i], rec->size[i]))
492         {
493             rec_encode_unsigned(0, (unsigned char *) *out_buf + *out_offset,
494                                 &len);
495             (*out_offset) += len;
496         }
497         else
498         {
499             rec_encode_unsigned(rec->size[i]+1,
500                                 (unsigned char *) *out_buf + *out_offset,
501                                 &len);
502             (*out_offset) += len;
503             memcpy(*out_buf + *out_offset, rec->info[i], rec->size[i]);
504             (*out_offset) += rec->size[i];
505         }
506     }
507 }
508
509 static ZEBRA_RES rec_flush_shared(Records p, short ref_count, zint *sysnos,
510                                   char *out_buf, int out_offset)
511 {
512     ZEBRA_RES ret = ZEBRA_OK;
513     if (ref_count)
514     {
515         int i;
516         unsigned int csize = 0;  /* indicate compression "not performed yet" */
517         char compression_method = p->compression_method;
518         switch (compression_method)
519         {
520         case REC_COMPRESS_ZLIB:
521 #if HAVE_ZLIB_H
522             csize = out_offset + (out_offset >> 6) + 620;
523             while (1)
524             {
525                 int r;
526                 uLongf destLen = csize;
527                 rec_tmp_expand(p, csize);
528                 r = compress((Bytef *) p->tmp_buf+sizeof(zint)+sizeof(short)+
529                              sizeof(char),
530                              &destLen, (const Bytef *) out_buf, out_offset);
531                 csize = destLen;
532                 if (r == Z_OK)
533                 {
534                     break;
535                 }
536                 if (r != Z_MEM_ERROR)
537                 {
538                     yaz_log(YLOG_WARN, "compress error: %d", r);
539                     csize = 0;
540                     break;
541                 }
542                 csize = csize * 2;
543             }
544 #endif
545             break;
546         case REC_COMPRESS_BZIP2:
547 #if HAVE_BZLIB_H        
548             csize = out_offset + (out_offset >> 6) + 620;
549             rec_tmp_expand(p, csize);
550 #ifdef BZ_CONFIG_ERROR
551             i = BZ2_bzBuffToBuffCompress 
552 #else
553             i = bzBuffToBuffCompress 
554 #endif
555                                     (p->tmp_buf+sizeof(zint)+sizeof(short)+
556                                       sizeof(char),
557                                       &csize, out_buf, out_offset, 1, 0, 30);
558             if (i != BZ_OK)
559             {
560                 yaz_log(YLOG_WARN, "bzBuffToBuffCompress error code=%d", i);
561                 csize = 0;
562             }
563 #endif
564             break;
565         case REC_COMPRESS_NONE:
566             break;
567         }
568         if (!csize)  
569         {
570             /* either no compression or compression not supported ... */
571             csize = out_offset;
572             rec_tmp_expand(p, csize);
573             memcpy(p->tmp_buf + sizeof(zint) + sizeof(short) + sizeof(char),
574                     out_buf, out_offset);
575             csize = out_offset;
576             compression_method = REC_COMPRESS_NONE;
577         }
578         memcpy(p->tmp_buf + sizeof(zint), &ref_count, sizeof(ref_count));
579         memcpy(p->tmp_buf + sizeof(zint)+sizeof(short),
580                 &compression_method, sizeof(compression_method));
581                 
582         /* -------- compression */
583         if (rec_write_tmp_buf(p, csize + sizeof(short) + sizeof(char), sysnos)
584             != ZEBRA_OK)
585             ret = ZEBRA_FAIL;
586     }
587     return ret;
588 }
589
590 static ZEBRA_RES rec_write_multiple(Records p, int saveCount)
591 {
592     int i;
593     short ref_count = 0;
594     Record last_rec = 0;
595     int out_size = 1000;
596     int out_offset = 0;
597     char *out_buf = (char *) xmalloc(out_size);
598     zint *sysnos = (zint *) xmalloc(sizeof(*sysnos) * (p->cache_cur + 1));
599     zint *sysnop = sysnos;
600     ZEBRA_RES ret = ZEBRA_OK;
601
602     for (i = 0; i<p->cache_cur - saveCount; i++)
603     {
604         struct record_cache_entry *e = p->record_cache + i;
605         switch (e->flag)
606         {
607         case recordFlagNew:
608             rec_cache_flush_block1(p, e->rec, last_rec, &out_buf,
609                                     &out_size, &out_offset);
610             *sysnop++ = rec_sysno_to_int(e->rec->sysno);
611             ref_count++;
612             e->flag = recordFlagNop;
613             last_rec = e->rec;
614             break;
615         case recordFlagWrite:
616             if (rec_release_blocks(p, rec_sysno_to_int(e->rec->sysno))
617                 != ZEBRA_OK)
618                 ret = ZEBRA_FAIL;
619
620             rec_cache_flush_block1(p, e->rec, last_rec, &out_buf,
621                                     &out_size, &out_offset);
622             *sysnop++ = rec_sysno_to_int(e->rec->sysno);
623             ref_count++;
624             e->flag = recordFlagNop;
625             last_rec = e->rec;
626             break;
627         case recordFlagDelete:
628             if (rec_delete_single(p, e->rec) != ZEBRA_OK)
629                 ret = ZEBRA_FAIL;
630
631             e->flag = recordFlagNop;
632             break;
633         case recordFlagNop:
634             break;
635         default:
636             break;
637         }
638     }
639
640     *sysnop = -1;
641     rec_flush_shared(p, ref_count, sysnos, out_buf, out_offset);
642     xfree(out_buf);
643     xfree(sysnos);
644     return ret;
645 }
646
647 static ZEBRA_RES rec_cache_flush(Records p, int saveCount)
648 {
649     int i, j;
650     ZEBRA_RES ret;
651
652     if (saveCount >= p->cache_cur)
653         saveCount = 0;
654
655     ret = rec_write_multiple(p, saveCount);
656
657     for (i = 0; i<p->cache_cur - saveCount; i++)
658     {
659         struct record_cache_entry *e = p->record_cache + i;
660         rec_free(&e->rec);
661     } 
662     /* i still being used ... */
663     for (j = 0; j<saveCount; j++, i++)
664         memcpy(p->record_cache+j, p->record_cache+i,
665                 sizeof(*p->record_cache));
666     p->cache_cur = saveCount;
667     return ret;
668 }
669
670 static Record *rec_cache_lookup(Records p, zint sysno,
671                                 enum recordCacheFlag flag)
672 {
673     int i;
674     for (i = 0; i<p->cache_cur; i++)
675     {
676         struct record_cache_entry *e = p->record_cache + i;
677         if (e->rec->sysno == sysno)
678         {
679             if (flag != recordFlagNop && e->flag == recordFlagNop)
680                 e->flag = flag;
681             return &e->rec;
682         }
683     }
684     return NULL;
685 }
686
687 static ZEBRA_RES rec_cache_insert(Records p, Record rec, enum recordCacheFlag flag)
688 {
689     struct record_cache_entry *e;
690     ZEBRA_RES ret = ZEBRA_OK;
691
692     if (p->cache_cur == p->cache_max)
693         ret = rec_cache_flush(p, 1);
694     else if (p->cache_cur > 0)
695     {
696         int i, j;
697         int used = 0;
698         for (i = 0; i<p->cache_cur; i++)
699         {
700             Record r = (p->record_cache + i)->rec;
701             for (j = 0; j<REC_NO_INFO; j++)
702                 used += r->size[j];
703         }
704         if (used > p->compression_chunk_size)
705             ret = rec_cache_flush(p, 1);
706     }
707     assert(p->cache_cur < p->cache_max);
708
709     e = p->record_cache + (p->cache_cur)++;
710     e->flag = flag;
711     e->rec = rec_cp(rec);
712     return ret;
713 }
714
715 ZEBRA_RES rec_close(Records *pp)
716 {
717     Records p = *pp;
718     int i;
719     ZEBRA_RES ret = ZEBRA_OK;
720
721     if (!p)
722         return ret;
723
724     zebra_mutex_destroy(&p->mutex);
725     if (rec_cache_flush(p, 0) != ZEBRA_OK)
726         ret = ZEBRA_FAIL;
727
728     xfree(p->record_cache);
729
730     if (p->rw)
731     {
732         if (recindex_write_head(p->recindex, &p->head, sizeof(p->head)) != ZEBRA_OK)
733             ret = ZEBRA_FAIL;
734     }
735
736     recindex_close(p->recindex);
737
738     for (i = 0; i<REC_BLOCK_TYPES; i++)
739     {
740         if (p->data_BFile[i])
741             bf_close(p->data_BFile[i]);
742         xfree(p->data_fname[i]);
743     }
744     xfree(p->tmp_buf);
745     xfree(p);
746     *pp = NULL;
747     return ret;
748 }
749
750 static Record rec_get_int(Records p, zint sysno)
751 {
752     int i, in_size, r;
753     Record rec, *recp;
754     struct record_index_entry entry;
755     zint freeblock;
756     int dst_type;
757     char *nptr, *cptr;
758     char *in_buf = 0;
759     char *bz_buf = 0;
760     char compression_method;
761
762     assert(sysno > 0);
763     assert(p);
764
765     if ((recp = rec_cache_lookup(p, sysno, recordFlagNop)))
766         return rec_cp(*recp);
767
768     if (recindex_read_indx(p->recindex, rec_sysno_to_int(sysno), &entry, sizeof(entry), 1) < 1)
769         return NULL;       /* record is not there! */
770
771     if (!entry.size)
772         return NULL;       /* record is deleted */
773
774     dst_type = (int) (entry.next & 7);
775     assert(dst_type < REC_BLOCK_TYPES);
776     freeblock = entry.next / 8;
777
778     assert(freeblock > 0);
779     
780     rec_tmp_expand(p, entry.size);
781
782     cptr = p->tmp_buf;
783     r = bf_read(p->data_BFile[dst_type], freeblock, 0, 0, cptr);
784     if (r < 0)
785         return 0;
786     memcpy(&freeblock, cptr, sizeof(freeblock));
787
788     while (freeblock)
789     {
790         zint tmp;
791
792         cptr += p->head.block_size[dst_type] - sizeof(freeblock);
793         
794         memcpy(&tmp, cptr, sizeof(tmp));
795         r = bf_read(p->data_BFile[dst_type], freeblock, 0, 0, cptr);
796         if (r < 0)
797             return 0;
798         memcpy(&freeblock, cptr, sizeof(freeblock));
799         memcpy(cptr, &tmp, sizeof(tmp));
800     }
801
802     rec = (Record) xmalloc(sizeof(*rec));
803     rec->sysno = sysno;
804     memcpy(&compression_method, p->tmp_buf + sizeof(zint) + sizeof(short),
805             sizeof(compression_method));
806     in_buf = p->tmp_buf + sizeof(zint) + sizeof(short) + sizeof(char);
807     in_size = entry.size - sizeof(short) - sizeof(char);
808     switch (compression_method)
809     {
810     case REC_COMPRESS_ZLIB:
811 #if HAVE_ZLIB_H
812         if (1)
813         {
814             unsigned int bz_size = entry.size * 20 + 100;
815             while (1)
816             {
817                 uLongf destLen = bz_size;
818                 bz_buf = (char *) xmalloc(bz_size);
819                 i = uncompress((Bytef *) bz_buf, &destLen,
820                                (const Bytef *) in_buf, in_size);
821                 if (i == Z_OK)
822                 {
823                     bz_size = destLen; 
824                     break;
825                 }
826                 yaz_log(YLOG_LOG, "failed");
827                 xfree(bz_buf);
828                 bz_size *= 2;
829             }
830             in_buf = bz_buf;
831             in_size = bz_size;
832         }
833 #else
834         yaz_log(YLOG_FATAL, "cannot decompress record(s) in ZLIB format");
835         return 0;
836 #endif
837         break;
838     case REC_COMPRESS_BZIP2:
839 #if HAVE_BZLIB_H
840         if (1)
841         {
842             unsigned int bz_size = entry.size * 20 + 100;
843             while (1)
844             {
845                 bz_buf = (char *) xmalloc(bz_size);
846 #ifdef BZ_CONFIG_ERROR
847                 i = BZ2_bzBuffToBuffDecompress
848 #else
849                     i = bzBuffToBuffDecompress
850 #endif
851                     (bz_buf, &bz_size, in_buf, in_size, 0, 0);
852                 if (i == BZ_OK)
853                     break;
854                 yaz_log(YLOG_LOG, "failed");
855                 xfree(bz_buf);
856                 bz_size *= 2;
857             }
858             in_buf = bz_buf;
859             in_size = bz_size;
860         }
861 #else
862         yaz_log(YLOG_FATAL, "cannot decompress record(s) in BZIP2 format");
863         return 0;
864 #endif
865         break;
866     case REC_COMPRESS_NONE:
867         break;
868     }
869     for (i = 0; i<REC_NO_INFO; i++)
870         rec->info[i] = 0;
871
872     nptr = in_buf;                /* skip ref count */
873     while (nptr < in_buf + in_size)
874     {
875         zint this_sysno;
876         int len;
877         rec_decode_zint(&this_sysno, (unsigned char *) nptr, &len);
878         nptr += len;
879
880         for (i = 0; i < REC_NO_INFO; i++)
881         {
882             unsigned int this_size;
883             rec_decode_unsigned(&this_size, (unsigned char *) nptr, &len);
884             nptr += len;
885
886             if (this_size == 0)
887                 continue;
888             rec->size[i] = this_size-1;
889
890             if (rec->size[i])
891             {
892                 rec->info[i] = nptr;
893                 nptr += rec->size[i];
894             }
895             else
896                 rec->info[i] = NULL;
897         }
898         if (this_sysno == rec_sysno_to_int(sysno))
899             break;
900     }
901     for (i = 0; i<REC_NO_INFO; i++)
902     {
903         if (rec->info[i] && rec->size[i])
904         {
905             char *np = xmalloc(rec->size[i]+1);
906             memcpy(np, rec->info[i], rec->size[i]);
907             np[rec->size[i]] = '\0';
908             rec->info[i] = np;
909         }
910         else
911         {
912             assert(rec->info[i] == 0);
913             assert(rec->size[i] == 0);
914         }
915     }
916     xfree(bz_buf);
917     if (rec_cache_insert(p, rec, recordFlagNop) != ZEBRA_OK)
918         return 0;
919     return rec;
920 }
921
922 Record rec_get(Records p, zint sysno)
923 {
924     Record rec;
925     zebra_mutex_lock(&p->mutex);
926
927     rec = rec_get_int(p, sysno);
928     zebra_mutex_unlock(&p->mutex);
929     return rec;
930 }
931
932 Record rec_get_root(Records p)
933 {
934     return rec_get(p, rec_sysno_to_ext(1));
935 }
936
937 Record rec_get_next(Records p, Record rec)
938 {
939     Record next = 0;
940     zint next_sysno_int = rec_sysno_to_int(rec->sysno);
941
942     while (!next)
943     {
944          ++next_sysno_int;
945         if (next_sysno_int == p->head.index_last)
946             break;
947         next = rec_get(p, rec_sysno_to_ext(next_sysno_int));
948     }
949     return next;
950 }
951
952 static Record rec_new_int(Records p)
953 {
954     int i;
955     zint sysno;
956     Record rec;
957
958     assert(p);
959     rec = (Record) xmalloc(sizeof(*rec));
960     if (1 || p->head.index_free == 0)
961         sysno = (p->head.index_last)++;
962     else
963     {
964         struct record_index_entry entry;
965
966         if (recindex_read_indx(p->recindex, p->head.index_free, &entry, sizeof(entry), 0) < 1)
967         {
968             xfree(rec);
969             return 0;
970         }
971         sysno = p->head.index_free;
972         p->head.index_free = entry.next;
973     }
974     (p->head.no_records)++;
975     rec->sysno = rec_sysno_to_ext(sysno);
976     for (i = 0; i < REC_NO_INFO; i++)
977     {
978         rec->info[i] = NULL;
979         rec->size[i] = 0;
980     }
981     rec_cache_insert(p, rec, recordFlagNew);
982     return rec;
983 }
984
985 Record rec_new(Records p)
986 {
987     Record rec;
988     zebra_mutex_lock(&p->mutex);
989
990     rec = rec_new_int(p);
991     zebra_mutex_unlock(&p->mutex);
992     return rec;
993 }
994
995 ZEBRA_RES rec_del(Records p, Record *recpp)
996 {
997     Record *recp;
998     ZEBRA_RES ret = ZEBRA_OK;
999
1000     zebra_mutex_lock(&p->mutex);
1001     (p->head.no_records)--;
1002     if ((recp = rec_cache_lookup(p, (*recpp)->sysno, recordFlagDelete)))
1003     {
1004         rec_free(recp);
1005         *recp = *recpp;
1006     }
1007     else
1008     {
1009         ret = rec_cache_insert(p, *recpp, recordFlagDelete);
1010         rec_free(recpp);
1011     }
1012     zebra_mutex_unlock(&p->mutex);
1013     *recpp = NULL;
1014     return ret;
1015 }
1016
1017 ZEBRA_RES rec_put(Records p, Record *recpp)
1018 {
1019     Record *recp;
1020     ZEBRA_RES ret = ZEBRA_OK;
1021
1022     zebra_mutex_lock(&p->mutex);
1023     if ((recp = rec_cache_lookup(p, (*recpp)->sysno, recordFlagWrite)))
1024     {
1025         rec_free(recp);
1026         *recp = *recpp;
1027     }
1028     else
1029     {
1030         ret = rec_cache_insert(p, *recpp, recordFlagWrite);
1031         rec_free(recpp);
1032     }
1033     zebra_mutex_unlock(&p->mutex);
1034     *recpp = NULL;
1035     return ret;
1036 }
1037
1038 void rec_free(Record *recpp)
1039 {
1040     int i;
1041
1042     if (!*recpp)
1043         return ;
1044     for (i = 0; i < REC_NO_INFO; i++)
1045         xfree((*recpp)->info[i]);
1046     xfree(*recpp);
1047     *recpp = NULL;
1048 }
1049
1050 Record rec_cp(Record rec)
1051 {
1052     Record n;
1053     int i;
1054
1055     n = (Record) xmalloc(sizeof(*n));
1056     n->sysno = rec->sysno;
1057     for (i = 0; i < REC_NO_INFO; i++)
1058         if (!rec->info[i])
1059         {
1060             n->info[i] = NULL;
1061             n->size[i] = 0;
1062         }
1063         else
1064         {
1065             n->size[i] = rec->size[i];
1066             n->info[i] = (char *) xmalloc(rec->size[i]+1);
1067             memcpy(n->info[i], rec->info[i], rec->size[i]);
1068             n->info[i][rec->size[i]] = '\0';
1069         }
1070     return n;
1071 }
1072
1073
1074 char *rec_strdup(const char *s, size_t *len)
1075 {
1076     char *p;
1077
1078     if (!s)
1079     {
1080         *len = 0;
1081         return NULL;
1082     }
1083     *len = strlen(s)+1;
1084     p = (char *) xmalloc(*len);
1085     strcpy(p, s);
1086     return p;
1087 }
1088
1089 void rec_prstat(Records records, int verbose)
1090 {
1091     int i;
1092     zint total_bytes = 0;
1093     
1094     yaz_log (YLOG_LOG,
1095           "Total records                        %8" ZINT_FORMAT0,
1096           records->head.no_records);
1097
1098     for (i = 0; i< REC_BLOCK_TYPES; i++)
1099     {
1100         yaz_log (YLOG_LOG, "Record blocks of size "ZINT_FORMAT,
1101               records->head.block_size[i]);
1102         yaz_log (YLOG_LOG,
1103           " Used/Total/Bytes used            "
1104               ZINT_FORMAT "/" ZINT_FORMAT "/" ZINT_FORMAT,
1105               records->head.block_used[i], records->head.block_last[i]-1,
1106               records->head.block_used[i] * records->head.block_size[i]);
1107         total_bytes +=
1108             records->head.block_used[i] * records->head.block_size[i];
1109
1110         yaz_log(YLOG_LOG, " Block Last " ZINT_FORMAT, records->head.block_last[i]);
1111         if (verbose)
1112         {   /* analyse free lists */
1113             zint no_free = 0;
1114             zint block_free = records->head.block_free[i];
1115             WRBUF w = wrbuf_alloc();
1116             while (block_free)
1117             {
1118                 zint nblock;
1119                 no_free++;
1120                 wrbuf_printf(w, " " ZINT_FORMAT, block_free);
1121                 if (bf_read(records->data_BFile[i],
1122                             block_free, 0, sizeof(nblock), &nblock) != 1)
1123                 {
1124                     yaz_log(YLOG_FATAL|YLOG_ERRNO, "read in %s at free block "
1125                             ZINT_FORMAT,
1126                             records->data_fname[i], block_free);
1127                     break;
1128                 }
1129                 block_free = nblock;
1130             }
1131             yaz_log (YLOG_LOG,
1132                      " Number in free list       %8" ZINT_FORMAT0, no_free);
1133             if (no_free)
1134                 yaz_log(YLOG_LOG, "%s", wrbuf_cstr(w));
1135             wrbuf_destroy(w);
1136         }
1137     }
1138     yaz_log (YLOG_LOG,
1139           "Total size of record index in bytes  %8" ZINT_FORMAT0,
1140           records->head.total_bytes);
1141     yaz_log (YLOG_LOG,
1142           "Total size with overhead             %8" ZINT_FORMAT0,
1143           total_bytes);
1144 }
1145
1146 /*
1147  * Local variables:
1148  * c-basic-offset: 4
1149  * c-file-style: "Stroustrup"
1150  * indent-tabs-mode: nil
1151  * End:
1152  * vim: shiftwidth=4 tabstop=8 expandtab
1153  */
1154