c3f560ec9d4bb33a2da0adbfe5b1078631652132
[idzebra-moved-to-github.git] / index / recindex.c
1 /* $Id: recindex.c,v 1.50 2006-06-07 10:14:41 adam Exp $
2    Copyright (C) 1995-2006
3    Index Data ApS
4
5 This file is part of the Zebra server.
6
7 Zebra is free software; you can redistribute it and/or modify it under
8 the terms of the GNU General Public License as published by the Free
9 Software Foundation; either version 2, or (at your option) any later
10 version.
11
12 Zebra is distributed in the hope that it will be useful, but WITHOUT ANY
13 WARRANTY; without even the implied warranty of MERCHANTABILITY or
14 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
15 for more details.
16
17 You should have received a copy of the GNU General Public License
18 along with Zebra; see the file LICENSE.zebra.  If not, write to the
19 Free Software Foundation, 59 Temple Place - Suite 330, Boston, MA
20 02111-1307, USA.
21 */
22
23 #define RIDX_CHUNK 128
24
25 /*
26  *  Format of first block
27  *      next       (8 bytes)
28  *      ref_count  (2 bytes)
29  *      block      (500 bytes)
30  *
31  *  Format of subsequent blocks 
32  *      next  (8 bytes)
33  *      block (502 bytes)
34  *
35  *  Format of each record
36  *      sysno
37  *      (length, data) - pairs
38  *      length = 0 if same as previous
39  */
40 #include <stdio.h>
41 #include <stdlib.h>
42 #include <assert.h>
43 #include <string.h>
44
45 #include <yaz/yaz-util.h>
46 #include "recindxp.h"
47
48 #if HAVE_BZLIB_H
49 #include <bzlib.h>
50 #endif
51
52 /* Modify argument to if below: 1=normal, 0=sysno testing */
53 #if 1
54 /* If this is used sysno are not converted (no testing) */
55 #define FAKE_OFFSET 0
56 #define USUAL_RANGE 6000000000LL
57
58 #else
59 /* Use a fake > 2^32 offset so we can test for proper 64-bit handling */
60 #define FAKE_OFFSET 6000000000LL
61 #define USUAL_RANGE 2000000000LL
62 #endif
63
64 static SYSNO rec_sysno_to_ext(SYSNO sysno)
65 {
66     assert(sysno >= 0 && sysno <= USUAL_RANGE);
67     return sysno + FAKE_OFFSET;
68 }
69
70 SYSNO rec_sysno_to_int(SYSNO sysno)
71 {
72     assert(sysno >= FAKE_OFFSET && sysno <= FAKE_OFFSET + USUAL_RANGE);
73     return sysno - FAKE_OFFSET;
74 }
75
76 static ZEBRA_RES rec_write_head(Records p)
77 {
78     int r;
79
80     assert(p);
81     assert(p->index_BFile);
82
83     r = bf_write(p->index_BFile, 0, 0, sizeof(p->head), &p->head);    
84     if (r)
85     {
86         yaz_log(YLOG_FATAL|YLOG_ERRNO, "write head of %s", p->index_fname);
87         return ZEBRA_FAIL;
88     }
89     return ZEBRA_OK;
90 }
91
92 static void rec_tmp_expand(Records p, int size)
93 {
94     if (p->tmp_size < size + 2048 ||
95         p->tmp_size < p->head.block_size[REC_BLOCK_TYPES-1]*2)
96     {
97         xfree(p->tmp_buf);
98         p->tmp_size = size + (int)
99                         (p->head.block_size[REC_BLOCK_TYPES-1])*2 + 2048;
100         p->tmp_buf = (char *) xmalloc(p->tmp_size);
101     }
102 }
103
104 static int read_indx(Records p, SYSNO sysno, void *buf, int itemsize, 
105                       int ignoreError)
106 {
107     int r;
108     zint pos = (sysno-1)*itemsize;
109     int off = CAST_ZINT_TO_INT(pos%RIDX_CHUNK);
110     int sz1 = RIDX_CHUNK - off;    /* sz1 is size of buffer to read.. */
111
112     if (sz1 > itemsize)
113         sz1 = itemsize;  /* no more than itemsize bytes */
114
115     r = bf_read(p->index_BFile, 1+pos/RIDX_CHUNK, off, sz1, buf);
116     if (r == 1 && sz1 < itemsize) /* boundary? - must read second part */
117         r = bf_read(p->index_BFile, 2+pos/RIDX_CHUNK, 0, itemsize - sz1,
118                         (char*) buf + sz1);
119     if (r != 1 && !ignoreError)
120     {
121         yaz_log(YLOG_FATAL|YLOG_ERRNO, "read in %s at pos %ld",
122                 p->index_fname, (long) pos);
123     }
124     return r;
125 }
126
127 static void write_indx(Records p, SYSNO sysno, void *buf, int itemsize)
128 {
129     zint pos = (sysno-1)*itemsize;
130     int off = CAST_ZINT_TO_INT(pos%RIDX_CHUNK);
131     int sz1 = RIDX_CHUNK - off;    /* sz1 is size of buffer to read.. */
132
133     if (sz1 > itemsize)
134         sz1 = itemsize;  /* no more than itemsize bytes */
135
136     bf_write(p->index_BFile, 1+pos/RIDX_CHUNK, off, sz1, buf);
137     if (sz1 < itemsize)   /* boundary? must write second part */
138         bf_write(p->index_BFile, 2+pos/RIDX_CHUNK, 0, itemsize - sz1,
139                 (char*) buf + sz1);
140 }
141
142 static ZEBRA_RES rec_release_blocks(Records p, SYSNO sysno)
143 {
144     struct record_index_entry entry;
145     zint freeblock;
146     char block_and_ref[sizeof(zint) + sizeof(short)];
147     int dst_type;
148     int first = 1;
149
150     if (read_indx(p, sysno, &entry, sizeof(entry), 1) != 1)
151         return ZEBRA_FAIL;
152
153     freeblock = entry.next;
154     assert(freeblock > 0);
155     dst_type = CAST_ZINT_TO_INT(freeblock & 7);
156     assert(dst_type < REC_BLOCK_TYPES);
157     freeblock = freeblock / 8;
158     while (freeblock)
159     {
160         if (bf_read(p->data_BFile[dst_type], freeblock, 0,
161                      first ? sizeof(block_and_ref) : sizeof(zint),
162                      block_and_ref) != 1)
163         {
164             yaz_log(YLOG_FATAL|YLOG_ERRNO, "read in rec_del_single");
165             return ZEBRA_FAIL;
166         }
167         if (first)
168         {
169             short ref;
170             memcpy(&ref, block_and_ref + sizeof(freeblock), sizeof(ref));
171             --ref;
172             memcpy(block_and_ref + sizeof(freeblock), &ref, sizeof(ref));
173             if (ref)
174             {
175                 if (bf_write(p->data_BFile[dst_type], freeblock, 0,
176                               sizeof(block_and_ref), block_and_ref))
177                 {
178                     yaz_log(YLOG_FATAL|YLOG_ERRNO, "write in rec_del_single");
179                     return ZEBRA_FAIL;
180                 }
181                 return ZEBRA_OK;
182             }
183             first = 0;
184         }
185         
186         if (bf_write(p->data_BFile[dst_type], freeblock, 0, sizeof(freeblock),
187                       &p->head.block_free[dst_type]))
188         {
189             yaz_log(YLOG_FATAL|YLOG_ERRNO, "write in rec_del_single");
190             return ZEBRA_FAIL;
191         }
192         p->head.block_free[dst_type] = freeblock;
193         memcpy(&freeblock, block_and_ref, sizeof(freeblock));
194
195         p->head.block_used[dst_type]--;
196     }
197     p->head.total_bytes -= entry.size;
198     return ZEBRA_OK;
199 }
200
201 static ZEBRA_RES rec_delete_single(Records p, Record rec)
202 {
203     struct record_index_entry entry;
204
205     if (rec_release_blocks(p, rec_sysno_to_int(rec->sysno)) != ZEBRA_OK)
206         return ZEBRA_FAIL;
207
208     entry.next = p->head.index_free;
209     entry.size = 0;
210     p->head.index_free = rec_sysno_to_int(rec->sysno);
211     write_indx(p, rec_sysno_to_int(rec->sysno), &entry, sizeof(entry));
212     return ZEBRA_OK;
213 }
214
215 static ZEBRA_RES rec_write_tmp_buf(Records p, int size, SYSNO *sysnos)
216 {
217     struct record_index_entry entry;
218     int no_written = 0;
219     char *cptr = p->tmp_buf;
220     zint block_prev = -1, block_free;
221     int dst_type = 0;
222     int i;
223
224     for (i = 1; i<REC_BLOCK_TYPES; i++)
225         if (size >= p->head.block_move[i])
226             dst_type = i;
227     while (no_written < size)
228     {
229         block_free = p->head.block_free[dst_type];
230         if (block_free)
231         {
232             if (bf_read(p->data_BFile[dst_type],
233                          block_free, 0, sizeof(*p->head.block_free),
234                          &p->head.block_free[dst_type]) != 1)
235             {
236                 yaz_log(YLOG_FATAL|YLOG_ERRNO, "read in %s at free block "
237                          ZINT_FORMAT,
238                          p->data_fname[dst_type], block_free);
239                 return ZEBRA_FAIL;
240             }
241         }
242         else
243             block_free = p->head.block_last[dst_type]++;
244         if (block_prev == -1)
245         {
246             entry.next = block_free*8 + dst_type;
247             entry.size = size;
248             p->head.total_bytes += size;
249             while (*sysnos > 0)
250             {
251                 write_indx(p, *sysnos, &entry, sizeof(entry));
252                 sysnos++;
253             }
254         }
255         else
256         {
257             memcpy(cptr, &block_free, sizeof(block_free));
258             bf_write(p->data_BFile[dst_type], block_prev, 0, 0, cptr);
259             cptr = p->tmp_buf + no_written;
260         }
261         block_prev = block_free;
262         no_written += CAST_ZINT_TO_INT(p->head.block_size[dst_type]) 
263             - sizeof(zint);
264         p->head.block_used[dst_type]++;
265     }
266     assert(block_prev != -1);
267     block_free = 0;
268     memcpy(cptr, &block_free, sizeof(block_free));
269     bf_write(p->data_BFile[dst_type], block_prev, 0,
270               sizeof(block_free) + (p->tmp_buf+size) - cptr, cptr);
271     return ZEBRA_OK;
272 }
273
274 Records rec_open(BFiles bfs, int rw, int compression_method)
275 {
276     Records p;
277     int i, r;
278     int version;
279     ZEBRA_RES ret = ZEBRA_OK;
280
281     p = (Records) xmalloc(sizeof(*p));
282     p->compression_method = compression_method;
283     p->rw = rw;
284     p->tmp_size = 1024;
285     p->index_fname = "reci";
286     p->index_BFile = bf_open(bfs, p->index_fname, RIDX_CHUNK, rw);
287     if (p->index_BFile == NULL)
288     {
289         yaz_log(YLOG_FATAL|YLOG_ERRNO, "open %s", p->index_fname);
290         xfree(p);
291         return 0;
292     }
293     p->tmp_buf = (char *) xmalloc(p->tmp_size);
294     r = bf_read(p->index_BFile, 0, 0, 0, p->tmp_buf);
295     switch (r)
296     {
297     case 0:
298         memcpy(p->head.magic, REC_HEAD_MAGIC, sizeof(p->head.magic));
299         sprintf(p->head.version, "%3d", REC_VERSION);
300         p->head.index_free = 0;
301         p->head.index_last = 1;
302         p->head.no_records = 0;
303         p->head.total_bytes = 0;
304         for (i = 0; i<REC_BLOCK_TYPES; i++)
305         {
306             p->head.block_free[i] = 0;
307             p->head.block_last[i] = 1;
308             p->head.block_used[i] = 0;
309         }
310         p->head.block_size[0] = 128;
311         p->head.block_move[0] = 0;
312         for (i = 1; i<REC_BLOCK_TYPES; i++)
313         {
314             p->head.block_size[i] = p->head.block_size[i-1] * 4;
315             p->head.block_move[i] = p->head.block_size[i] * 24;
316         }
317         if (rw)
318         {
319             if (rec_write_head(p) != ZEBRA_OK)
320                 ret = ZEBRA_FAIL;
321         }
322         break;
323     case 1:
324         memcpy(&p->head, p->tmp_buf, sizeof(p->head));
325         if (memcmp(p->head.magic, REC_HEAD_MAGIC, sizeof(p->head.magic)))
326         {
327             yaz_log(YLOG_FATAL, "file %s has bad format", p->index_fname);
328             ret = ZEBRA_FAIL;
329         }
330         version = atoi(p->head.version);
331         if (version != REC_VERSION)
332         {
333             yaz_log(YLOG_FATAL, "file %s is version %d, but version"
334                   " %d is required", p->index_fname, version, REC_VERSION);
335             ret = ZEBRA_FAIL;
336         }
337         break;
338     }
339     for (i = 0; i<REC_BLOCK_TYPES; i++)
340     {
341         char str[80];
342         sprintf(str, "recd%c", i + 'A');
343         p->data_fname[i] = (char *) xmalloc(strlen(str)+1);
344         strcpy(p->data_fname[i], str);
345         p->data_BFile[i] = NULL;
346     }
347     for (i = 0; i<REC_BLOCK_TYPES; i++)
348     {
349         if (!(p->data_BFile[i] =
350               bf_open(bfs, p->data_fname[i],
351                       CAST_ZINT_TO_INT(p->head.block_size[i]), rw)))
352         {
353             yaz_log(YLOG_FATAL|YLOG_ERRNO, "bf_open %s", p->data_fname[i]);
354             ret = ZEBRA_FAIL;
355         }
356     }
357     p->cache_max = 400;
358     p->cache_cur = 0;
359     p->record_cache = (struct record_cache_entry *)
360         xmalloc(sizeof(*p->record_cache)*p->cache_max);
361     zebra_mutex_init(&p->mutex);
362     if (ret == ZEBRA_FAIL)
363         rec_close(&p);
364     return p;
365 }
366
367 static void rec_encode_unsigned(unsigned n, unsigned char *buf, int *len)
368 {
369     (*len) = 0;
370     while (n > 127)
371     {
372         buf[*len] = 128 + (n & 127);
373         n = n >> 7;
374         (*len)++;
375     }
376     buf[*len] = n;
377     (*len)++;
378 }
379
380 static void rec_decode_unsigned(unsigned *np, unsigned char *buf, int *len)
381 {
382     unsigned n = 0;
383     unsigned w = 1;
384     (*len) = 0;
385
386     while (buf[*len] > 127)
387     {
388         n += w*(buf[*len] & 127);
389         w = w << 7;
390         (*len)++;
391     }
392     n += w * buf[*len];
393     (*len)++;
394     *np = n;
395 }
396
397 static void rec_encode_zint(zint n, unsigned char *buf, int *len)
398 {
399     (*len) = 0;
400     while (n > 127)
401     {
402         buf[*len] = (unsigned) (128 + (n & 127));
403         n = n >> 7;
404         (*len)++;
405     }
406     buf[*len] = (unsigned) n;
407     (*len)++;
408 }
409
410 static void rec_decode_zint(zint *np, unsigned char *buf, int *len)
411 {
412     zint  n = 0;
413     zint w = 1;
414     (*len) = 0;
415
416     while (buf[*len] > 127)
417     {
418         n += w*(buf[*len] & 127);
419         w = w << 7;
420         (*len)++;
421     }
422     n += w * buf[*len];
423     (*len)++;
424     *np = n;
425 }
426
427 static void rec_cache_flush_block1(Records p, Record rec, Record last_rec,
428                                    char **out_buf, int *out_size,
429                                    int *out_offset)
430 {
431     int i;
432     int len;
433
434     for (i = 0; i<REC_NO_INFO; i++)
435     {
436         if (*out_offset + CAST_ZINT_TO_INT(rec->size[i]) + 20 > *out_size)
437         {
438             int new_size = *out_offset + rec->size[i] + 65536;
439             char *np = (char *) xmalloc(new_size);
440             if (*out_offset)
441                 memcpy(np, *out_buf, *out_offset);
442             xfree(*out_buf);
443             *out_size = new_size;
444             *out_buf = np;
445         }
446         if (i == 0)
447         {
448             rec_encode_zint(rec_sysno_to_int(rec->sysno), 
449                             (unsigned char *) *out_buf + *out_offset, &len);
450             (*out_offset) += len;
451         }
452         if (rec->size[i] == 0)
453         {
454             rec_encode_unsigned(1, (unsigned char *) *out_buf + *out_offset,
455                                 &len);
456             (*out_offset) += len;
457         }
458         else if (last_rec && rec->size[i] == last_rec->size[i] &&
459                  !memcmp(rec->info[i], last_rec->info[i], rec->size[i]))
460         {
461             rec_encode_unsigned(0, (unsigned char *) *out_buf + *out_offset,
462                                 &len);
463             (*out_offset) += len;
464         }
465         else
466         {
467             rec_encode_unsigned(rec->size[i]+1,
468                                 (unsigned char *) *out_buf + *out_offset,
469                                 &len);
470             (*out_offset) += len;
471             memcpy(*out_buf + *out_offset, rec->info[i], rec->size[i]);
472             (*out_offset) += rec->size[i];
473         }
474     }
475 }
476
477 static ZEBRA_RES rec_write_multiple(Records p, int saveCount)
478 {
479     int i;
480     short ref_count = 0;
481     char compression_method;
482     Record last_rec = 0;
483     int out_size = 1000;
484     int out_offset = 0;
485     char *out_buf = (char *) xmalloc(out_size);
486     SYSNO *sysnos = (SYSNO *) xmalloc(sizeof(*sysnos) * (p->cache_cur + 1));
487     SYSNO *sysnop = sysnos;
488     ZEBRA_RES ret = ZEBRA_OK;
489
490     for (i = 0; i<p->cache_cur - saveCount; i++)
491     {
492         struct record_cache_entry *e = p->record_cache + i;
493         switch (e->flag)
494         {
495         case recordFlagNew:
496             rec_cache_flush_block1(p, e->rec, last_rec, &out_buf,
497                                     &out_size, &out_offset);
498             *sysnop++ = rec_sysno_to_int(e->rec->sysno);
499             ref_count++;
500             e->flag = recordFlagNop;
501             last_rec = e->rec;
502             break;
503         case recordFlagWrite:
504             if (rec_release_blocks(p, rec_sysno_to_int(e->rec->sysno))
505                 != ZEBRA_OK)
506                 ret = ZEBRA_FAIL;
507
508             rec_cache_flush_block1(p, e->rec, last_rec, &out_buf,
509                                     &out_size, &out_offset);
510             *sysnop++ = rec_sysno_to_int(e->rec->sysno);
511             ref_count++;
512             e->flag = recordFlagNop;
513             last_rec = e->rec;
514             break;
515         case recordFlagDelete:
516             if (rec_delete_single(p, e->rec) != ZEBRA_OK)
517                 ret = ZEBRA_FAIL;
518
519             e->flag = recordFlagNop;
520             break;
521         default:
522             break;
523         }
524     }
525
526     *sysnop = -1;
527     if (ref_count)
528     {
529         unsigned int csize = 0;  /* indicate compression "not performed yet" */
530         compression_method = p->compression_method;
531         switch (compression_method)
532         {
533         case REC_COMPRESS_BZIP2:
534 #if HAVE_BZLIB_H        
535             csize = out_offset + (out_offset >> 6) + 620;
536             rec_tmp_expand(p, csize);
537 #ifdef BZ_CONFIG_ERROR
538             i = BZ2_bzBuffToBuffCompress 
539 #else
540             i = bzBuffToBuffCompress 
541 #endif
542                                     (p->tmp_buf+sizeof(zint)+sizeof(short)+
543                                       sizeof(char),
544                                       &csize, out_buf, out_offset, 1, 0, 30);
545             if (i != BZ_OK)
546             {
547                 yaz_log(YLOG_WARN, "bzBuffToBuffCompress error code=%d", i);
548                 csize = 0;
549             }
550             yaz_log(YLOG_LOG, "compress %4d %5d %5d", ref_count, out_offset,
551                   csize);
552 #endif
553             break;
554         case REC_COMPRESS_NONE:
555             break;
556         }
557         if (!csize)  
558         {
559             /* either no compression or compression not supported ... */
560             csize = out_offset;
561             rec_tmp_expand(p, csize);
562             memcpy(p->tmp_buf + sizeof(zint) + sizeof(short) + sizeof(char),
563                     out_buf, out_offset);
564             csize = out_offset;
565             compression_method = REC_COMPRESS_NONE;
566         }
567         memcpy(p->tmp_buf + sizeof(zint), &ref_count, sizeof(ref_count));
568         memcpy(p->tmp_buf + sizeof(zint)+sizeof(short),
569                 &compression_method, sizeof(compression_method));
570                 
571         /* -------- compression */
572         if (rec_write_tmp_buf(p, csize + sizeof(short) + sizeof(char), sysnos)
573             != ZEBRA_OK)
574             ret = ZEBRA_FAIL;
575     }
576     xfree(out_buf);
577     xfree(sysnos);
578     return ret;
579 }
580
581 static ZEBRA_RES rec_cache_flush(Records p, int saveCount)
582 {
583     int i, j;
584     ZEBRA_RES ret;
585
586     if (saveCount >= p->cache_cur)
587         saveCount = 0;
588
589     ret = rec_write_multiple(p, saveCount);
590
591     for (i = 0; i<p->cache_cur - saveCount; i++)
592     {
593         struct record_cache_entry *e = p->record_cache + i;
594         rec_rm(&e->rec);
595     } 
596     /* i still being used ... */
597     for (j = 0; j<saveCount; j++, i++)
598         memcpy(p->record_cache+j, p->record_cache+i,
599                 sizeof(*p->record_cache));
600     p->cache_cur = saveCount;
601     return ret;
602 }
603
604 static Record *rec_cache_lookup(Records p, SYSNO sysno,
605                                 enum recordCacheFlag flag)
606 {
607     int i;
608     for (i = 0; i<p->cache_cur; i++)
609     {
610         struct record_cache_entry *e = p->record_cache + i;
611         if (e->rec->sysno == sysno)
612         {
613             if (flag != recordFlagNop && e->flag == recordFlagNop)
614                 e->flag = flag;
615             return &e->rec;
616         }
617     }
618     return NULL;
619 }
620
621 static ZEBRA_RES rec_cache_insert(Records p, Record rec, enum recordCacheFlag flag)
622 {
623     struct record_cache_entry *e;
624     ZEBRA_RES ret = ZEBRA_OK;
625
626     if (p->cache_cur == p->cache_max)
627         ret = rec_cache_flush(p, 1);
628     else if (p->cache_cur > 0)
629     {
630         int i, j;
631         int used = 0;
632         for (i = 0; i<p->cache_cur; i++)
633         {
634             Record r = (p->record_cache + i)->rec;
635             for (j = 0; j<REC_NO_INFO; j++)
636                 used += r->size[j];
637         }
638         if (used > 90000)
639             ret = rec_cache_flush(p, 1);
640     }
641     assert(p->cache_cur < p->cache_max);
642
643     e = p->record_cache + (p->cache_cur)++;
644     e->flag = flag;
645     e->rec = rec_cp(rec);
646     return ret;
647 }
648
649 ZEBRA_RES rec_close(Records *pp)
650 {
651     Records p = *pp;
652     int i;
653     ZEBRA_RES ret = ZEBRA_OK;
654
655     if (!p)
656         return ret;
657
658     zebra_mutex_destroy(&p->mutex);
659     if (rec_cache_flush(p, 0) != ZEBRA_OK)
660         ret = ZEBRA_FAIL;
661
662     xfree(p->record_cache);
663
664     if (p->rw)
665     {
666         if (rec_write_head(p) != ZEBRA_OK)
667             ret = ZEBRA_FAIL;
668     }
669
670     if (p->index_BFile)
671         bf_close(p->index_BFile);
672
673     for (i = 0; i<REC_BLOCK_TYPES; i++)
674     {
675         if (p->data_BFile[i])
676             bf_close(p->data_BFile[i]);
677         xfree(p->data_fname[i]);
678     }
679     xfree(p->tmp_buf);
680     xfree(p);
681     *pp = NULL;
682     return ret;
683 }
684
685 static Record rec_get_int(Records p, SYSNO sysno)
686 {
687     int i, in_size, r;
688     Record rec, *recp;
689     struct record_index_entry entry;
690     zint freeblock;
691     int dst_type;
692     char *nptr, *cptr;
693     char *in_buf = 0;
694     char *bz_buf = 0;
695 #if HAVE_BZLIB_H
696     unsigned int bz_size;
697 #endif
698     char compression_method;
699
700     assert(sysno > 0);
701     assert(p);
702
703     if ((recp = rec_cache_lookup(p, sysno, recordFlagNop)))
704         return rec_cp(*recp);
705
706     if (read_indx(p, rec_sysno_to_int(sysno), &entry, sizeof(entry), 1) < 1)
707         return NULL;       /* record is not there! */
708
709     if (!entry.size)
710         return NULL;       /* record is deleted */
711
712     dst_type = (int) (entry.next & 7);
713     assert(dst_type < REC_BLOCK_TYPES);
714     freeblock = entry.next / 8;
715
716     assert(freeblock > 0);
717     
718     rec_tmp_expand(p, entry.size);
719
720     cptr = p->tmp_buf;
721     r = bf_read(p->data_BFile[dst_type], freeblock, 0, 0, cptr);
722     if (r < 0)
723         return 0;
724     memcpy(&freeblock, cptr, sizeof(freeblock));
725
726     while (freeblock)
727     {
728         zint tmp;
729
730         cptr += p->head.block_size[dst_type] - sizeof(freeblock);
731         
732         memcpy(&tmp, cptr, sizeof(tmp));
733         r = bf_read(p->data_BFile[dst_type], freeblock, 0, 0, cptr);
734         if (r < 0)
735             return 0;
736         memcpy(&freeblock, cptr, sizeof(freeblock));
737         memcpy(cptr, &tmp, sizeof(tmp));
738     }
739
740     rec = (Record) xmalloc(sizeof(*rec));
741     rec->sysno = sysno;
742     memcpy(&compression_method, p->tmp_buf + sizeof(zint) + sizeof(short),
743             sizeof(compression_method));
744     in_buf = p->tmp_buf + sizeof(zint) + sizeof(short) + sizeof(char);
745     in_size = entry.size - sizeof(short) - sizeof(char);
746     switch (compression_method)
747     {
748     case REC_COMPRESS_BZIP2:
749 #if HAVE_BZLIB_H
750         bz_size = entry.size * 20 + 100;
751         while (1)
752         {
753             bz_buf = (char *) xmalloc(bz_size);
754 #ifdef BZ_CONFIG_ERROR
755             i = BZ2_bzBuffToBuffDecompress
756 #else
757             i = bzBuffToBuffDecompress
758 #endif
759                 (bz_buf, &bz_size, in_buf, in_size, 0, 0);
760             yaz_log(YLOG_LOG, "decompress %5d %5d", in_size, bz_size);
761             if (i == BZ_OK)
762                 break;
763             yaz_log(YLOG_LOG, "failed");
764             xfree(bz_buf);
765             bz_size *= 2;
766         }
767         in_buf = bz_buf;
768         in_size = bz_size;
769 #else
770         yaz_log(YLOG_FATAL, "cannot decompress record(s) in BZIP2 format");
771         return 0;
772 #endif
773         break;
774     case REC_COMPRESS_NONE:
775         break;
776     }
777     for (i = 0; i<REC_NO_INFO; i++)
778         rec->info[i] = 0;
779
780     nptr = in_buf;                /* skip ref count */
781     while (nptr < in_buf + in_size)
782     {
783         zint this_sysno;
784         int len;
785         rec_decode_zint(&this_sysno, (unsigned char *) nptr, &len);
786         nptr += len;
787
788         for (i = 0; i < REC_NO_INFO; i++)
789         {
790             unsigned int this_size;
791             rec_decode_unsigned(&this_size, (unsigned char *) nptr, &len);
792             nptr += len;
793
794             if (this_size == 0)
795                 continue;
796             rec->size[i] = this_size-1;
797
798             if (rec->size[i])
799             {
800                 rec->info[i] = nptr;
801                 nptr += rec->size[i];
802             }
803             else
804                 rec->info[i] = NULL;
805         }
806         if (this_sysno == rec_sysno_to_int(sysno))
807             break;
808     }
809     for (i = 0; i<REC_NO_INFO; i++)
810     {
811         if (rec->info[i] && rec->size[i])
812         {
813             char *np = xmalloc(rec->size[i]+1);
814             memcpy(np, rec->info[i], rec->size[i]);
815             np[rec->size[i]] = '\0';
816             rec->info[i] = np;
817         }
818         else
819         {
820             assert(rec->info[i] == 0);
821             assert(rec->size[i] == 0);
822         }
823     }
824     xfree(bz_buf);
825     if (rec_cache_insert(p, rec, recordFlagNop) != ZEBRA_OK)
826         return 0;
827     return rec;
828 }
829
830 Record rec_get(Records p, SYSNO sysno)
831 {
832     Record rec;
833     zebra_mutex_lock(&p->mutex);
834
835     rec = rec_get_int(p, sysno);
836     zebra_mutex_unlock(&p->mutex);
837     return rec;
838 }
839
840 Record rec_get_root(Records p)
841 {
842     return rec_get(p, rec_sysno_to_ext(1));
843 }
844
845 static Record rec_new_int(Records p)
846 {
847     int i;
848     SYSNO sysno;
849     Record rec;
850
851     assert(p);
852     rec = (Record) xmalloc(sizeof(*rec));
853     if (1 || p->head.index_free == 0)
854         sysno = (p->head.index_last)++;
855     else
856     {
857         struct record_index_entry entry;
858
859         if (read_indx(p, p->head.index_free, &entry, sizeof(entry), 0) < 1)
860         {
861             xfree(rec);
862             return 0;
863         }
864         sysno = p->head.index_free;
865         p->head.index_free = entry.next;
866     }
867     (p->head.no_records)++;
868     rec->sysno = rec_sysno_to_ext(sysno);
869     for (i = 0; i < REC_NO_INFO; i++)
870     {
871         rec->info[i] = NULL;
872         rec->size[i] = 0;
873     }
874     rec_cache_insert(p, rec, recordFlagNew);
875     return rec;
876 }
877
878 Record rec_new(Records p)
879 {
880     Record rec;
881     zebra_mutex_lock(&p->mutex);
882
883     rec = rec_new_int(p);
884     zebra_mutex_unlock(&p->mutex);
885     return rec;
886 }
887
888 ZEBRA_RES rec_del(Records p, Record *recpp)
889 {
890     Record *recp;
891     ZEBRA_RES ret = ZEBRA_OK;
892
893     zebra_mutex_lock(&p->mutex);
894     (p->head.no_records)--;
895     if ((recp = rec_cache_lookup(p, (*recpp)->sysno, recordFlagDelete)))
896     {
897         rec_rm(recp);
898         *recp = *recpp;
899     }
900     else
901     {
902         ret = rec_cache_insert(p, *recpp, recordFlagDelete);
903         rec_rm(recpp);
904     }
905     zebra_mutex_unlock(&p->mutex);
906     *recpp = NULL;
907     return ret;
908 }
909
910 ZEBRA_RES rec_put(Records p, Record *recpp)
911 {
912     Record *recp;
913     ZEBRA_RES ret = ZEBRA_OK;
914
915     zebra_mutex_lock(&p->mutex);
916     if ((recp = rec_cache_lookup(p, (*recpp)->sysno, recordFlagWrite)))
917     {
918         rec_rm(recp);
919         *recp = *recpp;
920     }
921     else
922     {
923         ret = rec_cache_insert(p, *recpp, recordFlagWrite);
924         rec_rm(recpp);
925     }
926     zebra_mutex_unlock(&p->mutex);
927     *recpp = NULL;
928     return ret;
929 }
930
931 void rec_rm(Record *recpp)
932 {
933     int i;
934
935     if (!*recpp)
936         return ;
937     for (i = 0; i < REC_NO_INFO; i++)
938         xfree((*recpp)->info[i]);
939     xfree(*recpp);
940     *recpp = NULL;
941 }
942
943 Record rec_cp(Record rec)
944 {
945     Record n;
946     int i;
947
948     n = (Record) xmalloc(sizeof(*n));
949     n->sysno = rec->sysno;
950     for (i = 0; i < REC_NO_INFO; i++)
951         if (!rec->info[i])
952         {
953             n->info[i] = NULL;
954             n->size[i] = 0;
955         }
956         else
957         {
958             n->size[i] = rec->size[i];
959             n->info[i] = (char *) xmalloc(rec->size[i]+1);
960             memcpy(n->info[i], rec->info[i], rec->size[i]);
961             n->info[i][rec->size[i]] = '\0';
962         }
963     return n;
964 }
965
966
967 char *rec_strdup(const char *s, size_t *len)
968 {
969     char *p;
970
971     if (!s)
972     {
973         *len = 0;
974         return NULL;
975     }
976     *len = strlen(s)+1;
977     p = (char *) xmalloc(*len);
978     strcpy(p, s);
979     return p;
980 }
981
982 /*
983  * Local variables:
984  * c-basic-offset: 4
985  * indent-tabs-mode: nil
986  * End:
987  * vim: shiftwidth=4 tabstop=8 expandtab
988  */
989