Better allocation strategy in isamh_merge
[idzebra-moved-to-github.git] / isamc / merge.c
1 /*
2  * Copyright (c) 1996-1998, Index Data.
3  * See the file LICENSE for details.
4  * Sebastian Hammer, Adam Dickmeiss, Heikki Levanto
5  *
6  */
7
8 #include <stdlib.h>
9 #include <assert.h>
10 #include <string.h>
11 #include <stdio.h>
12 #include <log.h>
13 #include "isamc-p.h"
14 #include "isamh-p.h"
15
16 struct isc_merge_block {
17     int offset;       /* offset in r_buf */
18     int block;        /* block number of file (0 if none) */
19     int dirty;        /* block is different from that on file */
20 };
21
22 static void opt_blocks (ISAMC is, struct isc_merge_block *mb, int ptr,
23                         int last)
24 {
25     int i, no_dirty = 0;
26     for (i = 0; i<ptr; i++)
27         if (mb[i].dirty)
28             no_dirty++;
29     if (no_dirty*4 < ptr*3)
30         return;
31     /* bubble-sort it */
32     for (i = 0; i<ptr; i++)
33     {
34         int tmp, j, j_min = -1;
35         for (j = i; j<ptr; j++)
36         {
37             if (j_min < 0 || mb[j_min].block > mb[j].block)
38                 j_min = j;
39         }
40         assert (j_min >= 0);
41         tmp = mb[j_min].block;
42         mb[j_min].block = mb[i].block;
43         mb[i].block = tmp;
44         mb[i].dirty = 1;
45     }
46     if (!last)
47         mb[i].dirty = 1;
48 }
49
50 static void flush_blocks (ISAMC is, struct isc_merge_block *mb, int ptr,
51                           char *r_buf, int *firstpos, int cat, int last,
52                           int *numkeys)
53 {
54     int i;
55
56     for (i = 0; i<ptr; i++)
57     {
58         /* consider this block number */
59         if (!mb[i].block) 
60         {
61             mb[i].block = isc_alloc_block (is, cat);
62             mb[i].dirty = 1;
63         }
64
65         /* consider next block pointer */
66         if (last && i == ptr-1)
67             mb[i+1].block = 0;
68         else if (!mb[i+1].block)       
69         {
70             mb[i+1].block = isc_alloc_block (is, cat);
71             mb[i+1].dirty = 1;
72             mb[i].dirty = 1;
73         }
74     }
75
76     for (i = 0; i<ptr; i++)
77     {
78         char *src;
79         ISAMC_BLOCK_SIZE ssize = mb[i+1].offset - mb[i].offset;
80
81         assert (ssize);
82
83         /* skip rest if not dirty */
84         if (!mb[i].dirty)
85         {
86             assert (mb[i].block);
87             if (!*firstpos)
88                 *firstpos = mb[i].block;
89             if (is->method->debug > 2)
90                 logf (LOG_LOG, "isc: skip ptr=%d size=%d %d %d",
91                      i, ssize, cat, mb[i].block);
92             ++(is->files[cat].no_skip_writes);
93             continue;
94         }
95         /* write block */
96
97         if (!*firstpos)
98         {
99             *firstpos = mb[i].block;
100             src = r_buf + mb[i].offset - ISAMC_BLOCK_OFFSET_1;
101             ssize += ISAMC_BLOCK_OFFSET_1;
102
103             memcpy (src+sizeof(int)+sizeof(ssize), numkeys,
104                     sizeof(*numkeys));
105             if (is->method->debug > 2)
106                 logf (LOG_LOG, "isc: flush ptr=%d numk=%d size=%d nextpos=%d",
107                      i, *numkeys, (int) ssize, mb[i+1].block);
108         }
109         else
110         {
111             src = r_buf + mb[i].offset - ISAMC_BLOCK_OFFSET_N;
112             ssize += ISAMC_BLOCK_OFFSET_N;
113             if (is->method->debug > 2)
114                 logf (LOG_LOG, "isc: flush ptr=%d size=%d nextpos=%d",
115                      i, (int) ssize, mb[i+1].block);
116         }
117         memcpy (src, &mb[i+1].block, sizeof(int));
118         memcpy (src+sizeof(int), &ssize, sizeof(ssize));
119         isc_write_block (is, cat, mb[i].block, src);
120     }
121 }
122
123 static int get_border (ISAMC is, struct isc_merge_block *mb, int ptr,
124                        int cat, int firstpos)
125 {
126    /* Border set to initial fill or block size depending on
127       whether we are creating a new one or updating and old one.
128     */
129     
130     int fill = mb[ptr].block ? is->method->filecat[cat].bsize :
131                                is->method->filecat[cat].ifill;
132     int off = (ptr||firstpos) ? ISAMC_BLOCK_OFFSET_N : ISAMC_BLOCK_OFFSET_1;
133     
134     assert (ptr < 199);
135
136     return mb[ptr].offset + fill - off;
137 }
138
139 ISAMC_P isc_merge (ISAMC is, ISAMC_P ipos, ISAMC_I data)
140 {
141
142     char i_item[128], *i_item_ptr;
143     int i_more, i_mode, i;
144
145     ISAMC_PP pp; 
146     char f_item[128], *f_item_ptr;
147     int f_more;
148     int last_dirty = 0;
149     int debug = is->method->debug;
150  
151     struct isc_merge_block mb[200];
152
153     int firstpos = 0;
154     int cat = 0;
155     char r_item_buf[128]; /* temporary result output */
156     char *r_buf;          /* block with resulting data */
157     int r_offset = 0;     /* current offset in r_buf */
158     int ptr = 0;          /* pointer */
159     void *r_clientData;   /* encode client data */
160     int border;
161     int numKeys = 0;
162
163     r_clientData = (*is->method->code_start)(ISAMC_ENCODE);
164     r_buf = is->merge_buf + 128;
165
166     pp = isc_pp_open (is, ipos);
167     /* read first item from file. make sure f_more indicates no boundary */
168     f_item_ptr = f_item;
169     f_more = isc_read_item (pp, &f_item_ptr);
170     if (f_more > 0)
171         f_more = 1;
172     cat = pp->cat;
173
174     if (debug > 1)
175         logf (LOG_LOG, "isc: isc_merge begin %d %d", cat, pp->pos);
176
177     /* read first item from i */
178     i_item_ptr = i_item;
179     i_more = (*data->read_item)(data->clientData, &i_item_ptr, &i_mode);
180
181     mb[ptr].block = pp->pos;     /* is zero if no block on disk */
182     mb[ptr].dirty = 0;
183     mb[ptr].offset = 0;
184
185     border = get_border (is, mb, ptr, cat, firstpos);
186     while (i_more || f_more)
187     {
188         char *r_item = r_item_buf;
189         int cmp;
190
191         if (f_more > 1)
192         {
193             /* block to block boundary in the original file. */
194             f_more = 1;
195             if (cat == pp->cat) 
196             {
197                 /* the resulting output is of the same category as the
198                    the original 
199                 */
200                 if (r_offset <= mb[ptr].offset +is->method->filecat[cat].mfill)
201                 {
202                     /* the resulting output block is too small/empty. Delete
203                        the original (if any)
204                     */
205                     if (debug > 3)
206                         logf (LOG_LOG, "isc: release A");
207                     if (mb[ptr].block)
208                         isc_release_block (is, pp->cat, mb[ptr].block);
209                     mb[ptr].block = pp->pos;
210                     if (!mb[ptr].dirty)
211                         mb[ptr].dirty = 1;
212                     if (ptr > 0)
213                         mb[ptr-1].dirty = 1;
214                 }
215                 else
216                 {
217                     /* indicate new boundary based on the original file */
218                     mb[++ptr].block = pp->pos;
219                     mb[ptr].dirty = last_dirty;
220                     mb[ptr].offset = r_offset;
221                     if (debug > 3)
222                         logf (LOG_LOG, "isc: bound ptr=%d,offset=%d",
223                             ptr, r_offset);
224                     if (cat==is->max_cat && ptr >= is->method->max_blocks_mem)
225                     {
226                         /* We are dealing with block(s) of max size. Block(s)
227                            except 1 will be flushed.
228                          */
229                         if (debug > 2)
230                             logf (LOG_LOG, "isc: flush A %d sections", ptr);
231                         flush_blocks (is, mb, ptr-1, r_buf, &firstpos, cat,
232                                       0, &pp->numKeys);
233                         mb[0].block = mb[ptr-1].block;
234                         mb[0].dirty = mb[ptr-1].dirty;
235                         memcpy (r_buf, r_buf + mb[ptr-1].offset,
236                                 mb[ptr].offset - mb[ptr-1].offset);
237                         mb[0].offset = 0;
238
239                         mb[1].block = mb[ptr].block;
240                         mb[1].dirty = mb[ptr].dirty;
241                         mb[1].offset = mb[ptr].offset - mb[ptr-1].offset;
242                         ptr = 1;
243                         r_offset = mb[ptr].offset;
244                     }
245                 }
246             }
247             border = get_border (is, mb, ptr, cat, firstpos);
248         }
249         last_dirty = 0;
250         if (!f_more)
251             cmp = -1;
252         else if (!i_more)
253             cmp = 1;
254         else
255             cmp = (*is->method->compare_item)(i_item, f_item);
256         if (cmp == 0)                   /* insert i=f */
257         {
258             if (!i_mode)   /* delete item? */
259             {
260                 /* move i */
261                 i_item_ptr = i_item;
262                 i_more = (*data->read_item)(data->clientData, &i_item_ptr,
263                                            &i_mode);
264                 /* is next input item the same as current except
265                    for the delete flag? */
266                 cmp = (*is->method->compare_item)(i_item, f_item);
267                 if (!cmp && i_mode)   /* delete/insert nop? */
268                 {
269                     /* yes! insert as if it was an insert only */
270                     memcpy (r_item, i_item, i_item_ptr - i_item);
271                     i_item_ptr = i_item;
272                     i_more = (*data->read_item)(data->clientData, &i_item_ptr,
273                                                 &i_mode);
274                 }
275                 else
276                 {
277                     /* no! delete the item */
278                     r_item = NULL;
279                     last_dirty = 1;
280                     mb[ptr].dirty = 2;
281                 }
282             }
283             else
284             {
285                 memcpy (r_item, f_item, f_item_ptr - f_item);
286
287                 /* move i */
288                 i_item_ptr = i_item;
289                 i_more = (*data->read_item)(data->clientData, &i_item_ptr,
290                                            &i_mode);
291             }
292             /* move f */
293             f_item_ptr = f_item;
294             f_more = isc_read_item (pp, &f_item_ptr);
295         }
296         else if (cmp > 0)               /* insert f */
297         {
298             memcpy (r_item, f_item, f_item_ptr - f_item);
299             /* move f */
300             f_item_ptr = f_item;
301             f_more = isc_read_item (pp, &f_item_ptr);
302         }
303         else                            /* insert i */
304         {
305             if (!i_mode)                /* delete item which isn't there? */
306             {
307                 logf (LOG_FATAL, "Inconsistent register at offset %d",
308                                  r_offset);
309                 abort ();
310             }
311             memcpy (r_item, i_item, i_item_ptr - i_item);
312             mb[ptr].dirty = 2;
313             last_dirty = 1;
314             /* move i */
315             i_item_ptr = i_item;
316             i_more = (*data->read_item)(data->clientData, &i_item_ptr,
317                                         &i_mode);
318         }
319         if (r_item)  /* insert resulting item? */
320         {
321             char *r_out_ptr = r_buf + r_offset;
322             int new_offset;
323
324             (*is->method->code_item)(ISAMC_ENCODE, r_clientData,
325                                      &r_out_ptr, &r_item);
326             new_offset = r_out_ptr - r_buf; 
327
328             numKeys++;
329
330             if (border < new_offset && border >= r_offset)
331             {
332                 if (debug > 2)
333                     logf (LOG_LOG, "isc: border %d %d", ptr, border);
334                 /* Max size of current block category reached ...
335                    make new virtual block entry */
336                 mb[++ptr].block = 0;
337                 mb[ptr].dirty = 1;
338                 mb[ptr].offset = r_offset;
339                 if (cat == is->max_cat && ptr >= is->method->max_blocks_mem)
340                 {
341                     /* We are dealing with block(s) of max size. Block(s)
342                        except one will be flushed. Note: the block(s) are
343                        surely not the last one(s).
344                      */
345                     if (debug > 2)
346                         logf (LOG_LOG, "isc: flush B %d sections", ptr-1);
347                     flush_blocks (is, mb, ptr-1, r_buf, &firstpos, cat,
348                                   0, &pp->numKeys);
349                     mb[0].block = mb[ptr-1].block;
350                     mb[0].dirty = mb[ptr-1].dirty;
351                     memcpy (r_buf, r_buf + mb[ptr-1].offset,
352                             mb[ptr].offset - mb[ptr-1].offset);
353                     mb[0].offset = 0;
354
355                     mb[1].block = mb[ptr].block;
356                     mb[1].dirty = mb[0].dirty;
357                     mb[1].offset = mb[ptr].offset - mb[ptr-1].offset;
358                     memcpy (r_buf + mb[1].offset, r_buf + r_offset,
359                             new_offset - r_offset);
360                     new_offset = (new_offset - r_offset) + mb[1].offset;
361                     ptr = 1;
362                 }
363                 border = get_border (is, mb, ptr, cat, firstpos);
364             }
365             r_offset = new_offset;
366         }
367         if (cat < is->max_cat && ptr >= is->method->filecat[cat].mblocks)
368         {
369             /* Max number blocks in current category reached ->
370                must switch to next category (with larger block size) 
371             */
372             int j = 0;
373
374             (is->files[cat].no_remap)++;
375             /* delete all original block(s) read so far */
376             for (i = 0; i < ptr; i++)
377                 if (mb[i].block)
378                     isc_release_block (is, pp->cat, mb[i].block);
379             /* also delete all block to be read in the future */
380             pp->deleteFlag = 1;
381
382             /* remap block offsets */
383             assert (mb[j].offset == 0);
384             cat++;
385             mb[j].dirty = 1;
386             mb[j].block = 0;
387             mb[ptr].offset = r_offset;
388             for (i = 1; i < ptr; i++)
389             {
390                 int border = is->method->filecat[cat].ifill -
391                          ISAMC_BLOCK_OFFSET_1 + mb[j].offset;
392                 if (debug > 3)
393                     logf (LOG_LOG, "isc: remap %d border=%d", i, border);
394                 if (mb[i+1].offset > border && mb[i].offset <= border)
395                 {
396                     if (debug > 3)
397                         logf (LOG_LOG, "isc:  to %d %d", j, mb[i].offset);
398                     mb[++j].dirty = 1;
399                     mb[j].block = 0;
400                     mb[j].offset = mb[i].offset;
401                 }
402             }
403             if (debug > 2)
404                 logf (LOG_LOG, "isc: remap from %d to %d sections to cat %d",
405                       ptr, j, cat);
406             ptr = j;
407             border = get_border (is, mb, ptr, cat, firstpos);
408             if (debug > 3)
409                 logf (LOG_LOG, "isc: border=%d r_offset=%d", border, r_offset);
410         }
411     }
412     if (mb[ptr].offset < r_offset)
413     {   /* make the final boundary offset */
414         mb[++ptr].dirty = 1; 
415         mb[ptr].block = 0; 
416         mb[ptr].offset = r_offset;
417     }
418     else
419     {   /* empty output. Release last block if any */
420         if (cat == pp->cat && mb[ptr].block)
421         {
422             if (debug > 3)
423                 logf (LOG_LOG, "isc: release C");
424             isc_release_block (is, pp->cat, mb[ptr].block);
425             mb[ptr].block = 0;
426             if (ptr > 0)
427                 mb[ptr-1].dirty = 1;
428         }
429     }
430
431     if (debug > 2)
432         logf (LOG_LOG, "isc: flush C, %d sections", ptr);
433
434     if (firstpos)
435     {
436         /* we have to patch initial block with num keys if that
437            has changed */
438         if (numKeys != isc_pp_num (pp))
439         {
440             if (debug > 2)
441                 logf (LOG_LOG, "isc: patch num keys firstpos=%d num=%d",
442                                 firstpos, numKeys);
443             bf_write (is->files[cat].bf, firstpos, ISAMC_BLOCK_OFFSET_N,
444                       sizeof(numKeys), &numKeys);
445         }
446     }
447     else if (ptr > 0)
448     {   /* we haven't flushed initial block yet and there surely are some
449            blocks to flush. Make first block dirty if numKeys differ */
450         if (numKeys != isc_pp_num (pp))
451             mb[0].dirty = 1;
452     }
453     /* flush rest of block(s) in r_buf */
454     flush_blocks (is, mb, ptr, r_buf, &firstpos, cat, 1, &numKeys);
455
456     (*is->method->code_stop)(ISAMC_ENCODE, r_clientData);
457     if (!firstpos)
458         cat = 0;
459     if (debug > 1)
460         logf (LOG_LOG, "isc: isc_merge return %d %d", cat, firstpos);
461     isc_pp_close (pp);
462     return cat + firstpos * 8;
463 }
464
465 static char *hexdump(unsigned char *p, int len, char *buff) {
466   static char localbuff[128];
467   char bytebuff[8];
468   if (!buff) buff=localbuff;
469   *buff='\0';
470   while (len--) {
471     sprintf(bytebuff,"%02x",*p);
472     p++;
473     strcat(buff,bytebuff);
474     if (len) strcat(buff,",");
475   }
476   return buff;
477 }
478
479
480 /* isamh - heikki's append-only isam 
481  * Idea: When allocating a new block, allocate memory for a very large block
482  *       (maximal blocksize). When done, see if you can shrink it to some 
483  *       smaller size. First-time indexing will go in optimal blocks, and
484  *       following small additions will go to the end of the last of the 
485  *       maximal ones. Only later, when new blocks need to be allocated, it
486  *       may make sense to reserve some extra space...
487 */
488
489
490 static void isamh_reduceblock(ISAMH is, ISAMH_PP pp, int numKeys)
491 {
492   if (pp->is->method->debug > 2)
493      logf(LOG_LOG,"isamh_reduce: block p=%d c=%d o=%d nk=%d ", 
494        pp->pos, pp->cat, pp->offset, numKeys);
495   if (pp->pos != 0)
496     return; /* already allocated in some size */
497   while ( ( pp->cat > 0 ) &&
498           ( pp->offset < is->method->filecat[pp->cat-1].bsize) &&
499           ( numKeys < is->method->filecat[pp->cat-1].mblocks) )
500     pp->cat--;
501   pp->pos =  isamh_alloc_block(is,pp->cat) ;
502   if (pp->is->method->debug > 2)
503      logf(LOG_LOG,"isamh_reduced block p=%d to c=%d o=%d nk=%d bs=%d", 
504          pp->pos, pp->cat, pp->offset, numKeys, 
505          is->method->filecat[pp->cat].bsize);
506
507   
508 } /* reduceblock */
509
510 ISAMC_P isamh_append (ISAMH is, ISAMH_P ipos, ISAMH_I data)
511 {
512
513     ISAMH_PP pp; 
514
515     char i_item[128];
516     char *i_item_ptr;
517     int i_more=1, i_mode;
518
519     char codebuffer[128];
520     char *codeptr;
521     char *bufptr;
522     int codelen;
523
524     ISAMH_PP firstpp;
525     void *r_clientData;   /* encode client data */
526     int newblock;
527     int newcat;
528     int maxkeys;
529     int maxsize;
530     int retval;
531     int maxcat;
532             
533     pp = firstpp = isamh_pp_open (is, ipos);
534     assert (*is->method->code_reset);
535     
536     maxcat=0; /* find the largest block size for default allocation */
537     for ( newcat=0; is->method->filecat[newcat].mblocks > 0; newcat++)
538       if ( is->method->filecat[newcat].bsize > 
539            is->method->filecat[maxcat].bsize)
540         maxcat = newcat;
541    
542     if ( 0==ipos)
543     {   /* new block */
544       pp->cat=maxcat; /* start large... */
545       pp->pos = 0; /* not allocated yet */ 
546       pp->size= pp->offset = ISAMH_BLOCK_OFFSET_1 ;
547       r_clientData = (*is->method->code_start)(ISAMH_ENCODE);
548       if (pp->is->method->debug > 2)
549         logf(LOG_LOG,"isamh_append: starting with new block %d",pp->pos);
550     }
551     else
552     { /* existing block */
553       if (isamh_block(firstpp->lastblock) == firstpp->pos) 
554       { /* only one block, we have it already */
555         pp->offset=ISAMH_BLOCK_OFFSET_1;
556         if (pp->is->method->debug > 2)
557           logf(LOG_LOG,"isamh_append: starting with one block %d",pp->pos);
558       }
559       else
560       { 
561         if (pp->is->method->debug > 2)
562           logf(LOG_LOG,"isamh_append: starting with multiple blocks %d>%d>%d",
563             firstpp->pos,isamh_block(firstpp->next),isamh_block(firstpp->lastblock));
564         pp=isamh_pp_open(is,firstpp->lastblock);
565           /* dirty, but this can also read a N-block. Just clear extra values*/
566         pp->lastblock=0;
567         pp->offset=ISAMH_BLOCK_OFFSET_N;
568       } /* get last */ 
569       r_clientData = (*is->method->code_start)(ISAMH_ENCODE);
570       if (pp->is->method->debug > 3)
571         logf(LOG_LOG,"isamh_append: scanning to end of block %d %d->%d",
572              pp->pos, pp->offset, pp->size);
573       codeptr=codebuffer;
574       while (pp->offset<pp->size) {
575         codeptr=codebuffer;
576         bufptr=pp->buf + pp->offset;
577         (*is->method->code_item)(ISAMH_DECODE, r_clientData, &codeptr, &bufptr);
578         codelen = bufptr - (pp->buf+pp->offset) ;
579         if (pp->is->method->debug > 3)
580           logf(LOG_LOG,"isamh_append: dec at %d %d/%d:%s",
581             pp->offset, codelen, codeptr-codebuffer,
582             hexdump(codebuffer,codeptr-codebuffer,0) );
583         pp->offset += codelen;
584       }
585     } /* existing block */
586
587     
588     i_item_ptr = i_item;
589     i_more = (*data->read_item)(data->clientData,&i_item_ptr,&i_mode);
590     if (pp->is->method->debug > 3)
591       logf(LOG_LOG,"isamh_append 1: m=%d l=%d %s",
592         i_mode, i_item_ptr-i_item, hexdump(i_item,i_item_ptr-i_item,0));
593
594     maxsize = is->method->filecat[pp->cat].bsize;
595     
596     while(i_more) {
597        if (i_mode) 
598        { /* insert key, ignore all delete keys time being... */
599           codeptr = codebuffer;
600           i_item_ptr=i_item;
601           (*is->method->code_item)(ISAMH_ENCODE, r_clientData, &codeptr, &i_item_ptr);
602           codelen = codeptr-codebuffer;
603    
604           assert( (codelen < 128) && (codelen>0));
605        
606           if (pp->is->method->debug > 3)
607             logf(LOG_LOG,"isamh_append: coded into %d:%s  (nk=%d)", 
608                codelen,hexdump(codebuffer,codelen,0),firstpp->numKeys);
609
610           if ( pp->offset + codelen > maxsize ) 
611           { /* oops, block full, do something */
612             newcat = maxcat; /* start with a large block again */
613             maxkeys = is->method->filecat[pp->cat].mblocks;  /* max keys */
614             if (pp->is->method->debug > 2)
615               logf(LOG_LOG,"isamh_append: need new block: %d > %d (k:%d/%d)", 
616                   pp->offset + codelen, maxsize, firstpp->numKeys,maxkeys );
617
618 #ifdef SKIPTHIS
619             if ( (maxkeys>0) && (firstpp->numKeys > maxkeys) ) 
620             { /* time to increase block size */
621                newcat++;
622                maxsize = is->method->filecat[newcat].bsize;
623                pp->buf=xrealloc(pp->buf,maxsize);
624                if (pp->is->method->debug > 2)
625                  logf(LOG_LOG,"isamh_append: increased to cat %d ",newcat);
626             }
627 #endif
628
629             newblock = 0; /* isamh_alloc_block(is,newcat);*/
630             pp->next = isamh_addr(newblock,newcat);
631             if (firstpp!=pp)
632             {  /* not first block, write to disk already now */
633               isamh_reduceblock(is,pp,firstpp->numKeys);
634               isamh_buildlaterblock(pp);
635               isamh_write_block(is,pp->cat,pp->pos,pp->buf);    
636             }
637             else 
638             {  /* we had only one block, allocate a second buffer */
639               pp = isamh_pp_open(is,0);
640             }
641             pp->cat = newcat; 
642             pp->pos = newblock;
643          
644             pp->size=pp->offset=ISAMH_BLOCK_OFFSET_N ;
645             pp->next=0;
646             pp->lastblock=0;
647             if (pp->is->method->debug > 2)
648               logf(LOG_LOG,"isamh_append: got a new block %d:%d",pp->cat,pp->pos);
649
650             /* reset the encoding, and code again */
651             (*is->method->code_reset)(r_clientData);
652             codeptr = codebuffer;
653             i_item_ptr=i_item;
654             (*is->method->code_item)(ISAMH_ENCODE, r_clientData, &codeptr, &i_item_ptr);
655             codelen = codeptr-codebuffer;
656             if (pp->is->method->debug > 3)
657               logf(LOG_LOG,"isamh_append: coded again %d:%s  (nk=%d)", 
658                    codelen,hexdump(codebuffer,codelen,0),firstpp->numKeys);
659
660           } /* block full */
661
662           /* ok, now we can write it */
663           memcpy(&(pp->buf[pp->offset]), codebuffer, codelen);
664           pp->offset += codelen;
665           pp->size += codelen;
666           firstpp->numKeys++; 
667        } /* not a delete */
668
669        /* try to read the next element */
670        i_item_ptr = i_item;
671        i_more = (*data->read_item)(data->clientData,&i_item_ptr,&i_mode);
672        if (pp->is->method->debug > 3)
673          logf(LOG_LOG,"isamh_append 2: m=%d l=%d %s",
674              i_mode, i_item_ptr-i_item, hexdump(i_item,i_item_ptr-i_item,0));
675     
676     } /* while */
677
678     /* Write the last (partial) block, if needed. */
679     if (pp!=firstpp) 
680     {
681       pp->next=0; /* just to be sure */
682       isamh_reduceblock(is,pp,firstpp->numKeys);
683       isamh_buildlaterblock(pp);
684       isamh_write_block(is,pp->cat,pp->pos,pp->buf);    
685     }
686     
687     /* update first block and write it */
688     firstpp->lastblock = isamh_addr(pp->pos,pp->cat);
689     isamh_reduceblock(is,firstpp,firstpp->numKeys);
690     isamh_buildfirstblock(firstpp);
691     isamh_write_block(is,firstpp->cat,firstpp->pos,firstpp->buf);
692
693     /* release the second block, if we allocated one */
694     if ( firstpp != pp ) 
695       isamh_pp_close(pp); 
696  
697     /* get return value (before it disappears at close! */
698     retval = isamh_addr(firstpp->pos,firstpp->cat);
699
700     isamh_pp_close(firstpp);    
701     
702     return retval;
703     
704 } /*  isamh_append */
705
706
707 /*
708  * $Log: merge.c,v $
709  * Revision 1.17  1999-07-13 14:22:17  heikki
710  * Better allocation strategy in isamh_merge
711  *
712  * Revision 1.16  1999/07/08 14:23:27  heikki
713  * Fixed a bug in isamh_pp_read and cleaned up a bit
714  *
715  * Revision 1.15  1999/07/07 09:36:04  heikki
716  * Fixed an assertion in isamh
717  *
718  * Revision 1.13  1999/07/06 09:37:05  heikki
719  * Working on isamh - not ready yet.
720  *
721  * Revision 1.12  1999/06/30 15:03:55  heikki
722  * first take on isamh, the append-only isam structure
723  *
724  * Revision 1.11  1999/05/26 07:49:14  adam
725  * C++ compilation.
726  *
727  * Revision 1.10  1998/03/19 12:22:09  adam
728  * Minor change.
729  *
730  * Revision 1.9  1998/03/19 10:04:38  adam
731  * Minor changes.
732  *
733  * Revision 1.8  1998/03/18 09:23:55  adam
734  * Blocks are stored in chunks on free list - up to factor 2 in speed.
735  * Fixed bug that could occur in block category rearrangemen.
736  *
737  * Revision 1.7  1998/03/11 11:18:18  adam
738  * Changed the isc_merge to take into account the mfill (minimum-fill).
739  *
740  * Revision 1.6  1998/03/06 13:54:03  adam
741  * Fixed two nasty bugs in isc_merge.
742  *
743  * Revision 1.5  1997/02/12 20:42:43  adam
744  * Bug fix: during isc_merge operations, some pages weren't marked dirty
745  * even though they should be. At this point the merge operation marks
746  * a page dirty if the previous page changed at all. A better approach is
747  * to mark it dirty if the last key written changed in previous page.
748  *
749  * Revision 1.4  1996/11/08 11:15:31  adam
750  * Number of keys in chain are stored in first block and the function
751  * to retrieve this information, isc_pp_num is implemented.
752  *
753  * Revision 1.3  1996/11/04 14:08:59  adam
754  * Optimized free block usage.
755  *
756  * Revision 1.2  1996/11/01 13:36:46  adam
757  * New element, max_blocks_mem, that control how many blocks of max size
758  * to store in memory during isc_merge.
759  * Function isc_merge now ignores delete/update of identical keys and
760  * the proper blocks are then non-dirty and not written in flush_blocks.
761  *
762  * Revision 1.1  1996/11/01  08:59:15  adam
763  * First version of isc_merge that supports update/delete.
764  *
765  */
766
767