Using nmem for all rsets, and keeping a freelist for freed rfds, so
[idzebra-moved-to-github.git] / rset / rsmultior.c
1 /* $Id: rsmultior.c,v 1.7 2004-08-26 11:11:59 heikki Exp $
2    Copyright (C) 1995,1996,1997,1998,1999,2000,2001,2002
3    Index Data Aps
4
5 This file is part of the Zebra server.
6
7 Zebra is free software; you can redistribute it and/or modify it under
8 the terms of the GNU General Public License as published by the Free
9 Software Foundation; either version 2, or (at your option) any later
10 version.
11
12 Zebra is distributed in the hope that it will be useful, but WITHOUT ANY
13 WARRANTY; without even the implied warranty of MERCHANTABILITY or
14 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
15 for more details.
16
17 You should have received a copy of the GNU General Public License
18 along with Zebra; see the file LICENSE.zebra.  If not, write to the
19 Free Software Foundation, 59 Temple Place - Suite 330, Boston, MA
20 02111-1307, USA.
21 */
22
23
24
25
26 #include <assert.h>
27 #include <stdio.h>
28 #include <stdlib.h>
29 #include <string.h>
30
31 #include <zebrautl.h>
32 #include <isamc.h>
33 #include <rsmultior.h>
34
35 static RSFD r_open (RSET ct, int flag);
36 static void r_close (RSFD rfd);
37 static void r_delete (RSET ct);
38 static void r_rewind (RSFD rfd);
39 static int r_read (RSFD rfd, void *buf);
40 static int r_write (RSFD rfd, const void *buf);
41 static int r_forward(RSET ct, RSFD rfd, void *buf,
42                      int (*cmpfunc)(const void *p1, const void *p2),
43                      const void *untilbuf);
44 static void r_pos (RSFD rfd, double *current, double *total);
45
46 static const struct rset_control control = 
47 {
48     "multi-or",
49     r_open,
50     r_close,
51     r_delete,
52     r_rewind,
53     r_forward,
54     r_pos,
55     r_read,
56     r_write,
57 };
58
59 const struct rset_control *rset_kind_multior = &control;
60
61 /* The heap structure: 
62  * The rset contains a list or rsets we are ORing together 
63  * The rfd contains a heap of heap-items, which contain
64  * a rfd opened to those rsets, and a buffer for one key.
65  * They also contain a ptr to the rset list in the rset 
66  * itself, for practical reasons. 
67  */
68
69 struct heap_item {
70     RSFD fd;
71     void *buf;
72     RSET rset;
73 };
74
75 struct heap {
76     int heapnum;
77     int heapmax;
78     int keysize;
79     int     (*cmp)(const void *p1, const void *p2);
80     struct heap_item **heap; /* ptrs to the rfd */
81 };
82 typedef struct heap *HEAP;
83
84
85 struct rset_multior_info {
86     int     key_size;
87     int     no_rec;
88     int     (*cmp)(const void *p1, const void *p2);
89     int     no_rsets;
90     RSET    *rsets;
91     struct rset_multior_rfd *rfd_list; /* rfds in use */
92     struct rset_multior_rfd *free_list; /* rfds free */
93         /* rfds are allocated when first opened, if nothing in freelist */
94         /* when allocated, all their pointers are allocated as well, and */
95         /* those are kept intact when in the freelist, ready to be reused */
96 };
97
98
99 struct rset_multior_rfd {
100     int flag;
101     struct heap_item *items; /* we alloc and free them here */
102     HEAP h;
103     struct rset_multior_rfd *next;
104     struct rset_multior_info *info;
105     zint hits; /* returned so far */
106     char *prevvalue; /* to see if we are in another record */
107       /* FIXME - is this really needed? */
108 };
109
110 #if 0
111 static void heap_dump_item( HEAP h, int i, int level) {
112     double cur,tot;
113     if (i>h->heapnum)
114         return;
115     (void)rset_pos(h->heap[i]->rset,h->heap[i]->fd, &cur, &tot);
116     logf(LOG_LOG," %d %*s i=%p buf=%p %0.1f/%0.1f",i, level, "",  
117                     &(h->heap[i]), h->heap[i]->buf, cur,tot );
118     heap_dump_item(h, 2*i, level+1);
119     heap_dump_item(h, 2*i+1, level+1);
120 }
121 static void heap_dump( HEAP h,char *msg) {
122     logf(LOG_LOG, "heap dump: %s num=%d max=%d",msg, h->heapnum, h->heapmax);
123     heap_dump_item(h,1,1);
124 }
125 #endif
126
127 static void heap_swap (HEAP h, int x, int y)
128 {
129     struct heap_item *swap;
130     swap = h->heap[x];
131     h->heap[x]=h->heap[y];
132     h->heap[y]=swap;
133 }
134
135 static int heap_cmp(HEAP h, int x, int y)
136 {
137     return (*h->cmp)(h->heap[x]->buf,h->heap[y]->buf);
138 }
139
140 static int heap_empty(HEAP h)
141 {
142     return ( 0==h->heapnum );
143 }
144
145 static void heap_delete (HEAP h)
146 { /* deletes the first item in the heap, and balances the rest */
147     int cur = 1, child = 2;
148     h->heap[1]=0; /* been deleted */
149     heap_swap (h, 1, h->heapnum--);
150     while (child <= h->heapnum) {
151         if (child < h->heapnum && heap_cmp(h,child,1+child)>0 )
152             child++;
153         if (heap_cmp(h,cur,child) > 0)
154         {
155             heap_swap (h, cur, child);
156             cur = child;
157             child = 2*cur;
158         }
159         else
160             break;
161     }
162 }
163
164 static void heap_balance (HEAP h)
165 { /* The heap root element has changed value (to bigger) */
166   /* swap downwards until the heap is ordered again */
167     int cur = 1, child = 2;
168     while (child <= h->heapnum) {
169         if (child < h->heapnum && heap_cmp(h,child,1+child)>0 )
170             child++;
171         if (heap_cmp(h,cur,child) > 0)
172         {
173             heap_swap (h, cur, child);
174             cur = child;
175             child = 2*cur;
176         }
177         else
178             break;
179     }
180 }
181
182
183 static void heap_insert (HEAP h, struct heap_item *hi)
184 {
185     int cur, parent;
186
187     cur = ++(h->heapnum);
188     assert(cur <= h->heapmax);
189     h->heap[cur]=hi;
190     parent = cur/2;
191     while (parent && (heap_cmp(h,parent,cur) > 0))
192     {
193         assert(parent>0);
194         heap_swap (h, cur, parent);
195         cur = parent;
196         parent = cur/2;
197     }
198 }
199
200
201 static
202 HEAP heap_create (NMEM nmem, int size, int key_size,
203       int (*cmp)(const void *p1, const void *p2))
204 {
205     HEAP h = (HEAP) nmem_malloc (nmem, sizeof(*h));
206
207     ++size; /* heap array starts at 1 */
208     h->heapnum = 0;
209     h->heapmax = size;
210     h->keysize = key_size;
211     h->cmp = cmp;
212     h->heap = (struct heap_item**) nmem_malloc(nmem,(size)*sizeof(*h->heap));
213     h->heap[0]=0; /* not used */
214     return h;
215 }
216
217 static void heap_clear( HEAP h)
218 {
219     assert(h);
220     h->heapnum=0;
221 }
222
223 static void heap_destroy (HEAP h)
224 {
225     /* nothing to delete, all is nmem'd, and will go away in due time */
226 }
227
228
229 RSET rsmultior_create( NMEM nmem, int key_size, 
230             int (*cmp)(const void *p1, const void *p2),
231             int no_rsets, RSET* rsets)
232 {
233     RSET rnew=rset_create_base(&control, nmem);
234     struct rset_multior_info *info;
235     info = (struct rset_multior_info *) nmem_malloc(rnew->nmem,sizeof(*info));
236     info->key_size = key_size;
237     info->cmp = cmp;
238     info->no_rsets=no_rsets;
239     info->rsets=(RSET*)nmem_malloc(rnew->nmem, no_rsets*sizeof(*rsets));
240     memcpy(info->rsets,rsets,no_rsets*sizeof(*rsets));
241     info->rfd_list = NULL;
242     info->free_list = NULL;
243     rnew->priv=info;
244     return rnew;
245 }
246
247 static void r_delete (RSET ct)
248 {
249     struct rset_multior_info *info = (struct rset_multior_info *) ct->priv;
250     int i;
251
252     assert (info->rfd_list == NULL);
253     for(i=0;i<info->no_rsets;i++)
254         rset_delete(info->rsets[i]);
255 }
256 #if 0
257 static void *r_create (RSET ct, const struct rset_control *sel, void *parms)
258 {
259     rset_multior_parms *r_parms = (rset_multior_parms *) parms;
260     struct rset_multior_info *info;
261     info = (struct rset_multior_info *) xmalloc (sizeof(*info));
262     info->key_size = r_parms->key_size;
263     assert (info->key_size > 1);
264     info->cmp = r_parms->cmp;
265     info->no_rsets= r_parms->no_rsets;
266     info->rsets=r_parms->rsets; /* now we own it! */
267     info->rfd_list=0;
268     return info;
269 }
270 #endif
271
272 static RSFD r_open (RSET ct, int flag)
273 {
274     struct rset_multior_rfd *rfd;
275     struct rset_multior_info *info = (struct rset_multior_info *) ct->priv;
276     int i;
277
278     if (flag & RSETF_WRITE)
279     {
280         logf (LOG_FATAL, "multior set type is read-only");
281         return NULL;
282     }
283     rfd=info->free_list;
284     if (rfd) {
285         info->free_list=rfd->next;
286         heap_clear(rfd->h);
287         assert(rfd->items);
288         /* all other pointers shouls already be allocated, in right sizes! */
289     }
290     else {
291         rfd = (struct rset_multior_rfd *) nmem_malloc (ct->nmem,sizeof(*rfd));
292         rfd->h = heap_create( ct->nmem, info->no_rsets, 
293                               info->key_size, info->cmp);
294         rfd->items=(struct heap_item *) nmem_malloc(ct->nmem,
295                               info->no_rsets*sizeof(*rfd->items));
296         for (i=0; i<info->no_rsets; i++){
297             rfd->items[i].rset=info->rsets[i];
298             rfd->items[i].buf=nmem_malloc(ct->nmem,info->key_size);
299         }
300     }
301     rfd->flag = flag;
302     rfd->next = info->rfd_list;
303     rfd->info = info;
304     info->rfd_list = rfd;
305     rfd->prevvalue=0;
306     rfd->hits=0;
307     for (i=0; i<info->no_rsets; i++){
308         rfd->items[i].fd=rset_open(info->rsets[i],RSETF_READ);
309         if ( rset_read(rfd->items[i].rset, rfd->items[i].fd, 
310                       rfd->items[i].buf) )
311             heap_insert(rfd->h, &(rfd->items[i]));
312     }
313     return rfd;
314 }
315
316 static void r_close (RSFD rfd)
317 {
318     struct rset_multior_rfd *mrfd = (struct rset_multior_rfd *) rfd;
319     struct rset_multior_info *info = mrfd->info;
320     struct rset_multior_rfd **rfdp;
321     int i;
322     
323     for (rfdp = &info->rfd_list; *rfdp; rfdp = &(*rfdp)->next)
324         if (*rfdp == rfd)
325         {
326             *rfdp = (*rfdp)->next;
327         
328             heap_destroy (mrfd->h);
329             for (i = 0; i<info->no_rsets; i++) {
330                 if (mrfd->items[i].fd)
331                     rset_close(info->rsets[i],mrfd->items[i].fd);
332             }
333             mrfd->next=info->free_list;
334             info->free_list=mrfd;
335             return;
336         }
337     logf (LOG_FATAL, "r_close but no rfd match!");
338     assert (0);
339 }
340
341
342 static void r_rewind (RSFD rfd)
343 {
344     assert(!"rewind not implemented yet");
345 }
346
347
348 static int r_forward(RSET ct, RSFD rfd, void *buf,
349                      int (*cmpfunc)(const void *p1, const void *p2),
350                      const void *untilbuf)
351 {
352     struct rset_multior_rfd *mrfd = (struct rset_multior_rfd *) rfd;
353     struct rset_multior_info *info = mrfd->info;
354     struct heap_item it;
355     int rdres;
356     if (heap_empty(mrfd->h))
357         return 0;
358     if (cmpfunc)
359         assert(cmpfunc==mrfd->info->cmp);
360     it = *(mrfd->h->heap[1]);
361     memcpy(buf,it.buf, info->key_size); 
362     (mrfd->hits)++;
363     if (untilbuf)
364         rdres=rset_forward(it.rset, it.fd, it.buf, cmpfunc,untilbuf);
365     else
366         rdres=rset_read(it.rset, it.fd, it.buf);
367     if ( rdres )
368         heap_balance(mrfd->h);
369     else
370         heap_delete(mrfd->h);
371     return 1;
372
373 }
374
375 static int r_read (RSFD rfd, void *buf)
376 {
377     return r_forward(0,rfd, buf,0,0);
378 }
379
380 static void r_pos (RSFD rfd, double *current, double *total)
381 {
382     struct rset_multior_rfd *mrfd = (struct rset_multior_rfd *) rfd;
383     struct rset_multior_info *info = mrfd->info;
384     double cur, tot;
385     double scur=0.0, stot=0.0;
386     int i;
387     for (i=0; i<info->no_rsets; i++){
388         rset_pos(mrfd->items[i].rset, mrfd->items[i].fd, &cur, &tot);
389         logf(LOG_LOG, "r_pos: %d %0.1f %0.1f", i, cur,tot);
390         scur += cur;
391         stot += tot;
392     }
393     if (stot <1.0) { /* nothing there */
394         *current=0;
395         *total=0;
396         return;
397     }
398     *current=mrfd->hits;
399     *total=*current*stot/scur;
400 }
401
402 static int r_write (RSFD rfd, const void *buf)
403 {
404     logf (LOG_FATAL, "multior set type is read-only");
405     return -1;
406 }