More work on multi-map for sort
[idzebra-moved-to-github.git] / index / sortidx.c
1 /* This file is part of the Zebra server.
2    Copyright (C) 1995-2008 Index Data
3
4 Zebra is free software; you can redistribute it and/or modify it under
5 the terms of the GNU General Public License as published by the Free
6 Software Foundation; either version 2, or (at your option) any later
7 version.
8
9 Zebra is distributed in the hope that it will be useful, but WITHOUT ANY
10 WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12 for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
17
18 */
19
20
21 #include <assert.h> 
22 #include <string.h>
23
24 #include <yaz/log.h>
25 #include <yaz/xmalloc.h>
26 #include <idzebra/isamb.h>
27 #include <idzebra/bfile.h>
28 #include <sortidx.h>
29 #include "recindex.h"
30
31 #define SORT_MAX_TERM 110
32 #define SORT_MAX_MULTI 4096
33
34 #define SORT_IDX_BLOCKSIZE 64
35
36 struct sort_term {
37     zint sysno;
38     zint length;
39     char term[SORT_MAX_MULTI];
40 };
41
42
43 static void sort_term_log_item(int level, const void *b, const char *txt)
44 {
45     struct sort_term a1;
46
47     memcpy(&a1, b, sizeof(a1));
48
49     yaz_log(level, "%s " ZINT_FORMAT " %.*s", txt, a1.sysno, 
50             (int) a1.length, a1.term);
51 }
52
53 static int sort_term_compare(const void *a, const void *b)
54 {
55     struct sort_term a1, b1;
56
57     memcpy(&a1, a, sizeof(a1));
58     memcpy(&b1, b, sizeof(b1));
59
60     if (a1.sysno > b1.sysno)
61         return 1;
62     else if (a1.sysno < b1.sysno)
63         return -1;
64     return 0;
65 }
66
67 static void *sort_term_code_start(void)
68 {
69     return 0;
70 }
71
72 static void sort_term_encode1(void *p, char **dst, const char **src)
73 {
74     struct sort_term a1;
75
76     memcpy(&a1, *src, sizeof(a1));
77     *src += sizeof(a1);
78
79     zebra_zint_encode(dst, a1.sysno); /* encode record id */
80     strcpy(*dst, a1.term); /* then sort term, 0 terminated */
81     *dst += strlen(a1.term) + 1;
82 }
83
84 static void sort_term_encode2(void *p, char **dst, const char **src)
85 {
86     struct sort_term a1;
87
88     memcpy(&a1, *src, sizeof(a1));
89     *src += sizeof(a1);
90
91     zebra_zint_encode(dst, a1.sysno); /* encode record id */
92     zebra_zint_encode(dst, a1.length); /* encode length */
93     memcpy(*dst, a1.term, a1.length);
94     *dst += a1.length;
95 }
96
97 static void sort_term_decode1(void *p, char **dst, const char **src)
98 {
99     struct sort_term a1;
100     size_t slen;
101
102     zebra_zint_decode(src, &a1.sysno);
103
104     strcpy(a1.term, *src);
105     slen = strlen(a1.term);
106     *src += slen + 1;
107     a1.length = slen;
108
109     memcpy(*dst, &a1, sizeof(a1));
110     *dst += sizeof(a1);
111 }
112
113 static void sort_term_decode2(void *p, char **dst, const char **src)
114 {
115     struct sort_term a1;
116
117     zebra_zint_decode(src, &a1.sysno);
118     zebra_zint_decode(src, &a1.length);
119
120     memcpy(a1.term, *src, a1.length);
121     *src += a1.length;
122
123     memcpy(*dst, &a1, sizeof(a1));
124     *dst += sizeof(a1);
125 }
126
127 static void sort_term_code_reset(void *p)
128 {
129 }
130
131 static void sort_term_code_stop(void *p)
132 {
133 }
134
135 struct sort_term_stream {
136     int no;
137     int insert_flag;
138     struct sort_term st;
139 };
140
141 static int sort_term_code_read(void *vp, char **dst, int *insertMode)
142 {
143     struct sort_term_stream *s = (struct sort_term_stream *) vp;
144
145     if (s->no == 0)
146         return 0;
147
148     (s->no)--;
149     
150     *insertMode = s->insert_flag;
151     memcpy(*dst, &s->st, sizeof(s->st));
152     *dst += sizeof(s->st);
153     return 1;
154 }
155
156 struct sortFileHead {
157     zint sysno_max;
158 };
159
160 struct sortFile {
161     int id;
162     union {
163         BFile bf;
164         ISAMB isamb;
165     } u;
166     ISAM_P isam_p;
167     ISAMB_PP isam_pp;
168     struct sortFile *next;
169     struct sortFileHead head;
170     int no_inserted;
171     int no_deleted;
172 };
173
174 struct zebra_sort_index {
175     BFiles bfs;
176     int write_flag;
177     zint sysno;
178     int type;
179     char *entry_buf;
180     struct sortFile *current_file;
181     struct sortFile *files;
182 };
183
184 zebra_sort_index_t zebra_sort_open(BFiles bfs, int write_flag, int type)
185 {
186     zebra_sort_index_t si = (zebra_sort_index_t) xmalloc(sizeof(*si));
187     si->bfs = bfs;
188     si->write_flag = write_flag;
189     si->current_file = NULL;
190     si->files = NULL;
191     si->type = type;
192     si->entry_buf = (char *) xmalloc(SORT_IDX_ENTRYSIZE);
193     return si;
194 }
195
196 void zebra_sort_close(zebra_sort_index_t si)
197 {
198     struct sortFile *sf = si->files;
199     while (sf)
200     {
201         struct sortFile *sf_next = sf->next;
202         switch(si->type)
203         {
204         case ZEBRA_SORT_TYPE_FLAT:
205             bf_close(sf->u.bf);
206             break;
207         case ZEBRA_SORT_TYPE_ISAMB:
208         case ZEBRA_SORT_TYPE_MULTI:
209             if (sf->isam_pp)
210                 isamb_pp_close(sf->isam_pp);
211             isamb_set_root_ptr(sf->u.isamb, sf->isam_p);
212             isamb_close(sf->u.isamb);
213             break;
214         }
215         xfree(sf);
216         sf = sf_next;
217     }
218     xfree(si->entry_buf);
219     xfree(si);
220 }
221
222 int zebra_sort_type(zebra_sort_index_t si, int id)
223 {
224     int isam_block_size = 4096;
225
226     ISAMC_M method;
227     char fname[80];
228     struct sortFile *sf;
229
230     method.compare_item = sort_term_compare;
231     method.log_item = sort_term_log_item;
232     method.codec.reset = sort_term_code_reset;
233     method.codec.start = sort_term_code_start;
234     method.codec.stop = sort_term_code_stop;
235
236     if (si->current_file && si->current_file->id == id)
237         return 0;
238     for (sf = si->files; sf; sf = sf->next)
239         if (sf->id == id)
240         {
241             si->current_file = sf;
242             return 0;
243         }
244     sf = (struct sortFile *) xmalloc(sizeof(*sf));
245     sf->id = id;
246
247     switch(si->type)
248     {
249     case ZEBRA_SORT_TYPE_FLAT:
250         sf->u.bf = NULL;
251         sprintf(fname, "sort%d", id);
252         yaz_log(YLOG_DEBUG, "sort idx %s wr=%d", fname, si->write_flag);
253         sf->u.bf = bf_open(si->bfs, fname, SORT_IDX_BLOCKSIZE, si->write_flag);
254         if (!sf->u.bf)
255         {
256             xfree(sf);
257             return -1;
258         }
259         if (!bf_read(sf->u.bf, 0, 0, sizeof(sf->head), &sf->head))
260         {
261             sf->head.sysno_max = 0;
262             if (!si->write_flag)
263             {
264                 bf_close(sf->u.bf);
265                 xfree(sf);
266                 return -1;
267             }
268         }
269         break;
270     case ZEBRA_SORT_TYPE_ISAMB:
271         method.codec.encode = sort_term_encode1;
272         method.codec.decode = sort_term_decode1;
273         
274         sprintf(fname, "sortb%d", id);
275         sf->u.isamb = isamb_open2(si->bfs, fname, si->write_flag, &method,
276                                   /* cache */ 0,
277                                   /* no_cat */ 1, &isam_block_size,
278                                   /* use_root_ptr */ 1);
279         if (!sf->u.isamb)
280         {
281             xfree(sf);
282             return -1;
283         }
284         else
285         {
286             sf->isam_p = isamb_get_root_ptr(sf->u.isamb);
287         }
288         break;
289     case ZEBRA_SORT_TYPE_MULTI:
290         isam_block_size = 32768;
291         method.codec.encode = sort_term_encode2;
292         method.codec.decode = sort_term_decode2;
293         
294         sprintf(fname, "sortm%d", id);
295         sf->u.isamb = isamb_open2(si->bfs, fname, si->write_flag, &method,
296                                   /* cache */ 0,
297                                   /* no_cat */ 1, &isam_block_size,
298                                   /* use_root_ptr */ 1);
299         if (!sf->u.isamb)
300         {
301             xfree(sf);
302             return -1;
303         }
304         else
305         {
306             sf->isam_p = isamb_get_root_ptr(sf->u.isamb);
307         }
308         break;
309     }
310     sf->isam_pp = 0;
311     sf->no_inserted = 0;
312     sf->no_deleted = 0;
313     sf->next = si->files;
314     si->current_file = si->files = sf;
315     return 0;
316 }
317
318 void zebra_sort_sysno(zebra_sort_index_t si, zint sysno)
319 {
320     struct sortFile *sf = si->current_file;
321     zint new_sysno = rec_sysno_to_int(sysno);
322
323     for (sf = si->files; sf; sf = sf->next)
324     {
325         if (sf->no_inserted || sf->no_deleted)
326         {
327             isamb_pp_close(sf->isam_pp);
328             sf->isam_pp = 0;
329         }
330         else if (sf->isam_pp && new_sysno < si->sysno && sf->isam_pp)
331         {
332             isamb_pp_close(sf->isam_pp);
333             sf->isam_pp = 0;
334         }
335         sf->no_inserted = 0;
336         sf->no_deleted = 0;
337     }
338     si->sysno = new_sysno;
339 }
340
341
342 void zebra_sort_delete(zebra_sort_index_t si)
343 {
344     struct sortFile *sf = si->current_file;
345
346     if (!sf || !sf->u.bf)
347         return;
348     switch(si->type)
349     {
350     case ZEBRA_SORT_TYPE_FLAT:
351         zebra_sort_add(si, "", 0);
352         break;
353     case ZEBRA_SORT_TYPE_ISAMB:
354     case ZEBRA_SORT_TYPE_MULTI:
355         assert(sf->u.isamb);
356         if (sf->no_deleted == 0)
357         {
358             struct sort_term_stream s;
359             ISAMC_I isamc_i;
360
361             s.st.sysno = si->sysno;
362             s.st.length = 0;
363             s.st.term[0] = '\0';
364             
365             s.no = 1;
366             s.insert_flag = 0;
367             isamc_i.clientData = &s;
368             isamc_i.read_item = sort_term_code_read;
369             
370             isamb_merge(sf->u.isamb, &sf->isam_p, &isamc_i);
371             sf->no_deleted++;
372         }
373         break;
374     }
375 }
376
377 void zebra_sort_add_ent(zebra_sort_index_t si, struct zebra_sort_ent *ent)
378 {
379     struct sortFile *sf = si->current_file;
380     int len;
381
382     if (!sf || !sf->u.bf)
383         return;
384     switch(si->type)
385     {
386     case ZEBRA_SORT_TYPE_FLAT:
387         /* take first entry from wrbuf - itself is 0-terminated */
388         len = strlen(wrbuf_buf(ent->wrbuf));
389         if (len > SORT_IDX_ENTRYSIZE)
390             len = SORT_IDX_ENTRYSIZE;
391         
392         memcpy(si->entry_buf, wrbuf_buf(ent->wrbuf), len);
393         if (len < SORT_IDX_ENTRYSIZE-len)
394             memset(si->entry_buf+len, 0, SORT_IDX_ENTRYSIZE-len);
395         bf_write(sf->u.bf, si->sysno+1, 0, 0, si->entry_buf);
396         break;
397     case ZEBRA_SORT_TYPE_ISAMB:
398         assert(sf->u.isamb);
399
400         assert(sf->no_inserted == 0);
401         if (sf->no_inserted == 0)
402         {
403             struct sort_term_stream s;
404             ISAMC_I isamc_i;
405             /* take first entry from wrbuf - itself is 0-terminated */
406             len = strlen(wrbuf_buf(ent->wrbuf)); 
407
408             s.st.sysno = si->sysno;
409             if (len >= SORT_MAX_TERM)
410                 len = SORT_MAX_TERM-1;
411             memcpy(s.st.term, wrbuf_buf(ent->wrbuf), len);
412             s.st.term[len] = '\0';
413             s.st.length = len;
414             s.no = 1;
415             s.insert_flag = 1;
416             isamc_i.clientData = &s;
417             isamc_i.read_item = sort_term_code_read;
418             
419             isamb_merge(sf->u.isamb, &sf->isam_p, &isamc_i);
420             sf->no_inserted++;
421         }
422         break;
423     case ZEBRA_SORT_TYPE_MULTI:
424         assert(sf->u.isamb);
425         if (sf->no_inserted == 0)
426         {
427             struct sort_term_stream s;
428             ISAMC_I isamc_i;
429             len = wrbuf_len(ent->wrbuf);
430
431             s.st.sysno = si->sysno;
432             if (len >= SORT_MAX_MULTI)
433                 len = SORT_MAX_MULTI-1;
434             memcpy(s.st.term, wrbuf_buf(ent->wrbuf), len);
435             s.st.length = len;
436             s.no = 1;
437             s.insert_flag = 1;
438             isamc_i.clientData = &s;
439             isamc_i.read_item = sort_term_code_read;
440             
441             isamb_merge(sf->u.isamb, &sf->isam_p, &isamc_i);
442             sf->no_inserted++;
443         }
444         break;
445     }
446 }
447
448 void zebra_sort_add(zebra_sort_index_t si, const char *buf, int len)
449 {
450     struct sortFile *sf = si->current_file;
451
452     if (!sf || !sf->u.bf)
453         return;
454     switch(si->type)
455     {
456     case ZEBRA_SORT_TYPE_FLAT:
457         if (len > SORT_IDX_ENTRYSIZE)
458         {
459             len = SORT_IDX_ENTRYSIZE;
460             memcpy(si->entry_buf, buf, len);
461         }
462         else
463         {
464             memcpy(si->entry_buf, buf, len);
465             memset(si->entry_buf+len, 0, SORT_IDX_ENTRYSIZE-len);
466         }
467         bf_write(sf->u.bf, si->sysno+1, 0, 0, si->entry_buf);
468         break;
469     case ZEBRA_SORT_TYPE_ISAMB:
470         assert(sf->u.isamb);
471         if (sf->no_inserted == 0)
472         {
473             struct sort_term_stream s;
474             ISAMC_I isamc_i;
475
476             s.st.sysno = si->sysno;
477             if (len >= SORT_MAX_TERM)
478                 len = SORT_MAX_TERM-1;
479             memcpy(s.st.term, buf, len);
480             s.st.term[len] = '\0';
481             s.st.length = len;
482             s.no = 1;
483             s.insert_flag = 1;
484             isamc_i.clientData = &s;
485             isamc_i.read_item = sort_term_code_read;
486             
487             isamb_merge(sf->u.isamb, &sf->isam_p, &isamc_i);
488             sf->no_inserted++;
489         }
490         break;
491     case ZEBRA_SORT_TYPE_MULTI:
492         assert(sf->u.isamb);
493         if (sf->no_inserted == 0)
494         {
495             struct sort_term_stream s;
496             ISAMC_I isamc_i;
497
498             s.st.sysno = si->sysno;
499             if (len >= SORT_MAX_MULTI)
500                 len = SORT_MAX_MULTI-1;
501             memcpy(s.st.term, buf, len);
502             s.st.length = len;
503             s.no = 1;
504             s.insert_flag = 1;
505             isamc_i.clientData = &s;
506             isamc_i.read_item = sort_term_code_read;
507             
508             isamb_merge(sf->u.isamb, &sf->isam_p, &isamc_i);
509             sf->no_inserted++;
510         }
511         break;
512     }
513 }
514
515 int zebra_sort_read(zebra_sort_index_t si, WRBUF w)
516 {
517     int r;
518     struct sortFile *sf = si->current_file;
519     char tbuf[SORT_IDX_ENTRYSIZE];
520
521     assert(sf);
522     assert(sf->u.bf);
523
524     switch(si->type)
525     {
526     case ZEBRA_SORT_TYPE_FLAT:
527         r = bf_read(sf->u.bf, si->sysno+1, 0, 0, tbuf);
528         if (r && *tbuf)
529             wrbuf_puts(w, tbuf);
530         else
531             return 0;
532         break;
533     case ZEBRA_SORT_TYPE_ISAMB:
534     case ZEBRA_SORT_TYPE_MULTI:
535         if (!sf->isam_p)
536             return 0;
537         else
538         {
539             struct sort_term st, st_untilbuf;
540
541             if (!sf->isam_pp)
542                 sf->isam_pp = isamb_pp_open(sf->u.isamb, sf->isam_p, 1);
543             if (!sf->isam_pp)
544                 return 0;
545
546             st_untilbuf.sysno = si->sysno;
547             st_untilbuf.length = 0;
548             st_untilbuf.term[0] = '\0';
549             r = isamb_pp_forward(sf->isam_pp, &st, &st_untilbuf);
550             if (!r)
551                 return 0;
552             if (r)
553             {
554                 if (st.sysno != si->sysno)
555                 {
556                     yaz_log(YLOG_LOG, "Received sysno=" ZINT_FORMAT " looking for "
557                             ZINT_FORMAT, st.sysno, si->sysno);
558                     return 0;
559                 }
560                 wrbuf_write(w, st.term, st.length);
561             }
562         }
563         break;
564     }
565     return 1;
566 }
567 /*
568  * Local variables:
569  * c-basic-offset: 4
570  * indent-tabs-mode: nil
571  * End:
572  * vim: shiftwidth=4 tabstop=8 expandtab
573  */
574