d1439797c3e4bbb9ae41565895ce4068e17c6747
[idzebra-moved-to-github.git] / rset / rsmultiandor.c
1 /* This file is part of the Zebra server.
2    Copyright (C) 1994-2009 Index Data
3
4 Zebra is free software; you can redistribute it and/or modify it under
5 the terms of the GNU General Public License as published by the Free
6 Software Foundation; either version 2, or (at your option) any later
7 version.
8
9 Zebra is distributed in the hope that it will be useful, but WITHOUT ANY
10 WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12 for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
17
18 */
19
20
21 /**
22  * \file rsmultiandor.c
23  * \brief This module implements the rsmulti_or and rsmulti_and result sets
24  *
25  * rsmultior is based on a heap, from which we find the next hit.
26  *
27  * rsmultiand is based on a simple array of rsets, and a linear
28  * search to find the record that exists in all of those rsets.
29  * To speed things up, the array is sorted so that the smallest
30  * rsets come first, they are most likely to have the hits furthest
31  * away, and thus forwarding to them makes the most sense.
32  */
33
34
35 #include <assert.h>
36 #include <stdio.h>
37 #include <stdlib.h>
38 #include <string.h>
39
40 #include <idzebra/util.h>
41 #include <idzebra/isamc.h>
42 #include <rset.h>
43
44 static RSFD r_open_and (RSET ct, int flag);
45 static RSFD r_open_or (RSET ct, int flag);
46 static void r_close (RSFD rfd);
47 static void r_delete (RSET ct);
48 static int r_read_and (RSFD rfd, void *buf, TERMID *term);
49 static int r_read_or (RSFD rfd, void *buf, TERMID *term);
50 static int r_write (RSFD rfd, const void *buf);
51 static int r_forward_and(RSFD rfd, void *buf, TERMID *term,
52                      const void *untilbuf);
53 static int r_forward_or(RSFD rfd, void *buf, TERMID *term,
54                      const void *untilbuf);
55 static void r_pos (RSFD rfd, double *current, double *total);
56 static void r_get_terms(RSET ct, TERMID *terms, int maxterms, int *curterm);
57
58 static const struct rset_control control_or = 
59 {
60     "multi-or",
61     r_delete,
62     r_get_terms,
63     r_open_or,
64     r_close,
65     r_forward_or,
66     r_pos,
67     r_read_or,
68     r_write,
69 };
70
71 static const struct rset_control control_and = 
72 {
73     "multi-and",
74     r_delete,
75     r_get_terms,
76     r_open_and,
77     r_close,
78     r_forward_and,
79     r_pos,
80     r_read_and,
81     r_write,
82 };
83
84 /* The heap structure: 
85  * The rset contains a list or rsets we are ORing together 
86  * The rfd contains a heap of heap-items, which contain
87  * a rfd opened to those rsets, and a buffer for one key.
88  * They also contain a ptr to the rset list in the rset 
89  * itself, for practical reasons. 
90  */
91
92 struct heap_item {
93     RSFD fd;
94     void *buf;
95     RSET rset;
96     TERMID term;
97 };
98
99 struct heap {
100     int heapnum;
101     int heapmax;
102     const struct rset_key_control *kctrl;
103     struct heap_item **heap; /* ptrs to the rfd */
104 };
105 typedef struct heap *HEAP;
106
107
108 struct rset_private {
109     int dummy;
110 };
111
112
113 struct rfd_private {
114     int flag;
115     struct heap_item *items; /* we alloc and free them here */
116     HEAP h; /* and move around here */
117     zint hits; /* returned so far */
118     int eof; /* seen the end of it */
119     int tailcount; /* how many items are tailing */
120     zint segment;
121     int skip;
122     char *tailbits;
123 };
124
125 static int log_level = 0;
126 static int log_level_initialized = 0;
127
128
129 /* Heap functions ***********************/
130
131 #if 0
132 static void heap_dump_item( HEAP h, int i, int level)
133 {
134     double cur,tot;
135     if (i>h->heapnum)
136         return;
137     (void)rset_pos(h->heap[i]->rset,h->heap[i]->fd, &cur, &tot);
138     yaz_log(log_level," %d %*s i=%p buf=%p %0.1f/%0.1f",i, level, "",  
139                     &(h->heap[i]), h->heap[i]->buf, cur,tot );
140     heap_dump_item(h, 2*i, level+1);
141     heap_dump_item(h, 2*i+1, level+1);
142 }
143 static void heap_dump( HEAP h,char *msg) {
144     yaz_log(log_level, "heap dump: %s num=%d max=%d",msg, h->heapnum, h->heapmax);
145     heap_dump_item(h,1,1);
146 }
147 #endif
148
149 static void heap_swap (HEAP h, int x, int y)
150 {
151     struct heap_item *swap;
152     swap = h->heap[x];
153     h->heap[x] = h->heap[y];
154     h->heap[y] = swap;
155 }
156
157 static int heap_cmp(HEAP h, int x, int y)
158 {
159     return (*h->kctrl->cmp)(h->heap[x]->buf,h->heap[y]->buf);
160 }
161
162 static int heap_empty(HEAP h)
163 {
164     return ( 0==h->heapnum );
165 }
166
167 /** \brief deletes the first item in the heap, and balances the rest 
168  */
169 static void heap_delete (HEAP h)
170 {
171     int cur = 1, child = 2;
172     h->heap[1] = 0; /* been deleted */
173     heap_swap (h, 1, h->heapnum--);
174     while (child <= h->heapnum) {
175         if (child < h->heapnum && heap_cmp(h,child,1+child)>0 )
176             child++;
177         if (heap_cmp(h,cur,child) > 0)
178         {
179             heap_swap (h, cur, child);
180             cur = child;
181             child = 2*cur;
182         }
183         else
184             break;
185     }
186 }
187
188 /** \brief puts item into heap.
189     The heap root element has changed value (to bigger) 
190     Swap downwards until the heap is ordered again 
191 */
192 static void heap_balance (HEAP h)
193 {
194     int cur = 1, child = 2;
195     while (child <= h->heapnum) {
196         if (child < h->heapnum && heap_cmp(h,child,1+child)>0 )
197             child++;
198         if (heap_cmp(h,cur,child) > 0)
199         {
200             heap_swap (h, cur, child);
201             cur = child;
202             child = 2*cur;
203         }
204         else
205             break;
206     }
207 }
208
209
210 static void heap_insert (HEAP h, struct heap_item *hi)
211 {
212     int cur, parent;
213
214     cur = ++(h->heapnum);
215     assert(cur <= h->heapmax);
216     h->heap[cur] = hi;
217     parent = cur/2;
218     while (parent && (heap_cmp(h,parent,cur) > 0))
219     {
220         assert(parent>0);
221         heap_swap (h, cur, parent);
222         cur = parent;
223         parent = cur/2;
224     }
225 }
226
227
228 static
229 HEAP heap_create (NMEM nmem, int size, const struct rset_key_control *kctrl)
230 {
231     HEAP h = (HEAP) nmem_malloc (nmem, sizeof(*h));
232
233     ++size; /* heap array starts at 1 */
234     h->heapnum = 0;
235     h->heapmax = size;
236     h->kctrl = kctrl;
237     h->heap = (struct heap_item**) nmem_malloc(nmem,size*sizeof(*h->heap));
238     h->heap[0]=0; /* not used */
239     return h;
240 }
241
242 static void heap_clear( HEAP h)
243 {
244     assert(h);
245     h->heapnum = 0;
246 }
247
248 static void heap_destroy (HEAP h)
249 {
250     /* nothing to delete, all is nmem'd, and will go away in due time */
251 }
252
253 /** \brief compare and items for quicksort
254     used in qsort to get the multi-and args in optimal order
255     that is, those with fewest occurrences first
256 */
257 int compare_ands(const void *x, const void *y)
258 {   const struct heap_item *hx = x;
259     const struct heap_item *hy = y;
260     double cur, totx, toty;
261     rset_pos(hx->fd, &cur, &totx);
262     rset_pos(hy->fd, &cur, &toty);
263     if ( totx > toty +0.5 )
264         return 1;
265     if ( totx < toty -0.5 )
266         return -1;
267     return 0;  /* return totx - toty, except for overflows and rounding */
268 }
269
270 static RSET rsmulti_andor_create(NMEM nmem,
271                                  struct rset_key_control *kcontrol, 
272                                  int scope, TERMID termid,
273                                  int no_rsets, RSET* rsets, 
274                                  const struct rset_control *ctrl)
275 {
276     RSET rnew = rset_create_base(ctrl, nmem, kcontrol, scope, termid,
277                                  no_rsets, rsets);
278     struct rset_private *info;
279     if (!log_level_initialized)
280     {
281         log_level = yaz_log_module_level("rsmultiandor");
282         log_level_initialized = 1;
283     }
284     yaz_log(log_level, "rsmultiand_andor_create scope=%d", scope);
285     info = (struct rset_private *) nmem_malloc(rnew->nmem, sizeof(*info));
286     rnew->priv = info;
287     return rnew;
288 }
289
290 RSET rset_create_or(NMEM nmem, struct rset_key_control *kcontrol,
291                     int scope, TERMID termid, int no_rsets, RSET* rsets)
292 {
293     return rsmulti_andor_create(nmem, kcontrol, scope, termid,
294                                 no_rsets, rsets, &control_or);
295 }
296
297 RSET rset_create_and(NMEM nmem, struct rset_key_control *kcontrol,
298                      int scope, int no_rsets, RSET* rsets)
299 {
300     return rsmulti_andor_create(nmem, kcontrol, scope, 0,
301                                 no_rsets, rsets, &control_and);
302 }
303
304 static void r_delete (RSET ct)
305 {
306 }
307
308 static RSFD r_open_andor (RSET ct, int flag, int is_and)
309 {
310     RSFD rfd;
311     struct rfd_private *p;
312     const struct rset_key_control *kctrl = ct->keycontrol;
313     int i;
314
315     if (flag & RSETF_WRITE)
316     {
317         yaz_log (YLOG_FATAL, "multiandor set type is read-only");
318         return NULL;
319     }
320     rfd = rfd_create_base(ct);
321     if (rfd->priv) {
322         p = (struct rfd_private *)rfd->priv;
323         if (!is_and)
324             heap_clear(p->h);
325         assert(p->items);
326         /* all other pointers shouls already be allocated, in right sizes! */
327     }
328     else 
329     {
330         p = (struct rfd_private *) nmem_malloc (ct->nmem,sizeof(*p));
331         rfd->priv = p;
332         p->h = 0;
333         p->tailbits = 0;
334         if (is_and)
335             p->tailbits = nmem_malloc(ct->nmem, ct->no_children*sizeof(char) );
336         else 
337             p->h = heap_create( ct->nmem, ct->no_children, kctrl);
338         p->items = (struct heap_item *) 
339             nmem_malloc(ct->nmem, ct->no_children*sizeof(*p->items));
340         for (i = 0; i<ct->no_children; i++)
341         {
342             p->items[i].rset = ct->children[i];
343             p->items[i].buf = nmem_malloc(ct->nmem, kctrl->key_size);
344         }
345     }
346     p->flag = flag;
347     p->hits = 0;
348     p->eof = 0;
349     p->tailcount = 0;
350     if (is_and)
351     { /* read the array and sort it */
352         for (i = 0; i<ct->no_children; i++){
353             p->items[i].fd = rset_open(ct->children[i], RSETF_READ);
354             if (!rset_read(p->items[i].fd, p->items[i].buf, &p->items[i].term))
355                 p->eof = 1;
356             p->tailbits[i] = 0;
357         }
358         qsort(p->items, ct->no_children, sizeof(p->items[0]), compare_ands);
359     } 
360     else
361     { /* fill the heap for ORing */
362         for (i = 0; i<ct->no_children; i++){
363             p->items[i].fd = rset_open(ct->children[i],RSETF_READ);
364             if ( rset_read(p->items[i].fd, p->items[i].buf, &p->items[i].term))
365                 heap_insert(p->h, &(p->items[i]));
366         }
367     }
368     return rfd;
369 }
370
371 static RSFD r_open_or (RSET ct, int flag)
372 {
373     return r_open_andor(ct, flag, 0);
374 }
375
376 static RSFD r_open_and (RSET ct, int flag)
377 {
378     return r_open_andor(ct, flag, 1);
379 }
380
381
382 static void r_close (RSFD rfd)
383 {
384     struct rfd_private *p=(struct rfd_private *)(rfd->priv);
385     int i;
386
387     if (p->h)
388         heap_destroy (p->h);
389     for (i = 0; i<rfd->rset->no_children; i++) 
390         if (p->items[i].fd)
391             rset_close(p->items[i].fd);
392 }
393
394 static int r_forward_or(RSFD rfd, void *buf, 
395                         TERMID *term, const void *untilbuf)
396 { /* while heap head behind untilbuf, forward it and rebalance heap */
397     struct rfd_private *p = rfd->priv;
398     const struct rset_key_control *kctrl = rfd->rset->keycontrol;
399     if (heap_empty(p->h))
400         return 0;
401     while ( (*kctrl->cmp)(p->h->heap[1]->buf,untilbuf) < -rfd->rset->scope )
402     {
403         if (rset_forward(p->h->heap[1]->fd,p->h->heap[1]->buf,
404                          &p->h->heap[1]->term, untilbuf))
405             heap_balance(p->h);
406         else 
407         {
408             heap_delete(p->h);
409             if (heap_empty(p->h))
410                 return 0;
411         }
412
413     }
414     return r_read_or(rfd, buf, term);
415 }
416
417
418 /** \brief reads one item key from an 'or' set
419     \param rfd set handle
420     \param buf resulting item buffer
421     \param term resulting term
422     \retval 0 EOF
423     \retval 1 item could be read
424 */
425 static int r_read_or (RSFD rfd, void *buf, TERMID *term)
426 {
427     RSET rset = rfd->rset;
428     struct rfd_private *mrfd = rfd->priv;
429     const struct rset_key_control *kctrl = rset->keycontrol;
430     struct heap_item *it;
431     int rdres;
432     if (heap_empty(mrfd->h))
433         return 0;
434     it = mrfd->h->heap[1];
435     memcpy(buf, it->buf, kctrl->key_size);
436     if (term)
437     {
438         if (rset->term)
439             *term = rset->term;
440         else
441             *term = it->term;
442     }
443     (mrfd->hits)++;
444     rdres = rset_read(it->fd, it->buf, &it->term);
445     if ( rdres )
446         heap_balance(mrfd->h);
447     else
448         heap_delete(mrfd->h);
449     return 1;
450
451 }
452
453 /** \brief reads one item key from an 'and' set
454     \param rfd set handle
455     \param buf resulting item buffer
456     \param term resulting term
457     \retval 0 EOF
458     \retval 1 item could be read
459     
460     Has to return all hits where each item points to the
461     same sysno (scope), in order. Keep an extra key (hitkey)
462     as long as all records do not point to hitkey, forward
463     them, and update hitkey to be the highest seen so far.
464     (if any item eof's, mark eof, and return 0 thereafter)
465     Once a hit has been found, scan all items for the smallest
466     value. Mark all as being in the tail. Read next from that
467     item, and if not in the same record, clear its tail bit
468 */
469 static int r_read_and (RSFD rfd, void *buf, TERMID *term)
470 {   struct rfd_private *p = rfd->priv;
471     RSET ct = rfd->rset;
472     const struct rset_key_control *kctrl = ct->keycontrol;
473     int i;
474
475     while (1) {
476         if (p->tailcount) 
477         { /* we are tailing, find lowest tail and return it */
478             int mintail = -1;
479             int cmp;
480                  
481             for (i = 0; i<ct->no_children; i++)
482             {
483                 if (p->tailbits[i])
484                 {
485                     if (mintail >= 0)
486                         cmp = (*kctrl->cmp)
487                             (p->items[i].buf, p->items[mintail].buf);
488                     else
489                         cmp = -1;
490                     if (cmp < 0)
491                         mintail = i;
492
493                     if (kctrl->get_segment)
494                     {   /* segments enabled */
495                         zint segment =  kctrl->get_segment(p->items[i].buf);
496                         /* store segment if not stored already */
497                         if (!p->segment && segment)
498                             p->segment = segment;
499                         
500                         /* skip rest entirely if segments don't match */
501                         if (p->segment && segment && p->segment != segment)
502                             p->skip = 1;
503                     }
504                 }
505             }
506             /* return the lowest tail */
507             memcpy(buf, p->items[mintail].buf, kctrl->key_size); 
508             if (term)
509                 *term = p->items[mintail].term;
510             if (!rset_read(p->items[mintail].fd, p->items[mintail].buf,
511                            &p->items[mintail].term))
512             {
513                 p->eof = 1; /* game over, once tails have been returned */
514                 p->tailbits[mintail] = 0; 
515                 (p->tailcount)--;
516             }
517             else
518             {
519                 /* still a tail? */
520                 cmp = (*kctrl->cmp)(p->items[mintail].buf,buf);
521                 if (cmp >= rfd->rset->scope)
522                 {
523                     p->tailbits[mintail] = 0;
524                     (p->tailcount)--;
525                 }
526             }
527             if (p->skip)
528                 continue;  /* skip again.. eventually tailcount will be 0 */
529             (p->hits)++;
530             return 1;
531         } 
532         /* not tailing, forward until all records match, and set up */
533         /* as tails. the earlier 'if' will then return the hits */
534         if (p->eof)
535             return 0; /* nothing more to see */
536         i = 1; /* assume items[0] is highest up */
537         while (i < ct->no_children) 
538         {
539             int cmp = (*kctrl->cmp)(p->items[0].buf, p->items[i].buf);
540             if (cmp <= -rfd->rset->scope) { /* [0] was behind, forward it */
541                 if (!rset_forward(p->items[0].fd, p->items[0].buf, 
542                                   &p->items[0].term, p->items[i].buf))
543                 {
544                     p->eof = 1; /* game over */
545                     return 0;
546                 }
547                 i = 0; /* start forwarding from scratch */
548             } 
549             else if (cmp>=rfd->rset->scope)
550             { /* [0] was ahead, forward i */
551                 if (!rset_forward(p->items[i].fd, p->items[i].buf, 
552                                   &p->items[i].term, p->items[0].buf))
553                 {
554                     p->eof = 1; /* game over */
555                     return 0;
556                 }
557             } 
558             else
559                 i++;
560         } /* while i */
561         /* if we get this far, all rsets are now within +- scope of [0] */
562         /* ergo, we have a hit. Mark them all as tailing, and let the */
563         /* upper 'if' return the hits in right order */
564         for (i = 0; i < ct->no_children; i++)
565             p->tailbits[i] = 1;
566         p->tailcount = ct->no_children;
567         p->segment = 0;
568         p->skip = 0;
569     } /* while 1 */
570 }
571
572
573 static int r_forward_and(RSFD rfd, void *buf, TERMID *term, 
574                          const void *untilbuf)
575
576     struct rfd_private *p = rfd->priv;
577     RSET ct = rfd->rset;
578     const struct rset_key_control *kctrl = ct->keycontrol;
579     int i;
580     int cmp;
581     int killtail = 0;
582
583     for (i = 0; i<ct->no_children; i++)
584     {
585         cmp = (*kctrl->cmp)(p->items[i].buf,untilbuf);
586         if (cmp <= -rfd->rset->scope)
587         {
588             killtail = 1; /* we are moving to a different hit */
589             if (!rset_forward(p->items[i].fd, p->items[i].buf, 
590                               &p->items[i].term, untilbuf))
591             {
592                 p->eof = 1; /* game over */
593                 p->tailcount = 0;
594                 return 0;
595             }
596         }
597     }
598     if (killtail) 
599     {
600         for (i = 0; i<ct->no_children; i++)
601             p->tailbits[i] = 0;
602         p->tailcount = 0;
603     }
604     return r_read_and(rfd,buf,term);
605 }
606
607 static void r_pos (RSFD rfd, double *current, double *total)
608 {
609     RSET ct = rfd->rset;
610     struct rfd_private *mrfd = 
611         (struct rfd_private *)(rfd->priv);
612     double cur, tot;
613     double scur = 0.0, stot = 0.0;
614     int i;
615     for (i = 0; i<ct->no_children; i++){
616         rset_pos(mrfd->items[i].fd, &cur, &tot);
617         yaz_log(log_level, "r_pos: %d %0.1f %0.1f", i, cur,tot); 
618         scur += cur;
619         stot += tot;
620     }
621     if (stot < 1.0) { /* nothing there */
622         *current = 0;
623         *total = 0;
624         yaz_log(log_level, "r_pos: NULL  %0.1f %0.1f",  *current, *total);
625     }
626     else
627     {
628         *current = (double) (mrfd->hits);
629         *total = *current*stot/scur;
630         yaz_log(log_level, "r_pos: =  %0.1f %0.1f",  *current, *total);
631     }
632 }
633
634 static int r_write (RSFD rfd, const void *buf)
635 {
636     yaz_log (YLOG_FATAL, "multior set type is read-only");
637     return -1;
638 }
639
640 static void r_get_terms(RSET ct, TERMID *terms, int maxterms, int *curterm)
641 {
642     if (ct->term)
643         rset_get_one_term(ct, terms, maxterms, curterm);
644     else
645     {
646         /* Special case: Some multi-ors have all terms pointing to the same 
647            term. We do not want to duplicate those. Other multiors (and ands)
648            have different terms under them. Those we want. 
649         */
650         int firstterm= *curterm;
651         int i;
652
653         for (i = 0; i<ct->no_children; i++)
654         {
655             rset_getterms(ct->children[i], terms, maxterms, curterm);
656             if ( ( *curterm > firstterm+1 ) &&
657                  ( *curterm <= maxterms ) &&
658                  ( terms[(*curterm)-1] == terms[firstterm] ) 
659                 )
660                 (*curterm)--; /* forget the term, seen that before */
661         }
662     }
663 }
664
665
666 /*
667  * Local variables:
668  * c-basic-offset: 4
669  * indent-tabs-mode: nil
670  * End:
671  * vim: shiftwidth=4 tabstop=8 expandtab
672  */
673