Moved file locking utilities from index/lockutil.c to util/flock.c
[idzebra-moved-to-github.git] / index / recindex.c
1 /* $Id: recindex.c,v 1.46 2005-08-22 08:18:43 adam Exp $
2    Copyright (C) 1995-2005
3    Index Data ApS
4
5 This file is part of the Zebra server.
6
7 Zebra is free software; you can redistribute it and/or modify it under
8 the terms of the GNU General Public License as published by the Free
9 Software Foundation; either version 2, or (at your option) any later
10 version.
11
12 Zebra is distributed in the hope that it will be useful, but WITHOUT ANY
13 WARRANTY; without even the implied warranty of MERCHANTABILITY or
14 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
15 for more details.
16
17 You should have received a copy of the GNU General Public License
18 along with Zebra; see the file LICENSE.zebra.  If not, write to the
19 Free Software Foundation, 59 Temple Place - Suite 330, Boston, MA
20 02111-1307, USA.
21 */
22
23 #define RIDX_CHUNK 128
24
25 /*
26  *  Format of first block
27  *      next       (8 bytes)
28  *      ref_count  (2 bytes)
29  *      block      (500 bytes)
30  *
31  *  Format of subsequent blocks 
32  *      next  (8 bytes)
33  *      block (502 bytes)
34  *
35  *  Format of each record
36  *      sysno
37  *      (length, data) - pairs
38  *      length = 0 if same as previous
39  */
40 #include <stdio.h>
41 #include <stdlib.h>
42 #include <assert.h>
43 #include <string.h>
44
45 #include <yaz/yaz-util.h>
46 #include "recindxp.h"
47
48 #if HAVE_BZLIB_H
49 #include <bzlib.h>
50 #endif
51
52 /* Modify argument to if below: 1=normal, 0=sysno testing */
53 #if 1
54 /* If this is used sysno are not converted (no testing) */
55 #define FAKE_OFFSET 0
56 #define USUAL_RANGE 6000000000LL
57
58 #else
59 /* Use a fake > 2^32 offset so we can test for proper 64-bit handling */
60 #define FAKE_OFFSET 6000000000LL
61 #define USUAL_RANGE 2000000000LL
62 #endif
63
64 static SYSNO rec_sysno_to_ext(SYSNO sysno)
65 {
66     assert(sysno >= 0 && sysno <= USUAL_RANGE);
67     return sysno + FAKE_OFFSET;
68 }
69
70 SYSNO rec_sysno_to_int(SYSNO sysno)
71 {
72     assert(sysno >= FAKE_OFFSET && sysno <= FAKE_OFFSET + USUAL_RANGE);
73     return sysno - FAKE_OFFSET;
74 }
75
76 static void rec_write_head(Records p)
77 {
78     int r;
79
80     assert(p);
81     assert(p->index_BFile);
82
83     r = bf_write(p->index_BFile, 0, 0, sizeof(p->head), &p->head);    
84     if (r)
85     {
86         yaz_log(YLOG_FATAL|YLOG_ERRNO, "write head of %s", p->index_fname);
87         exit(1);
88     }
89 }
90
91 static void rec_tmp_expand(Records p, int size)
92 {
93     if (p->tmp_size < size + 2048 ||
94         p->tmp_size < p->head.block_size[REC_BLOCK_TYPES-1]*2)
95     {
96         xfree(p->tmp_buf);
97         p->tmp_size = size + (int)
98                         (p->head.block_size[REC_BLOCK_TYPES-1])*2 + 2048;
99         p->tmp_buf = (char *) xmalloc(p->tmp_size);
100     }
101 }
102
103 static int read_indx(Records p, SYSNO sysno, void *buf, int itemsize, 
104                       int ignoreError)
105 {
106     int r;
107     zint pos = (sysno-1)*itemsize;
108     int off = (int) (pos%RIDX_CHUNK);
109     int sz1 = RIDX_CHUNK - off;    /* sz1 is size of buffer to read.. */
110
111     if (sz1 > itemsize)
112         sz1 = itemsize;  /* no more than itemsize bytes */
113
114     r = bf_read(p->index_BFile, 1+pos/RIDX_CHUNK, off, sz1, buf);
115     if (r == 1 && sz1 < itemsize) /* boundary? - must read second part */
116         r = bf_read(p->index_BFile, 2+pos/RIDX_CHUNK, 0, itemsize - sz1,
117                         (char*) buf + sz1);
118     if (r != 1 && !ignoreError)
119     {
120         yaz_log(YLOG_FATAL|YLOG_ERRNO, "read in %s at pos %ld",
121               p->index_fname, (long) pos);
122         exit(1);
123     }
124     return r;
125 }
126
127 static void write_indx(Records p, SYSNO sysno, void *buf, int itemsize)
128 {
129     zint pos = (sysno-1)*itemsize;
130     int off = (int) (pos%RIDX_CHUNK);
131     int sz1 = RIDX_CHUNK - off;    /* sz1 is size of buffer to read.. */
132
133     if (sz1 > itemsize)
134         sz1 = itemsize;  /* no more than itemsize bytes */
135
136     bf_write(p->index_BFile, 1+pos/RIDX_CHUNK, off, sz1, buf);
137     if (sz1 < itemsize)   /* boundary? must write second part */
138         bf_write(p->index_BFile, 2+pos/RIDX_CHUNK, 0, itemsize - sz1,
139                 (char*) buf + sz1);
140 }
141
142 static void rec_release_blocks(Records p, SYSNO sysno)
143 {
144     struct record_index_entry entry;
145     zint freeblock;
146     char block_and_ref[sizeof(zint) + sizeof(short)];
147     int dst_type;
148     int first = 1;
149
150     if (read_indx(p, sysno, &entry, sizeof(entry), 1) != 1)
151         return ;
152
153     freeblock = entry.next;
154     assert(freeblock > 0);
155     dst_type = (int) (freeblock & 7);
156     assert(dst_type < REC_BLOCK_TYPES);
157     freeblock = freeblock / 8;
158     while (freeblock)
159     {
160         if (bf_read(p->data_BFile[dst_type], freeblock, 0,
161                      first ? sizeof(block_and_ref) : sizeof(zint),
162                      block_and_ref) != 1)
163         {
164             yaz_log(YLOG_FATAL|YLOG_ERRNO, "read in rec_del_single");
165             exit(1);
166         }
167         if (first)
168         {
169             short ref;
170             memcpy(&ref, block_and_ref + sizeof(freeblock), sizeof(ref));
171             --ref;
172             memcpy(block_and_ref + sizeof(freeblock), &ref, sizeof(ref));
173             if (ref)
174             {
175                 if (bf_write(p->data_BFile[dst_type], freeblock, 0,
176                               sizeof(block_and_ref), block_and_ref))
177                 {
178                     yaz_log(YLOG_FATAL|YLOG_ERRNO, "write in rec_del_single");
179                     exit(1);
180                 }
181                 return;
182             }
183             first = 0;
184         }
185         
186         if (bf_write(p->data_BFile[dst_type], freeblock, 0, sizeof(freeblock),
187                       &p->head.block_free[dst_type]))
188         {
189             yaz_log(YLOG_FATAL|YLOG_ERRNO, "write in rec_del_single");
190             exit(1);
191         }
192         p->head.block_free[dst_type] = freeblock;
193         memcpy(&freeblock, block_and_ref, sizeof(freeblock));
194
195         p->head.block_used[dst_type]--;
196     }
197     p->head.total_bytes -= entry.size;
198 }
199
200 static void rec_delete_single(Records p, Record rec)
201 {
202     struct record_index_entry entry;
203
204     rec_release_blocks(p, rec_sysno_to_int(rec->sysno));
205
206     entry.next = p->head.index_free;
207     entry.size = 0;
208     p->head.index_free = rec_sysno_to_int(rec->sysno);
209     write_indx(p, rec_sysno_to_int(rec->sysno), &entry, sizeof(entry));
210 }
211
212 static void rec_write_tmp_buf(Records p, int size, SYSNO *sysnos)
213 {
214     struct record_index_entry entry;
215     int no_written = 0;
216     char *cptr = p->tmp_buf;
217     zint block_prev = -1, block_free;
218     int dst_type = 0;
219     int i;
220
221     for (i = 1; i<REC_BLOCK_TYPES; i++)
222         if (size >= p->head.block_move[i])
223             dst_type = i;
224     while (no_written < size)
225     {
226         block_free = p->head.block_free[dst_type];
227         if (block_free)
228         {
229             if (bf_read(p->data_BFile[dst_type],
230                          block_free, 0, sizeof(*p->head.block_free),
231                          &p->head.block_free[dst_type]) != 1)
232             {
233                 yaz_log(YLOG_FATAL|YLOG_ERRNO, "read in %s at free block "
234                          ZINT_FORMAT,
235                          p->data_fname[dst_type], block_free);
236                 exit(1);
237             }
238         }
239         else
240             block_free = p->head.block_last[dst_type]++;
241         if (block_prev == -1)
242         {
243             entry.next = block_free*8 + dst_type;
244             entry.size = size;
245             p->head.total_bytes += size;
246             while (*sysnos > 0)
247             {
248                 write_indx(p, *sysnos, &entry, sizeof(entry));
249                 sysnos++;
250             }
251         }
252         else
253         {
254             memcpy(cptr, &block_free, sizeof(block_free));
255             bf_write(p->data_BFile[dst_type], block_prev, 0, 0, cptr);
256             cptr = p->tmp_buf + no_written;
257         }
258         block_prev = block_free;
259         no_written += (int)(p->head.block_size[dst_type]) - sizeof(zint);
260         p->head.block_used[dst_type]++;
261     }
262     assert(block_prev != -1);
263     block_free = 0;
264     memcpy(cptr, &block_free, sizeof(block_free));
265     bf_write(p->data_BFile[dst_type], block_prev, 0,
266               sizeof(block_free) + (p->tmp_buf+size) - cptr, cptr);
267 }
268
269 Records rec_open(BFiles bfs, int rw, int compression_method)
270 {
271     Records p;
272     int i, r;
273     int version;
274
275     p = (Records) xmalloc(sizeof(*p));
276     p->compression_method = compression_method;
277     p->rw = rw;
278     p->tmp_size = 1024;
279     p->tmp_buf = (char *) xmalloc(p->tmp_size);
280     p->index_fname = "reci";
281     p->index_BFile = bf_open(bfs, p->index_fname, RIDX_CHUNK, rw);
282     if (p->index_BFile == NULL)
283     {
284         yaz_log(YLOG_FATAL|YLOG_ERRNO, "open %s", p->index_fname);
285         exit(1);
286     }
287     r = bf_read(p->index_BFile, 0, 0, 0, p->tmp_buf);
288     switch (r)
289     {
290     case 0:
291         memcpy(p->head.magic, REC_HEAD_MAGIC, sizeof(p->head.magic));
292         sprintf(p->head.version, "%3d", REC_VERSION);
293         p->head.index_free = 0;
294         p->head.index_last = 1;
295         p->head.no_records = 0;
296         p->head.total_bytes = 0;
297         for (i = 0; i<REC_BLOCK_TYPES; i++)
298         {
299             p->head.block_free[i] = 0;
300             p->head.block_last[i] = 1;
301             p->head.block_used[i] = 0;
302         }
303         p->head.block_size[0] = 128;
304         p->head.block_move[0] = 0;
305         for (i = 1; i<REC_BLOCK_TYPES; i++)
306         {
307             p->head.block_size[i] = p->head.block_size[i-1] * 4;
308             p->head.block_move[i] = p->head.block_size[i] * 24;
309         }
310         if (rw)
311             rec_write_head(p);
312         break;
313     case 1:
314         memcpy(&p->head, p->tmp_buf, sizeof(p->head));
315         if (memcmp(p->head.magic, REC_HEAD_MAGIC, sizeof(p->head.magic)))
316         {
317             yaz_log(YLOG_FATAL, "file %s has bad format", p->index_fname);
318             exit(1);
319         }
320         version = atoi(p->head.version);
321         if (version != REC_VERSION)
322         {
323             yaz_log(YLOG_FATAL, "file %s is version %d, but version"
324                   " %d is required", p->index_fname, version, REC_VERSION);
325             exit(1);
326         }
327         break;
328     }
329     for (i = 0; i<REC_BLOCK_TYPES; i++)
330     {
331         char str[80];
332         sprintf(str, "recd%c", i + 'A');
333         p->data_fname[i] = (char *) xmalloc(strlen(str)+1);
334         strcpy(p->data_fname[i], str);
335         p->data_BFile[i] = NULL;
336     }
337     for (i = 0; i<REC_BLOCK_TYPES; i++)
338     {
339         if (!(p->data_BFile[i] = bf_open(bfs, p->data_fname[i],
340                                          (int) (p->head.block_size[i]),
341                                          rw)))
342         {
343             yaz_log(YLOG_FATAL|YLOG_ERRNO, "bf_open %s", p->data_fname[i]);
344             exit(1);
345         }
346     }
347     p->cache_max = 400;
348     p->cache_cur = 0;
349     p->record_cache = (struct record_cache_entry *)
350         xmalloc(sizeof(*p->record_cache)*p->cache_max);
351     zebra_mutex_init(&p->mutex);
352     return p;
353 }
354
355 static void rec_encode_unsigned(unsigned n, unsigned char *buf, int *len)
356 {
357     (*len) = 0;
358     while (n > 127)
359     {
360         buf[*len] = 128 + (n & 127);
361         n = n >> 7;
362         (*len)++;
363     }
364     buf[*len] = n;
365     (*len)++;
366 }
367
368 static void rec_decode_unsigned(unsigned *np, unsigned char *buf, int *len)
369 {
370     unsigned n = 0;
371     unsigned w = 1;
372     (*len) = 0;
373
374     while (buf[*len] > 127)
375     {
376         n += w*(buf[*len] & 127);
377         w = w << 7;
378         (*len)++;
379     }
380     n += w * buf[*len];
381     (*len)++;
382     *np = n;
383 }
384
385 static void rec_encode_zint(zint n, unsigned char *buf, int *len)
386 {
387     (*len) = 0;
388     while (n > 127)
389     {
390         buf[*len] = (unsigned) (128 + (n & 127));
391         n = n >> 7;
392         (*len)++;
393     }
394     buf[*len] = (unsigned) n;
395     (*len)++;
396 }
397
398 static void rec_decode_zint(zint *np, unsigned char *buf, int *len)
399 {
400     zint  n = 0;
401     zint w = 1;
402     (*len) = 0;
403
404     while (buf[*len] > 127)
405     {
406         n += w*(buf[*len] & 127);
407         w = w << 7;
408         (*len)++;
409     }
410     n += w * buf[*len];
411     (*len)++;
412     *np = n;
413 }
414
415 static void rec_cache_flush_block1(Records p, Record rec, Record last_rec,
416                                    char **out_buf, int *out_size,
417                                    int *out_offset)
418 {
419     int i;
420     int len;
421
422     for (i = 0; i<REC_NO_INFO; i++)
423     {
424         if (*out_offset + (int) rec->size[i] + 20 > *out_size)
425         {
426             int new_size = *out_offset + rec->size[i] + 65536;
427             char *np = (char *) xmalloc(new_size);
428             if (*out_offset)
429                 memcpy(np, *out_buf, *out_offset);
430             xfree(*out_buf);
431             *out_size = new_size;
432             *out_buf = np;
433         }
434         if (i == 0)
435         {
436             rec_encode_zint(rec_sysno_to_int(rec->sysno), 
437                             (unsigned char *) *out_buf + *out_offset, &len);
438             (*out_offset) += len;
439         }
440         if (rec->size[i] == 0)
441         {
442             rec_encode_unsigned(1, (unsigned char *) *out_buf + *out_offset,
443                                 &len);
444             (*out_offset) += len;
445         }
446         else if (last_rec && rec->size[i] == last_rec->size[i] &&
447                  !memcmp(rec->info[i], last_rec->info[i], rec->size[i]))
448         {
449             rec_encode_unsigned(0, (unsigned char *) *out_buf + *out_offset,
450                                 &len);
451             (*out_offset) += len;
452         }
453         else
454         {
455             rec_encode_unsigned(rec->size[i]+1,
456                                 (unsigned char *) *out_buf + *out_offset,
457                                 &len);
458             (*out_offset) += len;
459             memcpy(*out_buf + *out_offset, rec->info[i], rec->size[i]);
460             (*out_offset) += rec->size[i];
461         }
462     }
463 }
464
465 static void rec_write_multiple(Records p, int saveCount)
466 {
467     int i;
468     short ref_count = 0;
469     char compression_method;
470     Record last_rec = 0;
471     int out_size = 1000;
472     int out_offset = 0;
473     char *out_buf = (char *) xmalloc(out_size);
474     SYSNO *sysnos = (SYSNO *) xmalloc(sizeof(*sysnos) * (p->cache_cur + 1));
475     SYSNO *sysnop = sysnos;
476
477     for (i = 0; i<p->cache_cur - saveCount; i++)
478     {
479         struct record_cache_entry *e = p->record_cache + i;
480         switch (e->flag)
481         {
482         case recordFlagNew:
483             rec_cache_flush_block1(p, e->rec, last_rec, &out_buf,
484                                     &out_size, &out_offset);
485             *sysnop++ = rec_sysno_to_int(e->rec->sysno);
486             ref_count++;
487             e->flag = recordFlagNop;
488             last_rec = e->rec;
489             break;
490         case recordFlagWrite:
491             rec_release_blocks(p, rec_sysno_to_int(e->rec->sysno));
492             rec_cache_flush_block1(p, e->rec, last_rec, &out_buf,
493                                     &out_size, &out_offset);
494             *sysnop++ = rec_sysno_to_int(e->rec->sysno);
495             ref_count++;
496             e->flag = recordFlagNop;
497             last_rec = e->rec;
498             break;
499         case recordFlagDelete:
500             rec_delete_single(p, e->rec);
501             e->flag = recordFlagNop;
502             break;
503         default:
504             break;
505         }
506     }
507
508     *sysnop = -1;
509     if (ref_count)
510     {
511         unsigned int csize = 0;  /* indicate compression "not performed yet" */
512         compression_method = p->compression_method;
513         switch (compression_method)
514         {
515         case REC_COMPRESS_BZIP2:
516 #if HAVE_BZLIB_H        
517             csize = out_offset + (out_offset >> 6) + 620;
518             rec_tmp_expand(p, csize);
519 #ifdef BZ_CONFIG_ERROR
520             i = BZ2_bzBuffToBuffCompress 
521 #else
522             i = bzBuffToBuffCompress 
523 #endif
524                                     (p->tmp_buf+sizeof(zint)+sizeof(short)+
525                                       sizeof(char),
526                                       &csize, out_buf, out_offset, 1, 0, 30);
527             if (i != BZ_OK)
528             {
529                 yaz_log(YLOG_WARN, "bzBuffToBuffCompress error code=%d", i);
530                 csize = 0;
531             }
532             yaz_log(YLOG_LOG, "compress %4d %5d %5d", ref_count, out_offset,
533                   csize);
534 #endif
535             break;
536         case REC_COMPRESS_NONE:
537             break;
538         }
539         if (!csize)  
540         {
541             /* either no compression or compression not supported ... */
542             csize = out_offset;
543             rec_tmp_expand(p, csize);
544             memcpy(p->tmp_buf + sizeof(zint) + sizeof(short) + sizeof(char),
545                     out_buf, out_offset);
546             csize = out_offset;
547             compression_method = REC_COMPRESS_NONE;
548         }
549         memcpy(p->tmp_buf + sizeof(zint), &ref_count, sizeof(ref_count));
550         memcpy(p->tmp_buf + sizeof(zint)+sizeof(short),
551                 &compression_method, sizeof(compression_method));
552                 
553         /* -------- compression */
554         rec_write_tmp_buf(p, csize + sizeof(short) + sizeof(char), sysnos);
555     }
556     xfree(out_buf);
557     xfree(sysnos);
558 }
559
560 static void rec_cache_flush(Records p, int saveCount)
561 {
562     int i, j;
563
564     if (saveCount >= p->cache_cur)
565         saveCount = 0;
566
567     rec_write_multiple(p, saveCount);
568
569     for (i = 0; i<p->cache_cur - saveCount; i++)
570     {
571         struct record_cache_entry *e = p->record_cache + i;
572         rec_rm(&e->rec);
573     } 
574     /* i still being used ... */
575     for (j = 0; j<saveCount; j++, i++)
576         memcpy(p->record_cache+j, p->record_cache+i,
577                 sizeof(*p->record_cache));
578     p->cache_cur = saveCount;
579 }
580
581 static Record *rec_cache_lookup(Records p, SYSNO sysno,
582                                 enum recordCacheFlag flag)
583 {
584     int i;
585     for (i = 0; i<p->cache_cur; i++)
586     {
587         struct record_cache_entry *e = p->record_cache + i;
588         if (e->rec->sysno == sysno)
589         {
590             if (flag != recordFlagNop && e->flag == recordFlagNop)
591                 e->flag = flag;
592             return &e->rec;
593         }
594     }
595     return NULL;
596 }
597
598 static void rec_cache_insert(Records p, Record rec, enum recordCacheFlag flag)
599 {
600     struct record_cache_entry *e;
601
602     if (p->cache_cur == p->cache_max)
603         rec_cache_flush(p, 1);
604     else if (p->cache_cur > 0)
605     {
606         int i, j;
607         int used = 0;
608         for (i = 0; i<p->cache_cur; i++)
609         {
610             Record r = (p->record_cache + i)->rec;
611             for (j = 0; j<REC_NO_INFO; j++)
612                 used += r->size[j];
613         }
614         if (used > 90000)
615             rec_cache_flush(p, 1);
616     }
617     assert(p->cache_cur < p->cache_max);
618
619     e = p->record_cache + (p->cache_cur)++;
620     e->flag = flag;
621     e->rec = rec_cp(rec);
622 }
623
624 void rec_close(Records *pp)
625 {
626     Records p = *pp;
627     int i;
628
629     assert(p);
630
631     zebra_mutex_destroy(&p->mutex);
632     rec_cache_flush(p, 0);
633     xfree(p->record_cache);
634
635     if (p->rw)
636         rec_write_head(p);
637
638     if (p->index_BFile)
639         bf_close(p->index_BFile);
640
641     for (i = 0; i<REC_BLOCK_TYPES; i++)
642     {
643         if (p->data_BFile[i])
644             bf_close(p->data_BFile[i]);
645         xfree(p->data_fname[i]);
646     }
647     xfree(p->tmp_buf);
648     xfree(p);
649     *pp = NULL;
650 }
651
652 static Record rec_get_int(Records p, SYSNO sysno)
653 {
654     int i, in_size, r;
655     Record rec, *recp;
656     struct record_index_entry entry;
657     zint freeblock;
658     int dst_type;
659     char *nptr, *cptr;
660     char *in_buf = 0;
661     char *bz_buf = 0;
662 #if HAVE_BZLIB_H
663     unsigned int bz_size;
664 #endif
665     char compression_method;
666
667     assert(sysno > 0);
668     assert(p);
669
670     if ((recp = rec_cache_lookup(p, sysno, recordFlagNop)))
671         return rec_cp(*recp);
672
673     if (read_indx(p, rec_sysno_to_int(sysno), &entry, sizeof(entry), 1) < 1)
674         return NULL;       /* record is not there! */
675
676     if (!entry.size)
677         return NULL;       /* record is deleted */
678
679     dst_type = (int) (entry.next & 7);
680     assert(dst_type < REC_BLOCK_TYPES);
681     freeblock = entry.next / 8;
682
683     assert(freeblock > 0);
684     
685     rec_tmp_expand(p, entry.size);
686
687     cptr = p->tmp_buf;
688     r = bf_read(p->data_BFile[dst_type], freeblock, 0, 0, cptr);
689     if (r < 0)
690         return 0;
691     memcpy(&freeblock, cptr, sizeof(freeblock));
692
693     while (freeblock)
694     {
695         zint tmp;
696
697         cptr += p->head.block_size[dst_type] - sizeof(freeblock);
698         
699         memcpy(&tmp, cptr, sizeof(tmp));
700         r = bf_read(p->data_BFile[dst_type], freeblock, 0, 0, cptr);
701         if (r < 0)
702             return 0;
703         memcpy(&freeblock, cptr, sizeof(freeblock));
704         memcpy(cptr, &tmp, sizeof(tmp));
705     }
706
707     rec = (Record) xmalloc(sizeof(*rec));
708     rec->sysno = sysno;
709     memcpy(&compression_method, p->tmp_buf + sizeof(zint) + sizeof(short),
710             sizeof(compression_method));
711     in_buf = p->tmp_buf + sizeof(zint) + sizeof(short) + sizeof(char);
712     in_size = entry.size - sizeof(short) - sizeof(char);
713     switch (compression_method)
714     {
715     case REC_COMPRESS_BZIP2:
716 #if HAVE_BZLIB_H
717         bz_size = entry.size * 20 + 100;
718         while (1)
719         {
720             bz_buf = (char *) xmalloc(bz_size);
721 #ifdef BZ_CONFIG_ERROR
722             i = BZ2_bzBuffToBuffDecompress
723 #else
724             i = bzBuffToBuffDecompress
725 #endif
726                 (bz_buf, &bz_size, in_buf, in_size, 0, 0);
727             yaz_log(YLOG_LOG, "decompress %5d %5d", in_size, bz_size);
728             if (i == BZ_OK)
729                 break;
730             yaz_log(YLOG_LOG, "failed");
731             xfree(bz_buf);
732             bz_size *= 2;
733         }
734         in_buf = bz_buf;
735         in_size = bz_size;
736 #else
737         yaz_log(YLOG_FATAL, "cannot decompress record(s) in BZIP2 format");
738         exit(1);
739 #endif
740         break;
741     case REC_COMPRESS_NONE:
742         break;
743     }
744     for (i = 0; i<REC_NO_INFO; i++)
745         rec->info[i] = 0;
746
747     nptr = in_buf;                /* skip ref count */
748     while (nptr < in_buf + in_size)
749     {
750         zint this_sysno;
751         int len;
752         rec_decode_zint(&this_sysno, (unsigned char *) nptr, &len);
753         nptr += len;
754
755         for (i = 0; i < REC_NO_INFO; i++)
756         {
757             unsigned int this_size;
758             rec_decode_unsigned(&this_size, (unsigned char *) nptr, &len);
759             nptr += len;
760
761             if (this_size == 0)
762                 continue;
763             rec->size[i] = this_size-1;
764
765             if (rec->size[i])
766             {
767                 rec->info[i] = nptr;
768                 nptr += rec->size[i];
769             }
770             else
771                 rec->info[i] = NULL;
772         }
773         if (this_sysno == rec_sysno_to_int(sysno))
774             break;
775     }
776     for (i = 0; i<REC_NO_INFO; i++)
777     {
778         if (rec->info[i] && rec->size[i])
779         {
780             char *np = xmalloc(rec->size[i]+1);
781             memcpy(np, rec->info[i], rec->size[i]);
782             np[rec->size[i]] = '\0';
783             rec->info[i] = np;
784         }
785         else
786         {
787             assert(rec->info[i] == 0);
788             assert(rec->size[i] == 0);
789         }
790     }
791     xfree(bz_buf);
792     rec_cache_insert(p, rec, recordFlagNop);
793     return rec;
794 }
795
796 Record rec_get(Records p, SYSNO sysno)
797 {
798     Record rec;
799     zebra_mutex_lock(&p->mutex);
800
801     rec = rec_get_int(p, sysno);
802     zebra_mutex_unlock(&p->mutex);
803     return rec;
804 }
805
806 Record rec_get_root(Records p)
807 {
808     return rec_get(p, rec_sysno_to_ext(1));
809 }
810
811 static Record rec_new_int(Records p)
812 {
813     int i;
814     SYSNO sysno;
815     Record rec;
816
817     assert(p);
818     rec = (Record) xmalloc(sizeof(*rec));
819     if (1 || p->head.index_free == 0)
820         sysno = (p->head.index_last)++;
821     else
822     {
823         struct record_index_entry entry;
824
825         read_indx(p, p->head.index_free, &entry, sizeof(entry), 0);
826         sysno = p->head.index_free;
827         p->head.index_free = entry.next;
828     }
829     (p->head.no_records)++;
830     rec->sysno = rec_sysno_to_ext(sysno);
831     for (i = 0; i < REC_NO_INFO; i++)
832     {
833         rec->info[i] = NULL;
834         rec->size[i] = 0;
835     }
836     rec_cache_insert(p, rec, recordFlagNew);
837     return rec;
838 }
839
840 Record rec_new(Records p)
841 {
842     Record rec;
843     zebra_mutex_lock(&p->mutex);
844
845     rec = rec_new_int(p);
846     zebra_mutex_unlock(&p->mutex);
847     return rec;
848 }
849
850 void rec_del(Records p, Record *recpp)
851 {
852     Record *recp;
853
854     zebra_mutex_lock(&p->mutex);
855     (p->head.no_records)--;
856     if ((recp = rec_cache_lookup(p, (*recpp)->sysno, recordFlagDelete)))
857     {
858         rec_rm(recp);
859         *recp = *recpp;
860     }
861     else
862     {
863         rec_cache_insert(p, *recpp, recordFlagDelete);
864         rec_rm(recpp);
865     }
866     zebra_mutex_unlock(&p->mutex);
867     *recpp = NULL;
868 }
869
870 void rec_put(Records p, Record *recpp)
871 {
872     Record *recp;
873
874     zebra_mutex_lock(&p->mutex);
875     if ((recp = rec_cache_lookup(p, (*recpp)->sysno, recordFlagWrite)))
876     {
877         rec_rm(recp);
878         *recp = *recpp;
879     }
880     else
881     {
882         rec_cache_insert(p, *recpp, recordFlagWrite);
883         rec_rm(recpp);
884     }
885     zebra_mutex_unlock(&p->mutex);
886     *recpp = NULL;
887 }
888
889 void rec_rm(Record *recpp)
890 {
891     int i;
892
893     if (!*recpp)
894         return ;
895     for (i = 0; i < REC_NO_INFO; i++)
896         xfree((*recpp)->info[i]);
897     xfree(*recpp);
898     *recpp = NULL;
899 }
900
901 Record rec_cp(Record rec)
902 {
903     Record n;
904     int i;
905
906     n = (Record) xmalloc(sizeof(*n));
907     n->sysno = rec->sysno;
908     for (i = 0; i < REC_NO_INFO; i++)
909         if (!rec->info[i])
910         {
911             n->info[i] = NULL;
912             n->size[i] = 0;
913         }
914         else
915         {
916             n->size[i] = rec->size[i];
917             n->info[i] = (char *) xmalloc(rec->size[i]);
918             memcpy(n->info[i], rec->info[i], rec->size[i]);
919         }
920     return n;
921 }
922
923
924 char *rec_strdup(const char *s, size_t *len)
925 {
926     char *p;
927
928     if (!s)
929     {
930         *len = 0;
931         return NULL;
932     }
933     *len = strlen(s)+1;
934     p = (char *) xmalloc(*len);
935     strcpy(p, s);
936     return p;
937 }
938