Avoid logging of all compression/decompression
[idzebra-moved-to-github.git] / index / records.c
1 /* This file is part of the Zebra server.
2    Copyright (C) 1994-2009 Index Data
3
4 Zebra is free software; you can redistribute it and/or modify it under
5 the terms of the GNU General Public License as published by the Free
6 Software Foundation; either version 2, or (at your option) any later
7 version.
8
9 Zebra is distributed in the hope that it will be useful, but WITHOUT ANY
10 WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12 for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
17
18 */
19
20 /*
21  *  Format of first block (assumes a 512 block size)
22  *      next       (8 bytes)
23  *      ref_count  (2 bytes)
24  *      block      (500 bytes)
25  *
26  *  Format of subsequent blocks 
27  *      next  (8 bytes)
28  *      block (502 bytes)
29  *
30  *  Format of each record
31  *      sysno
32  *      (length, data) - pairs
33  *      length = 0 if same as previous
34  */
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <assert.h>
38 #include <string.h>
39
40 #include <yaz/yaz-util.h>
41 #include <idzebra/bfile.h>
42 #include "recindex.h"
43
44 #if HAVE_BZLIB_H
45 #include <bzlib.h>
46 #endif
47 #if HAVE_ZLIB_H
48 #include <zlib.h>
49 #endif
50
51 #define REC_BLOCK_TYPES 2
52 #define REC_HEAD_MAGIC "recindex"
53 #define REC_VERSION 5
54
55 struct records_info {
56     int rw;
57     int compression_method;
58
59     recindex_t recindex;
60
61     char *data_fname[REC_BLOCK_TYPES];
62     BFile data_BFile[REC_BLOCK_TYPES];
63
64     char *tmp_buf;
65     int tmp_size;
66
67     struct record_cache_entry *record_cache;
68     int cache_size;
69     int cache_cur;
70     int cache_max;
71
72     int compression_chunk_size;
73
74     Zebra_mutex mutex;
75
76     struct records_head {
77         char magic[8];
78         char version[4];
79         zint block_size[REC_BLOCK_TYPES];
80         zint block_free[REC_BLOCK_TYPES];
81         zint block_last[REC_BLOCK_TYPES];
82         zint block_used[REC_BLOCK_TYPES];
83         zint block_move[REC_BLOCK_TYPES];
84
85         zint total_bytes;
86         zint index_last;
87         zint index_free;
88         zint no_records;
89
90     } head;
91 };
92
93 enum recordCacheFlag { recordFlagNop, recordFlagWrite, recordFlagNew,
94                        recordFlagDelete };
95
96 struct record_cache_entry {
97     Record rec;
98     enum recordCacheFlag flag;
99 };
100
101 struct record_index_entry {
102     zint next;         /* first block of record info / next free entry */
103     int size;          /* size of record or 0 if free entry */
104 };
105
106 Record rec_cp(Record rec);
107
108 /* Modify argument to if below: 1=normal, 0=sysno testing */
109 #if 1
110 /* If this is used sysno are not converted (no testing) */
111 #define FAKE_OFFSET 0
112 #define USUAL_RANGE 6000000000LL
113
114 #else
115 /* Use a fake > 2^32 offset so we can test for proper 64-bit handling */
116 #define FAKE_OFFSET 6000000000LL
117 #define USUAL_RANGE 2000000000LL
118 #endif
119
120 static zint rec_sysno_to_ext(zint sysno)
121 {
122     assert(sysno >= 0 && sysno <= USUAL_RANGE);
123     return sysno + FAKE_OFFSET;
124 }
125
126 zint rec_sysno_to_int(zint sysno)
127 {
128     assert(sysno >= FAKE_OFFSET && sysno <= FAKE_OFFSET + USUAL_RANGE);
129     return sysno - FAKE_OFFSET;
130 }
131
132 static void rec_tmp_expand(Records p, int size)
133 {
134     if (p->tmp_size < size + 2048 ||
135         p->tmp_size < p->head.block_size[REC_BLOCK_TYPES-1]*2)
136     {
137         xfree(p->tmp_buf);
138         p->tmp_size = size + (int)
139                         (p->head.block_size[REC_BLOCK_TYPES-1])*2 + 2048;
140         p->tmp_buf = (char *) xmalloc(p->tmp_size);
141     }
142 }
143
144 static ZEBRA_RES rec_release_blocks(Records p, zint sysno)
145 {
146     struct record_index_entry entry;
147     zint freeblock;
148     char block_and_ref[sizeof(zint) + sizeof(short)];
149     int dst_type;
150     int first = 1;
151
152     if (recindex_read_indx(p->recindex, sysno, &entry, sizeof(entry), 1) != 1)
153         return ZEBRA_FAIL;
154
155     freeblock = entry.next;
156     assert(freeblock > 0);
157     dst_type = CAST_ZINT_TO_INT(freeblock & 7);
158     assert(dst_type < REC_BLOCK_TYPES);
159     freeblock = freeblock / 8;
160     while (freeblock)
161     {
162         if (bf_read(p->data_BFile[dst_type], freeblock, 0,
163                      first ? sizeof(block_and_ref) : sizeof(zint),
164                      block_and_ref) != 1)
165         {
166             yaz_log(YLOG_FATAL|YLOG_ERRNO, "read in rec_del_single");
167             return ZEBRA_FAIL;
168         }
169         if (first)
170         {
171             short ref;
172             memcpy(&ref, block_and_ref + sizeof(freeblock), sizeof(ref));
173             --ref;
174             memcpy(block_and_ref + sizeof(freeblock), &ref, sizeof(ref));
175             if (ref)
176             {
177                 /* there is still a reference to this block.. */
178                 if (bf_write(p->data_BFile[dst_type], freeblock, 0,
179                               sizeof(block_and_ref), block_and_ref))
180                 {
181                     yaz_log(YLOG_FATAL|YLOG_ERRNO, "write in rec_del_single");
182                     return ZEBRA_FAIL;
183                 }
184                 return ZEBRA_OK;
185             }
186             /* the list of blocks can all be removed (ref == 0) */
187             first = 0;
188         }
189         
190         if (bf_write(p->data_BFile[dst_type], freeblock, 0, sizeof(freeblock),
191                       &p->head.block_free[dst_type]))
192         {
193             yaz_log(YLOG_FATAL|YLOG_ERRNO, "write in rec_del_single");
194             return ZEBRA_FAIL;
195         }
196         p->head.block_free[dst_type] = freeblock;
197         memcpy(&freeblock, block_and_ref, sizeof(freeblock));
198
199         p->head.block_used[dst_type]--;
200     }
201     p->head.total_bytes -= entry.size;
202     return ZEBRA_OK;
203 }
204
205 static ZEBRA_RES rec_delete_single(Records p, Record rec)
206 {
207     struct record_index_entry entry;
208
209     /* all data in entry must be reset, since it's written verbatim */
210     memset(&entry, '\0', sizeof(entry));
211     if (rec_release_blocks(p, rec_sysno_to_int(rec->sysno)) != ZEBRA_OK)
212         return ZEBRA_FAIL;
213
214     entry.next = p->head.index_free;
215     entry.size = 0;
216     p->head.index_free = rec_sysno_to_int(rec->sysno);
217     recindex_write_indx(p->recindex, rec_sysno_to_int(rec->sysno), &entry, sizeof(entry));
218     return ZEBRA_OK;
219 }
220
221 static ZEBRA_RES rec_write_tmp_buf(Records p, int size, zint *sysnos)
222 {
223     struct record_index_entry entry;
224     int no_written = 0;
225     char *cptr = p->tmp_buf;
226     zint block_prev = -1, block_free;
227     int dst_type = 0;
228     int i;
229
230     /* all data in entry must be reset, since it's written verbatim */
231     memset(&entry, '\0', sizeof(entry));
232
233     for (i = 1; i<REC_BLOCK_TYPES; i++)
234         if (size >= p->head.block_move[i])
235             dst_type = i;
236     while (no_written < size)
237     {
238         block_free = p->head.block_free[dst_type];
239         if (block_free)
240         {
241             if (bf_read(p->data_BFile[dst_type],
242                          block_free, 0, sizeof(*p->head.block_free),
243                          &p->head.block_free[dst_type]) != 1)
244             {
245                 yaz_log(YLOG_FATAL|YLOG_ERRNO, "read in %s at free block "
246                          ZINT_FORMAT,
247                          p->data_fname[dst_type], block_free);
248                 return ZEBRA_FAIL;
249             }
250         }
251         else
252             block_free = p->head.block_last[dst_type]++;
253         if (block_prev == -1)
254         {
255             entry.next = block_free*8 + dst_type;
256             entry.size = size;
257             p->head.total_bytes += size;
258             while (*sysnos > 0)
259             {
260                 recindex_write_indx(p->recindex, *sysnos, &entry, sizeof(entry));
261                 sysnos++;
262             }
263         }
264         else
265         {
266             memcpy(cptr, &block_free, sizeof(block_free));
267             bf_write(p->data_BFile[dst_type], block_prev, 0, 0, cptr);
268             cptr = p->tmp_buf + no_written;
269         }
270         block_prev = block_free;
271         no_written += CAST_ZINT_TO_INT(p->head.block_size[dst_type]) 
272             - sizeof(zint);
273         p->head.block_used[dst_type]++;
274     }
275     assert(block_prev != -1);
276     block_free = 0;
277     memcpy(cptr, &block_free, sizeof(block_free));
278     bf_write(p->data_BFile[dst_type], block_prev, 0,
279               sizeof(block_free) + (p->tmp_buf+size) - cptr, cptr);
280     return ZEBRA_OK;
281 }
282
283 Records rec_open(BFiles bfs, int rw, int compression_method)
284 {
285     Records p;
286     int i, r;
287     int version;
288     ZEBRA_RES ret = ZEBRA_OK;
289
290     p = (Records) xmalloc(sizeof(*p));
291     memset(&p->head, '\0', sizeof(p->head));
292     p->compression_method = compression_method;
293     p->rw = rw;
294     p->tmp_size = 4096;
295     p->tmp_buf = (char *) xmalloc(p->tmp_size);
296     p->compression_chunk_size = 0;
297     if (compression_method == REC_COMPRESS_BZIP2)
298         p->compression_chunk_size = 90000;
299     p->recindex = recindex_open(bfs, rw, 0 /* 1=isamb for recindex */);
300     r = recindex_read_head(p->recindex, p->tmp_buf);
301     switch (r)
302     {
303     case 0:
304         memcpy(p->head.magic, REC_HEAD_MAGIC, sizeof(p->head.magic));
305         sprintf(p->head.version, "%3d", REC_VERSION);
306         p->head.index_free = 0;
307         p->head.index_last = 1;
308         p->head.no_records = 0;
309         p->head.total_bytes = 0;
310         for (i = 0; i<REC_BLOCK_TYPES; i++)
311         {
312             p->head.block_free[i] = 0;
313             p->head.block_last[i] = 1;
314             p->head.block_used[i] = 0;
315         }
316         p->head.block_size[0] = 256;
317         p->head.block_move[0] = 0;
318         for (i = 1; i<REC_BLOCK_TYPES; i++)
319         {
320             p->head.block_size[i] = p->head.block_size[i-1] * 8;
321             p->head.block_move[i] = p->head.block_size[i] * 2;
322         }
323         if (rw)
324         {
325             if (recindex_write_head(p->recindex, 
326                                     &p->head, sizeof(p->head)) != ZEBRA_OK)
327                 ret = ZEBRA_FAIL;
328         }
329         break;
330     case 1:
331         memcpy(&p->head, p->tmp_buf, sizeof(p->head));
332         if (memcmp(p->head.magic, REC_HEAD_MAGIC, sizeof(p->head.magic)))
333         {
334             yaz_log(YLOG_FATAL, "file %s has bad format",
335                     recindex_get_fname(p->recindex));
336             ret = ZEBRA_FAIL;
337         }
338         version = atoi(p->head.version);
339         if (version != REC_VERSION)
340         {
341             yaz_log(YLOG_FATAL, "file %s is version %d, but version"
342                   " %d is required",
343                     recindex_get_fname(p->recindex), version, REC_VERSION);
344             ret = ZEBRA_FAIL;
345         }
346         break;
347     }
348     for (i = 0; i<REC_BLOCK_TYPES; i++)
349     {
350         char str[80];
351         sprintf(str, "recd%c", i + 'A');
352         p->data_fname[i] = (char *) xmalloc(strlen(str)+1);
353         strcpy(p->data_fname[i], str);
354         p->data_BFile[i] = NULL;
355     }
356     for (i = 0; i<REC_BLOCK_TYPES; i++)
357     {
358         if (!(p->data_BFile[i] =
359               bf_open(bfs, p->data_fname[i],
360                       CAST_ZINT_TO_INT(p->head.block_size[i]), rw)))
361         {
362             yaz_log(YLOG_FATAL|YLOG_ERRNO, "bf_open %s", p->data_fname[i]);
363             ret = ZEBRA_FAIL;
364             break;
365         }
366     }
367     p->cache_max = 400;
368     p->cache_cur = 0;
369     p->record_cache = (struct record_cache_entry *)
370         xmalloc(sizeof(*p->record_cache)*p->cache_max);
371     zebra_mutex_init(&p->mutex);
372     if (ret == ZEBRA_FAIL)
373         rec_close(&p);
374     return p;
375 }
376
377 static void rec_encode_unsigned(unsigned n, unsigned char *buf, int *len)
378 {
379     (*len) = 0;
380     while (n > 127)
381     {
382         buf[*len] = 128 + (n & 127);
383         n = n >> 7;
384         (*len)++;
385     }
386     buf[*len] = n;
387     (*len)++;
388 }
389
390 static void rec_decode_unsigned(unsigned *np, unsigned char *buf, int *len)
391 {
392     unsigned n = 0;
393     unsigned w = 1;
394     (*len) = 0;
395
396     while (buf[*len] > 127)
397     {
398         n += w*(buf[*len] & 127);
399         w = w << 7;
400         (*len)++;
401     }
402     n += w * buf[*len];
403     (*len)++;
404     *np = n;
405 }
406
407 static void rec_encode_zint(zint n, unsigned char *buf, int *len)
408 {
409     (*len) = 0;
410     while (n > 127)
411     {
412         buf[*len] = (unsigned) (128 + (n & 127));
413         n = n >> 7;
414         (*len)++;
415     }
416     buf[*len] = (unsigned) n;
417     (*len)++;
418 }
419
420 static void rec_decode_zint(zint *np, unsigned char *buf, int *len)
421 {
422     zint  n = 0;
423     zint w = 1;
424     (*len) = 0;
425
426     while (buf[*len] > 127)
427     {
428         n += w*(buf[*len] & 127);
429         w = w << 7;
430         (*len)++;
431     }
432     n += w * buf[*len];
433     (*len)++;
434     *np = n;
435 }
436
437 static void rec_cache_flush_block1(Records p, Record rec, Record last_rec,
438                                    char **out_buf, int *out_size,
439                                    int *out_offset)
440 {
441     int i;
442     int len;
443
444     for (i = 0; i<REC_NO_INFO; i++)
445     {
446         if (*out_offset + CAST_ZINT_TO_INT(rec->size[i]) + 20 > *out_size)
447         {
448             int new_size = *out_offset + rec->size[i] + 65536;
449             char *np = (char *) xmalloc(new_size);
450             if (*out_offset)
451                 memcpy(np, *out_buf, *out_offset);
452             xfree(*out_buf);
453             *out_size = new_size;
454             *out_buf = np;
455         }
456         if (i == 0)
457         {
458             rec_encode_zint(rec_sysno_to_int(rec->sysno), 
459                             (unsigned char *) *out_buf + *out_offset, &len);
460             (*out_offset) += len;
461         }
462         if (rec->size[i] == 0)
463         {
464             rec_encode_unsigned(1, (unsigned char *) *out_buf + *out_offset,
465                                 &len);
466             (*out_offset) += len;
467         }
468         else if (last_rec && rec->size[i] == last_rec->size[i] &&
469                  !memcmp(rec->info[i], last_rec->info[i], rec->size[i]))
470         {
471             rec_encode_unsigned(0, (unsigned char *) *out_buf + *out_offset,
472                                 &len);
473             (*out_offset) += len;
474         }
475         else
476         {
477             rec_encode_unsigned(rec->size[i]+1,
478                                 (unsigned char *) *out_buf + *out_offset,
479                                 &len);
480             (*out_offset) += len;
481             memcpy(*out_buf + *out_offset, rec->info[i], rec->size[i]);
482             (*out_offset) += rec->size[i];
483         }
484     }
485 }
486
487 static ZEBRA_RES rec_flush_shared(Records p, short ref_count, zint *sysnos,
488                                   char *out_buf, int out_offset)
489 {
490     ZEBRA_RES ret = ZEBRA_OK;
491     if (ref_count)
492     {
493         int i;
494         unsigned int csize = 0;  /* indicate compression "not performed yet" */
495         char compression_method = p->compression_method;
496         switch (compression_method)
497         {
498         case REC_COMPRESS_ZLIB:
499 #if HAVE_ZLIB_H
500             csize = out_offset + (out_offset >> 6) + 620;
501             while (1)
502             {
503                 int r;
504                 uLongf destLen = csize;
505                 rec_tmp_expand(p, csize);
506                 r = compress((Bytef *) p->tmp_buf+sizeof(zint)+sizeof(short)+
507                              sizeof(char),
508                              &destLen, (const Bytef *) out_buf, out_offset);
509                 csize = destLen;
510                 if (r == Z_OK)
511                 {
512                     break;
513                 }
514                 if (r != Z_MEM_ERROR)
515                 {
516                     yaz_log(YLOG_WARN, "compress error: %d", r);
517                     csize = 0;
518                     break;
519                 }
520                 csize = csize * 2;
521             }
522 #endif
523             break;
524         case REC_COMPRESS_BZIP2:
525 #if HAVE_BZLIB_H        
526             csize = out_offset + (out_offset >> 6) + 620;
527             rec_tmp_expand(p, csize);
528 #ifdef BZ_CONFIG_ERROR
529             i = BZ2_bzBuffToBuffCompress 
530 #else
531             i = bzBuffToBuffCompress 
532 #endif
533                                     (p->tmp_buf+sizeof(zint)+sizeof(short)+
534                                       sizeof(char),
535                                       &csize, out_buf, out_offset, 1, 0, 30);
536             if (i != BZ_OK)
537             {
538                 yaz_log(YLOG_WARN, "bzBuffToBuffCompress error code=%d", i);
539                 csize = 0;
540             }
541 #endif
542             break;
543         case REC_COMPRESS_NONE:
544             break;
545         }
546         if (!csize)  
547         {
548             /* either no compression or compression not supported ... */
549             csize = out_offset;
550             rec_tmp_expand(p, csize);
551             memcpy(p->tmp_buf + sizeof(zint) + sizeof(short) + sizeof(char),
552                     out_buf, out_offset);
553             csize = out_offset;
554             compression_method = REC_COMPRESS_NONE;
555         }
556         memcpy(p->tmp_buf + sizeof(zint), &ref_count, sizeof(ref_count));
557         memcpy(p->tmp_buf + sizeof(zint)+sizeof(short),
558                 &compression_method, sizeof(compression_method));
559                 
560         /* -------- compression */
561         if (rec_write_tmp_buf(p, csize + sizeof(short) + sizeof(char), sysnos)
562             != ZEBRA_OK)
563             ret = ZEBRA_FAIL;
564     }
565     return ret;
566 }
567
568 static ZEBRA_RES rec_write_multiple(Records p, int saveCount)
569 {
570     int i;
571     short ref_count = 0;
572     Record last_rec = 0;
573     int out_size = 1000;
574     int out_offset = 0;
575     char *out_buf = (char *) xmalloc(out_size);
576     zint *sysnos = (zint *) xmalloc(sizeof(*sysnos) * (p->cache_cur + 1));
577     zint *sysnop = sysnos;
578     ZEBRA_RES ret = ZEBRA_OK;
579
580     for (i = 0; i<p->cache_cur - saveCount; i++)
581     {
582         struct record_cache_entry *e = p->record_cache + i;
583         switch (e->flag)
584         {
585         case recordFlagNew:
586             rec_cache_flush_block1(p, e->rec, last_rec, &out_buf,
587                                     &out_size, &out_offset);
588             *sysnop++ = rec_sysno_to_int(e->rec->sysno);
589             ref_count++;
590             e->flag = recordFlagNop;
591             last_rec = e->rec;
592             break;
593         case recordFlagWrite:
594             if (rec_release_blocks(p, rec_sysno_to_int(e->rec->sysno))
595                 != ZEBRA_OK)
596                 ret = ZEBRA_FAIL;
597
598             rec_cache_flush_block1(p, e->rec, last_rec, &out_buf,
599                                     &out_size, &out_offset);
600             *sysnop++ = rec_sysno_to_int(e->rec->sysno);
601             ref_count++;
602             e->flag = recordFlagNop;
603             last_rec = e->rec;
604             break;
605         case recordFlagDelete:
606             if (rec_delete_single(p, e->rec) != ZEBRA_OK)
607                 ret = ZEBRA_FAIL;
608
609             e->flag = recordFlagNop;
610             break;
611         case recordFlagNop:
612             break;
613         default:
614             break;
615         }
616     }
617
618     *sysnop = -1;
619     rec_flush_shared(p, ref_count, sysnos, out_buf, out_offset);
620     xfree(out_buf);
621     xfree(sysnos);
622     return ret;
623 }
624
625 static ZEBRA_RES rec_cache_flush(Records p, int saveCount)
626 {
627     int i, j;
628     ZEBRA_RES ret;
629
630     if (saveCount >= p->cache_cur)
631         saveCount = 0;
632
633     ret = rec_write_multiple(p, saveCount);
634
635     for (i = 0; i<p->cache_cur - saveCount; i++)
636     {
637         struct record_cache_entry *e = p->record_cache + i;
638         rec_free(&e->rec);
639     } 
640     /* i still being used ... */
641     for (j = 0; j<saveCount; j++, i++)
642         memcpy(p->record_cache+j, p->record_cache+i,
643                 sizeof(*p->record_cache));
644     p->cache_cur = saveCount;
645     return ret;
646 }
647
648 static Record *rec_cache_lookup(Records p, zint sysno,
649                                 enum recordCacheFlag flag)
650 {
651     int i;
652     for (i = 0; i<p->cache_cur; i++)
653     {
654         struct record_cache_entry *e = p->record_cache + i;
655         if (e->rec->sysno == sysno)
656         {
657             if (flag != recordFlagNop && e->flag == recordFlagNop)
658                 e->flag = flag;
659             return &e->rec;
660         }
661     }
662     return NULL;
663 }
664
665 static ZEBRA_RES rec_cache_insert(Records p, Record rec, enum recordCacheFlag flag)
666 {
667     struct record_cache_entry *e;
668     ZEBRA_RES ret = ZEBRA_OK;
669
670     if (p->cache_cur == p->cache_max)
671         ret = rec_cache_flush(p, 1);
672     else if (p->cache_cur > 0)
673     {
674         int i, j;
675         int used = 0;
676         for (i = 0; i<p->cache_cur; i++)
677         {
678             Record r = (p->record_cache + i)->rec;
679             for (j = 0; j<REC_NO_INFO; j++)
680                 used += r->size[j];
681         }
682         if (used > p->compression_chunk_size)
683             ret = rec_cache_flush(p, 1);
684     }
685     assert(p->cache_cur < p->cache_max);
686
687     e = p->record_cache + (p->cache_cur)++;
688     e->flag = flag;
689     e->rec = rec_cp(rec);
690     return ret;
691 }
692
693 ZEBRA_RES rec_close(Records *pp)
694 {
695     Records p = *pp;
696     int i;
697     ZEBRA_RES ret = ZEBRA_OK;
698
699     if (!p)
700         return ret;
701
702     zebra_mutex_destroy(&p->mutex);
703     if (rec_cache_flush(p, 0) != ZEBRA_OK)
704         ret = ZEBRA_FAIL;
705
706     xfree(p->record_cache);
707
708     if (p->rw)
709     {
710         if (recindex_write_head(p->recindex, &p->head, sizeof(p->head)) != ZEBRA_OK)
711             ret = ZEBRA_FAIL;
712     }
713
714     recindex_close(p->recindex);
715
716     for (i = 0; i<REC_BLOCK_TYPES; i++)
717     {
718         if (p->data_BFile[i])
719             bf_close(p->data_BFile[i]);
720         xfree(p->data_fname[i]);
721     }
722     xfree(p->tmp_buf);
723     xfree(p);
724     *pp = NULL;
725     return ret;
726 }
727
728 static Record rec_get_int(Records p, zint sysno)
729 {
730     int i, in_size, r;
731     Record rec, *recp;
732     struct record_index_entry entry;
733     zint freeblock;
734     int dst_type;
735     char *nptr, *cptr;
736     char *in_buf = 0;
737     char *bz_buf = 0;
738 #if HAVE_BZLIB_H
739     unsigned int bz_size;
740 #endif
741     char compression_method;
742
743     assert(sysno > 0);
744     assert(p);
745
746     if ((recp = rec_cache_lookup(p, sysno, recordFlagNop)))
747         return rec_cp(*recp);
748
749     if (recindex_read_indx(p->recindex, rec_sysno_to_int(sysno), &entry, sizeof(entry), 1) < 1)
750         return NULL;       /* record is not there! */
751
752     if (!entry.size)
753         return NULL;       /* record is deleted */
754
755     dst_type = (int) (entry.next & 7);
756     assert(dst_type < REC_BLOCK_TYPES);
757     freeblock = entry.next / 8;
758
759     assert(freeblock > 0);
760     
761     rec_tmp_expand(p, entry.size);
762
763     cptr = p->tmp_buf;
764     r = bf_read(p->data_BFile[dst_type], freeblock, 0, 0, cptr);
765     if (r < 0)
766         return 0;
767     memcpy(&freeblock, cptr, sizeof(freeblock));
768
769     while (freeblock)
770     {
771         zint tmp;
772
773         cptr += p->head.block_size[dst_type] - sizeof(freeblock);
774         
775         memcpy(&tmp, cptr, sizeof(tmp));
776         r = bf_read(p->data_BFile[dst_type], freeblock, 0, 0, cptr);
777         if (r < 0)
778             return 0;
779         memcpy(&freeblock, cptr, sizeof(freeblock));
780         memcpy(cptr, &tmp, sizeof(tmp));
781     }
782
783     rec = (Record) xmalloc(sizeof(*rec));
784     rec->sysno = sysno;
785     memcpy(&compression_method, p->tmp_buf + sizeof(zint) + sizeof(short),
786             sizeof(compression_method));
787     in_buf = p->tmp_buf + sizeof(zint) + sizeof(short) + sizeof(char);
788     in_size = entry.size - sizeof(short) - sizeof(char);
789     switch (compression_method)
790     {
791     case REC_COMPRESS_ZLIB:
792 #if HAVE_ZLIB_H
793         bz_size = entry.size * 20 + 100;
794         while (1)
795         {
796             uLongf destLen = bz_size;
797             bz_buf = (char *) xmalloc(bz_size);
798             i = uncompress((Bytef *) bz_buf, &destLen,
799                            (const Bytef *) in_buf, in_size);
800             if (i == Z_OK)
801             {
802                 bz_size = destLen; 
803                 break;
804             }
805             yaz_log(YLOG_LOG, "failed");
806             xfree(bz_buf);
807             bz_size *= 2;
808         }
809         in_buf = bz_buf;
810         in_size = bz_size;
811 #else
812         yaz_log(YLOG_FATAL, "cannot decompress record(s) in ZLIB format");
813         return 0;
814 #endif
815         break;
816     case REC_COMPRESS_BZIP2:
817 #if HAVE_BZLIB_H
818         bz_size = entry.size * 20 + 100;
819         while (1)
820         {
821             bz_buf = (char *) xmalloc(bz_size);
822 #ifdef BZ_CONFIG_ERROR
823             i = BZ2_bzBuffToBuffDecompress
824 #else
825             i = bzBuffToBuffDecompress
826 #endif
827                 (bz_buf, &bz_size, in_buf, in_size, 0, 0);
828             if (i == BZ_OK)
829                 break;
830             yaz_log(YLOG_LOG, "failed");
831             xfree(bz_buf);
832             bz_size *= 2;
833         }
834         in_buf = bz_buf;
835         in_size = bz_size;
836 #else
837         yaz_log(YLOG_FATAL, "cannot decompress record(s) in BZIP2 format");
838         return 0;
839 #endif
840         break;
841     case REC_COMPRESS_NONE:
842         break;
843     }
844     for (i = 0; i<REC_NO_INFO; i++)
845         rec->info[i] = 0;
846
847     nptr = in_buf;                /* skip ref count */
848     while (nptr < in_buf + in_size)
849     {
850         zint this_sysno;
851         int len;
852         rec_decode_zint(&this_sysno, (unsigned char *) nptr, &len);
853         nptr += len;
854
855         for (i = 0; i < REC_NO_INFO; i++)
856         {
857             unsigned int this_size;
858             rec_decode_unsigned(&this_size, (unsigned char *) nptr, &len);
859             nptr += len;
860
861             if (this_size == 0)
862                 continue;
863             rec->size[i] = this_size-1;
864
865             if (rec->size[i])
866             {
867                 rec->info[i] = nptr;
868                 nptr += rec->size[i];
869             }
870             else
871                 rec->info[i] = NULL;
872         }
873         if (this_sysno == rec_sysno_to_int(sysno))
874             break;
875     }
876     for (i = 0; i<REC_NO_INFO; i++)
877     {
878         if (rec->info[i] && rec->size[i])
879         {
880             char *np = xmalloc(rec->size[i]+1);
881             memcpy(np, rec->info[i], rec->size[i]);
882             np[rec->size[i]] = '\0';
883             rec->info[i] = np;
884         }
885         else
886         {
887             assert(rec->info[i] == 0);
888             assert(rec->size[i] == 0);
889         }
890     }
891     xfree(bz_buf);
892     if (rec_cache_insert(p, rec, recordFlagNop) != ZEBRA_OK)
893         return 0;
894     return rec;
895 }
896
897 Record rec_get(Records p, zint sysno)
898 {
899     Record rec;
900     zebra_mutex_lock(&p->mutex);
901
902     rec = rec_get_int(p, sysno);
903     zebra_mutex_unlock(&p->mutex);
904     return rec;
905 }
906
907 Record rec_get_root(Records p)
908 {
909     return rec_get(p, rec_sysno_to_ext(1));
910 }
911
912 static Record rec_new_int(Records p)
913 {
914     int i;
915     zint sysno;
916     Record rec;
917
918     assert(p);
919     rec = (Record) xmalloc(sizeof(*rec));
920     if (1 || p->head.index_free == 0)
921         sysno = (p->head.index_last)++;
922     else
923     {
924         struct record_index_entry entry;
925
926         if (recindex_read_indx(p->recindex, p->head.index_free, &entry, sizeof(entry), 0) < 1)
927         {
928             xfree(rec);
929             return 0;
930         }
931         sysno = p->head.index_free;
932         p->head.index_free = entry.next;
933     }
934     (p->head.no_records)++;
935     rec->sysno = rec_sysno_to_ext(sysno);
936     for (i = 0; i < REC_NO_INFO; i++)
937     {
938         rec->info[i] = NULL;
939         rec->size[i] = 0;
940     }
941     rec_cache_insert(p, rec, recordFlagNew);
942     return rec;
943 }
944
945 Record rec_new(Records p)
946 {
947     Record rec;
948     zebra_mutex_lock(&p->mutex);
949
950     rec = rec_new_int(p);
951     zebra_mutex_unlock(&p->mutex);
952     return rec;
953 }
954
955 ZEBRA_RES rec_del(Records p, Record *recpp)
956 {
957     Record *recp;
958     ZEBRA_RES ret = ZEBRA_OK;
959
960     zebra_mutex_lock(&p->mutex);
961     (p->head.no_records)--;
962     if ((recp = rec_cache_lookup(p, (*recpp)->sysno, recordFlagDelete)))
963     {
964         rec_free(recp);
965         *recp = *recpp;
966     }
967     else
968     {
969         ret = rec_cache_insert(p, *recpp, recordFlagDelete);
970         rec_free(recpp);
971     }
972     zebra_mutex_unlock(&p->mutex);
973     *recpp = NULL;
974     return ret;
975 }
976
977 ZEBRA_RES rec_put(Records p, Record *recpp)
978 {
979     Record *recp;
980     ZEBRA_RES ret = ZEBRA_OK;
981
982     zebra_mutex_lock(&p->mutex);
983     if ((recp = rec_cache_lookup(p, (*recpp)->sysno, recordFlagWrite)))
984     {
985         rec_free(recp);
986         *recp = *recpp;
987     }
988     else
989     {
990         ret = rec_cache_insert(p, *recpp, recordFlagWrite);
991         rec_free(recpp);
992     }
993     zebra_mutex_unlock(&p->mutex);
994     *recpp = NULL;
995     return ret;
996 }
997
998 void rec_free(Record *recpp)
999 {
1000     int i;
1001
1002     if (!*recpp)
1003         return ;
1004     for (i = 0; i < REC_NO_INFO; i++)
1005         xfree((*recpp)->info[i]);
1006     xfree(*recpp);
1007     *recpp = NULL;
1008 }
1009
1010 Record rec_cp(Record rec)
1011 {
1012     Record n;
1013     int i;
1014
1015     n = (Record) xmalloc(sizeof(*n));
1016     n->sysno = rec->sysno;
1017     for (i = 0; i < REC_NO_INFO; i++)
1018         if (!rec->info[i])
1019         {
1020             n->info[i] = NULL;
1021             n->size[i] = 0;
1022         }
1023         else
1024         {
1025             n->size[i] = rec->size[i];
1026             n->info[i] = (char *) xmalloc(rec->size[i]+1);
1027             memcpy(n->info[i], rec->info[i], rec->size[i]);
1028             n->info[i][rec->size[i]] = '\0';
1029         }
1030     return n;
1031 }
1032
1033
1034 char *rec_strdup(const char *s, size_t *len)
1035 {
1036     char *p;
1037
1038     if (!s)
1039     {
1040         *len = 0;
1041         return NULL;
1042     }
1043     *len = strlen(s)+1;
1044     p = (char *) xmalloc(*len);
1045     strcpy(p, s);
1046     return p;
1047 }
1048
1049 void rec_prstat(Records records, int verbose)
1050 {
1051     int i;
1052     zint total_bytes = 0;
1053     
1054     yaz_log (YLOG_LOG,
1055           "Total records                        %8" ZINT_FORMAT0,
1056           records->head.no_records);
1057
1058     for (i = 0; i< REC_BLOCK_TYPES; i++)
1059     {
1060         yaz_log (YLOG_LOG, "Record blocks of size "ZINT_FORMAT,
1061               records->head.block_size[i]);
1062         yaz_log (YLOG_LOG,
1063           " Used/Total/Bytes used            "
1064               ZINT_FORMAT "/" ZINT_FORMAT "/" ZINT_FORMAT,
1065               records->head.block_used[i], records->head.block_last[i]-1,
1066               records->head.block_used[i] * records->head.block_size[i]);
1067         total_bytes +=
1068             records->head.block_used[i] * records->head.block_size[i];
1069
1070         yaz_log(YLOG_LOG, " Block Last " ZINT_FORMAT, records->head.block_last[i]);
1071         if (verbose)
1072         {   /* analyse free lists */
1073             zint no_free = 0;
1074             zint block_free = records->head.block_free[i];
1075             WRBUF w = wrbuf_alloc();
1076             while (block_free)
1077             {
1078                 zint nblock;
1079                 no_free++;
1080                 wrbuf_printf(w, " " ZINT_FORMAT, block_free);
1081                 if (bf_read(records->data_BFile[i],
1082                             block_free, 0, sizeof(nblock), &nblock) != 1)
1083                 {
1084                     yaz_log(YLOG_FATAL|YLOG_ERRNO, "read in %s at free block "
1085                             ZINT_FORMAT,
1086                             records->data_fname[i], block_free);
1087                     break;
1088                 }
1089                 block_free = nblock;
1090             }
1091             yaz_log (YLOG_LOG,
1092                      " Number in free list       %8" ZINT_FORMAT0, no_free);
1093             if (no_free)
1094                 yaz_log(YLOG_LOG, "%s", wrbuf_cstr(w));
1095             wrbuf_destroy(w);
1096         }
1097     }
1098     yaz_log (YLOG_LOG,
1099           "Total size of record index in bytes  %8" ZINT_FORMAT0,
1100           records->head.total_bytes);
1101     yaz_log (YLOG_LOG,
1102           "Total size with overhead             %8" ZINT_FORMAT0,
1103           total_bytes);
1104 }
1105
1106 /*
1107  * Local variables:
1108  * c-basic-offset: 4
1109  * c-file-style: "Stroustrup"
1110  * indent-tabs-mode: nil
1111  * End:
1112  * vim: shiftwidth=4 tabstop=8 expandtab
1113  */
1114