First version of ISAMS.
[idzebra-moved-to-github.git] / isamc / merge.c
1 /*
2  * Copyright (c) 1996-1998, Index Data.
3  * See the file LICENSE for details.
4  * Sebastian Hammer, Adam Dickmeiss
5  *
6  * $Log: merge.c,v $
7  * Revision 1.10  1998-03-19 12:22:09  adam
8  * Minor change.
9  *
10  * Revision 1.9  1998/03/19 10:04:38  adam
11  * Minor changes.
12  *
13  * Revision 1.8  1998/03/18 09:23:55  adam
14  * Blocks are stored in chunks on free list - up to factor 2 in speed.
15  * Fixed bug that could occur in block category rearrangemen.
16  *
17  * Revision 1.7  1998/03/11 11:18:18  adam
18  * Changed the isc_merge to take into account the mfill (minimum-fill).
19  *
20  * Revision 1.6  1998/03/06 13:54:03  adam
21  * Fixed two nasty bugs in isc_merge.
22  *
23  * Revision 1.5  1997/02/12 20:42:43  adam
24  * Bug fix: during isc_merge operations, some pages weren't marked dirty
25  * even though they should be. At this point the merge operation marks
26  * a page dirty if the previous page changed at all. A better approach is
27  * to mark it dirty if the last key written changed in previous page.
28  *
29  * Revision 1.4  1996/11/08 11:15:31  adam
30  * Number of keys in chain are stored in first block and the function
31  * to retrieve this information, isc_pp_num is implemented.
32  *
33  * Revision 1.3  1996/11/04 14:08:59  adam
34  * Optimized free block usage.
35  *
36  * Revision 1.2  1996/11/01 13:36:46  adam
37  * New element, max_blocks_mem, that control how many blocks of max size
38  * to store in memory during isc_merge.
39  * Function isc_merge now ignores delete/update of identical keys and
40  * the proper blocks are then non-dirty and not written in flush_blocks.
41  *
42  * Revision 1.1  1996/11/01  08:59:15  adam
43  * First version of isc_merge that supports update/delete.
44  *
45  */
46
47 #include <stdlib.h>
48 #include <assert.h>
49 #include <string.h>
50 #include <stdio.h>
51
52 #include <log.h>
53 #include "isamc-p.h"
54
55 struct isc_merge_block {
56     int offset;       /* offset in r_buf */
57     int block;        /* block number of file (0 if none) */
58     int dirty;        /* block is different from that on file */
59 };
60
61 static void opt_blocks (ISAMC is, struct isc_merge_block *mb, int ptr, int last)
62 {
63     int i, no_dirty = 0;
64     for (i = 0; i<ptr; i++)
65         if (mb[i].dirty)
66             no_dirty++;
67     if (no_dirty*4 < ptr*3)
68         return;
69     /* bubble-sort it */
70     for (i = 0; i<ptr; i++)
71     {
72         int tmp, j, j_min = -1;
73         for (j = i; j<ptr; j++)
74         {
75             if (j_min < 0 || mb[j_min].block > mb[j].block)
76                 j_min = j;
77         }
78         assert (j_min >= 0);
79         tmp = mb[j_min].block;
80         mb[j_min].block = mb[i].block;
81         mb[i].block = tmp;
82         mb[i].dirty = 1;
83     }
84     if (!last)
85         mb[i].dirty = 1;
86 }
87
88 static void flush_blocks (ISAMC is, struct isc_merge_block *mb, int ptr,
89                           char *r_buf, int *firstpos, int cat, int last,
90                           int *numkeys)
91 {
92     int i;
93
94     for (i = 0; i<ptr; i++)
95     {
96         /* consider this block number */
97         if (!mb[i].block) 
98         {
99             mb[i].block = isc_alloc_block (is, cat);
100             mb[i].dirty = 1;
101         }
102
103         /* consider next block pointer */
104         if (last && i == ptr-1)
105             mb[i+1].block = 0;
106         else if (!mb[i+1].block)       
107         {
108             mb[i+1].block = isc_alloc_block (is, cat);
109             mb[i+1].dirty = 1;
110             mb[i].dirty = 1;
111         }
112     }
113
114     for (i = 0; i<ptr; i++)
115     {
116         char *src;
117         ISAMC_BLOCK_SIZE ssize = mb[i+1].offset - mb[i].offset;
118
119         assert (ssize);
120
121         /* skip rest if not dirty */
122         if (!mb[i].dirty)
123         {
124             assert (mb[i].block);
125             if (!*firstpos)
126                 *firstpos = mb[i].block;
127             if (is->method->debug > 2)
128                 logf (LOG_LOG, "isc: skip ptr=%d size=%d %d %d",
129                      i, ssize, cat, mb[i].block);
130             ++(is->files[cat].no_skip_writes);
131             continue;
132         }
133         /* write block */
134
135         if (!*firstpos)
136         {
137             *firstpos = mb[i].block;
138             src = r_buf + mb[i].offset - ISAMC_BLOCK_OFFSET_1;
139             ssize += ISAMC_BLOCK_OFFSET_1;
140
141             memcpy (src+sizeof(int)+sizeof(ssize), numkeys,
142                     sizeof(*numkeys));
143             if (is->method->debug > 2)
144                 logf (LOG_LOG, "isc: flush ptr=%d numk=%d size=%d nextpos=%d",
145                      i, *numkeys, (int) ssize, mb[i+1].block);
146         }
147         else
148         {
149             src = r_buf + mb[i].offset - ISAMC_BLOCK_OFFSET_N;
150             ssize += ISAMC_BLOCK_OFFSET_N;
151             if (is->method->debug > 2)
152                 logf (LOG_LOG, "isc: flush ptr=%d size=%d nextpos=%d",
153                      i, (int) ssize, mb[i+1].block);
154         }
155         memcpy (src, &mb[i+1].block, sizeof(int));
156         memcpy (src+sizeof(int), &ssize, sizeof(ssize));
157         isc_write_block (is, cat, mb[i].block, src);
158     }
159 }
160
161 static int get_border (ISAMC is, struct isc_merge_block *mb, int ptr,
162                        int cat, int firstpos)
163 {
164    /* Border set to initial fill or block size depending on
165       whether we are creating a new one or updating and old one.
166     */
167     
168     int fill = mb[ptr].block ? is->method->filecat[cat].bsize :
169                                is->method->filecat[cat].ifill;
170     int off = (ptr||firstpos) ? ISAMC_BLOCK_OFFSET_N : ISAMC_BLOCK_OFFSET_1;
171     
172     assert (ptr < 199);
173
174     return mb[ptr].offset + fill - off;
175 }
176
177 ISAMC_P isc_merge (ISAMC is, ISAMC_P ipos, ISAMC_I data)
178 {
179
180     char i_item[128], *i_item_ptr;
181     int i_more, i_mode, i;
182
183     ISAMC_PP pp; 
184     char f_item[128], *f_item_ptr;
185     int f_more;
186     int last_dirty = 0;
187     int debug = is->method->debug;
188  
189     struct isc_merge_block mb[200];
190
191     int firstpos = 0;
192     int cat = 0;
193     char r_item_buf[128]; /* temporary result output */
194     char *r_buf;          /* block with resulting data */
195     int r_offset = 0;     /* current offset in r_buf */
196     int ptr = 0;          /* pointer */
197     void *r_clientData;   /* encode client data */
198     int border;
199     int numKeys = 0;
200
201     r_clientData = (*is->method->code_start)(ISAMC_ENCODE);
202     r_buf = is->merge_buf + 128;
203
204     pp = isc_pp_open (is, ipos);
205     /* read first item from file. make sure f_more indicates no boundary */
206     f_item_ptr = f_item;
207     f_more = isc_read_item (pp, &f_item_ptr);
208     if (f_more > 0)
209         f_more = 1;
210     cat = pp->cat;
211
212     if (debug > 1)
213         logf (LOG_LOG, "isc: isc_merge begin %d %d", cat, pp->pos);
214
215     /* read first item from i */
216     i_item_ptr = i_item;
217     i_more = (*data->read_item)(data->clientData, &i_item_ptr, &i_mode);
218
219     mb[ptr].block = pp->pos;     /* is zero if no block on disk */
220     mb[ptr].dirty = 0;
221     mb[ptr].offset = 0;
222
223     border = get_border (is, mb, ptr, cat, firstpos);
224     while (i_more || f_more)
225     {
226         char *r_item = r_item_buf;
227         int cmp;
228
229         if (f_more > 1)
230         {
231             /* block to block boundary in the original file. */
232             f_more = 1;
233             if (cat == pp->cat) 
234             {
235                 /* the resulting output is of the same category as the
236                    the original 
237                 */
238                 if (r_offset <= mb[ptr].offset +is->method->filecat[cat].mfill)
239                 {
240                     /* the resulting output block is too small/empty. Delete
241                        the original (if any)
242                     */
243                     if (debug > 3)
244                         logf (LOG_LOG, "isc: release A");
245                     if (mb[ptr].block)
246                         isc_release_block (is, pp->cat, mb[ptr].block);
247                     mb[ptr].block = pp->pos;
248                     if (!mb[ptr].dirty)
249                         mb[ptr].dirty = 1;
250                     if (ptr > 0)
251                         mb[ptr-1].dirty = 1;
252                 }
253                 else
254                 {
255                     /* indicate new boundary based on the original file */
256                     mb[++ptr].block = pp->pos;
257                     mb[ptr].dirty = last_dirty;
258                     mb[ptr].offset = r_offset;
259                     if (debug > 3)
260                         logf (LOG_LOG, "isc: bound ptr=%d,offset=%d",
261                             ptr, r_offset);
262                     if (cat==is->max_cat && ptr >= is->method->max_blocks_mem)
263                     {
264                         /* We are dealing with block(s) of max size. Block(s)
265                            except 1 will be flushed.
266                          */
267                         if (debug > 2)
268                             logf (LOG_LOG, "isc: flush A %d sections", ptr);
269                         flush_blocks (is, mb, ptr-1, r_buf, &firstpos, cat,
270                                       0, &pp->numKeys);
271                         mb[0].block = mb[ptr-1].block;
272                         mb[0].dirty = mb[ptr-1].dirty;
273                         memcpy (r_buf, r_buf + mb[ptr-1].offset,
274                                 mb[ptr].offset - mb[ptr-1].offset);
275                         mb[0].offset = 0;
276
277                         mb[1].block = mb[ptr].block;
278                         mb[1].dirty = mb[ptr].dirty;
279                         mb[1].offset = mb[ptr].offset - mb[ptr-1].offset;
280                         ptr = 1;
281                         r_offset = mb[ptr].offset;
282                     }
283                 }
284             }
285             border = get_border (is, mb, ptr, cat, firstpos);
286         }
287         last_dirty = 0;
288         if (!f_more)
289             cmp = -1;
290         else if (!i_more)
291             cmp = 1;
292         else
293             cmp = (*is->method->compare_item)(i_item, f_item);
294         if (cmp == 0)                   /* insert i=f */
295         {
296             if (!i_mode)   /* delete item? */
297             {
298                 /* move i */
299                 i_item_ptr = i_item;
300                 i_more = (*data->read_item)(data->clientData, &i_item_ptr,
301                                            &i_mode);
302                 /* is next input item the same as current except
303                    for the delete flag? */
304                 cmp = (*is->method->compare_item)(i_item, f_item);
305                 if (!cmp && i_mode)   /* delete/insert nop? */
306                 {
307                     /* yes! insert as if it was an insert only */
308                     memcpy (r_item, i_item, i_item_ptr - i_item);
309                     i_item_ptr = i_item;
310                     i_more = (*data->read_item)(data->clientData, &i_item_ptr,
311                                                 &i_mode);
312                 }
313                 else
314                 {
315                     /* no! delete the item */
316                     r_item = NULL;
317                     last_dirty = 1;
318                     mb[ptr].dirty = 2;
319                 }
320             }
321             else
322             {
323                 memcpy (r_item, f_item, f_item_ptr - f_item);
324
325                 /* move i */
326                 i_item_ptr = i_item;
327                 i_more = (*data->read_item)(data->clientData, &i_item_ptr,
328                                            &i_mode);
329             }
330             /* move f */
331             f_item_ptr = f_item;
332             f_more = isc_read_item (pp, &f_item_ptr);
333         }
334         else if (cmp > 0)               /* insert f */
335         {
336             memcpy (r_item, f_item, f_item_ptr - f_item);
337             /* move f */
338             f_item_ptr = f_item;
339             f_more = isc_read_item (pp, &f_item_ptr);
340         }
341         else                            /* insert i */
342         {
343             if (!i_mode)                /* delete item which isn't there? */
344             {
345                 logf (LOG_FATAL, "Inconsistent register at offset %d",
346                                  r_offset);
347                 abort ();
348             }
349             memcpy (r_item, i_item, i_item_ptr - i_item);
350             mb[ptr].dirty = 2;
351             last_dirty = 1;
352             /* move i */
353             i_item_ptr = i_item;
354             i_more = (*data->read_item)(data->clientData, &i_item_ptr,
355                                         &i_mode);
356         }
357         if (r_item)  /* insert resulting item? */
358         {
359             char *r_out_ptr = r_buf + r_offset;
360             int new_offset;
361
362             (*is->method->code_item)(ISAMC_ENCODE, r_clientData,
363                                      &r_out_ptr, &r_item);
364             new_offset = r_out_ptr - r_buf; 
365
366             numKeys++;
367
368             if (border < new_offset && border >= r_offset)
369             {
370                 if (debug > 2)
371                     logf (LOG_LOG, "isc: border %d %d", ptr, border);
372                 /* Max size of current block category reached ...
373                    make new virtual block entry */
374                 mb[++ptr].block = 0;
375                 mb[ptr].dirty = 1;
376                 mb[ptr].offset = r_offset;
377                 if (cat == is->max_cat && ptr >= is->method->max_blocks_mem)
378                 {
379                     /* We are dealing with block(s) of max size. Block(s)
380                        except one will be flushed. Note: the block(s) are
381                        surely not the last one(s).
382                      */
383                     if (debug > 2)
384                         logf (LOG_LOG, "isc: flush B %d sections", ptr-1);
385                     flush_blocks (is, mb, ptr-1, r_buf, &firstpos, cat,
386                                   0, &pp->numKeys);
387                     mb[0].block = mb[ptr-1].block;
388                     mb[0].dirty = mb[ptr-1].dirty;
389                     memcpy (r_buf, r_buf + mb[ptr-1].offset,
390                             mb[ptr].offset - mb[ptr-1].offset);
391                     mb[0].offset = 0;
392
393                     mb[1].block = mb[ptr].block;
394                     mb[1].dirty = mb[0].dirty;
395                     mb[1].offset = mb[ptr].offset - mb[ptr-1].offset;
396                     memcpy (r_buf + mb[1].offset, r_buf + r_offset,
397                             new_offset - r_offset);
398                     new_offset = (new_offset - r_offset) + mb[1].offset;
399                     ptr = 1;
400                 }
401                 border = get_border (is, mb, ptr, cat, firstpos);
402             }
403             r_offset = new_offset;
404         }
405         if (cat < is->max_cat && ptr >= is->method->filecat[cat].mblocks)
406         {
407             /* Max number blocks in current category reached ->
408                must switch to next category (with larger block size) 
409             */
410             int j = 0;
411
412             (is->files[cat].no_remap)++;
413             /* delete all original block(s) read so far */
414             for (i = 0; i < ptr; i++)
415                 if (mb[i].block)
416                     isc_release_block (is, pp->cat, mb[i].block);
417             /* also delete all block to be read in the future */
418             pp->deleteFlag = 1;
419
420             /* remap block offsets */
421             assert (mb[j].offset == 0);
422             cat++;
423             mb[j].dirty = 1;
424             mb[j].block = 0;
425             mb[ptr].offset = r_offset;
426             for (i = 1; i < ptr; i++)
427             {
428                 int border = is->method->filecat[cat].ifill -
429                          ISAMC_BLOCK_OFFSET_1 + mb[j].offset;
430                 if (debug > 3)
431                     logf (LOG_LOG, "isc: remap %d border=%d", i, border);
432                 if (mb[i+1].offset > border && mb[i].offset <= border)
433                 {
434                     if (debug > 3)
435                         logf (LOG_LOG, "isc:  to %d %d", j, mb[i].offset);
436                     mb[++j].dirty = 1;
437                     mb[j].block = 0;
438                     mb[j].offset = mb[i].offset;
439                 }
440             }
441             if (debug > 2)
442                 logf (LOG_LOG, "isc: remap from %d to %d sections to cat %d",
443                       ptr, j, cat);
444             ptr = j;
445             border = get_border (is, mb, ptr, cat, firstpos);
446             if (debug > 3)
447                 logf (LOG_LOG, "isc: border=%d r_offset=%d", border, r_offset);
448         }
449     }
450     if (mb[ptr].offset < r_offset)
451     {   /* make the final boundary offset */
452         mb[++ptr].dirty = 1; 
453         mb[ptr].block = 0; 
454         mb[ptr].offset = r_offset;
455     }
456     else
457     {   /* empty output. Release last block if any */
458         if (cat == pp->cat && mb[ptr].block)
459         {
460             if (debug > 3)
461                 logf (LOG_LOG, "isc: release C");
462             isc_release_block (is, pp->cat, mb[ptr].block);
463             mb[ptr].block = 0;
464             if (ptr > 0)
465                 mb[ptr-1].dirty = 1;
466         }
467     }
468
469     if (debug > 2)
470         logf (LOG_LOG, "isc: flush C, %d sections", ptr);
471
472     if (firstpos)
473     {
474         /* we have to patch initial block with num keys if that
475            has changed */
476         if (numKeys != isc_pp_num (pp))
477         {
478             if (debug > 2)
479                 logf (LOG_LOG, "isc: patch num keys firstpos=%d num=%d",
480                                 firstpos, numKeys);
481             bf_write (is->files[cat].bf, firstpos, ISAMC_BLOCK_OFFSET_N,
482                       sizeof(numKeys), &numKeys);
483         }
484     }
485     else if (ptr > 0)
486     {   /* we haven't flushed initial block yet and there surely are some
487            blocks to flush. Make first block dirty if numKeys differ */
488         if (numKeys != isc_pp_num (pp))
489             mb[0].dirty = 1;
490     }
491     /* flush rest of block(s) in r_buf */
492     flush_blocks (is, mb, ptr, r_buf, &firstpos, cat, 1, &numKeys);
493
494     (*is->method->code_stop)(ISAMC_ENCODE, r_clientData);
495     if (!firstpos)
496         cat = 0;
497     if (debug > 1)
498         logf (LOG_LOG, "isc: isc_merge return %d %d", cat, firstpos);
499     isc_pp_close (pp);
500     return cat + firstpos * 8;
501 }
502