First version of isc_merge that supports update/delete.
[idzebra-moved-to-github.git] / isamc / merge.c
1 /*
2  * Copyright (c) 1996, Index Data.
3  * See the file LICENSE for details.
4  * Sebastian Hammer, Adam Dickmeiss
5  *
6  * $Log: merge.c,v $
7  * Revision 1.1  1996-11-01 08:59:15  adam
8  * First version of isc_merge that supports update/delete.
9  *
10  */
11
12 #include <stdlib.h>
13 #include <assert.h>
14 #include <string.h>
15 #include <stdio.h>
16
17 #include <log.h>
18 #include "isamc-p.h"
19
20 struct isc_merge_block {
21     int offset;       /* offset in r_buf */
22     int block;        /* block number of file (0 if none) */
23     int dirty;        /* block is different from that on file */
24 };
25
26 static void flush_blocks (ISAMC is, struct isc_merge_block *mb, int ptr,
27                           char *r_buf, int *firstpos, int cat, int last)
28 {
29     int i;
30
31     for (i = 1; i<ptr; i++)
32     {
33         /* skip rest if not dirty */
34         if (!mb[i-1].dirty)
35         {
36             assert (mb[i-1].block);
37             if (!*firstpos)
38                 *firstpos = mb[i-1].block;
39             if (is->method->debug > 2)
40                 logf (LOG_LOG, "isc: skip block %d %d", cat, mb[i-1].block);
41             ++(is->files[cat].no_skip_writes);
42             continue;
43         }
44         /* consider this block number */
45         if (!mb[i-1].block) 
46             mb[i-1].block = isc_alloc_block (is, cat);
47         if (!*firstpos)
48             *firstpos = mb[i-1].block;
49
50         /* consider next block pointer */
51         if (last && i == ptr-1)
52             mb[i].block = 0;
53         else if (!mb[i].block)       
54             mb[i].block = isc_alloc_block (is, cat);
55
56         /* write block */
57         assert (mb[i].offset > mb[i-1].offset);
58         isc_write_dblock (is, cat, mb[i-1].block, r_buf + mb[i-1].offset,
59                           mb[i].block, mb[i].offset - mb[i-1].offset);
60     }
61 }
62
63 ISAMC_P isc_merge (ISAMC is, ISAMC_P ipos, ISAMC_I data)
64 {
65
66     char i_item[128], *i_item_ptr;
67     int i_more, i_mode, i;
68
69     ISAMC_PP pp; 
70     char f_item[128], *f_item_ptr;
71     int f_more;
72  
73     struct isc_merge_block mb[100];
74     int firstpos = 0;
75     int cat = 0;
76     char r_item_buf[128]; /* temporary result output */
77     char *r_buf;          /* block with resulting data */
78     int r_offset = 0;     /* current offset in r_buf */
79     int ptr = 0;          /* pointer */
80     void *r_clientData;   /* encode client data */
81
82     r_clientData = (*is->method->code_start)(ISAMC_ENCODE);
83     r_buf = is->merge_buf + ISAMC_BLOCK_OFFSET;
84
85     pp = isc_pp_open (is, ipos);
86     /* read first item from file. make sure f_more indicates no boundary */
87     f_item_ptr = f_item;
88     f_more = isc_read_item (pp, &f_item_ptr);
89     if (f_more > 0)
90         f_more = 1;
91     cat = pp->cat;
92
93     if (is->method->debug > 1)
94         logf (LOG_LOG, "isc: isc_merge begin %d %d", cat, pp->pos);
95
96     /* read first item from i */
97     i_item_ptr = i_item;
98     i_more = (*data->read_item)(data->clientData, &i_item_ptr, &i_mode);
99
100     mb[ptr].block = pp->pos;     /* is zero if no block on disk */
101     mb[ptr].dirty = 0;
102     mb[ptr++].offset = 0;
103
104     while (i_more || f_more)
105     {
106         char *r_item = r_item_buf;
107         int cmp;
108
109         if (!f_more)
110             cmp = -1;
111         else if (!i_more)
112             cmp = 1;
113         else
114             cmp = (*is->method->compare_item)(i_item, f_item);
115         if (cmp == 0)                   /* insert i=f */
116         {
117             if (!i_mode)   /* delete item? */
118             {
119                 /* move i */
120                 i_item_ptr = i_item;
121                 i_more = (*data->read_item)(data->clientData, &i_item_ptr,
122                                            &i_mode);
123                 /* it next input item the same as current except
124                    for the delete flag? */
125                 cmp = (*is->method->compare_item)(i_item, f_item);
126                 if (0 && !cmp && i_mode)
127                 {
128                     /* yes! insert as if it was an insert only */
129                     memcpy (r_item, i_item, i_item_ptr - i_item);
130                     i_item_ptr = i_item;
131                     i_more = (*data->read_item)(data->clientData, &i_item_ptr,
132                                                 &i_mode);
133                 }
134                 else
135                 {
136                     /* no! delete the item */
137                     r_item = NULL;
138                     mb[ptr-1].dirty = 1;
139                 }
140             }
141             else
142             {
143                 memcpy (r_item, f_item, f_item_ptr - f_item);
144
145                 /* move i */
146                 i_item_ptr = i_item;
147                 i_more = (*data->read_item)(data->clientData, &i_item_ptr,
148                                            &i_mode);
149             }
150             /* move f */
151             f_item_ptr = f_item;
152             f_more = isc_read_item (pp, &f_item_ptr);
153         }
154         else if (cmp > 0)               /* insert f */
155         {
156             memcpy (r_item, f_item, f_item_ptr - f_item);
157             /* move f */
158             f_item_ptr = f_item;
159             f_more = isc_read_item (pp, &f_item_ptr);
160         }
161         else                            /* insert i */
162         {
163             if (!i_mode)                /* delete item which isn't there? */
164             {
165                 logf (LOG_FATAL, "Inconsistent register at offset %d",
166                                  r_offset);
167                 abort ();
168             }
169             memcpy (r_item, i_item, i_item_ptr - i_item);
170             mb[ptr-1].dirty = 1;
171             /* move i */
172             i_item_ptr = i_item;
173             i_more = (*data->read_item)(data->clientData, &i_item_ptr,
174                                         &i_mode);
175         }
176         if (f_more > 1)
177         {
178             /* block to block boundary in the original file. */
179             f_more = 1;
180             if (cat == pp->cat) 
181             {
182                 /* the resulting output is of the same category as the
183                    the original 
184                  */
185                 if (mb[ptr-1].offset == r_offset)
186                 {
187                     /* the resulting output block is empty. Delete
188                        the original (if any)
189                      */
190                     if (mb[ptr-1].block)
191                         isc_release_block (is, pp->cat, mb[ptr-1].block);
192                     mb[ptr-1].block = pp->pos;
193                     mb[ptr-1].dirty = 1;
194                 }
195                 else
196                 {
197                     /* indicate new boundary based on the original file */
198                     mb[ptr].block = pp->pos;
199                     mb[ptr].dirty = 0;
200                     mb[ptr++].offset = r_offset;
201 #if 0
202                     if (r_item && cat == is->max_cat)
203                     {
204                         /* We are dealing with block(s) of max size. Block(s)
205                            will be flushed. Note: the block(s) are surely not
206                            the last one(s).
207                          */
208                         if (is->method->debug > 2)
209                             logf (LOG_LOG, "isc: flush A %d sections", ptr-1);
210                         flush_blocks (is, mb, ptr-1, r_buf, &firstpos, cat, 0);
211                         ptr = 0;
212                         mb[ptr].block = pp->pos;
213                         mb[ptr].dirty = 0;
214                         mb[ptr++].offset = 0;
215                     }
216 #endif
217                 }
218             }
219         }
220         if (r_item)  /* insert resulting item? */
221         {
222             char *r_out_ptr = r_buf + r_offset;
223             int new_offset;
224             int border;
225
226             /* border set to initial fill or block size depending on
227                whether we are creating a new one or updating and old one
228              */
229             if (mb[ptr-1].block)
230                 border = mb[ptr-1].offset + is->method->filecat[cat].bsize
231                          -ISAMC_BLOCK_OFFSET;
232             else
233                 border = mb[ptr-1].offset + is->method->filecat[cat].ifill
234                          -ISAMC_BLOCK_OFFSET;
235
236             (*is->method->code_item)(ISAMC_ENCODE, r_clientData,
237                                      &r_out_ptr, &r_item);
238             new_offset = r_out_ptr - r_buf; 
239
240             if (border >= r_offset && border < new_offset)
241             {
242                 if (is->method->debug > 2)
243                     logf (LOG_LOG, "isc: border %d %d", ptr, border);
244                 /* Max size of current block category reached ...
245                    make new virtual block entry */
246                 mb[ptr].block = 0;
247                 mb[ptr].dirty = 1;
248                 mb[ptr++].offset = r_offset;
249                 if (cat == is->max_cat)
250                 {
251                     /* We are dealing with block(s) of max size. Block(s)
252                        will be flushed. Note: the block(s) are surely not
253                        the last one(s).
254                      */
255                     if (is->method->debug > 2)
256                         logf (LOG_LOG, "isc: flush B %d sections", ptr-1);
257                     flush_blocks (is, mb, ptr, r_buf, &firstpos, cat, 0);
258                     mb[0].block = mb[ptr-1].block;
259                     ptr = 0;
260                     mb[ptr].dirty = 1;
261                     mb[ptr++].offset = 0;
262                     memcpy (r_buf, r_buf + r_offset, new_offset - r_offset);
263                     new_offset = (new_offset - r_offset);
264                 }
265             }
266             r_offset = new_offset;
267         }
268         if (cat < is->max_cat && ptr > is->method->filecat[cat].mblocks)
269         {
270             /* Max number blocks in current category reached ->
271                must switch to next category (with larger block size) 
272             */
273             int j = 0;
274
275             (is->files[cat].no_remap)++;
276             /* delete all original block(s) read so far */
277             for (i = 1; i < ptr; i++)
278                 if (mb[i-1].block)
279                     isc_release_block (is, pp->cat, mb[i-1].block);
280             /* also delete all block to be read in the future */
281             pp->deleteFlag = 1;
282
283             /* remap block offsets */
284             assert (mb[j].offset == 0);
285             cat++;
286             mb[j].dirty = 1;
287             mb[j++].block = 0;
288             for (i = 2; i < ptr; i++)
289             {
290                 int border = is->method->filecat[cat].ifill -
291                          ISAMC_BLOCK_OFFSET + mb[j-1].offset;
292                 if (is->method->debug > 3)
293                     logf (LOG_LOG, "isc: remap %d border=%d", i-1, border);
294                 if (mb[i].offset > border && mb[i-1].offset <= border)
295                 {
296                     if (is->method->debug > 3)
297                         logf (LOG_LOG, "isc:  to %d %d", j, mb[i-1].offset);
298                     mb[j].dirty = 1;
299                     mb[j].block = 0;
300                     mb[j++].offset = mb[i-1].offset;
301                     assert (j <= i);
302                 }
303             }
304             if (is->method->debug > 2)
305                 logf (LOG_LOG, "isc: remap from %d to %d sections to cat %d",
306                       ptr, j, cat);
307             ptr = j;
308         }
309     }
310     if (mb[ptr-1].offset < r_offset)
311     {   /* make the final boundary offset */
312         mb[ptr].dirty = 1;         /* ignored by flush_blocks */
313         mb[ptr].block = 0;         /* ignored by flush_blocks */
314         mb[ptr++].offset = r_offset;
315     }
316     else
317     {   /* empty output. Release last block if any */
318         if (cat == pp->cat && mb[ptr-1].block)
319         {
320             isc_release_block (is, pp->cat, mb[ptr-1].block);
321             mb[ptr-1].block = 0;
322             mb[ptr-1].dirty = 1;
323         }
324     }
325
326     if (is->method->debug > 2)
327         logf (LOG_LOG, "isc: flush C, %d sections", ptr-1);
328
329     /* flush rest of block(s) in r_buf */
330     flush_blocks (is, mb, ptr, r_buf, &firstpos, cat, 1);
331
332     (*is->method->code_stop)(ISAMC_ENCODE, r_clientData);
333     if (!firstpos)
334         cat = 0;
335     if (is->method->debug > 1)
336         logf (LOG_LOG, "isc: isc_merge return %d %d", cat, firstpos);
337     return cat + firstpos * 8;
338 }
339