Changed assert in isamb, since we have larger keys
[idzebra-moved-to-github.git] / index / sortidx.c
1 /* $Id: sortidx.c,v 1.21 2006-12-19 00:25:41 adam Exp $
2    Copyright (C) 1995-2006
3    Index Data ApS
4
5 This file is part of the Zebra server.
6
7 Zebra is free software; you can redistribute it and/or modify it under
8 the terms of the GNU General Public License as published by the Free
9 Software Foundation; either version 2, or (at your option) any later
10 version.
11
12 Zebra is distributed in the hope that it will be useful, but WITHOUT ANY
13 WARRANTY; without even the implied warranty of MERCHANTABILITY or
14 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
15 for more details.
16
17 You should have received a copy of the GNU General Public License
18 along with this program; if not, write to the Free Software
19 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
20
21 */
22
23
24 #include <assert.h> 
25 #include <string.h>
26
27 #include <yaz/log.h>
28 #include <yaz/xmalloc.h>
29 #include <idzebra/isamb.h>
30 #include <idzebra/bfile.h>
31 #include <sortidx.h>
32 #include "recindex.h"
33
34 #define SORT_MAX_TERM 110
35
36 #define SORT_IDX_BLOCKSIZE 64
37
38 struct sort_term {
39     zint sysno;
40     char term[SORT_MAX_TERM];
41 };
42
43
44 static void sort_term_log_item(int level, const void *b, const char *txt)
45 {
46     struct sort_term a1;
47
48     memcpy(&a1, b, sizeof(a1));
49
50     yaz_log(level, "%s " ZINT_FORMAT " %s", txt, a1.sysno, a1.term);
51 }
52
53 int sort_term_compare(const void *a, const void *b)
54 {
55     struct sort_term a1, b1;
56
57     memcpy(&a1, a, sizeof(a1));
58     memcpy(&b1, b, sizeof(b1));
59
60     if (a1.sysno > b1.sysno)
61         return 1;
62     else if (a1.sysno < b1.sysno)
63         return -1;
64     return 0;
65 }
66
67 void *sort_term_code_start(void)
68 {
69     return 0;
70 }
71
72 void sort_term_encode(void *p, char **dst, const char **src)
73 {
74     struct sort_term a1;
75
76     memcpy(&a1, *src, sizeof(a1));
77     *src += sizeof(a1);
78
79     zebra_zint_encode(dst, a1.sysno); /* encode record id */
80     strcpy(*dst, a1.term); /* then sort term, 0 terminated */
81     *dst += strlen(a1.term) + 1;
82 }
83
84 void sort_term_decode(void *p, char **dst, const char **src)
85 {
86     struct sort_term a1;
87
88     zebra_zint_decode(src, &a1.sysno);
89
90     strcpy(a1.term, *src);
91     *src += strlen(a1.term) + 1;
92
93     memcpy(*dst, &a1, sizeof(a1));
94     *dst += sizeof(a1);
95 }
96
97 void sort_term_code_reset(void *p)
98 {
99 }
100
101 void sort_term_code_stop(void *p)
102 {
103 }
104
105
106 struct sort_term_stream {
107     int no;
108     int insert_flag;
109     struct sort_term st;
110 };
111
112 int sort_term_code_read(void *vp, char **dst, int *insertMode)
113 {
114     struct sort_term_stream *s = (struct sort_term_stream *) vp;
115
116     if (s->no == 0)
117         return 0;
118
119     (s->no)--;
120     
121     *insertMode = s->insert_flag;
122     memcpy(*dst, &s->st, sizeof(s->st));
123     *dst += sizeof(s->st);
124     return 1;
125 }
126
127         
128 struct sortFileHead {
129     zint sysno_max;
130 };
131
132 struct sortFile {
133     int id;
134     union {
135         BFile bf;
136         ISAMB isamb;
137     } u;
138     ISAM_P isam_p;
139     ISAMB_PP isam_pp;
140     struct sortFile *next;
141     struct sortFileHead head;
142     int no_inserted;
143     int no_deleted;
144 };
145
146 struct zebra_sort_index {
147     BFiles bfs;
148     int write_flag;
149     zint sysno;
150     int type;
151     char *entry_buf;
152     struct sortFile *current_file;
153     struct sortFile *files;
154 };
155
156 zebra_sort_index_t zebra_sort_open(BFiles bfs, int write_flag, int type)
157 {
158     zebra_sort_index_t si = (zebra_sort_index_t) xmalloc(sizeof(*si));
159     si->bfs = bfs;
160     si->write_flag = write_flag;
161     si->current_file = NULL;
162     si->files = NULL;
163     si->type = type;
164     si->entry_buf = (char *) xmalloc(SORT_IDX_ENTRYSIZE);
165     return si;
166 }
167
168 void zebra_sort_close(zebra_sort_index_t si)
169 {
170     struct sortFile *sf = si->files;
171     while (sf)
172     {
173         struct sortFile *sf_next = sf->next;
174         switch(si->type)
175         {
176         case ZEBRA_SORT_TYPE_FLAT:
177             bf_close(sf->u.bf);
178             break;
179         case ZEBRA_SORT_TYPE_ISAMB:
180             if (sf->isam_pp)
181                 isamb_pp_close(sf->isam_pp);
182             isamb_set_root_ptr(sf->u.isamb, sf->isam_p);
183             isamb_close(sf->u.isamb);
184             break;
185         }
186         xfree(sf);
187         sf = sf_next;
188     }
189     xfree(si->entry_buf);
190     xfree(si);
191 }
192
193 int zebra_sort_type(zebra_sort_index_t si, int id)
194 {
195     int isam_block_size = 4096;
196     ISAMC_M method;
197     char fname[80];
198     struct sortFile *sf;
199     if (si->current_file && si->current_file->id == id)
200         return 0;
201     for (sf = si->files; sf; sf = sf->next)
202         if (sf->id == id)
203         {
204             si->current_file = sf;
205             return 0;
206         }
207     sf = (struct sortFile *) xmalloc(sizeof(*sf));
208     sf->id = id;
209
210     method.compare_item = sort_term_compare;
211     method.log_item = sort_term_log_item;
212     method.codec.start = sort_term_code_start;
213     method.codec.encode = sort_term_encode;
214     method.codec.decode = sort_term_decode;
215     method.codec.reset = sort_term_code_reset;
216     method.codec.stop = sort_term_code_stop;
217    
218     switch(si->type)
219     {
220     case ZEBRA_SORT_TYPE_FLAT:
221         sf->u.bf = NULL;
222         sprintf(fname, "sort%d", id);
223         yaz_log(YLOG_DEBUG, "sort idx %s wr=%d", fname, si->write_flag);
224         sf->u.bf = bf_open(si->bfs, fname, SORT_IDX_BLOCKSIZE, si->write_flag);
225         if (!sf->u.bf)
226         {
227             xfree(sf);
228             return -1;
229         }
230         if (!bf_read(sf->u.bf, 0, 0, sizeof(sf->head), &sf->head))
231         {
232             sf->head.sysno_max = 0;
233             if (!si->write_flag)
234             {
235                 bf_close(sf->u.bf);
236                 xfree(sf);
237                 return -1;
238             }
239         }
240         break;
241     case ZEBRA_SORT_TYPE_ISAMB:
242         sprintf(fname, "sortb%d", id);
243         
244         sf->u.isamb = isamb_open2(si->bfs, fname, si->write_flag, &method,
245                                   /* cache */ 0,
246                                   /* no_cat */ 1, &isam_block_size,
247                                   /* use_root_ptr */ 1);
248         if (!sf->u.isamb)
249         {
250             xfree(sf);
251             return -1;
252         }
253         else
254         {
255             sf->isam_p = isamb_get_root_ptr(sf->u.isamb);
256             sf->isam_pp = 0;
257         }
258         break;
259     }
260     sf->no_inserted = 0;
261     sf->no_deleted = 0;
262     sf->next = si->files;
263     si->current_file = si->files = sf;
264     return 0;
265 }
266
267 void zebra_sort_sysno(zebra_sort_index_t si, zint sysno)
268 {
269     struct sortFile *sf = si->current_file;
270     zint new_sysno = rec_sysno_to_int(sysno);
271
272     for (sf = si->files; sf; sf = sf->next)
273     {
274         sf->no_inserted = 0;
275         sf->no_deleted = 0;
276         if (new_sysno < si->sysno && sf->isam_pp)
277         {
278             isamb_pp_close(sf->isam_pp);
279             sf->isam_pp = 0;
280         }
281     }
282     si->sysno = new_sysno;
283 }
284
285
286 void zebra_sort_delete(zebra_sort_index_t si)
287 {
288     struct sortFile *sf = si->current_file;
289
290     if (!sf || !sf->u.bf)
291         return;
292     switch(si->type)
293     {
294     case ZEBRA_SORT_TYPE_FLAT:
295         zebra_sort_add(si, "", 0);
296         break;
297     case ZEBRA_SORT_TYPE_ISAMB:
298         assert(sf->u.isamb);
299         if (sf->no_deleted == 0)
300         {
301             struct sort_term_stream s;
302             ISAMC_I isamc_i;
303
304             s.st.sysno = si->sysno;
305             s.st.term[0] = '\0';
306             
307             s.no = 1;
308             s.insert_flag = 0;
309             isamc_i.clientData = &s;
310             isamc_i.read_item = sort_term_code_read;
311             
312             isamb_merge(sf->u.isamb, &sf->isam_p, &isamc_i);
313             sf->no_deleted++;
314         }
315         break;
316     }
317 }
318
319 void zebra_sort_add(zebra_sort_index_t si, const char *buf, int len)
320 {
321     struct sortFile *sf = si->current_file;
322
323     if (!sf || !sf->u.bf)
324         return;
325     switch(si->type)
326     {
327     case ZEBRA_SORT_TYPE_FLAT:
328         if (len > SORT_IDX_ENTRYSIZE)
329         {
330             len = SORT_IDX_ENTRYSIZE;
331             memcpy(si->entry_buf, buf, len);
332         }
333         else
334         {
335             memcpy(si->entry_buf, buf, len);
336             memset(si->entry_buf+len, 0, SORT_IDX_ENTRYSIZE-len);
337         }
338         bf_write(sf->u.bf, si->sysno+1, 0, 0, si->entry_buf);
339         break;
340     case ZEBRA_SORT_TYPE_ISAMB:
341         assert(sf->u.isamb);
342         if (sf->no_inserted == 0)
343         {
344             struct sort_term_stream s;
345             ISAMC_I isamc_i;
346
347             s.st.sysno = si->sysno;
348             if (len >= SORT_MAX_TERM)
349                 len = SORT_MAX_TERM-1;
350             memcpy(s.st.term, buf, len);
351             s.st.term[len] = '\0';
352             s.no = 1;
353             s.insert_flag = 1;
354             isamc_i.clientData = &s;
355             isamc_i.read_item = sort_term_code_read;
356             
357             isamb_merge(sf->u.isamb, &sf->isam_p, &isamc_i);
358             sf->no_inserted++;
359         }
360         break;
361     }
362 }
363
364 void zebra_sort_read(zebra_sort_index_t si, char *buf)
365 {
366     int r;
367     struct sortFile *sf = si->current_file;
368
369     assert(sf);
370
371     switch(si->type)
372     {
373     case ZEBRA_SORT_TYPE_FLAT:
374         r = bf_read(sf->u.bf, si->sysno+1, 0, 0, buf);
375         if (!r)
376             memset(buf, 0, SORT_IDX_ENTRYSIZE);
377         break;
378     case ZEBRA_SORT_TYPE_ISAMB:
379         memset(buf, 0, SORT_IDX_ENTRYSIZE);
380         assert(sf->u.bf);
381         if (sf->u.bf)
382         {
383             struct sort_term st, st_untilbuf;
384
385             st.sysno = 99999;
386             if (!sf->isam_pp)
387             {
388                 yaz_log(YLOG_LOG, "isamb_pp_open " ZINT_FORMAT, sf->isam_p);
389                 sf->isam_pp = isamb_pp_open(sf->u.isamb, sf->isam_p, 1);
390             }
391             if (!sf->isam_pp)
392                 return;
393
394 #if 0
395             while (1)
396             {
397                 r = isamb_pp_read(sf->isam_pp, &st);
398                 if (!r)
399                     break;
400                 if (st.sysno == si->sysno)
401                     break;
402                 yaz_log(YLOG_LOG, "Received sysno=" ZINT_FORMAT " looking for "
403                         ZINT_FORMAT, st.sysno, si->sysno);
404             }
405 #else
406             st_untilbuf.sysno = si->sysno;
407             st_untilbuf.term[0] = '\0';
408             r = isamb_pp_forward(sf->isam_pp, &st, &st_untilbuf);
409             if (!r)
410                 return;
411 #endif
412             if (r)
413             {
414                 if (st.sysno != si->sysno)
415                 {
416                     yaz_log(YLOG_LOG, "Received sysno=" ZINT_FORMAT " looking for "
417                             ZINT_FORMAT, st.sysno, si->sysno);
418                     return;
419                 }
420                 if (strlen(st.term) < SORT_IDX_ENTRYSIZE)
421                     strcpy(buf, st.term);
422                 else
423                     memcpy(buf, st.term, SORT_IDX_ENTRYSIZE);
424             }
425         }
426         break;
427     }
428 }
429 /*
430  * Local variables:
431  * c-basic-offset: 4
432  * indent-tabs-mode: nil
433  * End:
434  * vim: shiftwidth=4 tabstop=8 expandtab
435  */
436