Added the scope parameter to rsets, and using it in all forwards and reads
[idzebra-moved-to-github.git] / rset / rsmultior.c
1 /* $Id: rsmultior.c,v 1.10 2004-09-09 10:08:06 heikki Exp $
2    Copyright (C) 1995,1996,1997,1998,1999,2000,2001,2002
3    Index Data Aps
4
5 This file is part of the Zebra server.
6
7 Zebra is free software; you can redistribute it and/or modify it under
8 the terms of the GNU General Public License as published by the Free
9 Software Foundation; either version 2, or (at your option) any later
10 version.
11
12 Zebra is distributed in the hope that it will be useful, but WITHOUT ANY
13 WARRANTY; without even the implied warranty of MERCHANTABILITY or
14 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
15 for more details.
16
17 You should have received a copy of the GNU General Public License
18 along with Zebra; see the file LICENSE.zebra.  If not, write to the
19 Free Software Foundation, 59 Temple Place - Suite 330, Boston, MA
20 02111-1307, USA.
21 */
22
23
24
25
26 #include <assert.h>
27 #include <stdio.h>
28 #include <stdlib.h>
29 #include <string.h>
30
31 #include <zebrautl.h>
32 #include <isamc.h>
33 #include <rset.h>
34
35 static RSFD r_open (RSET ct, int flag);
36 static void r_close (RSFD rfd);
37 static void r_delete (RSET ct);
38 static void r_rewind (RSFD rfd);
39 static int r_read (RSFD rfd, void *buf);
40 static int r_write (RSFD rfd, const void *buf);
41 static int r_forward(RSFD rfd, void *buf,
42                      const void *untilbuf);
43 static void r_pos (RSFD rfd, double *current, double *total);
44
45 static const struct rset_control control = 
46 {
47     "multi-or",
48     r_delete,
49     r_open,
50     r_close,
51     r_rewind,
52     r_forward,
53     r_pos,
54     r_read,
55     r_write,
56 };
57
58 const struct rset_control *rset_kind_multior = &control;
59
60 /* The heap structure: 
61  * The rset contains a list or rsets we are ORing together 
62  * The rfd contains a heap of heap-items, which contain
63  * a rfd opened to those rsets, and a buffer for one key.
64  * They also contain a ptr to the rset list in the rset 
65  * itself, for practical reasons. 
66  */
67
68 struct heap_item {
69     RSFD fd;
70     void *buf;
71     RSET rset;
72 };
73
74 struct heap {
75     int heapnum;
76     int heapmax;
77     const struct key_control *kctrl;
78     struct heap_item **heap; /* ptrs to the rfd */
79 };
80 typedef struct heap *HEAP;
81
82
83 struct rset_multior_info {
84     int     no_rsets;
85     RSET    *rsets;
86 };
87
88
89 struct rset_multior_rfd {
90     int flag;
91     struct heap_item *items; /* we alloc and free them here */
92     HEAP h;
93     zint hits; /* returned so far */
94 };
95
96 #if 0
97 static void heap_dump_item( HEAP h, int i, int level) {
98     double cur,tot;
99     if (i>h->heapnum)
100         return;
101     (void)rset_pos(h->heap[i]->rset,h->heap[i]->fd, &cur, &tot);
102     logf(LOG_LOG," %d %*s i=%p buf=%p %0.1f/%0.1f",i, level, "",  
103                     &(h->heap[i]), h->heap[i]->buf, cur,tot );
104     heap_dump_item(h, 2*i, level+1);
105     heap_dump_item(h, 2*i+1, level+1);
106 }
107 static void heap_dump( HEAP h,char *msg) {
108     logf(LOG_LOG, "heap dump: %s num=%d max=%d",msg, h->heapnum, h->heapmax);
109     heap_dump_item(h,1,1);
110 }
111 #endif
112
113 static void heap_swap (HEAP h, int x, int y)
114 {
115     struct heap_item *swap;
116     swap = h->heap[x];
117     h->heap[x]=h->heap[y];
118     h->heap[y]=swap;
119 }
120
121 static int heap_cmp(HEAP h, int x, int y)
122 {
123     return (*h->kctrl->cmp)(h->heap[x]->buf,h->heap[y]->buf);
124 }
125
126 static int heap_empty(HEAP h)
127 {
128     return ( 0==h->heapnum );
129 }
130
131 static void heap_delete (HEAP h)
132 { /* deletes the first item in the heap, and balances the rest */
133     int cur = 1, child = 2;
134     h->heap[1]=0; /* been deleted */
135     heap_swap (h, 1, h->heapnum--);
136     while (child <= h->heapnum) {
137         if (child < h->heapnum && heap_cmp(h,child,1+child)>0 )
138             child++;
139         if (heap_cmp(h,cur,child) > 0)
140         {
141             heap_swap (h, cur, child);
142             cur = child;
143             child = 2*cur;
144         }
145         else
146             break;
147     }
148 }
149
150 static void heap_balance (HEAP h)
151 { /* The heap root element has changed value (to bigger) */
152   /* swap downwards until the heap is ordered again */
153     int cur = 1, child = 2;
154     while (child <= h->heapnum) {
155         if (child < h->heapnum && heap_cmp(h,child,1+child)>0 )
156             child++;
157         if (heap_cmp(h,cur,child) > 0)
158         {
159             heap_swap (h, cur, child);
160             cur = child;
161             child = 2*cur;
162         }
163         else
164             break;
165     }
166 }
167
168
169 static void heap_insert (HEAP h, struct heap_item *hi)
170 {
171     int cur, parent;
172
173     cur = ++(h->heapnum);
174     assert(cur <= h->heapmax);
175     h->heap[cur]=hi;
176     parent = cur/2;
177     while (parent && (heap_cmp(h,parent,cur) > 0))
178     {
179         assert(parent>0);
180         heap_swap (h, cur, parent);
181         cur = parent;
182         parent = cur/2;
183     }
184 }
185
186
187 static
188 HEAP heap_create (NMEM nmem, int size, const struct key_control *kctrl)
189 {
190     HEAP h = (HEAP) nmem_malloc (nmem, sizeof(*h));
191
192     ++size; /* heap array starts at 1 */
193     h->heapnum = 0;
194     h->heapmax = size;
195     h->kctrl=kctrl;
196     h->heap = (struct heap_item**) nmem_malloc(nmem,size*sizeof(*h->heap));
197     h->heap[0]=0; /* not used */
198     return h;
199 }
200
201 static void heap_clear( HEAP h)
202 {
203     assert(h);
204     h->heapnum=0;
205 }
206
207 static void heap_destroy (HEAP h)
208 {
209     /* nothing to delete, all is nmem'd, and will go away in due time */
210 }
211
212
213 RSET rsmultior_create( NMEM nmem, const struct key_control *kcontrol, int scope,
214             int no_rsets, RSET* rsets)
215 {
216     RSET rnew=rset_create_base(&control, nmem,kcontrol, scope);
217     struct rset_multior_info *info;
218     info = (struct rset_multior_info *) nmem_malloc(rnew->nmem,sizeof(*info));
219     info->no_rsets=no_rsets;
220     info->rsets=(RSET*)nmem_malloc(rnew->nmem, no_rsets*sizeof(*rsets));
221     memcpy(info->rsets,rsets,no_rsets*sizeof(*rsets));
222     rnew->priv=info;
223     return rnew;
224 }
225
226 static void r_delete (RSET ct)
227 {
228     struct rset_multior_info *info = (struct rset_multior_info *) ct->priv;
229     int i;
230     for(i=0;i<info->no_rsets;i++)
231         rset_delete(info->rsets[i]);
232 }
233
234 static RSFD r_open (RSET ct, int flag)
235 {
236     RSFD rfd;
237     struct rset_multior_rfd *p;
238     struct rset_multior_info *info = (struct rset_multior_info *) ct->priv;
239     const struct key_control *kctrl = ct->keycontrol;
240     int i;
241
242     if (flag & RSETF_WRITE)
243     {
244         logf (LOG_FATAL, "multior set type is read-only");
245         return NULL;
246     }
247     rfd=rfd_create_base(ct);
248     if (rfd->priv) {
249         p=(struct rset_multior_rfd *)rfd->priv;
250         heap_clear(p->h);
251         assert(p->items);
252         /* all other pointers shouls already be allocated, in right sizes! */
253     }
254     else {
255         p = (struct rset_multior_rfd *) nmem_malloc (ct->nmem,sizeof(*p));
256         rfd->priv=p;
257         p->h = heap_create( ct->nmem, info->no_rsets, kctrl);
258         p->items=(struct heap_item *) nmem_malloc(ct->nmem,
259                               info->no_rsets*sizeof(*p->items));
260         for (i=0; i<info->no_rsets; i++){
261             p->items[i].rset=info->rsets[i];
262             p->items[i].buf=nmem_malloc(ct->nmem,kctrl->key_size);
263         }
264     }
265     p->flag = flag;
266     p->hits=0;
267     for (i=0; i<info->no_rsets; i++){
268         p->items[i].fd=rset_open(info->rsets[i],RSETF_READ);
269         if ( rset_read(p->items[i].fd, p->items[i].buf) )
270             heap_insert(p->h, &(p->items[i]));
271     }
272     return rfd;
273 }
274
275 static void r_close (RSFD rfd)
276 {
277     struct rset_multior_info *info=(struct rset_multior_info *)(rfd->rset->priv);
278     struct rset_multior_rfd *p=(struct rset_multior_rfd *)(rfd->priv);
279     int i;
280
281     heap_destroy (p->h);
282     for (i = 0; i<info->no_rsets; i++) 
283         if (p->items[i].fd)
284             rset_close(p->items[i].fd);
285     rfd_delete_base(rfd);
286 }
287
288
289 static void r_rewind (RSFD rfd)
290 {
291     assert(!"rewind not implemented yet");
292 }
293
294
295 static int r_forward(RSFD rfd, void *buf, const void *untilbuf)
296 {
297     struct rset_multior_rfd *mrfd=rfd->priv;
298     const struct key_control *kctrl=rfd->rset->keycontrol;
299     struct heap_item it;
300     int rdres;
301     if (heap_empty(mrfd->h))
302         return 0;
303     it = *(mrfd->h->heap[1]);
304     memcpy(buf,it.buf, kctrl->key_size); 
305     (mrfd->hits)++;
306     if (untilbuf)
307         rdres=rset_forward(it.fd, it.buf, untilbuf);
308     else
309         rdres=rset_read(it.fd, it.buf);
310     if ( rdres )
311         heap_balance(mrfd->h);
312     else
313         heap_delete(mrfd->h);
314     return 1;
315
316 }
317
318 static int r_read (RSFD rfd, void *buf)
319 {
320     return r_forward(rfd, buf,0);
321 }
322
323 static void r_pos (RSFD rfd, double *current, double *total)
324 {
325     struct rset_multior_info *info=(struct rset_multior_info *)(rfd->rset->priv);
326     struct rset_multior_rfd *mrfd=(struct rset_multior_rfd *)(rfd->priv);
327     double cur, tot;
328     double scur=0.0, stot=0.0;
329     int i;
330     for (i=0; i<info->no_rsets; i++){
331         rset_pos(mrfd->items[i].fd, &cur, &tot);
332         logf(LOG_LOG, "r_pos: %d %0.1f %0.1f", i, cur,tot);
333         scur += cur;
334         stot += tot;
335     }
336     if (stot <1.0) { /* nothing there */
337         *current=0;
338         *total=0;
339         return;
340     }
341     *current=mrfd->hits;
342     *total=*current*stot/scur;
343 }
344
345 static int r_write (RSFD rfd, const void *buf)
346 {
347     logf (LOG_FATAL, "multior set type is read-only");
348     return -1;
349 }