Updated footer comment
[idzebra-moved-to-github.git] / isamc / merge.c
1 /* This file is part of the Zebra server.
2    Copyright (C) 1994-2009 Index Data
3
4 Zebra is free software; you can redistribute it and/or modify it under
5 the terms of the GNU General Public License as published by the Free
6 Software Foundation; either version 2, or (at your option) any later
7 version.
8
9 Zebra is distributed in the hope that it will be useful, but WITHOUT ANY
10 WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12 for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
17
18 */
19
20 #include <stdlib.h>
21 #include <assert.h>
22 #include <string.h>
23 #include <stdio.h>
24 #include <yaz/log.h>
25 #include "isamc-p.h"
26
27 struct isamc_merge_block {
28     int offset;   /* offset in r_buf */
29     zint block;   /* block number of file (0 if none) */
30     int dirty;    /* block is different from that on file */
31 };
32
33 #if 0
34 static void opt_blocks (ISAMC is, struct isamc_merge_block *mb, int ptr,
35                         int last)
36 {
37     int i, no_dirty = 0;
38     for (i = 0; i<ptr; i++)
39         if (mb[i].dirty)
40             no_dirty++;
41     if (no_dirty*4 < ptr*3)
42         return;
43     /* bubble-sort it */
44     for (i = 0; i<ptr; i++)
45     {
46         int tmp, j, j_min = -1;
47         for (j = i; j<ptr; j++)
48         {
49             if (j_min < 0 || mb[j_min].block > mb[j].block)
50                 j_min = j;
51         }
52         assert (j_min >= 0);
53         tmp = mb[j_min].block;
54         mb[j_min].block = mb[i].block;
55         mb[i].block = tmp;
56         mb[i].dirty = 1;
57     }
58     if (!last)
59         mb[i].dirty = 1;
60 }
61 #endif
62
63 static void flush_blocks (ISAMC is, struct isamc_merge_block *mb, int ptr,
64                           char *r_buf, zint *firstpos, int cat, int last,
65                           zint *numkeys)
66 {
67     int i;
68
69     for (i = 0; i<ptr; i++)
70     {
71         /* consider this block number */
72         if (!mb[i].block) 
73         {
74             mb[i].block = isamc_alloc_block (is, cat);
75             mb[i].dirty = 1;
76         }
77
78         /* consider next block pointer */
79         if (last && i == ptr-1)
80             mb[i+1].block = 0;
81         else if (!mb[i+1].block)       
82         {
83             mb[i+1].block = isamc_alloc_block (is, cat);
84             mb[i+1].dirty = 1;
85             mb[i].dirty = 1;
86         }
87     }
88
89     for (i = 0; i<ptr; i++)
90     {
91         char *src;
92         ISAMC_BLOCK_SIZE ssize = mb[i+1].offset - mb[i].offset;
93
94         assert (ssize);
95
96         /* skip rest if not dirty */
97         if (!mb[i].dirty)
98         {
99             assert (mb[i].block);
100             if (!*firstpos)
101                 *firstpos = mb[i].block;
102             if (is->method->debug > 2)
103                 yaz_log (YLOG_LOG, "isc: skip ptr=%d size=%d %d " ZINT_FORMAT,
104                      i, ssize, cat, mb[i].block);
105             ++(is->files[cat].no_skip_writes);
106             continue;
107         }
108         /* write block */
109
110         if (!*firstpos)
111         {
112             *firstpos = mb[i].block;
113             src = r_buf + mb[i].offset - ISAMC_BLOCK_OFFSET_1;
114             ssize += ISAMC_BLOCK_OFFSET_1;
115
116             memcpy (src+sizeof(zint)+sizeof(ssize), numkeys, sizeof(*numkeys));
117             if (is->method->debug > 2)
118                 yaz_log (YLOG_LOG, "isc: flush ptr=%d numk=" ZINT_FORMAT " size=%d nextpos="
119                       ZINT_FORMAT, i, *numkeys, (int) ssize, mb[i+1].block);
120         }
121         else
122         {
123             src = r_buf + mb[i].offset - ISAMC_BLOCK_OFFSET_N;
124             ssize += ISAMC_BLOCK_OFFSET_N;
125             if (is->method->debug > 2)
126                 yaz_log (YLOG_LOG, "isc: flush ptr=%d size=%d nextpos=" ZINT_FORMAT,
127                      i, (int) ssize, mb[i+1].block);
128         }
129         memcpy (src, &mb[i+1].block, sizeof(zint));
130         memcpy (src+sizeof(zint), &ssize, sizeof(ssize));
131         isamc_write_block (is, cat, mb[i].block, src);
132     }
133 }
134
135 static int get_border (ISAMC is, struct isamc_merge_block *mb, zint ptr,
136                        int cat, zint firstpos)
137 {
138    /* Border set to initial fill or block size depending on
139       whether we are creating a new one or updating and old one.
140     */
141     
142     int fill = mb[ptr].block ? is->method->filecat[cat].bsize :
143                                is->method->filecat[cat].ifill;
144     int off = (ptr||firstpos) ? ISAMC_BLOCK_OFFSET_N : ISAMC_BLOCK_OFFSET_1;
145     
146     assert (ptr < 199);
147
148     return mb[ptr].offset + fill - off;
149 }
150
151 void isamc_merge (ISAMC is, ISAM_P *ipos, ISAMC_I *data)
152 {
153
154     char i_item[128], *i_item_ptr;
155     int i_more, i_mode, i;
156
157     ISAMC_PP pp; 
158     char f_item[128], *f_item_ptr;
159     int f_more;
160     int last_dirty = 0;
161     int debug = is->method->debug;
162  
163     struct isamc_merge_block mb[200];
164
165     zint firstpos = 0;
166     int cat = 0;
167     char r_item_buf[128]; /* temporary result output */
168     char *r_buf;          /* block with resulting data */
169     int r_offset = 0;     /* current offset in r_buf */
170     int ptr = 0;          /* pointer */
171     void *r_clientData;   /* encode client data */
172     zint border;
173     zint numKeys = 0;
174
175     r_clientData = (*is->method->codec.start)();
176     r_buf = is->merge_buf + 128;
177
178     pp = isamc_pp_open (is, *ipos);
179     /* read first item from file. make sure f_more indicates no boundary */
180     f_item_ptr = f_item;
181     f_more = isamc_read_item (pp, &f_item_ptr);
182     if (f_more > 0)
183         f_more = 1;
184     cat = pp->cat;
185
186     if (debug > 1)
187         yaz_log (YLOG_LOG, "isc: isamc_merge begin %d " ZINT_FORMAT, cat, pp->pos);
188
189     /* read first item from i */
190     i_item_ptr = i_item;
191     i_more = (*data->read_item)(data->clientData, &i_item_ptr, &i_mode);
192
193     mb[ptr].block = pp->pos;     /* is zero if no block on disk */
194     mb[ptr].dirty = 0;
195     mb[ptr].offset = 0;
196
197     border = get_border (is, mb, ptr, cat, firstpos);
198     while (i_more || f_more)
199     {
200         char *r_item = r_item_buf;
201         int cmp;
202
203         if (f_more > 1)
204         {
205             /* block to block boundary in the original file. */
206             f_more = 1;
207             if (cat == pp->cat) 
208             {
209                 /* the resulting output is of the same category as the
210                    the original 
211                 */
212                 if (r_offset <= mb[ptr].offset +is->method->filecat[cat].mfill)
213                 {
214                     /* the resulting output block is too small/empty. Delete
215                        the original (if any)
216                     */
217                     if (debug > 3)
218                         yaz_log (YLOG_LOG, "isc: release A");
219                     if (mb[ptr].block)
220                         isamc_release_block (is, pp->cat, mb[ptr].block);
221                     mb[ptr].block = pp->pos;
222                     if (!mb[ptr].dirty)
223                         mb[ptr].dirty = 1;
224                     if (ptr > 0)
225                         mb[ptr-1].dirty = 1;
226                 }
227                 else
228                 {
229                     /* indicate new boundary based on the original file */
230                     mb[++ptr].block = pp->pos;
231                     mb[ptr].dirty = last_dirty;
232                     mb[ptr].offset = r_offset;
233                     if (debug > 3)
234                         yaz_log (YLOG_LOG, "isc: bound ptr=%d,offset=%d",
235                             ptr, r_offset);
236                     if (cat==is->max_cat && ptr >= is->method->max_blocks_mem)
237                     {
238                         /* We are dealing with block(s) of max size. Block(s)
239                            except 1 will be flushed.
240                          */
241                         if (debug > 2)
242                             yaz_log (YLOG_LOG, "isc: flush A %d sections", ptr);
243                         flush_blocks (is, mb, ptr-1, r_buf, &firstpos, cat,
244                                       0, &pp->numKeys);
245                         mb[0].block = mb[ptr-1].block;
246                         mb[0].dirty = mb[ptr-1].dirty;
247                         memcpy (r_buf, r_buf + mb[ptr-1].offset,
248                                 mb[ptr].offset - mb[ptr-1].offset);
249                         mb[0].offset = 0;
250
251                         mb[1].block = mb[ptr].block;
252                         mb[1].dirty = mb[ptr].dirty;
253                         mb[1].offset = mb[ptr].offset - mb[ptr-1].offset;
254                         ptr = 1;
255                         r_offset = mb[ptr].offset;
256                     }
257                 }
258             }
259             border = get_border (is, mb, ptr, cat, firstpos);
260         }
261         last_dirty = 0;
262         if (!f_more)
263             cmp = -1;
264         else if (!i_more)
265             cmp = 1;
266         else
267             cmp = (*is->method->compare_item)(i_item, f_item);
268         if (cmp == 0)                   /* insert i=f */
269         {
270             if (!i_mode)   /* delete item? */
271             {
272                 /* move i */
273                 i_item_ptr = i_item;
274                 i_more = (*data->read_item)(data->clientData, &i_item_ptr,
275                                            &i_mode);
276                 /* is next input item the same as current except
277                    for the delete flag? */
278                 cmp = (*is->method->compare_item)(i_item, f_item);
279                 if (!cmp && i_mode)   /* delete/insert nop? */
280                 {
281                     /* yes! insert as if it was an insert only */
282                     memcpy (r_item, i_item, i_item_ptr - i_item);
283                     i_item_ptr = i_item;
284                     i_more = (*data->read_item)(data->clientData, &i_item_ptr,
285                                                 &i_mode);
286                 }
287                 else
288                 {
289                     /* no! delete the item */
290                     r_item = NULL;
291                     last_dirty = 1;
292                     mb[ptr].dirty = 2;
293                 }
294             }
295             else
296             {
297                 memcpy (r_item, f_item, f_item_ptr - f_item);
298
299                 /* move i */
300                 i_item_ptr = i_item;
301                 i_more = (*data->read_item)(data->clientData, &i_item_ptr,
302                                            &i_mode);
303             }
304             /* move f */
305             f_item_ptr = f_item;
306             f_more = isamc_read_item (pp, &f_item_ptr);
307         }
308         else if (cmp > 0)               /* insert f */
309         {
310             memcpy (r_item, f_item, f_item_ptr - f_item);
311             /* move f */
312             f_item_ptr = f_item;
313             f_more = isamc_read_item (pp, &f_item_ptr);
314         }
315         else                            /* insert i */
316         {
317             if (!i_mode)                /* delete item which isn't there? */
318             {
319                 yaz_log (YLOG_FATAL, "Inconsistent register at offset %d",
320                                  r_offset);
321                 abort ();
322             }
323             memcpy (r_item, i_item, i_item_ptr - i_item);
324             mb[ptr].dirty = 2;
325             last_dirty = 1;
326             /* move i */
327             i_item_ptr = i_item;
328             i_more = (*data->read_item)(data->clientData, &i_item_ptr,
329                                         &i_mode);
330         }
331         if (r_item)  /* insert resulting item? */
332         {
333             char *r_out_ptr = r_buf + r_offset;
334             const char *src = r_item;
335             int new_offset;
336
337             (*is->method->codec.encode)(r_clientData, &r_out_ptr, &src);
338             new_offset = r_out_ptr - r_buf; 
339
340             numKeys++;
341
342             if (border < new_offset && border >= r_offset)
343             {
344                 if (debug > 2)
345                     yaz_log (YLOG_LOG, "isc: border %d " ZINT_FORMAT,
346                           ptr, border);
347                 /* Max size of current block category reached ...
348                    make new virtual block entry */
349                 mb[++ptr].block = 0;
350                 mb[ptr].dirty = 1;
351                 mb[ptr].offset = r_offset;
352                 if (cat == is->max_cat && ptr >= is->method->max_blocks_mem)
353                 {
354                     /* We are dealing with block(s) of max size. Block(s)
355                        except one will be flushed. Note: the block(s) are
356                        surely not the last one(s).
357                      */
358                     if (debug > 2)
359                         yaz_log (YLOG_LOG, "isc: flush B %d sections", ptr-1);
360                     flush_blocks (is, mb, ptr-1, r_buf, &firstpos, cat,
361                                   0, &pp->numKeys);
362                     mb[0].block = mb[ptr-1].block;
363                     mb[0].dirty = mb[ptr-1].dirty;
364                     memcpy (r_buf, r_buf + mb[ptr-1].offset,
365                             mb[ptr].offset - mb[ptr-1].offset);
366                     mb[0].offset = 0;
367
368                     mb[1].block = mb[ptr].block;
369                     mb[1].dirty = mb[0].dirty;
370                     mb[1].offset = mb[ptr].offset - mb[ptr-1].offset;
371                     memcpy (r_buf + mb[1].offset, r_buf + r_offset,
372                             new_offset - r_offset);
373                     new_offset = (new_offset - r_offset) + mb[1].offset;
374                     ptr = 1;
375                 }
376                 border = get_border (is, mb, ptr, cat, firstpos);
377             }
378             r_offset = new_offset;
379         }
380         if (cat < is->max_cat && ptr >= is->method->filecat[cat].mblocks)
381         {
382             /* Max number blocks in current category reached ->
383                must switch to next category (with larger block size) 
384             */
385             int j = 0;
386
387             (is->files[cat].no_remap)++;
388             /* delete all original block(s) read so far */
389             for (i = 0; i < ptr; i++)
390                 if (mb[i].block)
391                     isamc_release_block (is, pp->cat, mb[i].block);
392             /* also delete all block to be read in the future */
393             pp->deleteFlag = 1;
394
395             /* remap block offsets */
396             assert (mb[j].offset == 0);
397             cat++;
398             mb[j].dirty = 1;
399             mb[j].block = 0;
400             mb[ptr].offset = r_offset;
401             for (i = 1; i < ptr; i++)
402             {
403                 int border = is->method->filecat[cat].ifill -
404                          ISAMC_BLOCK_OFFSET_1 + mb[j].offset;
405                 if (debug > 3)
406                     yaz_log (YLOG_LOG, "isc: remap %d border=%d", i, border);
407                 if (mb[i+1].offset > border && mb[i].offset <= border)
408                 {
409                     if (debug > 3)
410                         yaz_log (YLOG_LOG, "isc:  to %d %d", j, mb[i].offset);
411                     mb[++j].dirty = 1;
412                     mb[j].block = 0;
413                     mb[j].offset = mb[i].offset;
414                 }
415             }
416             if (debug > 2)
417                 yaz_log (YLOG_LOG, "isc: remap from %d to %d sections to cat %d",
418                       ptr, j, cat);
419             ptr = j;
420             border = get_border (is, mb, ptr, cat, firstpos);
421             if (debug > 3)
422                 yaz_log (YLOG_LOG, "isc: border=" ZINT_FORMAT " r_offset=%d",
423                       border, r_offset);
424         }
425     }
426     if (mb[ptr].offset < r_offset)
427     {   /* make the final boundary offset */
428         mb[++ptr].dirty = 1; 
429         mb[ptr].block = 0; 
430         mb[ptr].offset = r_offset;
431     }
432     else
433     {   /* empty output. Release last block if any */
434         if (cat == pp->cat && mb[ptr].block)
435         {
436             if (debug > 3)
437                 yaz_log (YLOG_LOG, "isc: release C");
438             isamc_release_block (is, pp->cat, mb[ptr].block);
439             mb[ptr].block = 0;
440             if (ptr > 0)
441                 mb[ptr-1].dirty = 1;
442         }
443     }
444
445     if (debug > 2)
446         yaz_log (YLOG_LOG, "isc: flush C, %d sections", ptr);
447
448     if (firstpos)
449     {
450         /* we have to patch initial block with num keys if that
451            has changed */
452         if (numKeys != isamc_pp_num (pp))
453         {
454             if (debug > 2)
455                 yaz_log (YLOG_LOG, "isc: patch num keys firstpos=" ZINT_FORMAT " num=" ZINT_FORMAT,
456                                 firstpos, numKeys);
457             bf_write (is->files[cat].bf, firstpos, ISAMC_BLOCK_OFFSET_N,
458                       sizeof(numKeys), &numKeys);
459         }
460     }
461     else if (ptr > 0)
462     {   /* we haven't flushed initial block yet and there surely are some
463            blocks to flush. Make first block dirty if numKeys differ */
464         if (numKeys != isamc_pp_num (pp))
465             mb[0].dirty = 1;
466     }
467     /* flush rest of block(s) in r_buf */
468     flush_blocks (is, mb, ptr, r_buf, &firstpos, cat, 1, &numKeys);
469
470     (*is->method->codec.stop)(r_clientData);
471     if (!firstpos)
472         cat = 0;
473     if (debug > 1)
474         yaz_log (YLOG_LOG, "isc: isamc_merge return %d " ZINT_FORMAT, cat, firstpos);
475     isamc_pp_close (pp);
476     *ipos = cat + firstpos * 8;
477 }
478
479 /*
480  * Local variables:
481  * c-basic-offset: 4
482  * c-file-style: "Stroustrup"
483  * indent-tabs-mode: nil
484  * End:
485  * vim: shiftwidth=4 tabstop=8 expandtab
486  */
487