6c3145c4243e368fda6aa959e274cd2107a551c4
[idzebra-moved-to-github.git] / rset / rsmultiandor.c
1 /* $Id: rsmultiandor.c,v 1.26 2006-09-08 14:40:55 adam Exp $
2    Copyright (C) 1995-2006
3    Index Data ApS
4
5 This file is part of the Zebra server.
6
7 Zebra is free software; you can redistribute it and/or modify it under
8 the terms of the GNU General Public License as published by the Free
9 Software Foundation; either version 2, or (at your option) any later
10 version.
11
12 Zebra is distributed in the hope that it will be useful, but WITHOUT ANY
13 WARRANTY; without even the implied warranty of MERCHANTABILITY or
14 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
15 for more details.
16
17 You should have received a copy of the GNU General Public License
18 along with this program; if not, write to the Free Software
19 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
20
21 */
22
23
24 /**
25  * \file rsmultiandor.c
26  * \brief This module implements the rsmulti_or and rsmulti_and result sets
27  *
28  * rsmultior is based on a heap, from which we find the next hit.
29  *
30  * rsmultiand is based on a simple array of rsets, and a linear
31  * search to find the record that exists in all of those rsets.
32  * To speed things up, the array is sorted so that the smallest
33  * rsets come first, they are most likely to have the hits furthest
34  * away, and thus forwarding to them makes the most sense.
35  */
36
37
38 #include <assert.h>
39 #include <stdio.h>
40 #include <stdlib.h>
41 #include <string.h>
42
43 #include <idzebra/util.h>
44 #include <idzebra/isamc.h>
45 #include <rset.h>
46
47 static RSFD r_open_and (RSET ct, int flag);
48 static RSFD r_open_or (RSET ct, int flag);
49 static void r_close (RSFD rfd);
50 static void r_delete (RSET ct);
51 static int r_read_and (RSFD rfd, void *buf, TERMID *term);
52 static int r_read_or (RSFD rfd, void *buf, TERMID *term);
53 static int r_write (RSFD rfd, const void *buf);
54 static int r_forward_and(RSFD rfd, void *buf, TERMID *term,
55                      const void *untilbuf);
56 static int r_forward_or(RSFD rfd, void *buf, TERMID *term,
57                      const void *untilbuf);
58 static void r_pos (RSFD rfd, double *current, double *total);
59 static void r_get_terms(RSET ct, TERMID *terms, int maxterms, int *curterm);
60
61 static const struct rset_control control_or = 
62 {
63     "multi-or",
64     r_delete,
65     r_get_terms,
66     r_open_or,
67     r_close,
68     r_forward_or,
69     r_pos,
70     r_read_or,
71     r_write,
72 };
73
74 static const struct rset_control control_and = 
75 {
76     "multi-and",
77     r_delete,
78     r_get_terms,
79     r_open_and,
80     r_close,
81     r_forward_and,
82     r_pos,
83     r_read_and,
84     r_write,
85 };
86
87 /* The heap structure: 
88  * The rset contains a list or rsets we are ORing together 
89  * The rfd contains a heap of heap-items, which contain
90  * a rfd opened to those rsets, and a buffer for one key.
91  * They also contain a ptr to the rset list in the rset 
92  * itself, for practical reasons. 
93  */
94
95 struct heap_item {
96     RSFD fd;
97     void *buf;
98     RSET rset;
99     TERMID term;
100 };
101
102 struct heap {
103     int heapnum;
104     int heapmax;
105     const struct rset_key_control *kctrl;
106     struct heap_item **heap; /* ptrs to the rfd */
107 };
108 typedef struct heap *HEAP;
109
110
111 struct rset_private {
112     int dummy;
113 };
114
115
116 struct rfd_private {
117     int flag;
118     struct heap_item *items; /* we alloc and free them here */
119     HEAP h; /* and move around here */
120     zint hits; /* returned so far */
121     int eof; /* seen the end of it */
122     int tailcount; /* how many items are tailing */
123     zint segment;
124     int skip;
125     char *tailbits;
126 };
127
128 static int log_level = 0;
129 static int log_level_initialized = 0;
130
131
132 /* Heap functions ***********************/
133
134 #if 0
135 static void heap_dump_item( HEAP h, int i, int level)
136 {
137     double cur,tot;
138     if (i>h->heapnum)
139         return;
140     (void)rset_pos(h->heap[i]->rset,h->heap[i]->fd, &cur, &tot);
141     yaz_log(log_level," %d %*s i=%p buf=%p %0.1f/%0.1f",i, level, "",  
142                     &(h->heap[i]), h->heap[i]->buf, cur,tot );
143     heap_dump_item(h, 2*i, level+1);
144     heap_dump_item(h, 2*i+1, level+1);
145 }
146 static void heap_dump( HEAP h,char *msg) {
147     yaz_log(log_level, "heap dump: %s num=%d max=%d",msg, h->heapnum, h->heapmax);
148     heap_dump_item(h,1,1);
149 }
150 #endif
151
152 static void heap_swap (HEAP h, int x, int y)
153 {
154     struct heap_item *swap;
155     swap = h->heap[x];
156     h->heap[x] = h->heap[y];
157     h->heap[y] = swap;
158 }
159
160 static int heap_cmp(HEAP h, int x, int y)
161 {
162     return (*h->kctrl->cmp)(h->heap[x]->buf,h->heap[y]->buf);
163 }
164
165 static int heap_empty(HEAP h)
166 {
167     return ( 0==h->heapnum );
168 }
169
170 /** \brief deletes the first item in the heap, and balances the rest 
171  */
172 static void heap_delete (HEAP h)
173 {
174     int cur = 1, child = 2;
175     h->heap[1] = 0; /* been deleted */
176     heap_swap (h, 1, h->heapnum--);
177     while (child <= h->heapnum) {
178         if (child < h->heapnum && heap_cmp(h,child,1+child)>0 )
179             child++;
180         if (heap_cmp(h,cur,child) > 0)
181         {
182             heap_swap (h, cur, child);
183             cur = child;
184             child = 2*cur;
185         }
186         else
187             break;
188     }
189 }
190
191 /** \brief puts item into heap.
192     The heap root element has changed value (to bigger) 
193     Swap downwards until the heap is ordered again 
194 */
195 static void heap_balance (HEAP h)
196 {
197     int cur = 1, child = 2;
198     while (child <= h->heapnum) {
199         if (child < h->heapnum && heap_cmp(h,child,1+child)>0 )
200             child++;
201         if (heap_cmp(h,cur,child) > 0)
202         {
203             heap_swap (h, cur, child);
204             cur = child;
205             child = 2*cur;
206         }
207         else
208             break;
209     }
210 }
211
212
213 static void heap_insert (HEAP h, struct heap_item *hi)
214 {
215     int cur, parent;
216
217     cur = ++(h->heapnum);
218     assert(cur <= h->heapmax);
219     h->heap[cur] = hi;
220     parent = cur/2;
221     while (parent && (heap_cmp(h,parent,cur) > 0))
222     {
223         assert(parent>0);
224         heap_swap (h, cur, parent);
225         cur = parent;
226         parent = cur/2;
227     }
228 }
229
230
231 static
232 HEAP heap_create (NMEM nmem, int size, const struct rset_key_control *kctrl)
233 {
234     HEAP h = (HEAP) nmem_malloc (nmem, sizeof(*h));
235
236     ++size; /* heap array starts at 1 */
237     h->heapnum = 0;
238     h->heapmax = size;
239     h->kctrl = kctrl;
240     h->heap = (struct heap_item**) nmem_malloc(nmem,size*sizeof(*h->heap));
241     h->heap[0]=0; /* not used */
242     return h;
243 }
244
245 static void heap_clear( HEAP h)
246 {
247     assert(h);
248     h->heapnum = 0;
249 }
250
251 static void heap_destroy (HEAP h)
252 {
253     /* nothing to delete, all is nmem'd, and will go away in due time */
254 }
255
256 /** \brief compare and items for quicksort
257     used in qsort to get the multi-and args in optimal order
258     that is, those with fewest occurrences first
259 */
260 int compare_ands(const void *x, const void *y)
261 {   const struct heap_item *hx = x;
262     const struct heap_item *hy = y;
263     double cur, totx, toty;
264     rset_pos(hx->fd, &cur, &totx);
265     rset_pos(hy->fd, &cur, &toty);
266     if ( totx > toty +0.5 )
267         return 1;
268     if ( totx < toty -0.5 )
269         return -1;
270     return 0;  /* return totx - toty, except for overflows and rounding */
271 }
272
273 static RSET rsmulti_andor_create(NMEM nmem,
274                                  struct rset_key_control *kcontrol, 
275                                  int scope, TERMID termid,
276                                  int no_rsets, RSET* rsets, 
277                                  const struct rset_control *ctrl)
278 {
279     RSET rnew = rset_create_base(ctrl, nmem, kcontrol, scope, termid,
280                                  no_rsets, rsets);
281     struct rset_private *info;
282     if (!log_level_initialized)
283     {
284         log_level = yaz_log_module_level("rsmultiandor");
285         log_level_initialized = 1;
286     }
287     yaz_log(log_level, "rsmultiand_andor_create scope=%d", scope);
288     info = (struct rset_private *) nmem_malloc(rnew->nmem, sizeof(*info));
289     rnew->priv = info;
290     return rnew;
291 }
292
293 RSET rset_create_or(NMEM nmem, struct rset_key_control *kcontrol,
294                     int scope, TERMID termid, int no_rsets, RSET* rsets)
295 {
296     return rsmulti_andor_create(nmem, kcontrol, scope, termid,
297                                 no_rsets, rsets, &control_or);
298 }
299
300 RSET rset_create_and(NMEM nmem, struct rset_key_control *kcontrol,
301                      int scope, int no_rsets, RSET* rsets)
302 {
303     return rsmulti_andor_create(nmem, kcontrol, scope, 0,
304                                 no_rsets, rsets, &control_and);
305 }
306
307 static void r_delete (RSET ct)
308 {
309 }
310
311 static RSFD r_open_andor (RSET ct, int flag, int is_and)
312 {
313     RSFD rfd;
314     struct rfd_private *p;
315     const struct rset_key_control *kctrl = ct->keycontrol;
316     int i;
317
318     if (flag & RSETF_WRITE)
319     {
320         yaz_log (YLOG_FATAL, "multiandor set type is read-only");
321         return NULL;
322     }
323     rfd = rfd_create_base(ct);
324     if (rfd->priv) {
325         p = (struct rfd_private *)rfd->priv;
326         if (!is_and)
327             heap_clear(p->h);
328         assert(p->items);
329         /* all other pointers shouls already be allocated, in right sizes! */
330     }
331     else 
332     {
333         p = (struct rfd_private *) nmem_malloc (ct->nmem,sizeof(*p));
334         rfd->priv = p;
335         p->h = 0;
336         p->tailbits = 0;
337         if (is_and)
338             p->tailbits = nmem_malloc(ct->nmem, ct->no_children*sizeof(char) );
339         else 
340             p->h = heap_create( ct->nmem, ct->no_children, kctrl);
341         p->items = (struct heap_item *) 
342             nmem_malloc(ct->nmem, ct->no_children*sizeof(*p->items));
343         for (i = 0; i<ct->no_children; i++)
344         {
345             p->items[i].rset = ct->children[i];
346             p->items[i].buf = nmem_malloc(ct->nmem, kctrl->key_size);
347         }
348     }
349     p->flag = flag;
350     p->hits = 0;
351     p->eof = 0;
352     p->tailcount = 0;
353     if (is_and)
354     { /* read the array and sort it */
355         for (i = 0; i<ct->no_children; i++){
356             p->items[i].fd = rset_open(ct->children[i], RSETF_READ);
357             if (!rset_read(p->items[i].fd, p->items[i].buf, &p->items[i].term))
358                 p->eof = 1;
359             p->tailbits[i] = 0;
360         }
361         qsort(p->items, ct->no_children, sizeof(p->items[0]), compare_ands);
362     } 
363     else
364     { /* fill the heap for ORing */
365         for (i = 0; i<ct->no_children; i++){
366             p->items[i].fd = rset_open(ct->children[i],RSETF_READ);
367             if ( rset_read(p->items[i].fd, p->items[i].buf, &p->items[i].term))
368                 heap_insert(p->h, &(p->items[i]));
369         }
370     }
371     return rfd;
372 }
373
374 static RSFD r_open_or (RSET ct, int flag)
375 {
376     return r_open_andor(ct, flag, 0);
377 }
378
379 static RSFD r_open_and (RSET ct, int flag)
380 {
381     return r_open_andor(ct, flag, 1);
382 }
383
384
385 static void r_close (RSFD rfd)
386 {
387     struct rfd_private *p=(struct rfd_private *)(rfd->priv);
388     int i;
389
390     if (p->h)
391         heap_destroy (p->h);
392     for (i = 0; i<rfd->rset->no_children; i++) 
393         if (p->items[i].fd)
394             rset_close(p->items[i].fd);
395 }
396
397 static int r_forward_or(RSFD rfd, void *buf, 
398                         TERMID *term, const void *untilbuf)
399 { /* while heap head behind untilbuf, forward it and rebalance heap */
400     struct rfd_private *p = rfd->priv;
401     const struct rset_key_control *kctrl = rfd->rset->keycontrol;
402     if (heap_empty(p->h))
403         return 0;
404     while ( (*kctrl->cmp)(p->h->heap[1]->buf,untilbuf) < -rfd->rset->scope )
405     {
406         if (rset_forward(p->h->heap[1]->fd,p->h->heap[1]->buf,
407                          &p->h->heap[1]->term, untilbuf))
408             heap_balance(p->h);
409         else 
410         {
411             heap_delete(p->h);
412             if (heap_empty(p->h))
413                 return 0;
414         }
415
416     }
417     return r_read_or(rfd, buf, term);
418 }
419
420
421 /** \brief reads one item key from an 'or' set
422     \param rfd set handle
423     \param buf resulting item buffer
424     \param term resulting term
425     \retval 0 EOF
426     \retval 1 item could be read
427 */
428 static int r_read_or (RSFD rfd, void *buf, TERMID *term)
429 {
430     RSET rset = rfd->rset;
431     struct rfd_private *mrfd = rfd->priv;
432     const struct rset_key_control *kctrl = rset->keycontrol;
433     struct heap_item *it;
434     int rdres;
435     if (heap_empty(mrfd->h))
436         return 0;
437     it = mrfd->h->heap[1];
438     memcpy(buf, it->buf, kctrl->key_size);
439     if (term)
440     {
441         if (rset->term)
442             *term = rset->term;
443         else
444             *term = it->term;
445     }
446     (mrfd->hits)++;
447     rdres = rset_read(it->fd, it->buf, &it->term);
448     if ( rdres )
449         heap_balance(mrfd->h);
450     else
451         heap_delete(mrfd->h);
452     return 1;
453
454 }
455
456 /** \brief reads one item key from an 'and' set
457     \param rfd set handle
458     \param buf resulting item buffer
459     \param term resulting term
460     \retval 0 EOF
461     \retval 1 item could be read
462     
463     Has to return all hits where each item points to the
464     same sysno (scope), in order. Keep an extra key (hitkey)
465     as long as all records do not point to hitkey, forward
466     them, and update hitkey to be the highest seen so far.
467     (if any item eof's, mark eof, and return 0 thereafter)
468     Once a hit has been found, scan all items for the smallest
469     value. Mark all as being in the tail. Read next from that
470     item, and if not in the same record, clear its tail bit
471 */
472 static int r_read_and (RSFD rfd, void *buf, TERMID *term)
473 {   struct rfd_private *p = rfd->priv;
474     RSET ct = rfd->rset;
475     const struct rset_key_control *kctrl = ct->keycontrol;
476     int i;
477
478     while (1) {
479         if (p->tailcount) 
480         { /* we are tailing, find lowest tail and return it */
481             int mintail = -1;
482             int cmp;
483                  
484             for (i = 0; i<ct->no_children; i++)
485             {
486                 if (p->tailbits[i])
487                 {
488                     if (mintail >= 0)
489                         cmp = (*kctrl->cmp)
490                             (p->items[i].buf, p->items[mintail].buf);
491                     else
492                         cmp = -1;
493                     if (cmp < 0)
494                         mintail = i;
495
496                     if (kctrl->get_segment)
497                     {   /* segments enabled */
498                         zint segment =  kctrl->get_segment(p->items[i].buf);
499                         /* store segment if not stored already */
500                         if (!p->segment && segment)
501                             p->segment = segment;
502                         
503                         /* skip rest entirely if segments don't match */
504                         if (p->segment && segment && p->segment != segment)
505                             p->skip = 1;
506                     }
507                 }
508             }
509             /* return the lowest tail */
510             memcpy(buf, p->items[mintail].buf, kctrl->key_size); 
511             if (term)
512                 *term = p->items[mintail].term;
513             if (!rset_read(p->items[mintail].fd, p->items[mintail].buf,
514                            &p->items[mintail].term))
515             {
516                 p->eof = 1; /* game over, once tails have been returned */
517                 p->tailbits[mintail] = 0; 
518                 (p->tailcount)--;
519             }
520             else
521             {
522                 /* still a tail? */
523                 cmp = (*kctrl->cmp)(p->items[mintail].buf,buf);
524                 if (cmp >= rfd->rset->scope)
525                 {
526                     p->tailbits[mintail] = 0;
527                     (p->tailcount)--;
528                 }
529             }
530             if (p->skip)
531                 continue;  /* skip again.. eventually tailcount will be 0 */
532             (p->hits)++;
533             return 1;
534         } 
535         /* not tailing, forward until all records match, and set up */
536         /* as tails. the earlier 'if' will then return the hits */
537         if (p->eof)
538             return 0; /* nothing more to see */
539         i = 1; /* assume items[0] is highest up */
540         while (i < ct->no_children) 
541         {
542             int cmp = (*kctrl->cmp)(p->items[0].buf, p->items[i].buf);
543             if (cmp <= -rfd->rset->scope) { /* [0] was behind, forward it */
544                 if (!rset_forward(p->items[0].fd, p->items[0].buf, 
545                                   &p->items[0].term, p->items[i].buf))
546                 {
547                     p->eof = 1; /* game over */
548                     return 0;
549                 }
550                 i = 0; /* start forwarding from scratch */
551             } 
552             else if (cmp>=rfd->rset->scope)
553             { /* [0] was ahead, forward i */
554                 if (!rset_forward(p->items[i].fd, p->items[i].buf, 
555                                   &p->items[i].term, p->items[0].buf))
556                 {
557                     p->eof = 1; /* game over */
558                     return 0;
559                 }
560             } 
561             else
562                 i++;
563         } /* while i */
564         /* if we get this far, all rsets are now within +- scope of [0] */
565         /* ergo, we have a hit. Mark them all as tailing, and let the */
566         /* upper 'if' return the hits in right order */
567         for (i = 0; i < ct->no_children; i++)
568             p->tailbits[i] = 1;
569         p->tailcount = ct->no_children;
570         p->segment = 0;
571         p->skip = 0;
572     } /* while 1 */
573 }
574
575
576 static int r_forward_and(RSFD rfd, void *buf, TERMID *term, 
577                          const void *untilbuf)
578
579     struct rfd_private *p = rfd->priv;
580     RSET ct = rfd->rset;
581     const struct rset_key_control *kctrl = ct->keycontrol;
582     int i;
583     int cmp;
584     int killtail = 0;
585
586     for (i = 0; i<ct->no_children; i++)
587     {
588         cmp = (*kctrl->cmp)(p->items[i].buf,untilbuf);
589         if (cmp <= -rfd->rset->scope)
590         {
591             killtail = 1; /* we are moving to a different hit */
592             if (!rset_forward(p->items[i].fd, p->items[i].buf, 
593                               &p->items[i].term, untilbuf))
594             {
595                 p->eof = 1; /* game over */
596                 p->tailcount = 0;
597                 return 0;
598             }
599         }
600     }
601     if (killtail) 
602     {
603         for (i = 0; i<ct->no_children; i++)
604             p->tailbits[i] = 0;
605         p->tailcount = 0;
606     }
607     return r_read_and(rfd,buf,term);
608 }
609
610 static void r_pos (RSFD rfd, double *current, double *total)
611 {
612     RSET ct = rfd->rset;
613     struct rfd_private *mrfd = 
614         (struct rfd_private *)(rfd->priv);
615     double cur, tot;
616     double scur = 0.0, stot = 0.0;
617     int i;
618     for (i = 0; i<ct->no_children; i++){
619         rset_pos(mrfd->items[i].fd, &cur, &tot);
620         yaz_log(log_level, "r_pos: %d %0.1f %0.1f", i, cur,tot); 
621         scur += cur;
622         stot += tot;
623     }
624     if (stot < 1.0) { /* nothing there */
625         *current = 0;
626         *total = 0;
627         yaz_log(log_level, "r_pos: NULL  %0.1f %0.1f",  *current, *total);
628     }
629     else
630     {
631         *current = (double) (mrfd->hits);
632         *total = *current*stot/scur;
633         yaz_log(log_level, "r_pos: =  %0.1f %0.1f",  *current, *total);
634     }
635 }
636
637 static int r_write (RSFD rfd, const void *buf)
638 {
639     yaz_log (YLOG_FATAL, "multior set type is read-only");
640     return -1;
641 }
642
643 static void r_get_terms(RSET ct, TERMID *terms, int maxterms, int *curterm)
644 {
645     if (ct->term)
646         rset_get_one_term(ct, terms, maxterms, curterm);
647     else
648     {
649         /* Special case: Some multi-ors have all terms pointing to the same 
650            term. We do not want to duplicate those. Other multiors (and ands)
651            have different terms under them. Those we want. 
652         */
653         int firstterm= *curterm;
654         int i;
655
656         for (i = 0; i<ct->no_children; i++)
657         {
658             rset_getterms(ct->children[i], terms, maxterms, curterm);
659             if ( ( *curterm > firstterm+1 ) &&
660                  ( *curterm <= maxterms ) &&
661                  ( terms[(*curterm)-1] == terms[firstterm] ) 
662                 )
663                 (*curterm)--; /* forget the term, seen that before */
664         }
665     }
666 }
667
668
669 /*
670  * Local variables:
671  * c-basic-offset: 4
672  * indent-tabs-mode: nil
673  * End:
674  * vim: shiftwidth=4 tabstop=8 expandtab
675  */
676