Working on and operation which deals with segments (matches within fields)
[idzebra-moved-to-github.git] / rset / rsmultiandor.c
1 /* $Id: rsmultiandor.c,v 1.23 2006-07-04 14:10:31 adam Exp $
2    Copyright (C) 1995-2006
3    Index Data ApS
4
5 This file is part of the Zebra server.
6
7 Zebra is free software; you can redistribute it and/or modify it under
8 the terms of the GNU General Public License as published by the Free
9 Software Foundation; either version 2, or (at your option) any later
10 version.
11
12 Zebra is distributed in the hope that it will be useful, but WITHOUT ANY
13 WARRANTY; without even the implied warranty of MERCHANTABILITY or
14 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
15 for more details.
16
17 You should have received a copy of the GNU General Public License
18 along with Zebra; see the file LICENSE.zebra.  If not, write to the
19 Free Software Foundation, 59 Temple Place - Suite 330, Boston, MA
20 02111-1307, USA.
21 */
22
23
24 /**
25  * \file rsmultiandor.c
26  * \brief This module implements the rsmulti_or and rsmulti_and result sets
27  *
28  * rsmultior is based on a heap, from which we find the next hit.
29  *
30  * rsmultiand is based on a simple array of rsets, and a linear
31  * search to find the record that exists in all of those rsets.
32  * To speed things up, the array is sorted so that the smallest
33  * rsets come first, they are most likely to have the hits furthest
34  * away, and thus forwarding to them makes the most sense.
35  */
36
37
38 #include <assert.h>
39 #include <stdio.h>
40 #include <stdlib.h>
41 #include <string.h>
42
43 #include <idzebra/util.h>
44 #include <idzebra/isamc.h>
45 #include <rset.h>
46
47 static RSFD r_open_and (RSET ct, int flag);
48 static RSFD r_open_or (RSET ct, int flag);
49 static void r_close (RSFD rfd);
50 static void r_delete (RSET ct);
51 static int r_read_and (RSFD rfd, void *buf, TERMID *term);
52 static int r_read_or (RSFD rfd, void *buf, TERMID *term);
53 static int r_write (RSFD rfd, const void *buf);
54 static int r_forward_and(RSFD rfd, void *buf, TERMID *term,
55                      const void *untilbuf);
56 static int r_forward_or(RSFD rfd, void *buf, TERMID *term,
57                      const void *untilbuf);
58 static void r_pos (RSFD rfd, double *current, double *total);
59 static void r_get_terms(RSET ct, TERMID *terms, int maxterms, int *curterm);
60
61 static const struct rset_control control_or = 
62 {
63     "multi-or",
64     r_delete,
65     r_get_terms,
66     r_open_or,
67     r_close,
68     r_forward_or,
69     r_pos,
70     r_read_or,
71     r_write,
72 };
73
74 static const struct rset_control control_and = 
75 {
76     "multi-and",
77     r_delete,
78     r_get_terms,
79     r_open_and,
80     r_close,
81     r_forward_and,
82     r_pos,
83     r_read_and,
84     r_write,
85 };
86
87 /* The heap structure: 
88  * The rset contains a list or rsets we are ORing together 
89  * The rfd contains a heap of heap-items, which contain
90  * a rfd opened to those rsets, and a buffer for one key.
91  * They also contain a ptr to the rset list in the rset 
92  * itself, for practical reasons. 
93  */
94
95 struct heap_item {
96     RSFD fd;
97     void *buf;
98     RSET rset;
99     TERMID term;
100 };
101
102 struct heap {
103     int heapnum;
104     int heapmax;
105     const struct rset_key_control *kctrl;
106     struct heap_item **heap; /* ptrs to the rfd */
107 };
108 typedef struct heap *HEAP;
109
110
111 struct rset_private {
112     int dummy;
113 };
114
115
116 struct rfd_private {
117     int flag;
118     struct heap_item *items; /* we alloc and free them here */
119     HEAP h; /* and move around here */
120     zint hits; /* returned so far */
121     int eof; /* seen the end of it */
122     int tailcount; /* how many items are tailing */
123     zint segment;
124     int skip;
125     char *tailbits;
126 };
127
128 static int log_level = 0;
129 static int log_level_initialized = 0;
130
131
132 /* Heap functions ***********************/
133
134 #if 0
135 static void heap_dump_item( HEAP h, int i, int level)
136 {
137     double cur,tot;
138     if (i>h->heapnum)
139         return;
140     (void)rset_pos(h->heap[i]->rset,h->heap[i]->fd, &cur, &tot);
141     yaz_log(log_level," %d %*s i=%p buf=%p %0.1f/%0.1f",i, level, "",  
142                     &(h->heap[i]), h->heap[i]->buf, cur,tot );
143     heap_dump_item(h, 2*i, level+1);
144     heap_dump_item(h, 2*i+1, level+1);
145 }
146 static void heap_dump( HEAP h,char *msg) {
147     yaz_log(log_level, "heap dump: %s num=%d max=%d",msg, h->heapnum, h->heapmax);
148     heap_dump_item(h,1,1);
149 }
150 #endif
151
152 static void heap_swap (HEAP h, int x, int y)
153 {
154     struct heap_item *swap;
155     swap = h->heap[x];
156     h->heap[x] = h->heap[y];
157     h->heap[y] = swap;
158 }
159
160 static int heap_cmp(HEAP h, int x, int y)
161 {
162     return (*h->kctrl->cmp)(h->heap[x]->buf,h->heap[y]->buf);
163 }
164
165 static int heap_empty(HEAP h)
166 {
167     return ( 0==h->heapnum );
168 }
169
170 /** \brief deletes the first item in the heap, and balances the rest 
171  */
172 static void heap_delete (HEAP h)
173 {
174     int cur = 1, child = 2;
175     h->heap[1] = 0; /* been deleted */
176     heap_swap (h, 1, h->heapnum--);
177     while (child <= h->heapnum) {
178         if (child < h->heapnum && heap_cmp(h,child,1+child)>0 )
179             child++;
180         if (heap_cmp(h,cur,child) > 0)
181         {
182             heap_swap (h, cur, child);
183             cur = child;
184             child = 2*cur;
185         }
186         else
187             break;
188     }
189 }
190
191 /** \brief puts item into heap.
192     The heap root element has changed value (to bigger) 
193     Swap downwards until the heap is ordered again 
194 */
195 static void heap_balance (HEAP h)
196 {
197     int cur = 1, child = 2;
198     while (child <= h->heapnum) {
199         if (child < h->heapnum && heap_cmp(h,child,1+child)>0 )
200             child++;
201         if (heap_cmp(h,cur,child) > 0)
202         {
203             heap_swap (h, cur, child);
204             cur = child;
205             child = 2*cur;
206         }
207         else
208             break;
209     }
210 }
211
212
213 static void heap_insert (HEAP h, struct heap_item *hi)
214 {
215     int cur, parent;
216
217     cur = ++(h->heapnum);
218     assert(cur <= h->heapmax);
219     h->heap[cur] = hi;
220     parent = cur/2;
221     while (parent && (heap_cmp(h,parent,cur) > 0))
222     {
223         assert(parent>0);
224         heap_swap (h, cur, parent);
225         cur = parent;
226         parent = cur/2;
227     }
228 }
229
230
231 static
232 HEAP heap_create (NMEM nmem, int size, const struct rset_key_control *kctrl)
233 {
234     HEAP h = (HEAP) nmem_malloc (nmem, sizeof(*h));
235
236     ++size; /* heap array starts at 1 */
237     h->heapnum = 0;
238     h->heapmax = size;
239     h->kctrl = kctrl;
240     h->heap = (struct heap_item**) nmem_malloc(nmem,size*sizeof(*h->heap));
241     h->heap[0]=0; /* not used */
242     return h;
243 }
244
245 static void heap_clear( HEAP h)
246 {
247     assert(h);
248     h->heapnum = 0;
249 }
250
251 static void heap_destroy (HEAP h)
252 {
253     /* nothing to delete, all is nmem'd, and will go away in due time */
254 }
255
256 /** \brief compare and items for quicksort
257     used in qsort to get the multi-and args in optimal order
258     that is, those with fewest occurrences first
259 */
260 int compare_ands(const void *x, const void *y)
261 {   const struct heap_item *hx = x;
262     const struct heap_item *hy = y;
263     double cur, totx, toty;
264     rset_pos(hx->fd, &cur, &totx);
265     rset_pos(hy->fd, &cur, &toty);
266     if ( totx > toty +0.5 )
267         return 1;
268     if ( totx < toty -0.5 )
269         return -1;
270     return 0;  /* return totx - toty, except for overflows and rounding */
271 }
272
273 static RSET rsmulti_andor_create(NMEM nmem,
274                                  struct rset_key_control *kcontrol, 
275                                  int scope, TERMID termid,
276                                  int no_rsets, RSET* rsets, 
277                                  const struct rset_control *ctrl)
278 {
279     RSET rnew = rset_create_base(ctrl, nmem, kcontrol, scope, termid,
280                                  no_rsets, rsets);
281     struct rset_private *info;
282     if (!log_level_initialized)
283     {
284         log_level = yaz_log_module_level("rsmultiandor");
285         log_level_initialized = 1;
286     }
287     yaz_log(log_level, "rsmultiand_andor_create scope=%d", scope);
288     info = (struct rset_private *) nmem_malloc(rnew->nmem, sizeof(*info));
289     rnew->priv = info;
290     return rnew;
291 }
292
293 RSET rset_create_or(NMEM nmem, struct rset_key_control *kcontrol,
294                     int scope, TERMID termid, int no_rsets, RSET* rsets)
295 {
296     return rsmulti_andor_create(nmem, kcontrol, scope, termid,
297                                 no_rsets, rsets, &control_or);
298 }
299
300 RSET rset_create_and(NMEM nmem, struct rset_key_control *kcontrol,
301                      int scope, int no_rsets, RSET* rsets)
302 {
303     return rsmulti_andor_create(nmem, kcontrol, scope, 0,
304                                 no_rsets, rsets, &control_and);
305 }
306
307 static void r_delete (RSET ct)
308 {
309 }
310
311 static RSFD r_open_andor (RSET ct, int flag, int is_and)
312 {
313     RSFD rfd;
314     struct rfd_private *p;
315     const struct rset_key_control *kctrl = ct->keycontrol;
316     int i;
317
318     if (flag & RSETF_WRITE)
319     {
320         yaz_log (YLOG_FATAL, "multiandor set type is read-only");
321         return NULL;
322     }
323     rfd = rfd_create_base(ct);
324     if (rfd->priv) {
325         p = (struct rfd_private *)rfd->priv;
326         if (!is_and)
327             heap_clear(p->h);
328         assert(p->items);
329         /* all other pointers shouls already be allocated, in right sizes! */
330     }
331     else 
332     {
333         p = (struct rfd_private *) nmem_malloc (ct->nmem,sizeof(*p));
334         rfd->priv = p;
335         p->h = 0;
336         p->tailbits = 0;
337         if (is_and)
338             p->tailbits = nmem_malloc(ct->nmem, ct->no_children*sizeof(char) );
339         else 
340             p->h = heap_create( ct->nmem, ct->no_children, kctrl);
341         p->items = (struct heap_item *) 
342             nmem_malloc(ct->nmem, ct->no_children*sizeof(*p->items));
343         for (i = 0; i<ct->no_children; i++)
344         {
345             p->items[i].rset = ct->children[i];
346             p->items[i].buf = nmem_malloc(ct->nmem, kctrl->key_size);
347         }
348     }
349     p->flag = flag;
350     p->hits = 0;
351     p->eof = 0;
352     p->tailcount = 0;
353     if (is_and)
354     { /* read the array and sort it */
355         for (i = 0; i<ct->no_children; i++){
356             p->items[i].fd = rset_open(ct->children[i], RSETF_READ);
357             if (!rset_read(p->items[i].fd, p->items[i].buf, &p->items[i].term))
358                 p->eof = 1;
359             p->tailbits[i] = 0;
360         }
361         qsort(p->items, ct->no_children, sizeof(p->items[0]), compare_ands);
362     } 
363     else
364     { /* fill the heap for ORing */
365         for (i = 0; i<ct->no_children; i++){
366             p->items[i].fd = rset_open(ct->children[i],RSETF_READ);
367             if ( rset_read(p->items[i].fd, p->items[i].buf, &p->items[i].term))
368                 heap_insert(p->h, &(p->items[i]));
369         }
370     }
371     return rfd;
372 }
373
374 static RSFD r_open_or (RSET ct, int flag)
375 {
376     return r_open_andor(ct, flag, 0);
377 }
378
379 static RSFD r_open_and (RSET ct, int flag)
380 {
381     return r_open_andor(ct, flag, 1);
382 }
383
384
385 static void r_close (RSFD rfd)
386 {
387     struct rfd_private *p=(struct rfd_private *)(rfd->priv);
388     int i;
389
390     if (p->h)
391         heap_destroy (p->h);
392     for (i = 0; i<rfd->rset->no_children; i++) 
393         if (p->items[i].fd)
394             rset_close(p->items[i].fd);
395 }
396
397 static int r_forward_or(RSFD rfd, void *buf, 
398                         TERMID *term, const void *untilbuf)
399 { /* while heap head behind untilbuf, forward it and rebalance heap */
400     struct rfd_private *p = rfd->priv;
401     const struct rset_key_control *kctrl = rfd->rset->keycontrol;
402     if (heap_empty(p->h))
403         return 0;
404     while ( (*kctrl->cmp)(p->h->heap[1]->buf,untilbuf) < -rfd->rset->scope )
405     {
406         if (rset_forward(p->h->heap[1]->fd,p->h->heap[1]->buf,
407                          &p->h->heap[1]->term, untilbuf))
408             heap_balance(p->h);
409         else 
410         {
411             heap_delete(p->h);
412             if (heap_empty(p->h))
413                 return 0;
414         }
415
416     }
417     return r_read_or(rfd, buf, term);
418 }
419
420
421 /** \brief reads one item key from an 'or' set
422     \param rfd set handle
423     \param buf resulting item buffer
424     \param term resulting term
425     \retval 0 EOF
426     \retval 1 item could be read
427 */
428 static int r_read_or (RSFD rfd, void *buf, TERMID *term)
429 {
430     RSET rset = rfd->rset;
431     struct rfd_private *mrfd = rfd->priv;
432     const struct rset_key_control *kctrl = rset->keycontrol;
433     struct heap_item *it;
434     int rdres;
435     if (heap_empty(mrfd->h))
436         return 0;
437     it = mrfd->h->heap[1];
438     memcpy(buf, it->buf, kctrl->key_size);
439     if (term)
440     {
441         if (rset->term)
442             *term = rset->term;
443         else
444             *term = it->term;
445         assert(*term);
446     }
447     (mrfd->hits)++;
448     rdres = rset_read(it->fd, it->buf, &it->term);
449     if ( rdres )
450         heap_balance(mrfd->h);
451     else
452         heap_delete(mrfd->h);
453     return 1;
454
455 }
456
457 /** \brief reads one item key from an 'and' set
458     \param rfd set handle
459     \param buf resulting item buffer
460     \param term resulting term
461     \retval 0 EOF
462     \retval 1 item could be read
463     
464     Has to return all hits where each item points to the
465     same sysno (scope), in order. Keep an extra key (hitkey)
466     as long as all records do not point to hitkey, forward
467     them, and update hitkey to be the highest seen so far.
468     (if any item eof's, mark eof, and return 0 thereafter)
469     Once a hit has been found, scan all items for the smallest
470     value. Mark all as being in the tail. Read next from that
471     item, and if not in the same record, clear its tail bit
472 */
473 static int r_read_and (RSFD rfd, void *buf, TERMID *term)
474 {   struct rfd_private *p = rfd->priv;
475     RSET ct = rfd->rset;
476     const struct rset_key_control *kctrl = ct->keycontrol;
477     int i;
478
479     while (1) {
480         if (p->tailcount) 
481         { /* we are tailing, find lowest tail and return it */
482             int mintail = -1;
483             int cmp;
484                  
485             for (i = 0; i<ct->no_children; i++)
486             {
487                 if (p->tailbits[i])
488                 {
489                     if (mintail >= 0)
490                         cmp = (*kctrl->cmp)
491                             (p->items[i].buf, p->items[mintail].buf);
492                     else
493                         cmp = -1;
494                     if (cmp < 0)
495                         mintail = i;
496
497                     if (kctrl->get_segment)
498                     {
499                         /* segments enabled */
500                         
501                         zint segment =  kctrl->get_segment(p->items[i].buf);
502                         /* store segment if not stored already */
503                         if (!p->segment && segment)
504                             p->segment = segment;
505                         
506                         /* skip rest entirely if segments don't match */
507                         if (p->segment && segment && p->segment != segment)
508                             p->skip = 1;
509                     }
510                 }
511             }
512             /* return the lowest tail */
513             memcpy(buf, p->items[mintail].buf, kctrl->key_size); 
514             if (term)
515                 *term = p->items[mintail].term;
516             if (!rset_read(p->items[mintail].fd, p->items[mintail].buf,
517                            &p->items[mintail].term))
518             {
519                 p->eof = 1; /* game over, once tails have been returned */
520                 p->tailbits[mintail] = 0; 
521                 (p->tailcount)--;
522             }
523             else
524             {
525                 /* still a tail? */
526                 cmp = (*kctrl->cmp)(p->items[mintail].buf,buf);
527                 if (cmp >= rfd->rset->scope)
528                 {
529                     p->tailbits[mintail] = 0;
530                     (p->tailcount)--;
531                 }
532             }
533             if (p->skip)
534                 continue;  /* skip again.. eventually tailcount will be 0 */
535             (p->hits)++;
536             return 1;
537         } 
538         /* not tailing, forward until all reocrds match, and set up */
539         /* as tails. the earlier 'if' will then return the hits */
540         if (p->eof)
541             return 0; /* nothing more to see */
542         i = 1; /* assume items[0] is highest up */
543         while (i < ct->no_children) 
544         {
545             int cmp = (*kctrl->cmp)(p->items[0].buf, p->items[i].buf);
546             if (cmp <= -rfd->rset->scope) { /* [0] was behind, forward it */
547                 if (!rset_forward(p->items[0].fd, p->items[0].buf, 
548                                   &p->items[0].term, p->items[i].buf))
549                 {
550                     p->eof = 1; /* game over */
551                     return 0;
552                 }
553                 i = 0; /* start forwarding from scratch */
554             } 
555             else if (cmp>=rfd->rset->scope)
556             { /* [0] was ahead, forward i */
557                 if (!rset_forward(p->items[i].fd, p->items[i].buf, 
558                                   &p->items[i].term, p->items[0].buf))
559                 {
560                     p->eof = 1; /* game over */
561                     return 0;
562                 }
563             } 
564             else
565                 i++;
566         } /* while i */
567         /* if we get this far, all rsets are now within +- scope of [0] */
568         /* ergo, we have a hit. Mark them all as tailing, and let the */
569         /* upper 'if' return the hits in right order */
570         for (i = 0; i < ct->no_children; i++)
571             p->tailbits[i] = 1;
572         p->tailcount = ct->no_children;
573         p->segment = 0;
574         p->skip = 0;
575     } /* while 1 */
576 }
577
578
579 static int r_forward_and(RSFD rfd, void *buf, TERMID *term, 
580                          const void *untilbuf)
581
582     struct rfd_private *p = rfd->priv;
583     RSET ct = rfd->rset;
584     const struct rset_key_control *kctrl = ct->keycontrol;
585     int i;
586     int cmp;
587     int killtail = 0;
588
589     for (i = 0; i<ct->no_children; i++)
590     {
591         cmp = (*kctrl->cmp)(p->items[i].buf,untilbuf);
592         if (cmp <= -rfd->rset->scope)
593         {
594             killtail = 1; /* we are moving to a different hit */
595             if (!rset_forward(p->items[i].fd, p->items[i].buf, 
596                               &p->items[i].term, untilbuf))
597             {
598                 p->eof = 1; /* game over */
599                 p->tailcount = 0;
600                 return 0;
601             }
602         }
603     }
604     if (killtail) 
605     {
606         for (i = 0; i<ct->no_children; i++)
607             p->tailbits[i] = 0;
608         p->tailcount = 0;
609     }
610     return r_read_and(rfd,buf,term);
611 }
612
613 static void r_pos (RSFD rfd, double *current, double *total)
614 {
615     RSET ct = rfd->rset;
616     struct rfd_private *mrfd = 
617         (struct rfd_private *)(rfd->priv);
618     double cur, tot;
619     double scur = 0.0, stot = 0.0;
620     int i;
621     for (i = 0; i<ct->no_children; i++){
622         rset_pos(mrfd->items[i].fd, &cur, &tot);
623         yaz_log(log_level, "r_pos: %d %0.1f %0.1f", i, cur,tot); 
624         scur += cur;
625         stot += tot;
626     }
627     if (stot < 1.0) { /* nothing there */
628         *current = 0;
629         *total = 0;
630         yaz_log(log_level, "r_pos: NULL  %0.1f %0.1f",  *current, *total);
631     }
632     else
633     {
634         *current = (double) (mrfd->hits);
635         *total = *current*stot/scur;
636         yaz_log(log_level, "r_pos: =  %0.1f %0.1f",  *current, *total);
637     }
638 }
639
640 static int r_write (RSFD rfd, const void *buf)
641 {
642     yaz_log (YLOG_FATAL, "multior set type is read-only");
643     return -1;
644 }
645
646 static void r_get_terms(RSET ct, TERMID *terms, int maxterms, int *curterm)
647 {
648     if (ct->term)
649         rset_get_one_term(ct, terms, maxterms, curterm);
650     else
651     {
652         /* Special case: Some multi-ors have all terms pointing to the same 
653            term. We do not want to duplicate those. Other multiors (and ands)
654            have different terms under them. Those we want. 
655         */
656         int firstterm= *curterm;
657         int i;
658
659         for (i = 0; i<ct->no_children; i++)
660         {
661             rset_getterms(ct->children[i], terms, maxterms, curterm);
662             if ( ( *curterm > firstterm+1 ) &&
663                  ( *curterm <= maxterms ) &&
664                  ( terms[(*curterm)-1] == terms[firstterm] ) 
665                 )
666                 (*curterm)--; /* forget the term, seen that before */
667         }
668     }
669 }
670
671
672 /*
673  * Local variables:
674  * c-basic-offset: 4
675  * indent-tabs-mode: nil
676  * End:
677  * vim: shiftwidth=4 tabstop=8 expandtab
678  */
679