Removed the one-block append, it had a serious flaw.
[idzebra-moved-to-github.git] / isamc / merge.c
1 /*
2  * Copyright (c) 1996-1998, Index Data.
3  * See the file LICENSE for details.
4  * Sebastian Hammer, Adam Dickmeiss, Heikki Levanto
5  *
6  */
7
8 #include <stdlib.h>
9 #include <assert.h>
10 #include <string.h>
11 #include <stdio.h>
12 #include <log.h>
13 #include "isamc-p.h"
14 #include "isamh-p.h"
15
16 struct isc_merge_block {
17     int offset;       /* offset in r_buf */
18     int block;        /* block number of file (0 if none) */
19     int dirty;        /* block is different from that on file */
20 };
21
22 static void opt_blocks (ISAMC is, struct isc_merge_block *mb, int ptr,
23                         int last)
24 {
25     int i, no_dirty = 0;
26     for (i = 0; i<ptr; i++)
27         if (mb[i].dirty)
28             no_dirty++;
29     if (no_dirty*4 < ptr*3)
30         return;
31     /* bubble-sort it */
32     for (i = 0; i<ptr; i++)
33     {
34         int tmp, j, j_min = -1;
35         for (j = i; j<ptr; j++)
36         {
37             if (j_min < 0 || mb[j_min].block > mb[j].block)
38                 j_min = j;
39         }
40         assert (j_min >= 0);
41         tmp = mb[j_min].block;
42         mb[j_min].block = mb[i].block;
43         mb[i].block = tmp;
44         mb[i].dirty = 1;
45     }
46     if (!last)
47         mb[i].dirty = 1;
48 }
49
50 static void flush_blocks (ISAMC is, struct isc_merge_block *mb, int ptr,
51                           char *r_buf, int *firstpos, int cat, int last,
52                           int *numkeys)
53 {
54     int i;
55
56     for (i = 0; i<ptr; i++)
57     {
58         /* consider this block number */
59         if (!mb[i].block) 
60         {
61             mb[i].block = isc_alloc_block (is, cat);
62             mb[i].dirty = 1;
63         }
64
65         /* consider next block pointer */
66         if (last && i == ptr-1)
67             mb[i+1].block = 0;
68         else if (!mb[i+1].block)       
69         {
70             mb[i+1].block = isc_alloc_block (is, cat);
71             mb[i+1].dirty = 1;
72             mb[i].dirty = 1;
73         }
74     }
75
76     for (i = 0; i<ptr; i++)
77     {
78         char *src;
79         ISAMC_BLOCK_SIZE ssize = mb[i+1].offset - mb[i].offset;
80
81         assert (ssize);
82
83         /* skip rest if not dirty */
84         if (!mb[i].dirty)
85         {
86             assert (mb[i].block);
87             if (!*firstpos)
88                 *firstpos = mb[i].block;
89             if (is->method->debug > 2)
90                 logf (LOG_LOG, "isc: skip ptr=%d size=%d %d %d",
91                      i, ssize, cat, mb[i].block);
92             ++(is->files[cat].no_skip_writes);
93             continue;
94         }
95         /* write block */
96
97         if (!*firstpos)
98         {
99             *firstpos = mb[i].block;
100             src = r_buf + mb[i].offset - ISAMC_BLOCK_OFFSET_1;
101             ssize += ISAMC_BLOCK_OFFSET_1;
102
103             memcpy (src+sizeof(int)+sizeof(ssize), numkeys,
104                     sizeof(*numkeys));
105             if (is->method->debug > 2)
106                 logf (LOG_LOG, "isc: flush ptr=%d numk=%d size=%d nextpos=%d",
107                      i, *numkeys, (int) ssize, mb[i+1].block);
108         }
109         else
110         {
111             src = r_buf + mb[i].offset - ISAMC_BLOCK_OFFSET_N;
112             ssize += ISAMC_BLOCK_OFFSET_N;
113             if (is->method->debug > 2)
114                 logf (LOG_LOG, "isc: flush ptr=%d size=%d nextpos=%d",
115                      i, (int) ssize, mb[i+1].block);
116         }
117         memcpy (src, &mb[i+1].block, sizeof(int));
118         memcpy (src+sizeof(int), &ssize, sizeof(ssize));
119         isc_write_block (is, cat, mb[i].block, src);
120     }
121 }
122
123 static int get_border (ISAMC is, struct isc_merge_block *mb, int ptr,
124                        int cat, int firstpos)
125 {
126    /* Border set to initial fill or block size depending on
127       whether we are creating a new one or updating and old one.
128     */
129     
130     int fill = mb[ptr].block ? is->method->filecat[cat].bsize :
131                                is->method->filecat[cat].ifill;
132     int off = (ptr||firstpos) ? ISAMC_BLOCK_OFFSET_N : ISAMC_BLOCK_OFFSET_1;
133     
134     assert (ptr < 199);
135
136     return mb[ptr].offset + fill - off;
137 }
138
139 ISAMC_P isc_merge (ISAMC is, ISAMC_P ipos, ISAMC_I data)
140 {
141
142     char i_item[128], *i_item_ptr;
143     int i_more, i_mode, i;
144
145     ISAMC_PP pp; 
146     char f_item[128], *f_item_ptr;
147     int f_more;
148     int last_dirty = 0;
149     int debug = is->method->debug;
150  
151     struct isc_merge_block mb[200];
152
153     int firstpos = 0;
154     int cat = 0;
155     char r_item_buf[128]; /* temporary result output */
156     char *r_buf;          /* block with resulting data */
157     int r_offset = 0;     /* current offset in r_buf */
158     int ptr = 0;          /* pointer */
159     void *r_clientData;   /* encode client data */
160     int border;
161     int numKeys = 0;
162
163     r_clientData = (*is->method->code_start)(ISAMC_ENCODE);
164     r_buf = is->merge_buf + 128;
165
166     pp = isc_pp_open (is, ipos);
167     /* read first item from file. make sure f_more indicates no boundary */
168     f_item_ptr = f_item;
169     f_more = isc_read_item (pp, &f_item_ptr);
170     if (f_more > 0)
171         f_more = 1;
172     cat = pp->cat;
173
174     if (debug > 1)
175         logf (LOG_LOG, "isc: isc_merge begin %d %d", cat, pp->pos);
176
177     /* read first item from i */
178     i_item_ptr = i_item;
179     i_more = (*data->read_item)(data->clientData, &i_item_ptr, &i_mode);
180
181     mb[ptr].block = pp->pos;     /* is zero if no block on disk */
182     mb[ptr].dirty = 0;
183     mb[ptr].offset = 0;
184
185     border = get_border (is, mb, ptr, cat, firstpos);
186     while (i_more || f_more)
187     {
188         char *r_item = r_item_buf;
189         int cmp;
190
191         if (f_more > 1)
192         {
193             /* block to block boundary in the original file. */
194             f_more = 1;
195             if (cat == pp->cat) 
196             {
197                 /* the resulting output is of the same category as the
198                    the original 
199                 */
200                 if (r_offset <= mb[ptr].offset +is->method->filecat[cat].mfill)
201                 {
202                     /* the resulting output block is too small/empty. Delete
203                        the original (if any)
204                     */
205                     if (debug > 3)
206                         logf (LOG_LOG, "isc: release A");
207                     if (mb[ptr].block)
208                         isc_release_block (is, pp->cat, mb[ptr].block);
209                     mb[ptr].block = pp->pos;
210                     if (!mb[ptr].dirty)
211                         mb[ptr].dirty = 1;
212                     if (ptr > 0)
213                         mb[ptr-1].dirty = 1;
214                 }
215                 else
216                 {
217                     /* indicate new boundary based on the original file */
218                     mb[++ptr].block = pp->pos;
219                     mb[ptr].dirty = last_dirty;
220                     mb[ptr].offset = r_offset;
221                     if (debug > 3)
222                         logf (LOG_LOG, "isc: bound ptr=%d,offset=%d",
223                             ptr, r_offset);
224                     if (cat==is->max_cat && ptr >= is->method->max_blocks_mem)
225                     {
226                         /* We are dealing with block(s) of max size. Block(s)
227                            except 1 will be flushed.
228                          */
229                         if (debug > 2)
230                             logf (LOG_LOG, "isc: flush A %d sections", ptr);
231                         flush_blocks (is, mb, ptr-1, r_buf, &firstpos, cat,
232                                       0, &pp->numKeys);
233                         mb[0].block = mb[ptr-1].block;
234                         mb[0].dirty = mb[ptr-1].dirty;
235                         memcpy (r_buf, r_buf + mb[ptr-1].offset,
236                                 mb[ptr].offset - mb[ptr-1].offset);
237                         mb[0].offset = 0;
238
239                         mb[1].block = mb[ptr].block;
240                         mb[1].dirty = mb[ptr].dirty;
241                         mb[1].offset = mb[ptr].offset - mb[ptr-1].offset;
242                         ptr = 1;
243                         r_offset = mb[ptr].offset;
244                     }
245                 }
246             }
247             border = get_border (is, mb, ptr, cat, firstpos);
248         }
249         last_dirty = 0;
250         if (!f_more)
251             cmp = -1;
252         else if (!i_more)
253             cmp = 1;
254         else
255             cmp = (*is->method->compare_item)(i_item, f_item);
256         if (cmp == 0)                   /* insert i=f */
257         {
258             if (!i_mode)   /* delete item? */
259             {
260                 /* move i */
261                 i_item_ptr = i_item;
262                 i_more = (*data->read_item)(data->clientData, &i_item_ptr,
263                                            &i_mode);
264                 /* is next input item the same as current except
265                    for the delete flag? */
266                 cmp = (*is->method->compare_item)(i_item, f_item);
267                 if (!cmp && i_mode)   /* delete/insert nop? */
268                 {
269                     /* yes! insert as if it was an insert only */
270                     memcpy (r_item, i_item, i_item_ptr - i_item);
271                     i_item_ptr = i_item;
272                     i_more = (*data->read_item)(data->clientData, &i_item_ptr,
273                                                 &i_mode);
274                 }
275                 else
276                 {
277                     /* no! delete the item */
278                     r_item = NULL;
279                     last_dirty = 1;
280                     mb[ptr].dirty = 2;
281                 }
282             }
283             else
284             {
285                 memcpy (r_item, f_item, f_item_ptr - f_item);
286
287                 /* move i */
288                 i_item_ptr = i_item;
289                 i_more = (*data->read_item)(data->clientData, &i_item_ptr,
290                                            &i_mode);
291             }
292             /* move f */
293             f_item_ptr = f_item;
294             f_more = isc_read_item (pp, &f_item_ptr);
295         }
296         else if (cmp > 0)               /* insert f */
297         {
298             memcpy (r_item, f_item, f_item_ptr - f_item);
299             /* move f */
300             f_item_ptr = f_item;
301             f_more = isc_read_item (pp, &f_item_ptr);
302         }
303         else                            /* insert i */
304         {
305             if (!i_mode)                /* delete item which isn't there? */
306             {
307                 logf (LOG_FATAL, "Inconsistent register at offset %d",
308                                  r_offset);
309                 abort ();
310             }
311             memcpy (r_item, i_item, i_item_ptr - i_item);
312             mb[ptr].dirty = 2;
313             last_dirty = 1;
314             /* move i */
315             i_item_ptr = i_item;
316             i_more = (*data->read_item)(data->clientData, &i_item_ptr,
317                                         &i_mode);
318         }
319         if (r_item)  /* insert resulting item? */
320         {
321             char *r_out_ptr = r_buf + r_offset;
322             int new_offset;
323
324             (*is->method->code_item)(ISAMC_ENCODE, r_clientData,
325                                      &r_out_ptr, &r_item);
326             new_offset = r_out_ptr - r_buf; 
327
328             numKeys++;
329
330             if (border < new_offset && border >= r_offset)
331             {
332                 if (debug > 2)
333                     logf (LOG_LOG, "isc: border %d %d", ptr, border);
334                 /* Max size of current block category reached ...
335                    make new virtual block entry */
336                 mb[++ptr].block = 0;
337                 mb[ptr].dirty = 1;
338                 mb[ptr].offset = r_offset;
339                 if (cat == is->max_cat && ptr >= is->method->max_blocks_mem)
340                 {
341                     /* We are dealing with block(s) of max size. Block(s)
342                        except one will be flushed. Note: the block(s) are
343                        surely not the last one(s).
344                      */
345                     if (debug > 2)
346                         logf (LOG_LOG, "isc: flush B %d sections", ptr-1);
347                     flush_blocks (is, mb, ptr-1, r_buf, &firstpos, cat,
348                                   0, &pp->numKeys);
349                     mb[0].block = mb[ptr-1].block;
350                     mb[0].dirty = mb[ptr-1].dirty;
351                     memcpy (r_buf, r_buf + mb[ptr-1].offset,
352                             mb[ptr].offset - mb[ptr-1].offset);
353                     mb[0].offset = 0;
354
355                     mb[1].block = mb[ptr].block;
356                     mb[1].dirty = mb[0].dirty;
357                     mb[1].offset = mb[ptr].offset - mb[ptr-1].offset;
358                     memcpy (r_buf + mb[1].offset, r_buf + r_offset,
359                             new_offset - r_offset);
360                     new_offset = (new_offset - r_offset) + mb[1].offset;
361                     ptr = 1;
362                 }
363                 border = get_border (is, mb, ptr, cat, firstpos);
364             }
365             r_offset = new_offset;
366         }
367         if (cat < is->max_cat && ptr >= is->method->filecat[cat].mblocks)
368         {
369             /* Max number blocks in current category reached ->
370                must switch to next category (with larger block size) 
371             */
372             int j = 0;
373
374             (is->files[cat].no_remap)++;
375             /* delete all original block(s) read so far */
376             for (i = 0; i < ptr; i++)
377                 if (mb[i].block)
378                     isc_release_block (is, pp->cat, mb[i].block);
379             /* also delete all block to be read in the future */
380             pp->deleteFlag = 1;
381
382             /* remap block offsets */
383             assert (mb[j].offset == 0);
384             cat++;
385             mb[j].dirty = 1;
386             mb[j].block = 0;
387             mb[ptr].offset = r_offset;
388             for (i = 1; i < ptr; i++)
389             {
390                 int border = is->method->filecat[cat].ifill -
391                          ISAMC_BLOCK_OFFSET_1 + mb[j].offset;
392                 if (debug > 3)
393                     logf (LOG_LOG, "isc: remap %d border=%d", i, border);
394                 if (mb[i+1].offset > border && mb[i].offset <= border)
395                 {
396                     if (debug > 3)
397                         logf (LOG_LOG, "isc:  to %d %d", j, mb[i].offset);
398                     mb[++j].dirty = 1;
399                     mb[j].block = 0;
400                     mb[j].offset = mb[i].offset;
401                 }
402             }
403             if (debug > 2)
404                 logf (LOG_LOG, "isc: remap from %d to %d sections to cat %d",
405                       ptr, j, cat);
406             ptr = j;
407             border = get_border (is, mb, ptr, cat, firstpos);
408             if (debug > 3)
409                 logf (LOG_LOG, "isc: border=%d r_offset=%d", border, r_offset);
410         }
411     }
412     if (mb[ptr].offset < r_offset)
413     {   /* make the final boundary offset */
414         mb[++ptr].dirty = 1; 
415         mb[ptr].block = 0; 
416         mb[ptr].offset = r_offset;
417     }
418     else
419     {   /* empty output. Release last block if any */
420         if (cat == pp->cat && mb[ptr].block)
421         {
422             if (debug > 3)
423                 logf (LOG_LOG, "isc: release C");
424             isc_release_block (is, pp->cat, mb[ptr].block);
425             mb[ptr].block = 0;
426             if (ptr > 0)
427                 mb[ptr-1].dirty = 1;
428         }
429     }
430
431     if (debug > 2)
432         logf (LOG_LOG, "isc: flush C, %d sections", ptr);
433
434     if (firstpos)
435     {
436         /* we have to patch initial block with num keys if that
437            has changed */
438         if (numKeys != isc_pp_num (pp))
439         {
440             if (debug > 2)
441                 logf (LOG_LOG, "isc: patch num keys firstpos=%d num=%d",
442                                 firstpos, numKeys);
443             bf_write (is->files[cat].bf, firstpos, ISAMC_BLOCK_OFFSET_N,
444                       sizeof(numKeys), &numKeys);
445         }
446     }
447     else if (ptr > 0)
448     {   /* we haven't flushed initial block yet and there surely are some
449            blocks to flush. Make first block dirty if numKeys differ */
450         if (numKeys != isc_pp_num (pp))
451             mb[0].dirty = 1;
452     }
453     /* flush rest of block(s) in r_buf */
454     flush_blocks (is, mb, ptr, r_buf, &firstpos, cat, 1, &numKeys);
455
456     (*is->method->code_stop)(ISAMC_ENCODE, r_clientData);
457     if (!firstpos)
458         cat = 0;
459     if (debug > 1)
460         logf (LOG_LOG, "isc: isc_merge return %d %d", cat, firstpos);
461     isc_pp_close (pp);
462     return cat + firstpos * 8;
463 }
464
465 static char *hexdump(unsigned char *p, int len, char *buff) {
466   static char localbuff[128];
467   char bytebuff[8];
468   if (!buff) buff=localbuff;
469   *buff='\0';
470   while (len--) {
471     sprintf(bytebuff,"%02x",*p);
472     p++;
473     strcat(buff,bytebuff);
474     if (len) strcat(buff,",");
475   }
476   return buff;
477 }
478
479
480 /* isamh - heikki's append-only isam 
481  * Idea: When allocating a new block, allocate memory for a very large block
482  *       (maximal blocksize). When done, see if you can shrink it to some 
483  *       smaller size. First-time indexing will go in optimal blocks, and
484  *       following small additions will go to the end of the last of the 
485  *       maximal ones. Only later, when new blocks need to be allocated, it
486  *       may make sense to reserve some extra space...
487 */
488
489 ISAMC_P isamh_append (ISAMH is, ISAMH_P ipos, ISAMH_I data)
490 {
491
492     ISAMH_PP pp; 
493
494     char i_item[128];
495     char *i_item_ptr;
496     int i_more=1, i_mode;
497
498     char codebuffer[128];
499     char *codeptr;
500     char *bufptr;
501     int codelen;
502
503     ISAMH_PP firstpp;
504     void *r_clientData;   /* encode client data */
505     int newblock;
506     int newcat;
507     int maxkeys;
508     int maxsize;
509     int retval;
510             
511     pp = firstpp = isamh_pp_open (is, ipos);
512     assert (*is->method->code_reset);
513     
514     if ( 0==ipos)
515     {   /* new block */
516       pp->cat=0; /* start small... */
517       pp->pos = isamh_alloc_block(is,pp->cat);
518       pp->size= pp->offset = ISAMH_BLOCK_OFFSET_1 ;
519       r_clientData = (*is->method->code_start)(ISAMH_ENCODE);
520       if (pp->is->method->debug > 2)
521         logf(LOG_LOG,"isamh_append: starting with new block %d",pp->pos);
522     }
523     else
524     { /* existing block */
525       if (isamh_block(firstpp->lastblock) == firstpp->pos) 
526       { /* only one block, we have it already */
527         pp->offset=ISAMH_BLOCK_OFFSET_1;
528         if (pp->is->method->debug > 2)
529           logf(LOG_LOG,"isamh_append: starting with one block %d",pp->pos);
530       }
531       else
532       { 
533         if (pp->is->method->debug > 2)
534           logf(LOG_LOG,"isamh_append: starting with multiple blocks %d>%d>%d",
535             firstpp->pos,isamh_block(firstpp->next),isamh_block(firstpp->lastblock));
536         pp=isamh_pp_open(is,firstpp->lastblock);
537           /* dirty, but this can also read a N-block. Just clear extra values*/
538         pp->lastblock=0;
539         pp->offset=ISAMH_BLOCK_OFFSET_N;
540       } /* get last */ 
541       r_clientData = (*is->method->code_start)(ISAMH_ENCODE);
542       if (pp->is->method->debug > 3)
543         logf(LOG_LOG,"isamh_append: scanning to end of block %d %d->%d",
544              pp->pos, pp->offset, pp->size);
545       codeptr=codebuffer;
546       while (pp->offset<pp->size) {
547         codeptr=codebuffer;
548         bufptr=pp->buf + pp->offset;
549         (*is->method->code_item)(ISAMH_DECODE, r_clientData, &codeptr, &bufptr);
550         codelen = bufptr - (pp->buf+pp->offset) ;
551         if (pp->is->method->debug > 3)
552           logf(LOG_LOG,"isamh_append: dec at %d %d/%d:%s",
553             pp->offset, codelen, codeptr-codebuffer,
554             hexdump(codebuffer,codeptr-codebuffer,0) );
555         pp->offset += codelen;
556       }
557     } /* existing block */
558
559     
560     i_item_ptr = i_item;
561     i_more = (*data->read_item)(data->clientData,&i_item_ptr,&i_mode);
562     if (pp->is->method->debug > 3)
563       logf(LOG_LOG,"isamh_append 1: m=%d l=%d %s",
564         i_mode, i_item_ptr-i_item, hexdump(i_item,i_item_ptr-i_item,0));
565
566     maxsize = is->method->filecat[pp->cat].bsize;
567     
568     while(i_more) {
569        if (i_mode) 
570        { /* insert key, ignore all delete keys time being... */
571           codeptr = codebuffer;
572           i_item_ptr=i_item;
573           (*is->method->code_item)(ISAMH_ENCODE, r_clientData, &codeptr, &i_item_ptr);
574           codelen = codeptr-codebuffer;
575    
576           assert( (codelen < 128) && (codelen>0));
577        
578           if (pp->is->method->debug > 3)
579             logf(LOG_LOG,"isamh_append: coded into %d:%s  (nk=%d)", 
580                codelen,hexdump(codebuffer,codelen,0),firstpp->numKeys);
581
582           if ( pp->offset + codelen > maxsize ) 
583           { /* oops, block full, do something */
584             newcat = pp->cat;
585             maxkeys = is->method->filecat[pp->cat].mblocks;  /* max keys */
586             if (pp->is->method->debug > 2)
587               logf(LOG_LOG,"isamh_append: need new block: %d > %d (k:%d/%d)", 
588                   pp->offset + codelen, maxsize, firstpp->numKeys,maxkeys );
589             if ( (maxkeys>0) && (firstpp->numKeys > maxkeys) ) 
590             { /* time to increase block size */
591                newcat++;
592                maxsize = is->method->filecat[newcat].bsize;
593                pp->buf=xrealloc(pp->buf,maxsize);
594                if (pp->is->method->debug > 2)
595                  logf(LOG_LOG,"isamh_append: increased to cat %d ",newcat);
596             }
597
598             newblock = isamh_alloc_block(is,newcat);
599             pp->next = isamh_addr(newblock,newcat);
600             if (firstpp!=pp)
601             {  /* not first block, write to disk already now */
602               isamh_buildlaterblock(pp);
603               isamh_write_block(is,pp->cat,pp->pos,pp->buf);    
604             }
605             else 
606             {  /* we had only one block, allocate a second buffer */
607               pp = isamh_pp_open(is,0);
608             }
609             pp->cat = newcat; 
610             pp->pos = newblock;
611          
612             pp->size=pp->offset=ISAMH_BLOCK_OFFSET_N ;
613             pp->next=0;
614             pp->lastblock=0;
615             if (pp->is->method->debug > 2)
616               logf(LOG_LOG,"isamh_append: got a new block %d:%d",pp->cat,pp->pos);
617
618             /* reset the encoding, and code again */
619             (*is->method->code_reset)(r_clientData);
620             codeptr = codebuffer;
621             i_item_ptr=i_item;
622             (*is->method->code_item)(ISAMH_ENCODE, r_clientData, &codeptr, &i_item_ptr);
623             codelen = codeptr-codebuffer;
624             if (pp->is->method->debug > 3)
625               logf(LOG_LOG,"isamh_append: coded again %d:%s  (nk=%d)", 
626                    codelen,hexdump(codebuffer,codelen,0),firstpp->numKeys);
627
628           } /* block full */
629
630           /* ok, now we can write it */
631           memcpy(&(pp->buf[pp->offset]), codebuffer, codelen);
632           pp->offset += codelen;
633           pp->size += codelen;
634           firstpp->numKeys++; 
635        } /* not a delete */
636
637        /* try to read the next element */
638        i_item_ptr = i_item;
639        i_more = (*data->read_item)(data->clientData,&i_item_ptr,&i_mode);
640        if (pp->is->method->debug > 3)
641          logf(LOG_LOG,"isamh_append 2: m=%d l=%d %s",
642              i_mode, i_item_ptr-i_item, hexdump(i_item,i_item_ptr-i_item,0));
643     
644     } /* while */
645
646     /* Write the last (partial) block, if needed. */
647     if (pp!=firstpp) 
648     {
649       pp->next=0; /* just to be sure */
650       isamh_buildlaterblock(pp);
651       isamh_write_block(is,pp->cat,pp->pos,pp->buf);    
652     }
653     
654     /* update first block and write it */
655     firstpp->lastblock = isamh_addr(pp->pos,pp->cat);
656     isamh_buildfirstblock(firstpp);
657     isamh_write_block(is,firstpp->cat,firstpp->pos,firstpp->buf);
658
659     /* release the second block, if we allocated one */
660     if ( firstpp != pp ) 
661       isamh_pp_close(pp); 
662  
663     /* get return value (before it disappears at close! */
664     retval = isamh_addr(firstpp->pos,firstpp->cat);
665
666     isamh_pp_close(firstpp);    
667     
668     return retval;
669     
670 } /*  isamh_append */
671
672
673 /*
674  * $Log: merge.c,v $
675  * Revision 1.18  1999-07-13 15:24:50  heikki
676  * Removed the one-block append, it had a serious flaw.
677  *
678  * Revision 1.16  1999/07/08 14:23:27  heikki
679  * Fixed a bug in isamh_pp_read and cleaned up a bit
680  *
681  * Revision 1.15  1999/07/07 09:36:04  heikki
682  * Fixed an assertion in isamh
683  *
684  * Revision 1.13  1999/07/06 09:37:05  heikki
685  * Working on isamh - not ready yet.
686  *
687  * Revision 1.12  1999/06/30 15:03:55  heikki
688  * first take on isamh, the append-only isam structure
689  *
690  * Revision 1.11  1999/05/26 07:49:14  adam
691  * C++ compilation.
692  *
693  * Revision 1.10  1998/03/19 12:22:09  adam
694  * Minor change.
695  *
696  * Revision 1.9  1998/03/19 10:04:38  adam
697  * Minor changes.
698  *
699  * Revision 1.8  1998/03/18 09:23:55  adam
700  * Blocks are stored in chunks on free list - up to factor 2 in speed.
701  * Fixed bug that could occur in block category rearrangemen.
702  *
703  * Revision 1.7  1998/03/11 11:18:18  adam
704  * Changed the isc_merge to take into account the mfill (minimum-fill).
705  *
706  * Revision 1.6  1998/03/06 13:54:03  adam
707  * Fixed two nasty bugs in isc_merge.
708  *
709  * Revision 1.5  1997/02/12 20:42:43  adam
710  * Bug fix: during isc_merge operations, some pages weren't marked dirty
711  * even though they should be. At this point the merge operation marks
712  * a page dirty if the previous page changed at all. A better approach is
713  * to mark it dirty if the last key written changed in previous page.
714  *
715  * Revision 1.4  1996/11/08 11:15:31  adam
716  * Number of keys in chain are stored in first block and the function
717  * to retrieve this information, isc_pp_num is implemented.
718  *
719  * Revision 1.3  1996/11/04 14:08:59  adam
720  * Optimized free block usage.
721  *
722  * Revision 1.2  1996/11/01 13:36:46  adam
723  * New element, max_blocks_mem, that control how many blocks of max size
724  * to store in memory during isc_merge.
725  * Function isc_merge now ignores delete/update of identical keys and
726  * the proper blocks are then non-dirty and not written in flush_blocks.
727  *
728  * Revision 1.1  1996/11/01  08:59:15  adam
729  * First version of isc_merge that supports update/delete.
730  *
731  */
732
733