Functional multi-value sort + tests
[idzebra-moved-to-github.git] / index / sortidx.c
1 /* This file is part of the Zebra server.
2    Copyright (C) 1995-2008 Index Data
3
4 Zebra is free software; you can redistribute it and/or modify it under
5 the terms of the GNU General Public License as published by the Free
6 Software Foundation; either version 2, or (at your option) any later
7 version.
8
9 Zebra is distributed in the hope that it will be useful, but WITHOUT ANY
10 WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12 for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
17
18 */
19
20
21 #include <assert.h> 
22 #include <string.h>
23
24 #include <yaz/log.h>
25 #include <yaz/xmalloc.h>
26 #include <idzebra/isamb.h>
27 #include <idzebra/bfile.h>
28 #include <sortidx.h>
29 #include "recindex.h"
30
31 #define SORT_MAX_TERM 110
32 #define SORT_MAX_MULTI 4096
33
34 #define SORT_IDX_BLOCKSIZE 64
35
36 struct sort_term {
37     zint sysno;
38     zint length;
39     char term[SORT_MAX_MULTI];
40 };
41
42
43 static void sort_term_log_item(int level, const void *b, const char *txt)
44 {
45     struct sort_term a1;
46
47     memcpy(&a1, b, sizeof(a1));
48
49     yaz_log(level, "%s " ZINT_FORMAT " %.*s", txt, a1.sysno, 
50             (int) a1.length-1, a1.term);
51 }
52
53 static int sort_term_compare(const void *a, const void *b)
54 {
55     struct sort_term a1, b1;
56
57     memcpy(&a1, a, sizeof(a1));
58     memcpy(&b1, b, sizeof(b1));
59
60     if (a1.sysno > b1.sysno)
61         return 1;
62     else if (a1.sysno < b1.sysno)
63         return -1;
64     return 0;
65 }
66
67 static void *sort_term_code_start(void)
68 {
69     return 0;
70 }
71
72 static void sort_term_encode1(void *p, char **dst, const char **src)
73 {
74     struct sort_term a1;
75
76     memcpy(&a1, *src, sizeof(a1));
77     *src += sizeof(a1);
78
79     zebra_zint_encode(dst, a1.sysno); /* encode record id */
80     strcpy(*dst, a1.term); /* then sort term, 0 terminated */
81     *dst += strlen(a1.term) + 1;
82 }
83
84 static void sort_term_encode2(void *p, char **dst, const char **src)
85 {
86     struct sort_term a1;
87
88     memcpy(&a1, *src, sizeof(a1));
89     *src += sizeof(a1);
90
91     zebra_zint_encode(dst, a1.sysno); /* encode record id */
92     zebra_zint_encode(dst, a1.length); /* encode length */
93     memcpy(*dst, a1.term, a1.length);
94     *dst += a1.length;
95 }
96
97 static void sort_term_decode1(void *p, char **dst, const char **src)
98 {
99     struct sort_term a1;
100     size_t slen;
101
102     zebra_zint_decode(src, &a1.sysno);
103
104     strcpy(a1.term, *src);
105     slen = 1 + strlen(a1.term);
106     *src += slen;
107     a1.length = slen;
108
109     memcpy(*dst, &a1, sizeof(a1));
110     *dst += sizeof(a1);
111 }
112
113 static void sort_term_decode2(void *p, char **dst, const char **src)
114 {
115     struct sort_term a1;
116
117     zebra_zint_decode(src, &a1.sysno);
118     zebra_zint_decode(src, &a1.length);
119
120     memcpy(a1.term, *src, a1.length);
121     *src += a1.length;
122
123     memcpy(*dst, &a1, sizeof(a1));
124     *dst += sizeof(a1);
125 }
126
127 static void sort_term_code_reset(void *p)
128 {
129 }
130
131 static void sort_term_code_stop(void *p)
132 {
133 }
134
135 struct sort_term_stream {
136     int no;
137     int insert_flag;
138     struct sort_term st;
139 };
140
141 static int sort_term_code_read(void *vp, char **dst, int *insertMode)
142 {
143     struct sort_term_stream *s = (struct sort_term_stream *) vp;
144
145     if (s->no == 0)
146         return 0;
147
148     (s->no)--;
149     
150     *insertMode = s->insert_flag;
151     memcpy(*dst, &s->st, sizeof(s->st));
152     *dst += sizeof(s->st);
153     return 1;
154 }
155
156 struct sortFileHead {
157     zint sysno_max;
158 };
159
160 struct sortFile {
161     int id;
162     union {
163         BFile bf;
164         ISAMB isamb;
165     } u;
166     ISAM_P isam_p;
167     ISAMB_PP isam_pp;
168     struct sortFile *next;
169     struct sortFileHead head;
170     int no_inserted;
171     int no_deleted;
172 };
173
174 struct zebra_sort_index {
175     BFiles bfs;
176     int write_flag;
177     zint sysno;
178     int type;
179     char *entry_buf;
180     struct sortFile *current_file;
181     struct sortFile *files;
182 };
183
184 zebra_sort_index_t zebra_sort_open(BFiles bfs, int write_flag, int type)
185 {
186     zebra_sort_index_t si = (zebra_sort_index_t) xmalloc(sizeof(*si));
187     si->bfs = bfs;
188     si->write_flag = write_flag;
189     si->current_file = NULL;
190     si->files = NULL;
191     si->type = type;
192     si->entry_buf = (char *) xmalloc(SORT_IDX_ENTRYSIZE);
193     return si;
194 }
195
196 void zebra_sort_close(zebra_sort_index_t si)
197 {
198     struct sortFile *sf = si->files;
199     while (sf)
200     {
201         struct sortFile *sf_next = sf->next;
202         switch(si->type)
203         {
204         case ZEBRA_SORT_TYPE_FLAT:
205             bf_close(sf->u.bf);
206             break;
207         case ZEBRA_SORT_TYPE_ISAMB:
208         case ZEBRA_SORT_TYPE_MULTI:
209             if (sf->isam_pp)
210                 isamb_pp_close(sf->isam_pp);
211             isamb_set_root_ptr(sf->u.isamb, sf->isam_p);
212             isamb_close(sf->u.isamb);
213             break;
214         }
215         xfree(sf);
216         sf = sf_next;
217     }
218     xfree(si->entry_buf);
219     xfree(si);
220 }
221
222 int zebra_sort_type(zebra_sort_index_t si, int id)
223 {
224     int isam_block_size = 4096;
225
226     ISAMC_M method;
227     char fname[80];
228     struct sortFile *sf;
229
230     method.compare_item = sort_term_compare;
231     method.log_item = sort_term_log_item;
232     method.codec.reset = sort_term_code_reset;
233     method.codec.start = sort_term_code_start;
234     method.codec.stop = sort_term_code_stop;
235
236     if (si->current_file && si->current_file->id == id)
237         return 0;
238     for (sf = si->files; sf; sf = sf->next)
239         if (sf->id == id)
240         {
241             si->current_file = sf;
242             return 0;
243         }
244     sf = (struct sortFile *) xmalloc(sizeof(*sf));
245     sf->id = id;
246
247     switch(si->type)
248     {
249     case ZEBRA_SORT_TYPE_FLAT:
250         sf->u.bf = NULL;
251         sprintf(fname, "sort%d", id);
252         yaz_log(YLOG_DEBUG, "sort idx %s wr=%d", fname, si->write_flag);
253         sf->u.bf = bf_open(si->bfs, fname, SORT_IDX_BLOCKSIZE, si->write_flag);
254         if (!sf->u.bf)
255         {
256             xfree(sf);
257             return -1;
258         }
259         if (!bf_read(sf->u.bf, 0, 0, sizeof(sf->head), &sf->head))
260         {
261             sf->head.sysno_max = 0;
262             if (!si->write_flag)
263             {
264                 bf_close(sf->u.bf);
265                 xfree(sf);
266                 return -1;
267             }
268         }
269         break;
270     case ZEBRA_SORT_TYPE_ISAMB:
271         method.codec.encode = sort_term_encode1;
272         method.codec.decode = sort_term_decode1;
273         
274         sprintf(fname, "sortb%d", id);
275         sf->u.isamb = isamb_open2(si->bfs, fname, si->write_flag, &method,
276                                   /* cache */ 0,
277                                   /* no_cat */ 1, &isam_block_size,
278                                   /* use_root_ptr */ 1);
279         if (!sf->u.isamb)
280         {
281             xfree(sf);
282             return -1;
283         }
284         else
285         {
286             sf->isam_p = isamb_get_root_ptr(sf->u.isamb);
287         }
288         break;
289     case ZEBRA_SORT_TYPE_MULTI:
290         isam_block_size = 32768;
291         method.codec.encode = sort_term_encode2;
292         method.codec.decode = sort_term_decode2;
293         
294         sprintf(fname, "sortm%d", id);
295         sf->u.isamb = isamb_open2(si->bfs, fname, si->write_flag, &method,
296                                   /* cache */ 0,
297                                   /* no_cat */ 1, &isam_block_size,
298                                   /* use_root_ptr */ 1);
299         if (!sf->u.isamb)
300         {
301             xfree(sf);
302             return -1;
303         }
304         else
305         {
306             sf->isam_p = isamb_get_root_ptr(sf->u.isamb);
307         }
308         break;
309     }
310     sf->isam_pp = 0;
311     sf->no_inserted = 0;
312     sf->no_deleted = 0;
313     sf->next = si->files;
314     si->current_file = si->files = sf;
315     return 0;
316 }
317
318 void zebra_sort_sysno(zebra_sort_index_t si, zint sysno)
319 {
320     struct sortFile *sf = si->current_file;
321     zint new_sysno = rec_sysno_to_int(sysno);
322
323     for (sf = si->files; sf; sf = sf->next)
324     {
325         if (sf->no_inserted || sf->no_deleted)
326         {
327             isamb_pp_close(sf->isam_pp);
328             sf->isam_pp = 0;
329         }
330         else if (sf->isam_pp && new_sysno < si->sysno && sf->isam_pp)
331         {
332             isamb_pp_close(sf->isam_pp);
333             sf->isam_pp = 0;
334         }
335         sf->no_inserted = 0;
336         sf->no_deleted = 0;
337     }
338     si->sysno = new_sysno;
339 }
340
341
342 void zebra_sort_delete(zebra_sort_index_t si)
343 {
344     struct sortFile *sf = si->current_file;
345
346     if (!sf || !sf->u.bf)
347         return;
348     switch(si->type)
349     {
350     case ZEBRA_SORT_TYPE_FLAT:
351         memset(si->entry_buf, 0, SORT_IDX_ENTRYSIZE);
352         bf_write(sf->u.bf, si->sysno+1, 0, 0, si->entry_buf);
353         break;
354     case ZEBRA_SORT_TYPE_ISAMB:
355     case ZEBRA_SORT_TYPE_MULTI:
356         assert(sf->u.isamb);
357         if (sf->no_deleted == 0)
358         {
359             struct sort_term_stream s;
360             ISAMC_I isamc_i;
361
362             s.st.sysno = si->sysno;
363             s.st.length = 0;
364             s.st.term[0] = '\0';
365             
366             s.no = 1;
367             s.insert_flag = 0;
368             isamc_i.clientData = &s;
369             isamc_i.read_item = sort_term_code_read;
370             
371             isamb_merge(sf->u.isamb, &sf->isam_p, &isamc_i);
372             sf->no_deleted++;
373         }
374         break;
375     }
376 }
377
378 void zebra_sort_add(zebra_sort_index_t si, WRBUF wrbuf)
379 {
380     struct sortFile *sf = si->current_file;
381     int len;
382
383     if (!sf || !sf->u.bf)
384         return;
385     switch(si->type)
386     {
387     case ZEBRA_SORT_TYPE_FLAT:
388         /* take first entry from wrbuf - itself is 0-terminated */
389         len = strlen(wrbuf_buf(wrbuf));
390         if (len > SORT_IDX_ENTRYSIZE)
391             len = SORT_IDX_ENTRYSIZE;
392         
393         memcpy(si->entry_buf, wrbuf_buf(wrbuf), len);
394         if (len < SORT_IDX_ENTRYSIZE-len)
395             memset(si->entry_buf+len, 0, SORT_IDX_ENTRYSIZE-len);
396         bf_write(sf->u.bf, si->sysno+1, 0, 0, si->entry_buf);
397         break;
398     case ZEBRA_SORT_TYPE_ISAMB:
399         assert(sf->u.isamb);
400
401         if (sf->no_inserted == 0)
402         {
403             struct sort_term_stream s;
404             ISAMC_I isamc_i;
405             /* take first entry from wrbuf - itself is 0-terminated */
406
407             len = wrbuf_len(wrbuf);
408             if (len > SORT_MAX_TERM)
409             {
410                 len = SORT_MAX_TERM;
411                 wrbuf_buf(wrbuf)[len-1] = '\0';
412             }
413             memcpy(s.st.term, wrbuf_buf(wrbuf), len);
414             s.st.length = len;
415             s.st.sysno = si->sysno;
416             s.no = 1;
417             s.insert_flag = 1;
418             isamc_i.clientData = &s;
419             isamc_i.read_item = sort_term_code_read;
420             
421             isamb_merge(sf->u.isamb, &sf->isam_p, &isamc_i);
422             sf->no_inserted++;
423         }
424         break;
425     case ZEBRA_SORT_TYPE_MULTI:
426         assert(sf->u.isamb);
427         if (sf->no_inserted == 0)
428         {
429             struct sort_term_stream s;
430             ISAMC_I isamc_i;
431             len = wrbuf_len(wrbuf);
432             if (len > SORT_MAX_MULTI)
433             {
434                 len = SORT_MAX_MULTI;
435                 wrbuf_buf(wrbuf)[len-1] = '\0';
436             }
437             memcpy(s.st.term, wrbuf_buf(wrbuf), len);
438             s.st.length = len;
439             s.st.sysno = si->sysno;
440             s.no = 1;
441             s.insert_flag = 1;
442             isamc_i.clientData = &s;
443             isamc_i.read_item = sort_term_code_read;
444             
445             isamb_merge(sf->u.isamb, &sf->isam_p, &isamc_i);
446             sf->no_inserted++;
447         }
448         break;
449     }
450 }
451
452
453 int zebra_sort_read(zebra_sort_index_t si, WRBUF w)
454 {
455     int r;
456     struct sortFile *sf = si->current_file;
457     char tbuf[SORT_IDX_ENTRYSIZE];
458
459     assert(sf);
460     assert(sf->u.bf);
461
462     switch(si->type)
463     {
464     case ZEBRA_SORT_TYPE_FLAT:
465         r = bf_read(sf->u.bf, si->sysno+1, 0, 0, tbuf);
466         if (r && *tbuf)
467         {
468             wrbuf_puts(w, tbuf);
469             wrbuf_putc(w, '\0');
470             return 1;
471         }
472         break;
473     case ZEBRA_SORT_TYPE_ISAMB:
474     case ZEBRA_SORT_TYPE_MULTI:
475         if (sf->isam_p)
476         {
477
478             if (!sf->isam_pp)
479                 sf->isam_pp = isamb_pp_open(sf->u.isamb, sf->isam_p, 1);
480             if (sf->isam_pp)
481             {
482                 struct sort_term st, st_untilbuf;
483
484                 st_untilbuf.sysno = si->sysno;
485                 st_untilbuf.length = 0;
486                 st_untilbuf.term[0] = '\0';
487                 r = isamb_pp_forward(sf->isam_pp, &st, &st_untilbuf);
488                 if (r && st.sysno == si->sysno)
489                 {
490                     wrbuf_write(w, st.term, st.length);
491                     return 1;
492                 }
493             }
494         }
495         break;
496     }
497     return 0;
498 }
499 /*
500  * Local variables:
501  * c-basic-offset: 4
502  * indent-tabs-mode: nil
503  * End:
504  * vim: shiftwidth=4 tabstop=8 expandtab
505  */
506