WS updates
[idzebra-moved-to-github.git] / index / recindex.c
1 /* $Id: recindex.c,v 1.44 2005-05-11 12:36:45 adam Exp $
2    Copyright (C) 1995-2005
3    Index Data ApS
4
5 This file is part of the Zebra server.
6
7 Zebra is free software; you can redistribute it and/or modify it under
8 the terms of the GNU General Public License as published by the Free
9 Software Foundation; either version 2, or (at your option) any later
10 version.
11
12 Zebra is distributed in the hope that it will be useful, but WITHOUT ANY
13 WARRANTY; without even the implied warranty of MERCHANTABILITY or
14 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
15 for more details.
16
17 You should have received a copy of the GNU General Public License
18 along with Zebra; see the file LICENSE.zebra.  If not, write to the
19 Free Software Foundation, 59 Temple Place - Suite 330, Boston, MA
20 02111-1307, USA.
21 */
22
23 #define RIDX_CHUNK 128
24
25 /*
26  *  Format of first block
27  *      next       (8 bytes)
28  *      ref_count  (2 bytes)
29  *      block      (500 bytes)
30  *
31  *  Format of subsequent blocks 
32  *      next  (8 bytes)
33  *      block (502 bytes)
34  *
35  *  Format of each record
36  *      sysno
37  *      (length, data) - pairs
38  *      length = 0 if same as previous
39  */
40 #include <stdio.h>
41 #include <stdlib.h>
42 #include <assert.h>
43 #include <string.h>
44
45 #include <yaz/yaz-util.h>
46 #include "recindxp.h"
47
48 #if HAVE_BZLIB_H
49 #include <bzlib.h>
50 #endif
51 static void rec_write_head(Records p)
52 {
53     int r;
54
55     assert(p);
56     assert(p->index_BFile);
57
58     r = bf_write(p->index_BFile, 0, 0, sizeof(p->head), &p->head);    
59     if (r)
60     {
61         yaz_log(YLOG_FATAL|YLOG_ERRNO, "write head of %s", p->index_fname);
62         exit(1);
63     }
64 }
65
66 static void rec_tmp_expand(Records p, int size)
67 {
68     if (p->tmp_size < size + 2048 ||
69         p->tmp_size < p->head.block_size[REC_BLOCK_TYPES-1]*2)
70     {
71         xfree(p->tmp_buf);
72         p->tmp_size = size + (int)
73                         (p->head.block_size[REC_BLOCK_TYPES-1])*2 + 2048;
74         p->tmp_buf = (char *) xmalloc(p->tmp_size);
75     }
76 }
77
78 static int read_indx(Records p, SYSNO sysno, void *buf, int itemsize, 
79                       int ignoreError)
80 {
81     int r;
82     zint pos = (sysno-1)*itemsize;
83     int off = (int) (pos%RIDX_CHUNK);
84     int sz1 = RIDX_CHUNK - off;    /* sz1 is size of buffer to read.. */
85
86     if (sz1 > itemsize)
87         sz1 = itemsize;  /* no more than itemsize bytes */
88
89     r = bf_read(p->index_BFile, 1+pos/RIDX_CHUNK, off, sz1, buf);
90     if (r == 1 && sz1 < itemsize) /* boundary? - must read second part */
91         r = bf_read(p->index_BFile, 2+pos/RIDX_CHUNK, 0, itemsize - sz1,
92                         (char*) buf + sz1);
93     if (r != 1 && !ignoreError)
94     {
95         yaz_log(YLOG_FATAL|YLOG_ERRNO, "read in %s at pos %ld",
96               p->index_fname, (long) pos);
97         exit(1);
98     }
99     return r;
100 }
101
102 static void write_indx(Records p, SYSNO sysno, void *buf, int itemsize)
103 {
104     zint pos = (sysno-1)*itemsize;
105     int off = (int) (pos%RIDX_CHUNK);
106     int sz1 = RIDX_CHUNK - off;    /* sz1 is size of buffer to read.. */
107
108     if (sz1 > itemsize)
109         sz1 = itemsize;  /* no more than itemsize bytes */
110
111     bf_write(p->index_BFile, 1+pos/RIDX_CHUNK, off, sz1, buf);
112     if (sz1 < itemsize)   /* boundary? must write second part */
113         bf_write(p->index_BFile, 2+pos/RIDX_CHUNK, 0, itemsize - sz1,
114                 (char*) buf + sz1);
115 }
116
117 static void rec_release_blocks(Records p, SYSNO sysno)
118 {
119     struct record_index_entry entry;
120     zint freeblock;
121     char block_and_ref[sizeof(zint) + sizeof(short)];
122     int dst_type;
123     int first = 1;
124
125     if (read_indx(p, sysno, &entry, sizeof(entry), 1) != 1)
126         return ;
127
128     freeblock = entry.next;
129     assert(freeblock > 0);
130     dst_type = (int) (freeblock & 7);
131     assert(dst_type < REC_BLOCK_TYPES);
132     freeblock = freeblock / 8;
133     while (freeblock)
134     {
135         if (bf_read(p->data_BFile[dst_type], freeblock, 0,
136                      first ? sizeof(block_and_ref) : sizeof(zint),
137                      block_and_ref) != 1)
138         {
139             yaz_log(YLOG_FATAL|YLOG_ERRNO, "read in rec_del_single");
140             exit (1);
141         }
142         if (first)
143         {
144             short ref;
145             memcpy(&ref, block_and_ref + sizeof(freeblock), sizeof(ref));
146             --ref;
147             memcpy(block_and_ref + sizeof(freeblock), &ref, sizeof(ref));
148             if (ref)
149             {
150                 if (bf_write(p->data_BFile[dst_type], freeblock, 0,
151                               sizeof(block_and_ref), block_and_ref))
152                 {
153                     yaz_log(YLOG_FATAL|YLOG_ERRNO, "write in rec_del_single");
154                     exit (1);
155                 }
156                 return;
157             }
158             first = 0;
159         }
160         
161         if (bf_write(p->data_BFile[dst_type], freeblock, 0, sizeof(freeblock),
162                       &p->head.block_free[dst_type]))
163         {
164             yaz_log(YLOG_FATAL|YLOG_ERRNO, "write in rec_del_single");
165             exit(1);
166         }
167         p->head.block_free[dst_type] = freeblock;
168         memcpy(&freeblock, block_and_ref, sizeof(freeblock));
169
170         p->head.block_used[dst_type]--;
171     }
172     p->head.total_bytes -= entry.size;
173 }
174
175 static void rec_delete_single(Records p, Record rec)
176 {
177     struct record_index_entry entry;
178
179     rec_release_blocks(p, rec->sysno);
180
181     entry.next = p->head.index_free;
182     entry.size = 0;
183     p->head.index_free = rec->sysno;
184     write_indx(p, rec->sysno, &entry, sizeof(entry));
185 }
186
187 static void rec_write_tmp_buf(Records p, int size, SYSNO *sysnos)
188 {
189     struct record_index_entry entry;
190     int no_written = 0;
191     char *cptr = p->tmp_buf;
192     zint block_prev = -1, block_free;
193     int dst_type = 0;
194     int i;
195
196     for (i = 1; i<REC_BLOCK_TYPES; i++)
197         if (size >= p->head.block_move[i])
198             dst_type = i;
199     while (no_written < size)
200     {
201         block_free = p->head.block_free[dst_type];
202         if (block_free)
203         {
204             if (bf_read(p->data_BFile[dst_type],
205                          block_free, 0, sizeof(*p->head.block_free),
206                          &p->head.block_free[dst_type]) != 1)
207             {
208                 yaz_log(YLOG_FATAL|YLOG_ERRNO, "read in %s at free block "
209                          ZINT_FORMAT,
210                          p->data_fname[dst_type], block_free);
211                 exit (1);
212             }
213         }
214         else
215             block_free = p->head.block_last[dst_type]++;
216         if (block_prev == -1)
217         {
218             entry.next = block_free*8 + dst_type;
219             entry.size = size;
220             p->head.total_bytes += size;
221             while (*sysnos > 0)
222             {
223                 write_indx (p, *sysnos, &entry, sizeof(entry));
224                 sysnos++;
225             }
226         }
227         else
228         {
229             memcpy(cptr, &block_free, sizeof(block_free));
230             bf_write(p->data_BFile[dst_type], block_prev, 0, 0, cptr);
231             cptr = p->tmp_buf + no_written;
232         }
233         block_prev = block_free;
234         no_written += (int)(p->head.block_size[dst_type]) - sizeof(zint);
235         p->head.block_used[dst_type]++;
236     }
237     assert(block_prev != -1);
238     block_free = 0;
239     memcpy(cptr, &block_free, sizeof(block_free));
240     bf_write(p->data_BFile[dst_type], block_prev, 0,
241               sizeof(block_free) + (p->tmp_buf+size) - cptr, cptr);
242 }
243
244 Records rec_open(BFiles bfs, int rw, int compression_method)
245 {
246     Records p;
247     int i, r;
248     int version;
249
250     p = (Records) xmalloc (sizeof(*p));
251     p->compression_method = compression_method;
252     p->rw = rw;
253     p->tmp_size = 1024;
254     p->tmp_buf = (char *) xmalloc (p->tmp_size);
255     p->index_fname = "reci";
256     p->index_BFile = bf_open (bfs, p->index_fname, RIDX_CHUNK, rw);
257     if (p->index_BFile == NULL)
258     {
259         yaz_log(YLOG_FATAL|YLOG_ERRNO, "open %s", p->index_fname);
260         exit (1);
261     }
262     r = bf_read(p->index_BFile, 0, 0, 0, p->tmp_buf);
263     switch (r)
264     {
265     case 0:
266         memcpy(p->head.magic, REC_HEAD_MAGIC, sizeof(p->head.magic));
267         sprintf (p->head.version, "%3d", REC_VERSION);
268         p->head.index_free = 0;
269         p->head.index_last = 1;
270         p->head.no_records = 0;
271         p->head.total_bytes = 0;
272         for (i = 0; i<REC_BLOCK_TYPES; i++)
273         {
274             p->head.block_free[i] = 0;
275             p->head.block_last[i] = 1;
276             p->head.block_used[i] = 0;
277         }
278         p->head.block_size[0] = 128;
279         p->head.block_move[0] = 0;
280         for (i = 1; i<REC_BLOCK_TYPES; i++)
281         {
282             p->head.block_size[i] = p->head.block_size[i-1] * 4;
283             p->head.block_move[i] = p->head.block_size[i] * 24;
284         }
285         if (rw)
286             rec_write_head(p);
287         break;
288     case 1:
289         memcpy(&p->head, p->tmp_buf, sizeof(p->head));
290         if (memcmp (p->head.magic, REC_HEAD_MAGIC, sizeof(p->head.magic)))
291         {
292             yaz_log(YLOG_FATAL, "file %s has bad format", p->index_fname);
293             exit (1);
294         }
295         version = atoi (p->head.version);
296         if (version != REC_VERSION)
297         {
298             yaz_log(YLOG_FATAL, "file %s is version %d, but version"
299                   " %d is required", p->index_fname, version, REC_VERSION);
300             exit (1);
301         }
302         break;
303     }
304     for (i = 0; i<REC_BLOCK_TYPES; i++)
305     {
306         char str[80];
307         sprintf (str, "recd%c", i + 'A');
308         p->data_fname[i] = (char *) xmalloc (strlen(str)+1);
309         strcpy(p->data_fname[i], str);
310         p->data_BFile[i] = NULL;
311     }
312     for (i = 0; i<REC_BLOCK_TYPES; i++)
313     {
314         if (!(p->data_BFile[i] = bf_open (bfs, p->data_fname[i],
315                                           (int) (p->head.block_size[i]),
316                                           rw)))
317         {
318             yaz_log(YLOG_FATAL|YLOG_ERRNO, "bf_open %s", p->data_fname[i]);
319             exit (1);
320         }
321     }
322     p->cache_max = 400;
323     p->cache_cur = 0;
324     p->record_cache = (struct record_cache_entry *)
325         xmalloc (sizeof(*p->record_cache)*p->cache_max);
326     zebra_mutex_init (&p->mutex);
327     return p;
328 }
329
330 static void rec_encode_unsigned(unsigned n, unsigned char *buf, int *len)
331 {
332     (*len) = 0;
333     while (n > 127)
334     {
335         buf[*len] = 128 + (n & 127);
336         n = n >> 7;
337         (*len)++;
338     }
339     buf[*len] = n;
340     (*len)++;
341 }
342
343 static void rec_decode_unsigned(unsigned *np, unsigned char *buf, int *len)
344 {
345     unsigned n = 0;
346     unsigned w = 1;
347     (*len) = 0;
348
349     while (buf[*len] > 127)
350     {
351         n += w*(buf[*len] & 127);
352         w = w << 7;
353         (*len)++;
354     }
355     n += w * buf[*len];
356     (*len)++;
357     *np = n;
358 }
359
360 static void rec_encode_zint(zint n, unsigned char *buf, int *len)
361 {
362     (*len) = 0;
363     while (n > 127)
364     {
365         buf[*len] = (unsigned) (128 + (n & 127));
366         n = n >> 7;
367         (*len)++;
368     }
369     buf[*len] = (unsigned) n;
370     (*len)++;
371 }
372
373 static void rec_decode_zint(zint *np, unsigned char *buf, int *len)
374 {
375     zint  n = 0;
376     zint w = 1;
377     (*len) = 0;
378
379     while (buf[*len] > 127)
380     {
381         n += w*(buf[*len] & 127);
382         w = w << 7;
383         (*len)++;
384     }
385     n += w * buf[*len];
386     (*len)++;
387     *np = n;
388 }
389
390 static void rec_cache_flush_block1(Records p, Record rec, Record last_rec,
391                                    char **out_buf, int *out_size,
392                                    int *out_offset)
393 {
394     int i;
395     int len;
396
397     for (i = 0; i<REC_NO_INFO; i++)
398     {
399         if (*out_offset + (int) rec->size[i] + 20 > *out_size)
400         {
401             int new_size = *out_offset + rec->size[i] + 65536;
402             char *np = (char *) xmalloc (new_size);
403             if (*out_offset)
404                 memcpy(np, *out_buf, *out_offset);
405             xfree (*out_buf);
406             *out_size = new_size;
407             *out_buf = np;
408         }
409         if (i == 0)
410         {
411             rec_encode_zint (rec->sysno, *out_buf + *out_offset, &len);
412             (*out_offset) += len;
413         }
414         if (rec->size[i] == 0)
415         {
416             rec_encode_unsigned (1, *out_buf + *out_offset, &len);
417             (*out_offset) += len;
418         }
419         else if (last_rec && rec->size[i] == last_rec->size[i] &&
420                  !memcmp (rec->info[i], last_rec->info[i], rec->size[i]))
421         {
422             rec_encode_unsigned (0, *out_buf + *out_offset, &len);
423             (*out_offset) += len;
424         }
425         else
426         {
427             rec_encode_unsigned (rec->size[i]+1, *out_buf + *out_offset, &len);
428             (*out_offset) += len;
429             memcpy(*out_buf + *out_offset, rec->info[i], rec->size[i]);
430             (*out_offset) += rec->size[i];
431         }
432     }
433 }
434
435 static void rec_write_multiple(Records p, int saveCount)
436 {
437     int i;
438     short ref_count = 0;
439     char compression_method;
440     Record last_rec = 0;
441     int out_size = 1000;
442     int out_offset = 0;
443     char *out_buf = (char *) xmalloc (out_size);
444     SYSNO *sysnos = (SYSNO *) xmalloc (sizeof(*sysnos) * (p->cache_cur + 1));
445     SYSNO *sysnop = sysnos;
446
447     for (i = 0; i<p->cache_cur - saveCount; i++)
448     {
449         struct record_cache_entry *e = p->record_cache + i;
450         switch (e->flag)
451         {
452         case recordFlagNew:
453             rec_cache_flush_block1 (p, e->rec, last_rec, &out_buf,
454                                     &out_size, &out_offset);
455             *sysnop++ = e->rec->sysno;
456             ref_count++;
457             e->flag = recordFlagNop;
458             last_rec = e->rec;
459             break;
460         case recordFlagWrite:
461             rec_release_blocks (p, e->rec->sysno);
462             rec_cache_flush_block1 (p, e->rec, last_rec, &out_buf,
463                                     &out_size, &out_offset);
464             *sysnop++ = e->rec->sysno;
465             ref_count++;
466             e->flag = recordFlagNop;
467             last_rec = e->rec;
468             break;
469         case recordFlagDelete:
470             rec_delete_single (p, e->rec);
471             e->flag = recordFlagNop;
472             break;
473         default:
474             break;
475         }
476     }
477
478     *sysnop = -1;
479     if (ref_count)
480     {
481         int csize = 0;  /* indicate compression "not performed yet" */
482         compression_method = p->compression_method;
483         switch (compression_method)
484         {
485         case REC_COMPRESS_BZIP2:
486 #if HAVE_BZLIB_H        
487             csize = out_offset + (out_offset >> 6) + 620;
488             rec_tmp_expand (p, csize);
489 #ifdef BZ_CONFIG_ERROR
490             i = BZ2_bzBuffToBuffCompress 
491 #else
492             i = bzBuffToBuffCompress 
493 #endif
494                                      (p->tmp_buf+sizeof(zint)+sizeof(short)+
495                                       sizeof(char),
496                                       &csize, out_buf, out_offset, 1, 0, 30);
497             if (i != BZ_OK)
498             {
499                 yaz_log(YLOG_WARN, "bzBuffToBuffCompress error code=%d", i);
500                 csize = 0;
501             }
502             yaz_log(YLOG_LOG, "compress %4d %5d %5d", ref_count, out_offset,
503                   csize);
504 #endif
505             break;
506         case REC_COMPRESS_NONE:
507             break;
508         }
509         if (!csize)  
510         {
511             /* either no compression or compression not supported ... */
512             csize = out_offset;
513             rec_tmp_expand (p, csize);
514             memcpy(p->tmp_buf + sizeof(zint) + sizeof(short) + sizeof(char),
515                     out_buf, out_offset);
516             csize = out_offset;
517             compression_method = REC_COMPRESS_NONE;
518         }
519         memcpy(p->tmp_buf + sizeof(zint), &ref_count, sizeof(ref_count));
520         memcpy(p->tmp_buf + sizeof(zint)+sizeof(short),
521                 &compression_method, sizeof(compression_method));
522                 
523         /* -------- compression */
524         rec_write_tmp_buf (p, csize + sizeof(short) + sizeof(char), sysnos);
525     }
526     xfree (out_buf);
527     xfree (sysnos);
528 }
529
530 static void rec_cache_flush(Records p, int saveCount)
531 {
532     int i, j;
533
534     if (saveCount >= p->cache_cur)
535         saveCount = 0;
536
537     rec_write_multiple (p, saveCount);
538
539     for (i = 0; i<p->cache_cur - saveCount; i++)
540     {
541         struct record_cache_entry *e = p->record_cache + i;
542         rec_rm(&e->rec);
543     } 
544     /* i still being used ... */
545     for (j = 0; j<saveCount; j++, i++)
546         memcpy(p->record_cache+j, p->record_cache+i,
547                 sizeof(*p->record_cache));
548     p->cache_cur = saveCount;
549 }
550
551 static Record *rec_cache_lookup(Records p, SYSNO sysno,
552                                 enum recordCacheFlag flag)
553 {
554     int i;
555     for (i = 0; i<p->cache_cur; i++)
556     {
557         struct record_cache_entry *e = p->record_cache + i;
558         if (e->rec->sysno == sysno)
559         {
560             if (flag != recordFlagNop && e->flag == recordFlagNop)
561                 e->flag = flag;
562             return &e->rec;
563         }
564     }
565     return NULL;
566 }
567
568 static void rec_cache_insert(Records p, Record rec, enum recordCacheFlag flag)
569 {
570     struct record_cache_entry *e;
571
572     if (p->cache_cur == p->cache_max)
573         rec_cache_flush (p, 1);
574     else if (p->cache_cur > 0)
575     {
576         int i, j;
577         int used = 0;
578         for (i = 0; i<p->cache_cur; i++)
579         {
580             Record r = (p->record_cache + i)->rec;
581             for (j = 0; j<REC_NO_INFO; j++)
582                 used += r->size[j];
583         }
584         if (used > 90000)
585             rec_cache_flush (p, 1);
586     }
587     assert(p->cache_cur < p->cache_max);
588
589     e = p->record_cache + (p->cache_cur)++;
590     e->flag = flag;
591     e->rec = rec_cp (rec);
592 }
593
594 void rec_close(Records *pp)
595 {
596     Records p = *pp;
597     int i;
598
599     assert(p);
600
601     zebra_mutex_destroy(&p->mutex);
602     rec_cache_flush (p, 0);
603     xfree (p->record_cache);
604
605     if (p->rw)
606         rec_write_head(p);
607
608     if (p->index_BFile)
609         bf_close (p->index_BFile);
610
611     for (i = 0; i<REC_BLOCK_TYPES; i++)
612     {
613         if (p->data_BFile[i])
614             bf_close (p->data_BFile[i]);
615         xfree (p->data_fname[i]);
616     }
617     xfree (p->tmp_buf);
618     xfree (p);
619     *pp = NULL;
620 }
621
622 static Record rec_get_int(Records p, SYSNO sysno)
623 {
624     int i, in_size, r;
625     Record rec, *recp;
626     struct record_index_entry entry;
627     zint freeblock;
628     int dst_type;
629     char *nptr, *cptr;
630     char *in_buf = 0;
631     char *bz_buf = 0;
632 #if HAVE_BZLIB_H
633     int bz_size;
634 #endif
635     char compression_method;
636
637     assert(sysno > 0);
638     assert(p);
639
640     if ((recp = rec_cache_lookup (p, sysno, recordFlagNop)))
641         return rec_cp (*recp);
642
643     if (read_indx (p, sysno, &entry, sizeof(entry), 1) < 1)
644         return NULL;       /* record is not there! */
645
646     if (!entry.size)
647         return NULL;       /* record is deleted */
648
649     dst_type = (int) (entry.next & 7);
650     assert(dst_type < REC_BLOCK_TYPES);
651     freeblock = entry.next / 8;
652
653     assert(freeblock > 0);
654     
655     rec_tmp_expand (p, entry.size);
656
657     cptr = p->tmp_buf;
658     r = bf_read(p->data_BFile[dst_type], freeblock, 0, 0, cptr);
659     if (r < 0)
660         return 0;
661     memcpy(&freeblock, cptr, sizeof(freeblock));
662
663     while (freeblock)
664     {
665         zint tmp;
666
667         cptr += p->head.block_size[dst_type] - sizeof(freeblock);
668         
669         memcpy(&tmp, cptr, sizeof(tmp));
670         r = bf_read(p->data_BFile[dst_type], freeblock, 0, 0, cptr);
671         if (r < 0)
672             return 0;
673         memcpy(&freeblock, cptr, sizeof(freeblock));
674         memcpy(cptr, &tmp, sizeof(tmp));
675     }
676
677     rec = (Record) xmalloc (sizeof(*rec));
678     rec->sysno = sysno;
679     memcpy(&compression_method, p->tmp_buf + sizeof(zint) + sizeof(short),
680             sizeof(compression_method));
681     in_buf = p->tmp_buf + sizeof(zint) + sizeof(short) + sizeof(char);
682     in_size = entry.size - sizeof(short) - sizeof(char);
683     switch (compression_method)
684     {
685     case REC_COMPRESS_BZIP2:
686 #if HAVE_BZLIB_H
687         bz_size = entry.size * 20 + 100;
688         while (1)
689         {
690             bz_buf = (char *) xmalloc (bz_size);
691 #ifdef BZ_CONFIG_ERROR
692             i = BZ2_bzBuffToBuffDecompress
693 #else
694             i = bzBuffToBuffDecompress
695 #endif
696                  (bz_buf, &bz_size, in_buf, in_size, 0, 0);
697             yaz_log(YLOG_LOG, "decompress %5d %5d", in_size, bz_size);
698             if (i == BZ_OK)
699                 break;
700             yaz_log(YLOG_LOG, "failed");
701             xfree (bz_buf);
702             bz_size *= 2;
703         }
704         in_buf = bz_buf;
705         in_size = bz_size;
706 #else
707         yaz_log(YLOG_FATAL, "cannot decompress record(s) in BZIP2 format");
708         exit (1);
709 #endif
710         break;
711     case REC_COMPRESS_NONE:
712         break;
713     }
714     for (i = 0; i<REC_NO_INFO; i++)
715         rec->info[i] = 0;
716
717     nptr = in_buf;                /* skip ref count */
718     while (nptr < in_buf + in_size)
719     {
720         zint this_sysno;
721         int len;
722         rec_decode_zint (&this_sysno, nptr, &len);
723         nptr += len;
724
725         for (i = 0; i < REC_NO_INFO; i++)
726         {
727             int this_size;
728             rec_decode_unsigned (&this_size, nptr, &len);
729             nptr += len;
730
731             if (this_size == 0)
732                 continue;
733             rec->size[i] = this_size-1;
734
735             if (rec->size[i])
736             {
737                 rec->info[i] = nptr;
738                 nptr += rec->size[i];
739             }
740             else
741                 rec->info[i] = NULL;
742         }
743         if (this_sysno == sysno)
744             break;
745     }
746     for (i = 0; i<REC_NO_INFO; i++)
747     {
748         if (rec->info[i] && rec->size[i])
749         {
750             char *np = xmalloc (rec->size[i]+1);
751             memcpy(np, rec->info[i], rec->size[i]);
752             np[rec->size[i]] = '\0';
753             rec->info[i] = np;
754         }
755         else
756         {
757             assert(rec->info[i] == 0);
758             assert(rec->size[i] == 0);
759         }
760     }
761     xfree (bz_buf);
762     rec_cache_insert(p, rec, recordFlagNop);
763     return rec;
764 }
765
766 Record rec_get(Records p, SYSNO sysno)
767 {
768     Record rec;
769     zebra_mutex_lock (&p->mutex);
770
771     rec = rec_get_int (p, sysno);
772     zebra_mutex_unlock (&p->mutex);
773     return rec;
774 }
775
776 static Record rec_new_int(Records p)
777 {
778     int i;
779     SYSNO sysno;
780     Record rec;
781
782     assert(p);
783     rec = (Record) xmalloc (sizeof(*rec));
784     if (1 || p->head.index_free == 0)
785         sysno = (p->head.index_last)++;
786     else
787     {
788         struct record_index_entry entry;
789
790         read_indx (p, p->head.index_free, &entry, sizeof(entry), 0);
791         sysno = p->head.index_free;
792         p->head.index_free = entry.next;
793     }
794     (p->head.no_records)++;
795     rec->sysno = sysno;
796     for (i = 0; i < REC_NO_INFO; i++)
797     {
798         rec->info[i] = NULL;
799         rec->size[i] = 0;
800     }
801     rec_cache_insert(p, rec, recordFlagNew);
802     return rec;
803 }
804
805 Record rec_new(Records p)
806 {
807     Record rec;
808     zebra_mutex_lock (&p->mutex);
809
810     rec = rec_new_int (p);
811     zebra_mutex_unlock (&p->mutex);
812     return rec;
813 }
814
815 void rec_del(Records p, Record *recpp)
816 {
817     Record *recp;
818
819     zebra_mutex_lock (&p->mutex);
820     (p->head.no_records)--;
821     if ((recp = rec_cache_lookup (p, (*recpp)->sysno, recordFlagDelete)))
822     {
823         rec_rm(recp);
824         *recp = *recpp;
825     }
826     else
827     {
828         rec_cache_insert(p, *recpp, recordFlagDelete);
829         rec_rm(recpp);
830     }
831     zebra_mutex_unlock (&p->mutex);
832     *recpp = NULL;
833 }
834
835 void rec_put(Records p, Record *recpp)
836 {
837     Record *recp;
838
839     zebra_mutex_lock (&p->mutex);
840     if ((recp = rec_cache_lookup (p, (*recpp)->sysno, recordFlagWrite)))
841     {
842         rec_rm(recp);
843         *recp = *recpp;
844     }
845     else
846     {
847         rec_cache_insert(p, *recpp, recordFlagWrite);
848         rec_rm(recpp);
849     }
850     zebra_mutex_unlock (&p->mutex);
851     *recpp = NULL;
852 }
853
854 void rec_rm(Record *recpp)
855 {
856     int i;
857
858     if (!*recpp)
859         return ;
860     for (i = 0; i < REC_NO_INFO; i++)
861         xfree ((*recpp)->info[i]);
862     xfree (*recpp);
863     *recpp = NULL;
864 }
865
866 Record rec_cp(Record rec)
867 {
868     Record n;
869     int i;
870
871     n = (Record) xmalloc (sizeof(*n));
872     n->sysno = rec->sysno;
873     for (i = 0; i < REC_NO_INFO; i++)
874         if (!rec->info[i])
875         {
876             n->info[i] = NULL;
877             n->size[i] = 0;
878         }
879         else
880         {
881             n->size[i] = rec->size[i];
882             n->info[i] = (char *) xmalloc (rec->size[i]);
883             memcpy(n->info[i], rec->info[i], rec->size[i]);
884         }
885     return n;
886 }
887
888
889 char *rec_strdup(const char *s, size_t *len)
890 {
891     char *p;
892
893     if (!s)
894     {
895         *len = 0;
896         return NULL;
897     }
898     *len = strlen(s)+1;
899     p = (char *) xmalloc (*len);
900     strcpy(p, s);
901     return p;
902 }
903