Deb: idzebra-2.0-utils includes init.d script
[idzebra-moved-to-github.git] / rset / rsmultiandor.c
1 /* This file is part of the Zebra server.
2    Copyright (C) 2004-2013 Index Data
3
4 Zebra is free software; you can redistribute it and/or modify it under
5 the terms of the GNU General Public License as published by the Free
6 Software Foundation; either version 2, or (at your option) any later
7 version.
8
9 Zebra is distributed in the hope that it will be useful, but WITHOUT ANY
10 WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12 for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
17
18 */
19
20
21 /**
22  * \file rsmultiandor.c
23  * \brief This module implements the rsmulti_or and rsmulti_and result sets
24  *
25  * rsmultior is based on a heap, from which we find the next hit.
26  *
27  * rsmultiand is based on a simple array of rsets, and a linear
28  * search to find the record that exists in all of those rsets.
29  * To speed things up, the array is sorted so that the smallest
30  * rsets come first, they are most likely to have the hits furthest
31  * away, and thus forwarding to them makes the most sense.
32  */
33
34
35 #if HAVE_CONFIG_H
36 #include <config.h>
37 #endif
38 #include <assert.h>
39 #include <stdio.h>
40 #include <stdlib.h>
41 #include <string.h>
42
43 #include <idzebra/util.h>
44 #include <idzebra/isamc.h>
45 #include <rset.h>
46
47 static RSFD r_open_and(RSET ct, int flag);
48 static RSFD r_open_or(RSET ct, int flag);
49 static void r_close(RSFD rfd);
50 static void r_delete(RSET ct);
51 static int r_read_and(RSFD rfd, void *buf, TERMID *term);
52 static int r_read_or(RSFD rfd, void *buf, TERMID *term);
53 static int r_write(RSFD rfd, const void *buf);
54 static int r_forward_and(RSFD rfd, void *buf, TERMID *term,
55                      const void *untilbuf);
56 static int r_forward_or(RSFD rfd, void *buf, TERMID *term,
57                      const void *untilbuf);
58 static void r_pos_and(RSFD rfd, double *current, double *total);
59 static void r_pos_or(RSFD rfd, double *current, double *total);
60 static void r_get_terms(RSET ct, TERMID *terms, int maxterms, int *curterm);
61
62 static const struct rset_control control_or =
63 {
64     "multi-or",
65     r_delete,
66     r_get_terms,
67     r_open_or,
68     r_close,
69     r_forward_or,
70     r_pos_or,
71     r_read_or,
72     r_write,
73 };
74
75 static const struct rset_control control_and =
76 {
77     "multi-and",
78     r_delete,
79     r_get_terms,
80     r_open_and,
81     r_close,
82     r_forward_and,
83     r_pos_and,
84     r_read_and,
85     r_write,
86 };
87
88 /* The heap structure:
89  * The rset contains a list or rsets we are ORing together
90  * The rfd contains a heap of heap-items, which contain
91  * a rfd opened to those rsets, and a buffer for one key.
92  * They also contain a ptr to the rset list in the rset
93  * itself, for practical reasons.
94  */
95
96 struct heap_item {
97     RSFD fd;
98     void *buf;
99     RSET rset;
100     TERMID term;
101 };
102
103 struct heap {
104     int heapnum;
105     int heapmax;
106     const struct rset_key_control *kctrl;
107     struct heap_item **heap; /* ptrs to the rfd */
108 };
109 typedef struct heap *HEAP;
110
111
112 struct rset_private {
113     int dummy;
114 };
115
116 struct rfd_private {
117     int flag;
118     struct heap_item *items; /* we alloc and free them here */
119     HEAP h; /* and move around here */
120     zint hits; /* returned so far */
121     int eof; /* seen the end of it */
122     int tailcount; /* how many items are tailing */
123     zint segment;
124     int skip;
125     char *tailbits;
126 };
127
128 static int log_level = 0;
129 static int log_level_initialized = 0;
130
131 /* Heap functions ***********************/
132
133 static void heap_swap(HEAP h, int x, int y)
134 {
135     struct heap_item *swap;
136     swap = h->heap[x];
137     h->heap[x] = h->heap[y];
138     h->heap[y] = swap;
139 }
140
141 static int heap_cmp(HEAP h, int x, int y)
142 {
143     return (*h->kctrl->cmp)(h->heap[x]->buf, h->heap[y]->buf);
144 }
145
146 static int heap_empty(HEAP h)
147 {
148     return 0 == h->heapnum;
149 }
150
151 /** \brief deletes the first item in the heap, and balances the rest
152  */
153 static void heap_delete(HEAP h)
154 {
155     int cur = 1, child = 2;
156     h->heap[1] = 0; /* been deleted */
157     heap_swap(h, 1, h->heapnum--);
158     while (child <= h->heapnum)
159     {
160         if (child < h->heapnum && heap_cmp(h, child, 1 + child) > 0)
161             child++;
162         if (heap_cmp(h,cur,child) > 0)
163         {
164             heap_swap(h, cur, child);
165             cur = child;
166             child = 2*cur;
167         }
168         else
169             break;
170     }
171 }
172
173 /** \brief puts item into heap.
174     The heap root element has changed value (to bigger)
175     Swap downwards until the heap is ordered again
176 */
177 static void heap_balance(HEAP h)
178 {
179     int cur = 1, child = 2;
180     while (child <= h->heapnum)
181     {
182         if (child < h->heapnum && heap_cmp(h, child, 1 + child) > 0)
183             child++;
184         if (heap_cmp(h,cur,child) > 0)
185         {
186             heap_swap(h, cur, child);
187             cur = child;
188             child = 2*cur;
189         }
190         else
191             break;
192     }
193 }
194
195 static void heap_insert(HEAP h, struct heap_item *hi)
196 {
197     int cur, parent;
198
199     cur = ++(h->heapnum);
200     assert(cur <= h->heapmax);
201     h->heap[cur] = hi;
202     parent = cur/2;
203     while (parent && (heap_cmp(h,parent,cur) > 0))
204     {
205         assert(parent>0);
206         heap_swap(h, cur, parent);
207         cur = parent;
208         parent = cur/2;
209     }
210 }
211
212 static
213 HEAP heap_create(NMEM nmem, int size, const struct rset_key_control *kctrl)
214 {
215     HEAP h = (HEAP) nmem_malloc(nmem, sizeof(*h));
216
217     ++size; /* heap array starts at 1 */
218     h->heapnum = 0;
219     h->heapmax = size;
220     h->kctrl = kctrl;
221     h->heap = (struct heap_item**) nmem_malloc(nmem, size * sizeof(*h->heap));
222     h->heap[0] = 0; /* not used */
223     return h;
224 }
225
226 static void heap_clear( HEAP h)
227 {
228     assert(h);
229     h->heapnum = 0;
230 }
231
232 static void heap_destroy(HEAP h)
233 {
234     /* nothing to delete, all is nmem'd, and will go away in due time */
235 }
236
237 /** \brief compare and items for quicksort
238     used in qsort to get the multi-and args in optimal order
239     that is, those with fewest occurrences first
240 */
241 int compare_ands(const void *x, const void *y)
242 {   const struct heap_item *hx = x;
243     const struct heap_item *hy = y;
244     double cur, totx, toty;
245     rset_pos(hx->fd, &cur, &totx);
246     rset_pos(hy->fd, &cur, &toty);
247     if (totx > toty + 0.5)
248         return 1;
249     if (totx < toty - 0.5)
250         return -1;
251     return 0;  /* return totx - toty, except for overflows and rounding */
252 }
253
254 static RSET rsmulti_andor_create(NMEM nmem,
255                                  struct rset_key_control *kcontrol,
256                                  int scope, TERMID termid,
257                                  int no_rsets, RSET* rsets,
258                                  const struct rset_control *ctrl)
259 {
260     RSET rnew = rset_create_base(ctrl, nmem, kcontrol, scope, termid,
261                                  no_rsets, rsets);
262     struct rset_private *info;
263     if (!log_level_initialized)
264     {
265         log_level = yaz_log_module_level("rsmultiandor");
266         log_level_initialized = 1;
267     }
268     yaz_log(log_level, "rsmultiand_andor_create scope=%d", scope);
269     info = (struct rset_private *) nmem_malloc(rnew->nmem, sizeof(*info));
270     rnew->priv = info;
271     return rnew;
272 }
273
274 RSET rset_create_or(NMEM nmem, struct rset_key_control *kcontrol,
275                     int scope, TERMID termid, int no_rsets, RSET* rsets)
276 {
277     return rsmulti_andor_create(nmem, kcontrol, scope, termid,
278                                 no_rsets, rsets, &control_or);
279 }
280
281 RSET rset_create_and(NMEM nmem, struct rset_key_control *kcontrol,
282                      int scope, int no_rsets, RSET* rsets)
283 {
284     return rsmulti_andor_create(nmem, kcontrol, scope, 0,
285                                 no_rsets, rsets, &control_and);
286 }
287
288 static void r_delete(RSET ct)
289 {
290 }
291
292 static RSFD r_open_andor(RSET ct, int flag, int is_and)
293 {
294     RSFD rfd;
295     struct rfd_private *p;
296     const struct rset_key_control *kctrl = ct->keycontrol;
297     int i;
298
299     if (flag & RSETF_WRITE)
300     {
301         yaz_log(YLOG_FATAL, "multiandor set type is read-only");
302         return NULL;
303     }
304     rfd = rfd_create_base(ct);
305     if (rfd->priv)
306     {
307         p = (struct rfd_private *)rfd->priv;
308         if (!is_and)
309             heap_clear(p->h);
310         assert(p->items);
311         /* all other pointers shouls already be allocated, in right sizes! */
312     }
313     else
314     {
315         p = (struct rfd_private *) nmem_malloc(ct->nmem,sizeof(*p));
316         rfd->priv = p;
317         p->h = 0;
318         p->tailbits = 0;
319         if (is_and)
320             p->tailbits = nmem_malloc(ct->nmem, ct->no_children*sizeof(char) );
321         else
322             p->h = heap_create(ct->nmem, ct->no_children, kctrl);
323         p->items = (struct heap_item *)
324             nmem_malloc(ct->nmem, ct->no_children*sizeof(*p->items));
325         for (i = 0; i < ct->no_children; i++)
326         {
327             p->items[i].rset = ct->children[i];
328             p->items[i].buf = nmem_malloc(ct->nmem, kctrl->key_size);
329         }
330     }
331     p->flag = flag;
332     p->hits = 0;
333     p->eof = 0;
334     p->tailcount = 0;
335     if (is_and)
336     { /* read the array and sort it */
337         for (i = 0; i < ct->no_children; i++)
338         {
339             p->items[i].fd = rset_open(ct->children[i], RSETF_READ);
340             if (!rset_read(p->items[i].fd, p->items[i].buf, &p->items[i].term))
341                 p->eof = 1;
342             p->tailbits[i] = 0;
343         }
344         qsort(p->items, ct->no_children, sizeof(p->items[0]), compare_ands);
345     }
346     else
347     { /* fill the heap for ORing */
348         for (i = 0; i < ct->no_children; i++)
349         {
350             p->items[i].fd = rset_open(ct->children[i],RSETF_READ);
351             if ( rset_read(p->items[i].fd, p->items[i].buf, &p->items[i].term))
352                 heap_insert(p->h, &(p->items[i]));
353         }
354     }
355     return rfd;
356 }
357
358 static RSFD r_open_or(RSET ct, int flag)
359 {
360     return r_open_andor(ct, flag, 0);
361 }
362
363 static RSFD r_open_and(RSET ct, int flag)
364 {
365     return r_open_andor(ct, flag, 1);
366 }
367
368 static void r_close(RSFD rfd)
369 {
370     struct rfd_private *p=(struct rfd_private *)(rfd->priv);
371     int i;
372
373     if (p->h)
374         heap_destroy(p->h);
375     for (i = 0; i < rfd->rset->no_children; i++)
376         if (p->items[i].fd)
377             rset_close(p->items[i].fd);
378 }
379
380 static int r_forward_or(RSFD rfd, void *buf,
381                         TERMID *term, const void *untilbuf)
382 { /* while heap head behind untilbuf, forward it and rebalance heap */
383     struct rfd_private *p = rfd->priv;
384     const struct rset_key_control *kctrl = rfd->rset->keycontrol;
385     if (heap_empty(p->h))
386         return 0;
387     while ((*kctrl->cmp)(p->h->heap[1]->buf,untilbuf) < -rfd->rset->scope )
388     {
389         if (rset_forward(p->h->heap[1]->fd, p->h->heap[1]->buf,
390                          &p->h->heap[1]->term, untilbuf))
391             heap_balance(p->h);
392         else
393         {
394             heap_delete(p->h);
395             if (heap_empty(p->h))
396                 return 0;
397         }
398
399     }
400     return r_read_or(rfd, buf, term);
401 }
402
403 /** \brief reads one item key from an 'or' set
404     \param rfd set handle
405     \param buf resulting item buffer
406     \param term resulting term
407     \retval 0 EOF
408     \retval 1 item could be read
409 */
410 static int r_read_or(RSFD rfd, void *buf, TERMID *term)
411 {
412     RSET rset = rfd->rset;
413     struct rfd_private *mrfd = rfd->priv;
414     const struct rset_key_control *kctrl = rset->keycontrol;
415     struct heap_item *it;
416     int rdres;
417     if (heap_empty(mrfd->h))
418         return 0;
419     it = mrfd->h->heap[1];
420     memcpy(buf, it->buf, kctrl->key_size);
421     if (term)
422     {
423         if (rset->term)
424             *term = rset->term;
425         else
426             *term = it->term;
427     }
428     (mrfd->hits)++;
429     rdres = rset_read(it->fd, it->buf, &it->term);
430     if (rdres)
431         heap_balance(mrfd->h);
432     else
433         heap_delete(mrfd->h);
434     return 1;
435 }
436
437 /** \brief reads one item key from an 'and' set
438     \param rfd set handle
439     \param buf resulting item buffer
440     \param term resulting term
441     \retval 0 EOF
442     \retval 1 item could be read
443
444     Has to return all hits where each item points to the
445     same sysno (scope), in order. Keep an extra key (hitkey)
446     as long as all records do not point to hitkey, forward
447     them, and update hitkey to be the highest seen so far.
448     (if any item eof's, mark eof, and return 0 thereafter)
449     Once a hit has been found, scan all items for the smallest
450     value. Mark all as being in the tail. Read next from that
451     item, and if not in the same record, clear its tail bit
452 */
453 static int r_read_and(RSFD rfd, void *buf, TERMID *term)
454 {
455     struct rfd_private *p = rfd->priv;
456     RSET ct = rfd->rset;
457     const struct rset_key_control *kctrl = ct->keycontrol;
458     int i;
459
460     while (1)
461     {
462         if (p->tailcount)
463         { /* we are tailing, find lowest tail and return it */
464             int mintail = -1;
465             int cmp;
466
467             for (i = 0; i < ct->no_children; i++)
468             {
469                 if (p->tailbits[i])
470                 {
471                     if (mintail >= 0)
472                         cmp = (*kctrl->cmp)
473                             (p->items[i].buf, p->items[mintail].buf);
474                     else
475                         cmp = -1;
476                     if (cmp < 0)
477                         mintail = i;
478
479                     if (kctrl->get_segment)
480                     {   /* segments enabled */
481                         zint segment =  kctrl->get_segment(p->items[i].buf);
482                         /* store segment if not stored already */
483                         if (!p->segment && segment)
484                             p->segment = segment;
485
486                         /* skip rest entirely if segments don't match */
487                         if (p->segment && segment && p->segment != segment)
488                             p->skip = 1;
489                     }
490                 }
491             }
492             /* return the lowest tail */
493             memcpy(buf, p->items[mintail].buf, kctrl->key_size);
494             if (term)
495                 *term = p->items[mintail].term;
496             if (!rset_read(p->items[mintail].fd, p->items[mintail].buf,
497                            &p->items[mintail].term))
498             {
499                 p->eof = 1; /* game over, once tails have been returned */
500                 p->tailbits[mintail] = 0;
501                 (p->tailcount)--;
502             }
503             else
504             {
505                 /* still a tail? */
506                 cmp = (*kctrl->cmp)(p->items[mintail].buf,buf);
507                 if (cmp >= rfd->rset->scope)
508                 {
509                     p->tailbits[mintail] = 0;
510                     (p->tailcount)--;
511                 }
512             }
513             if (p->skip)
514                 continue;  /* skip again.. eventually tailcount will be 0 */
515             if (p->tailcount == 0)
516                 (p->hits)++;
517             return 1;
518         }
519         /* not tailing, forward until all records match, and set up */
520         /* as tails. the earlier 'if' will then return the hits */
521         if (p->eof)
522             return 0; /* nothing more to see */
523         i = 1; /* assume items[0] is highest up */
524         while (i < ct->no_children)
525         {
526             int cmp = (*kctrl->cmp)(p->items[0].buf, p->items[i].buf);
527             if (cmp <= -rfd->rset->scope) { /* [0] was behind, forward it */
528                 if (!rset_forward(p->items[0].fd, p->items[0].buf,
529                                   &p->items[0].term, p->items[i].buf))
530                 {
531                     p->eof = 1; /* game over */
532                     return 0;
533                 }
534                 i = 0; /* start forwarding from scratch */
535             }
536             else if (cmp >= rfd->rset->scope)
537             { /* [0] was ahead, forward i */
538                 if (!rset_forward(p->items[i].fd, p->items[i].buf,
539                                   &p->items[i].term, p->items[0].buf))
540                 {
541                     p->eof = 1; /* game over */
542                     return 0;
543                 }
544             }
545             else
546                 i++;
547         } /* while i */
548         /* if we get this far, all rsets are now within +- scope of [0] */
549         /* ergo, we have a hit. Mark them all as tailing, and let the */
550         /* upper 'if' return the hits in right order */
551         for (i = 0; i < ct->no_children; i++)
552             p->tailbits[i] = 1;
553         p->tailcount = ct->no_children;
554         p->segment = 0;
555         p->skip = 0;
556     } /* while 1 */
557 }
558
559
560 static int r_forward_and(RSFD rfd, void *buf, TERMID *term,
561                          const void *untilbuf)
562 {
563     struct rfd_private *p = rfd->priv;
564     RSET ct = rfd->rset;
565     const struct rset_key_control *kctrl = ct->keycontrol;
566     int i;
567     int cmp;
568     int killtail = 0;
569
570     for (i = 0; i < ct->no_children; i++)
571     {
572         cmp = (*kctrl->cmp)(p->items[i].buf,untilbuf);
573         if (cmp <= -rfd->rset->scope)
574         {
575             killtail = 1; /* we are moving to a different hit */
576             if (!rset_forward(p->items[i].fd, p->items[i].buf,
577                               &p->items[i].term, untilbuf))
578             {
579                 p->eof = 1; /* game over */
580                 p->tailcount = 0;
581                 return 0;
582             }
583         }
584     }
585     if (killtail)
586     {
587         for (i = 0; i < ct->no_children; i++)
588             p->tailbits[i] = 0;
589         p->tailcount = 0;
590     }
591     return r_read_and(rfd,buf,term);
592 }
593
594 static void r_pos_x(RSFD rfd, double *current, double *total, int and_op)
595 {
596     RSET ct = rfd->rset;
597     struct rfd_private *mrfd = (struct rfd_private *)(rfd->priv);
598     double ratio = and_op ? 0.0 : 1.0;
599     int i;
600     double sum_cur = 0.0;
601     double sum_tot = 0.0;
602     for (i = 0; i < ct->no_children; i++)
603     {
604         double cur, tot;
605         rset_pos(mrfd->items[i].fd, &cur, &tot);
606         if (i < 100)
607             yaz_log(log_level, "r_pos: %d %0.1f %0.1f", i, cur,tot);
608         if (and_op)
609         {
610             if (tot > 0.0)
611             {
612                 double nratio = cur / tot;
613                 if (nratio > ratio)
614                     ratio = nratio;
615             }
616         }
617         else
618         {
619             if (cur > 0)
620                 sum_cur += (cur - 1);
621             sum_tot += tot;
622         }
623     }
624     if (!and_op && sum_tot > 0.0)
625     {
626         yaz_log(YLOG_LOG, "or op sum_cur=%0.1f sum_tot=%0.1f hits=%f", sum_cur, sum_tot, (double) mrfd->hits);
627         ratio = sum_cur / sum_tot;
628     }
629     if (ratio == 0.0 || ratio == 1.0)
630     { /* nothing there */
631         *current = 0;
632         *total = 0;
633         yaz_log(log_level, "r_pos: NULL  %0.1f %0.1f",  *current, *total);
634     }
635     else
636     {
637         *current = (double) (mrfd->hits);
638         *total = *current / ratio;
639         yaz_log(log_level, "r_pos: =  %0.1f %0.1f",  *current, *total);
640     }
641 }
642
643 static void r_pos_and(RSFD rfd, double *current, double *total)
644 {
645     r_pos_x(rfd, current, total, 1);
646 }
647
648 static void r_pos_or(RSFD rfd, double *current, double *total)
649 {
650     r_pos_x(rfd, current, total, 0);
651 }
652
653 static int r_write(RSFD rfd, const void *buf)
654 {
655     yaz_log(YLOG_FATAL, "multior set type is read-only");
656     return -1;
657 }
658
659 static void r_get_terms(RSET ct, TERMID *terms, int maxterms, int *curterm)
660 {
661     if (ct->term)
662         rset_get_one_term(ct, terms, maxterms, curterm);
663     else
664     {
665         /* Special case: Some multi-ors have all terms pointing to the same
666            term. We do not want to duplicate those. Other multiors (and ands)
667            have different terms under them. Those we want.
668         */
669         int firstterm = *curterm;
670         int i;
671         for (i = 0; i < ct->no_children; i++)
672         {
673             rset_getterms(ct->children[i], terms, maxterms, curterm);
674             if (*curterm > firstterm + 1 && *curterm <= maxterms &&
675                 terms[(*curterm) - 1] == terms[firstterm])
676                 (*curterm)--; /* forget the term, seen that before */
677         }
678     }
679 }
680
681
682 /*
683  * Local variables:
684  * c-basic-offset: 4
685  * c-file-style: "Stroustrup"
686  * indent-tabs-mode: nil
687  * End:
688  * vim: shiftwidth=4 tabstop=8 expandtab
689  */
690