39c293eaea96ab4bae3d7e0219837be40d7791d5
[idzebra-moved-to-github.git] / isamc / merge.c
1 /*
2  * Copyright (c) 1996-1998, Index Data.
3  * See the file LICENSE for details.
4  * Sebastian Hammer, Adam Dickmeiss, Heikki Levanto
5  *
6  */
7
8 #include <stdlib.h>
9 #include <assert.h>
10 #include <string.h>
11 #include <stdio.h>
12
13 #include <log.h>
14 #include "isamc-p.h"
15 #include "isamh-p.h"
16
17 struct isc_merge_block {
18     int offset;       /* offset in r_buf */
19     int block;        /* block number of file (0 if none) */
20     int dirty;        /* block is different from that on file */
21 };
22
23 static void opt_blocks (ISAMC is, struct isc_merge_block *mb, int ptr,
24                         int last)
25 {
26     int i, no_dirty = 0;
27     for (i = 0; i<ptr; i++)
28         if (mb[i].dirty)
29             no_dirty++;
30     if (no_dirty*4 < ptr*3)
31         return;
32     /* bubble-sort it */
33     for (i = 0; i<ptr; i++)
34     {
35         int tmp, j, j_min = -1;
36         for (j = i; j<ptr; j++)
37         {
38             if (j_min < 0 || mb[j_min].block > mb[j].block)
39                 j_min = j;
40         }
41         assert (j_min >= 0);
42         tmp = mb[j_min].block;
43         mb[j_min].block = mb[i].block;
44         mb[i].block = tmp;
45         mb[i].dirty = 1;
46     }
47     if (!last)
48         mb[i].dirty = 1;
49 }
50
51 static void flush_blocks (ISAMC is, struct isc_merge_block *mb, int ptr,
52                           char *r_buf, int *firstpos, int cat, int last,
53                           int *numkeys)
54 {
55     int i;
56
57     for (i = 0; i<ptr; i++)
58     {
59         /* consider this block number */
60         if (!mb[i].block) 
61         {
62             mb[i].block = isc_alloc_block (is, cat);
63             mb[i].dirty = 1;
64         }
65
66         /* consider next block pointer */
67         if (last && i == ptr-1)
68             mb[i+1].block = 0;
69         else if (!mb[i+1].block)       
70         {
71             mb[i+1].block = isc_alloc_block (is, cat);
72             mb[i+1].dirty = 1;
73             mb[i].dirty = 1;
74         }
75     }
76
77     for (i = 0; i<ptr; i++)
78     {
79         char *src;
80         ISAMC_BLOCK_SIZE ssize = mb[i+1].offset - mb[i].offset;
81
82         assert (ssize);
83
84         /* skip rest if not dirty */
85         if (!mb[i].dirty)
86         {
87             assert (mb[i].block);
88             if (!*firstpos)
89                 *firstpos = mb[i].block;
90             if (is->method->debug > 2)
91                 logf (LOG_LOG, "isc: skip ptr=%d size=%d %d %d",
92                      i, ssize, cat, mb[i].block);
93             ++(is->files[cat].no_skip_writes);
94             continue;
95         }
96         /* write block */
97
98         if (!*firstpos)
99         {
100             *firstpos = mb[i].block;
101             src = r_buf + mb[i].offset - ISAMC_BLOCK_OFFSET_1;
102             ssize += ISAMC_BLOCK_OFFSET_1;
103
104             memcpy (src+sizeof(int)+sizeof(ssize), numkeys,
105                     sizeof(*numkeys));
106             if (is->method->debug > 2)
107                 logf (LOG_LOG, "isc: flush ptr=%d numk=%d size=%d nextpos=%d",
108                      i, *numkeys, (int) ssize, mb[i+1].block);
109         }
110         else
111         {
112             src = r_buf + mb[i].offset - ISAMC_BLOCK_OFFSET_N;
113             ssize += ISAMC_BLOCK_OFFSET_N;
114             if (is->method->debug > 2)
115                 logf (LOG_LOG, "isc: flush ptr=%d size=%d nextpos=%d",
116                      i, (int) ssize, mb[i+1].block);
117         }
118         memcpy (src, &mb[i+1].block, sizeof(int));
119         memcpy (src+sizeof(int), &ssize, sizeof(ssize));
120         isc_write_block (is, cat, mb[i].block, src);
121     }
122 }
123
124 static int get_border (ISAMC is, struct isc_merge_block *mb, int ptr,
125                        int cat, int firstpos)
126 {
127    /* Border set to initial fill or block size depending on
128       whether we are creating a new one or updating and old one.
129     */
130     
131     int fill = mb[ptr].block ? is->method->filecat[cat].bsize :
132                                is->method->filecat[cat].ifill;
133     int off = (ptr||firstpos) ? ISAMC_BLOCK_OFFSET_N : ISAMC_BLOCK_OFFSET_1;
134     
135     assert (ptr < 199);
136
137     return mb[ptr].offset + fill - off;
138 }
139
140 ISAMC_P isc_merge (ISAMC is, ISAMC_P ipos, ISAMC_I data)
141 {
142
143     char i_item[128], *i_item_ptr;
144     int i_more, i_mode, i;
145
146     ISAMC_PP pp; 
147     char f_item[128], *f_item_ptr;
148     int f_more;
149     int last_dirty = 0;
150     int debug = is->method->debug;
151  
152     struct isc_merge_block mb[200];
153
154     int firstpos = 0;
155     int cat = 0;
156     char r_item_buf[128]; /* temporary result output */
157     char *r_buf;          /* block with resulting data */
158     int r_offset = 0;     /* current offset in r_buf */
159     int ptr = 0;          /* pointer */
160     void *r_clientData;   /* encode client data */
161     int border;
162     int numKeys = 0;
163
164     r_clientData = (*is->method->code_start)(ISAMC_ENCODE);
165     r_buf = is->merge_buf + 128;
166
167     pp = isc_pp_open (is, ipos);
168     /* read first item from file. make sure f_more indicates no boundary */
169     f_item_ptr = f_item;
170     f_more = isc_read_item (pp, &f_item_ptr);
171     if (f_more > 0)
172         f_more = 1;
173     cat = pp->cat;
174
175     if (debug > 1)
176         logf (LOG_LOG, "isc: isc_merge begin %d %d", cat, pp->pos);
177
178     /* read first item from i */
179     i_item_ptr = i_item;
180     i_more = (*data->read_item)(data->clientData, &i_item_ptr, &i_mode);
181
182     mb[ptr].block = pp->pos;     /* is zero if no block on disk */
183     mb[ptr].dirty = 0;
184     mb[ptr].offset = 0;
185
186     border = get_border (is, mb, ptr, cat, firstpos);
187     while (i_more || f_more)
188     {
189         char *r_item = r_item_buf;
190         int cmp;
191
192         if (f_more > 1)
193         {
194             /* block to block boundary in the original file. */
195             f_more = 1;
196             if (cat == pp->cat) 
197             {
198                 /* the resulting output is of the same category as the
199                    the original 
200                 */
201                 if (r_offset <= mb[ptr].offset +is->method->filecat[cat].mfill)
202                 {
203                     /* the resulting output block is too small/empty. Delete
204                        the original (if any)
205                     */
206                     if (debug > 3)
207                         logf (LOG_LOG, "isc: release A");
208                     if (mb[ptr].block)
209                         isc_release_block (is, pp->cat, mb[ptr].block);
210                     mb[ptr].block = pp->pos;
211                     if (!mb[ptr].dirty)
212                         mb[ptr].dirty = 1;
213                     if (ptr > 0)
214                         mb[ptr-1].dirty = 1;
215                 }
216                 else
217                 {
218                     /* indicate new boundary based on the original file */
219                     mb[++ptr].block = pp->pos;
220                     mb[ptr].dirty = last_dirty;
221                     mb[ptr].offset = r_offset;
222                     if (debug > 3)
223                         logf (LOG_LOG, "isc: bound ptr=%d,offset=%d",
224                             ptr, r_offset);
225                     if (cat==is->max_cat && ptr >= is->method->max_blocks_mem)
226                     {
227                         /* We are dealing with block(s) of max size. Block(s)
228                            except 1 will be flushed.
229                          */
230                         if (debug > 2)
231                             logf (LOG_LOG, "isc: flush A %d sections", ptr);
232                         flush_blocks (is, mb, ptr-1, r_buf, &firstpos, cat,
233                                       0, &pp->numKeys);
234                         mb[0].block = mb[ptr-1].block;
235                         mb[0].dirty = mb[ptr-1].dirty;
236                         memcpy (r_buf, r_buf + mb[ptr-1].offset,
237                                 mb[ptr].offset - mb[ptr-1].offset);
238                         mb[0].offset = 0;
239
240                         mb[1].block = mb[ptr].block;
241                         mb[1].dirty = mb[ptr].dirty;
242                         mb[1].offset = mb[ptr].offset - mb[ptr-1].offset;
243                         ptr = 1;
244                         r_offset = mb[ptr].offset;
245                     }
246                 }
247             }
248             border = get_border (is, mb, ptr, cat, firstpos);
249         }
250         last_dirty = 0;
251         if (!f_more)
252             cmp = -1;
253         else if (!i_more)
254             cmp = 1;
255         else
256             cmp = (*is->method->compare_item)(i_item, f_item);
257         if (cmp == 0)                   /* insert i=f */
258         {
259             if (!i_mode)   /* delete item? */
260             {
261                 /* move i */
262                 i_item_ptr = i_item;
263                 i_more = (*data->read_item)(data->clientData, &i_item_ptr,
264                                            &i_mode);
265                 /* is next input item the same as current except
266                    for the delete flag? */
267                 cmp = (*is->method->compare_item)(i_item, f_item);
268                 if (!cmp && i_mode)   /* delete/insert nop? */
269                 {
270                     /* yes! insert as if it was an insert only */
271                     memcpy (r_item, i_item, i_item_ptr - i_item);
272                     i_item_ptr = i_item;
273                     i_more = (*data->read_item)(data->clientData, &i_item_ptr,
274                                                 &i_mode);
275                 }
276                 else
277                 {
278                     /* no! delete the item */
279                     r_item = NULL;
280                     last_dirty = 1;
281                     mb[ptr].dirty = 2;
282                 }
283             }
284             else
285             {
286                 memcpy (r_item, f_item, f_item_ptr - f_item);
287
288                 /* move i */
289                 i_item_ptr = i_item;
290                 i_more = (*data->read_item)(data->clientData, &i_item_ptr,
291                                            &i_mode);
292             }
293             /* move f */
294             f_item_ptr = f_item;
295             f_more = isc_read_item (pp, &f_item_ptr);
296         }
297         else if (cmp > 0)               /* insert f */
298         {
299             memcpy (r_item, f_item, f_item_ptr - f_item);
300             /* move f */
301             f_item_ptr = f_item;
302             f_more = isc_read_item (pp, &f_item_ptr);
303         }
304         else                            /* insert i */
305         {
306             if (!i_mode)                /* delete item which isn't there? */
307             {
308                 logf (LOG_FATAL, "Inconsistent register at offset %d",
309                                  r_offset);
310                 abort ();
311             }
312             memcpy (r_item, i_item, i_item_ptr - i_item);
313             mb[ptr].dirty = 2;
314             last_dirty = 1;
315             /* move i */
316             i_item_ptr = i_item;
317             i_more = (*data->read_item)(data->clientData, &i_item_ptr,
318                                         &i_mode);
319         }
320         if (r_item)  /* insert resulting item? */
321         {
322             char *r_out_ptr = r_buf + r_offset;
323             int new_offset;
324
325             (*is->method->code_item)(ISAMC_ENCODE, r_clientData,
326                                      &r_out_ptr, &r_item);
327             new_offset = r_out_ptr - r_buf; 
328
329             numKeys++;
330
331             if (border < new_offset && border >= r_offset)
332             {
333                 if (debug > 2)
334                     logf (LOG_LOG, "isc: border %d %d", ptr, border);
335                 /* Max size of current block category reached ...
336                    make new virtual block entry */
337                 mb[++ptr].block = 0;
338                 mb[ptr].dirty = 1;
339                 mb[ptr].offset = r_offset;
340                 if (cat == is->max_cat && ptr >= is->method->max_blocks_mem)
341                 {
342                     /* We are dealing with block(s) of max size. Block(s)
343                        except one will be flushed. Note: the block(s) are
344                        surely not the last one(s).
345                      */
346                     if (debug > 2)
347                         logf (LOG_LOG, "isc: flush B %d sections", ptr-1);
348                     flush_blocks (is, mb, ptr-1, r_buf, &firstpos, cat,
349                                   0, &pp->numKeys);
350                     mb[0].block = mb[ptr-1].block;
351                     mb[0].dirty = mb[ptr-1].dirty;
352                     memcpy (r_buf, r_buf + mb[ptr-1].offset,
353                             mb[ptr].offset - mb[ptr-1].offset);
354                     mb[0].offset = 0;
355
356                     mb[1].block = mb[ptr].block;
357                     mb[1].dirty = mb[0].dirty;
358                     mb[1].offset = mb[ptr].offset - mb[ptr-1].offset;
359                     memcpy (r_buf + mb[1].offset, r_buf + r_offset,
360                             new_offset - r_offset);
361                     new_offset = (new_offset - r_offset) + mb[1].offset;
362                     ptr = 1;
363                 }
364                 border = get_border (is, mb, ptr, cat, firstpos);
365             }
366             r_offset = new_offset;
367         }
368         if (cat < is->max_cat && ptr >= is->method->filecat[cat].mblocks)
369         {
370             /* Max number blocks in current category reached ->
371                must switch to next category (with larger block size) 
372             */
373             int j = 0;
374
375             (is->files[cat].no_remap)++;
376             /* delete all original block(s) read so far */
377             for (i = 0; i < ptr; i++)
378                 if (mb[i].block)
379                     isc_release_block (is, pp->cat, mb[i].block);
380             /* also delete all block to be read in the future */
381             pp->deleteFlag = 1;
382
383             /* remap block offsets */
384             assert (mb[j].offset == 0);
385             cat++;
386             mb[j].dirty = 1;
387             mb[j].block = 0;
388             mb[ptr].offset = r_offset;
389             for (i = 1; i < ptr; i++)
390             {
391                 int border = is->method->filecat[cat].ifill -
392                          ISAMC_BLOCK_OFFSET_1 + mb[j].offset;
393                 if (debug > 3)
394                     logf (LOG_LOG, "isc: remap %d border=%d", i, border);
395                 if (mb[i+1].offset > border && mb[i].offset <= border)
396                 {
397                     if (debug > 3)
398                         logf (LOG_LOG, "isc:  to %d %d", j, mb[i].offset);
399                     mb[++j].dirty = 1;
400                     mb[j].block = 0;
401                     mb[j].offset = mb[i].offset;
402                 }
403             }
404             if (debug > 2)
405                 logf (LOG_LOG, "isc: remap from %d to %d sections to cat %d",
406                       ptr, j, cat);
407             ptr = j;
408             border = get_border (is, mb, ptr, cat, firstpos);
409             if (debug > 3)
410                 logf (LOG_LOG, "isc: border=%d r_offset=%d", border, r_offset);
411         }
412     }
413     if (mb[ptr].offset < r_offset)
414     {   /* make the final boundary offset */
415         mb[++ptr].dirty = 1; 
416         mb[ptr].block = 0; 
417         mb[ptr].offset = r_offset;
418     }
419     else
420     {   /* empty output. Release last block if any */
421         if (cat == pp->cat && mb[ptr].block)
422         {
423             if (debug > 3)
424                 logf (LOG_LOG, "isc: release C");
425             isc_release_block (is, pp->cat, mb[ptr].block);
426             mb[ptr].block = 0;
427             if (ptr > 0)
428                 mb[ptr-1].dirty = 1;
429         }
430     }
431
432     if (debug > 2)
433         logf (LOG_LOG, "isc: flush C, %d sections", ptr);
434
435     if (firstpos)
436     {
437         /* we have to patch initial block with num keys if that
438            has changed */
439         if (numKeys != isc_pp_num (pp))
440         {
441             if (debug > 2)
442                 logf (LOG_LOG, "isc: patch num keys firstpos=%d num=%d",
443                                 firstpos, numKeys);
444             bf_write (is->files[cat].bf, firstpos, ISAMC_BLOCK_OFFSET_N,
445                       sizeof(numKeys), &numKeys);
446         }
447     }
448     else if (ptr > 0)
449     {   /* we haven't flushed initial block yet and there surely are some
450            blocks to flush. Make first block dirty if numKeys differ */
451         if (numKeys != isc_pp_num (pp))
452             mb[0].dirty = 1;
453     }
454     /* flush rest of block(s) in r_buf */
455     flush_blocks (is, mb, ptr, r_buf, &firstpos, cat, 1, &numKeys);
456
457     (*is->method->code_stop)(ISAMC_ENCODE, r_clientData);
458     if (!firstpos)
459         cat = 0;
460     if (debug > 1)
461         logf (LOG_LOG, "isc: isc_merge return %d %d", cat, firstpos);
462     isc_pp_close (pp);
463     return cat + firstpos * 8;
464 }
465
466 ISAMC_P isamh_append (ISAMH is, ISAMH_P ipos, ISAMH_I data)
467 {
468
469     char i_item[128], *i_item_ptr;
470     int i_more, i_mode, i;
471
472     ISAMH_PP pp; 
473     char f_item[128], *f_item_ptr;
474     int f_more;
475     int last_dirty = 0;
476     int debug = is->method->debug;
477  
478     struct isc_merge_block mb[200];
479
480     int firstpos = 0;
481     int cat = 0;
482     char r_item_buf[128]; /* temporary result output */
483     char *r_buf;          /* block with resulting data */
484     int r_offset = 0;     /* current offset in r_buf */
485     int ptr = 0;          /* pointer */
486     void *r_clientData;   /* encode client data */
487     int border;
488     int numKeys = 0;
489
490     r_clientData = (*is->method->code_start)(ISAMH_ENCODE);
491     r_buf = is->merge_buf + 128;
492
493     pp = isamh_pp_open (is, ipos);
494     /* read first item from file. make sure f_more indicates no boundary */
495     f_item_ptr = f_item;
496     f_more = isamh_read_item (pp, &f_item_ptr);
497     if (f_more > 0)
498         f_more = 1;
499     cat = pp->cat;
500
501     if (debug > 1)
502         logf (LOG_LOG, "isc: isamh_append begin %d %d", cat, pp->pos);
503
504     /* read first item from i */
505     i_item_ptr = i_item;
506     i_more = (*data->read_item)(data->clientData, &i_item_ptr, &i_mode);
507
508     mb[ptr].block = pp->pos;     /* is zero if no block on disk */
509     mb[ptr].dirty = 0;
510     mb[ptr].offset = 0;
511
512     border = is->method->filecat[cat].bsize;
513     /* border = get_border (is, mb, ptr, cat, firstpos); */
514     while (i_more || f_more)
515     {
516         char *r_item = r_item_buf;
517         int cmp;
518
519         if (f_more > 1)
520         {
521             /* block to block boundary in the original file. */
522             f_more = 1;
523             if (cat == pp->cat) 
524             {
525                 /* the resulting output is of the same category as the
526                    the original 
527                 */
528                 if (r_offset <= mb[ptr].offset +is->method->filecat[cat].mfill)
529                 {
530                     /* the resulting output block is too small/empty. Delete
531                        the original (if any)
532                     */
533                     if (debug > 3)
534                         logf (LOG_LOG, "isc: release A");
535                     if (mb[ptr].block)
536                         isamh_release_block (is, pp->cat, mb[ptr].block);
537                     mb[ptr].block = pp->pos;
538                     if (!mb[ptr].dirty)
539                         mb[ptr].dirty = 1;
540                     if (ptr > 0)
541                         mb[ptr-1].dirty = 1;
542                 }
543                 else
544                 {
545                     /* indicate new boundary based on the original file */
546                     mb[++ptr].block = pp->pos;
547                     mb[ptr].dirty = last_dirty;
548                     mb[ptr].offset = r_offset;
549                     if (debug > 3)
550                         logf (LOG_LOG, "isc: bound ptr=%d,offset=%d",
551                             ptr, r_offset);
552                     if (cat==is->max_cat && ptr >= is->method->max_blocks_mem)
553                     {
554                         /* We are dealing with block(s) of max size. Block(s)
555                            except 1 will be flushed.
556                          */
557                         if (debug > 2)
558                             logf (LOG_LOG, "isc: flush A %d sections", ptr);
559                         flush_blocks ((ISAMC)is, mb, ptr-1, r_buf, &firstpos, cat,
560                                       0, &pp->numKeys);
561                         mb[0].block = mb[ptr-1].block;
562                         mb[0].dirty = mb[ptr-1].dirty;
563                         memcpy (r_buf, r_buf + mb[ptr-1].offset,
564                                 mb[ptr].offset - mb[ptr-1].offset);
565                         mb[0].offset = 0;
566
567                         mb[1].block = mb[ptr].block;
568                         mb[1].dirty = mb[ptr].dirty;
569                         mb[1].offset = mb[ptr].offset - mb[ptr-1].offset;
570                         ptr = 1;
571                         r_offset = mb[ptr].offset;
572                     }
573                 }
574             }
575             /*border = get_border (is, mb, ptr, cat, firstpos);*/
576             border = is->method->filecat[cat].bsize;
577         }
578         last_dirty = 0;
579         if (!f_more)
580             cmp = -1;
581         else if (!i_more)
582             cmp = 1;
583         else
584             cmp = (*is->method->compare_item)(i_item, f_item);
585         if (cmp == 0)                   /* insert i=f */
586         {
587             if (!i_mode)   /* delete item? */
588             {
589                 /* move i */
590                 i_item_ptr = i_item;
591                 i_more = (*data->read_item)(data->clientData, &i_item_ptr,
592                                            &i_mode);
593                 /* is next input item the same as current except
594                    for the delete flag? */
595                 cmp = (*is->method->compare_item)(i_item, f_item);
596                 if (!cmp && i_mode)   /* delete/insert nop? */
597                 {
598                     /* yes! insert as if it was an insert only */
599                     memcpy (r_item, i_item, i_item_ptr - i_item);
600                     i_item_ptr = i_item;
601                     i_more = (*data->read_item)(data->clientData, &i_item_ptr,
602                                                 &i_mode);
603                 }
604                 else
605                 {
606                     /* no! delete the item */
607                     r_item = NULL;
608                     last_dirty = 1;
609                     mb[ptr].dirty = 2;
610                 }
611             }
612             else
613             {
614                 memcpy (r_item, f_item, f_item_ptr - f_item);
615
616                 /* move i */
617                 i_item_ptr = i_item;
618                 i_more = (*data->read_item)(data->clientData, &i_item_ptr,
619                                            &i_mode);
620             }
621             /* move f */
622             f_item_ptr = f_item;
623             f_more = isamh_read_item (pp, &f_item_ptr);
624         }
625         else if (cmp > 0)               /* insert f */
626         {
627             memcpy (r_item, f_item, f_item_ptr - f_item);
628             /* move f */
629             f_item_ptr = f_item;
630             f_more = isamh_read_item (pp, &f_item_ptr);
631         }
632         else                            /* insert i */
633         {
634             if (!i_mode)                /* delete item which isn't there? */
635             {
636                 logf (LOG_FATAL, "Inconsistent register at offset %d",
637                                  r_offset);
638                 abort ();
639             }
640             memcpy (r_item, i_item, i_item_ptr - i_item);
641             mb[ptr].dirty = 2;
642             last_dirty = 1;
643             /* move i */
644             i_item_ptr = i_item;
645             i_more = (*data->read_item)(data->clientData, &i_item_ptr,
646                                         &i_mode);
647         }
648         if (r_item)  /* insert resulting item? */
649         {
650             char *r_out_ptr = r_buf + r_offset;
651             int new_offset;
652
653             (*is->method->code_item)(ISAMH_ENCODE, r_clientData,
654                                      &r_out_ptr, &r_item);
655             new_offset = r_out_ptr - r_buf; 
656
657             numKeys++;
658
659             if (border < new_offset && border >= r_offset)
660             {
661                 if (debug > 2)
662                     logf (LOG_LOG, "isc: border %d %d", ptr, border);
663                 /* Max size of current block category reached ...
664                    make new virtual block entry */
665                 mb[++ptr].block = 0;
666                 mb[ptr].dirty = 1;
667                 mb[ptr].offset = r_offset;
668                 if (cat == is->max_cat && ptr >= is->method->max_blocks_mem)
669                 {
670                     /* We are dealing with block(s) of max size. Block(s)
671                        except one will be flushed. Note: the block(s) are
672                        surely not the last one(s).
673                      */
674                     if (debug > 2)
675                         logf (LOG_LOG, "isc: flush B %d sections", ptr-1);
676                     flush_blocks ((ISAMC)is, mb, ptr-1, r_buf, &firstpos, cat,
677                                   0, &pp->numKeys);
678                     mb[0].block = mb[ptr-1].block;
679                     mb[0].dirty = mb[ptr-1].dirty;
680                     memcpy (r_buf, r_buf + mb[ptr-1].offset,
681                             mb[ptr].offset - mb[ptr-1].offset);
682                     mb[0].offset = 0;
683
684                     mb[1].block = mb[ptr].block;
685                     mb[1].dirty = mb[0].dirty;
686                     mb[1].offset = mb[ptr].offset - mb[ptr-1].offset;
687                     memcpy (r_buf + mb[1].offset, r_buf + r_offset,
688                             new_offset - r_offset);
689                     new_offset = (new_offset - r_offset) + mb[1].offset;
690                     ptr = 1;
691                 }
692                 border = is->method->filecat[cat].bsize;
693                   /* get_border (is, mb, ptr, cat, firstpos); */
694             }
695             r_offset = new_offset;
696         }
697         if (cat < is->max_cat && ptr >= is->method->filecat[cat].mblocks)
698         {
699             /* Max number blocks in current category reached ->
700                must switch to next category (with larger block size) 
701             */
702             int j = 0;
703
704             (is->files[cat].no_remap)++;
705             /* delete all original block(s) read so far */
706             for (i = 0; i < ptr; i++)
707                 if (mb[i].block)
708                     isamh_release_block (is, pp->cat, mb[i].block);
709             /* also delete all block to be read in the future */
710             pp->deleteFlag = 1;
711
712             /* remap block offsets */
713             assert (mb[j].offset == 0);
714             cat++;
715             mb[j].dirty = 1;
716             mb[j].block = 0;
717             mb[ptr].offset = r_offset;
718             for (i = 1; i < ptr; i++)
719             {
720                 int border = is->method->filecat[cat].ifill -
721                          ISAMH_BLOCK_OFFSET_1 + mb[j].offset;
722                 if (debug > 3)
723                     logf (LOG_LOG, "isc: remap %d border=%d", i, border);
724                 if (mb[i+1].offset > border && mb[i].offset <= border)
725                 {
726                     if (debug > 3)
727                         logf (LOG_LOG, "isc:  to %d %d", j, mb[i].offset);
728                     mb[++j].dirty = 1;
729                     mb[j].block = 0;
730                     mb[j].offset = mb[i].offset;
731                 }
732             }
733             if (debug > 2)
734                 logf (LOG_LOG, "isc: remap from %d to %d sections to cat %d",
735                       ptr, j, cat);
736             ptr = j;
737             border = is->method->filecat[cat].bsize;
738             /*border = get_border (is, mb, ptr, cat, firstpos);*/
739             if (debug > 3)
740                 logf (LOG_LOG, "isc: border=%d r_offset=%d", border, r_offset);
741         }
742     }
743     if (mb[ptr].offset < r_offset)
744     {   /* make the final boundary offset */
745         mb[++ptr].dirty = 1; 
746         mb[ptr].block = 0; 
747         mb[ptr].offset = r_offset;
748     }
749     else
750     {   /* empty output. Release last block if any */
751         if (cat == pp->cat && mb[ptr].block)
752         {
753             if (debug > 3)
754                 logf (LOG_LOG, "isc: release C");
755             isamh_release_block (is, pp->cat, mb[ptr].block);
756             mb[ptr].block = 0;
757             if (ptr > 0)
758                 mb[ptr-1].dirty = 1;
759         }
760     }
761
762     if (debug > 2)
763         logf (LOG_LOG, "isc: flush C, %d sections", ptr);
764
765     if (firstpos)
766     {
767         /* we have to patch initial block with num keys if that
768            has changed */
769         if (numKeys != isamh_pp_num (pp))
770         {
771             if (debug > 2)
772                 logf (LOG_LOG, "isc: patch num keys firstpos=%d num=%d",
773                                 firstpos, numKeys);
774             bf_write (is->files[cat].bf, firstpos, ISAMH_BLOCK_OFFSET_N,
775                       sizeof(numKeys), &numKeys);
776         }
777     }
778     else if (ptr > 0)
779     {   /* we haven't flushed initial block yet and there surely are some
780            blocks to flush. Make first block dirty if numKeys differ */
781         if (numKeys != isamh_pp_num (pp))
782             mb[0].dirty = 1;
783     }
784     /* flush rest of block(s) in r_buf */
785     flush_blocks ((ISAMC)is, mb, ptr, r_buf, &firstpos, cat, 1, &numKeys);
786
787     (*is->method->code_stop)(ISAMH_ENCODE, r_clientData);
788     if (!firstpos)
789         cat = 0;
790     if (debug > 1)
791         logf (LOG_LOG, "isc: isamh_append return %d %d", cat, firstpos);
792     isamh_pp_close (pp);
793     return cat + firstpos * 8;
794 }
795
796
797 /*
798  * $Log: merge.c,v $
799  * Revision 1.12  1999-06-30 15:03:55  heikki
800  * first take on isamh, the append-only isam structure
801  *
802  * Revision 1.11  1999/05/26 07:49:14  adam
803  * C++ compilation.
804  *
805  * Revision 1.10  1998/03/19 12:22:09  adam
806  * Minor change.
807  *
808  * Revision 1.9  1998/03/19 10:04:38  adam
809  * Minor changes.
810  *
811  * Revision 1.8  1998/03/18 09:23:55  adam
812  * Blocks are stored in chunks on free list - up to factor 2 in speed.
813  * Fixed bug that could occur in block category rearrangemen.
814  *
815  * Revision 1.7  1998/03/11 11:18:18  adam
816  * Changed the isc_merge to take into account the mfill (minimum-fill).
817  *
818  * Revision 1.6  1998/03/06 13:54:03  adam
819  * Fixed two nasty bugs in isc_merge.
820  *
821  * Revision 1.5  1997/02/12 20:42:43  adam
822  * Bug fix: during isc_merge operations, some pages weren't marked dirty
823  * even though they should be. At this point the merge operation marks
824  * a page dirty if the previous page changed at all. A better approach is
825  * to mark it dirty if the last key written changed in previous page.
826  *
827  * Revision 1.4  1996/11/08 11:15:31  adam
828  * Number of keys in chain are stored in first block and the function
829  * to retrieve this information, isc_pp_num is implemented.
830  *
831  * Revision 1.3  1996/11/04 14:08:59  adam
832  * Optimized free block usage.
833  *
834  * Revision 1.2  1996/11/01 13:36:46  adam
835  * New element, max_blocks_mem, that control how many blocks of max size
836  * to store in memory during isc_merge.
837  * Function isc_merge now ignores delete/update of identical keys and
838  * the proper blocks are then non-dirty and not written in flush_blocks.
839  *
840  * Revision 1.1  1996/11/01  08:59:15  adam
841  * First version of isc_merge that supports update/delete.
842  *
843  */
844
845