Large-block isam-h (may not work too well... Abandoning for isam-d)
[idzebra-moved-to-github.git] / isamc / merge.c
1 /*
2  * Copyright (c) 1996-1998, Index Data.
3  * See the file LICENSE for details.
4  * Sebastian Hammer, Adam Dickmeiss, Heikki Levanto
5  *
6  */
7
8 #include <stdlib.h>
9 #include <assert.h>
10 #include <string.h>
11 #include <stdio.h>
12 #include <log.h>
13 #include "isamc-p.h"
14 #include "isamh-p.h"
15
16 struct isc_merge_block {
17     int offset;       /* offset in r_buf */
18     int block;        /* block number of file (0 if none) */
19     int dirty;        /* block is different from that on file */
20 };
21
22 static void opt_blocks (ISAMC is, struct isc_merge_block *mb, int ptr,
23                         int last)
24 {
25     int i, no_dirty = 0;
26     for (i = 0; i<ptr; i++)
27         if (mb[i].dirty)
28             no_dirty++;
29     if (no_dirty*4 < ptr*3)
30         return;
31     /* bubble-sort it */
32     for (i = 0; i<ptr; i++)
33     {
34         int tmp, j, j_min = -1;
35         for (j = i; j<ptr; j++)
36         {
37             if (j_min < 0 || mb[j_min].block > mb[j].block)
38                 j_min = j;
39         }
40         assert (j_min >= 0);
41         tmp = mb[j_min].block;
42         mb[j_min].block = mb[i].block;
43         mb[i].block = tmp;
44         mb[i].dirty = 1;
45     }
46     if (!last)
47         mb[i].dirty = 1;
48 }
49
50 static void flush_blocks (ISAMC is, struct isc_merge_block *mb, int ptr,
51                           char *r_buf, int *firstpos, int cat, int last,
52                           int *numkeys)
53 {
54     int i;
55
56     for (i = 0; i<ptr; i++)
57     {
58         /* consider this block number */
59         if (!mb[i].block) 
60         {
61             mb[i].block = isc_alloc_block (is, cat);
62             mb[i].dirty = 1;
63         }
64
65         /* consider next block pointer */
66         if (last && i == ptr-1)
67             mb[i+1].block = 0;
68         else if (!mb[i+1].block)       
69         {
70             mb[i+1].block = isc_alloc_block (is, cat);
71             mb[i+1].dirty = 1;
72             mb[i].dirty = 1;
73         }
74     }
75
76     for (i = 0; i<ptr; i++)
77     {
78         char *src;
79         ISAMC_BLOCK_SIZE ssize = mb[i+1].offset - mb[i].offset;
80
81         assert (ssize);
82
83         /* skip rest if not dirty */
84         if (!mb[i].dirty)
85         {
86             assert (mb[i].block);
87             if (!*firstpos)
88                 *firstpos = mb[i].block;
89             if (is->method->debug > 2)
90                 logf (LOG_LOG, "isc: skip ptr=%d size=%d %d %d",
91                      i, ssize, cat, mb[i].block);
92             ++(is->files[cat].no_skip_writes);
93             continue;
94         }
95         /* write block */
96
97         if (!*firstpos)
98         {
99             *firstpos = mb[i].block;
100             src = r_buf + mb[i].offset - ISAMC_BLOCK_OFFSET_1;
101             ssize += ISAMC_BLOCK_OFFSET_1;
102
103             memcpy (src+sizeof(int)+sizeof(ssize), numkeys,
104                     sizeof(*numkeys));
105             if (is->method->debug > 2)
106                 logf (LOG_LOG, "isc: flush ptr=%d numk=%d size=%d nextpos=%d",
107                      i, *numkeys, (int) ssize, mb[i+1].block);
108         }
109         else
110         {
111             src = r_buf + mb[i].offset - ISAMC_BLOCK_OFFSET_N;
112             ssize += ISAMC_BLOCK_OFFSET_N;
113             if (is->method->debug > 2)
114                 logf (LOG_LOG, "isc: flush ptr=%d size=%d nextpos=%d",
115                      i, (int) ssize, mb[i+1].block);
116         }
117         memcpy (src, &mb[i+1].block, sizeof(int));
118         memcpy (src+sizeof(int), &ssize, sizeof(ssize));
119         isc_write_block (is, cat, mb[i].block, src);
120     }
121 }
122
123 static int get_border (ISAMC is, struct isc_merge_block *mb, int ptr,
124                        int cat, int firstpos)
125 {
126    /* Border set to initial fill or block size depending on
127       whether we are creating a new one or updating and old one.
128     */
129     
130     int fill = mb[ptr].block ? is->method->filecat[cat].bsize :
131                                is->method->filecat[cat].ifill;
132     int off = (ptr||firstpos) ? ISAMC_BLOCK_OFFSET_N : ISAMC_BLOCK_OFFSET_1;
133     
134     assert (ptr < 199);
135
136     return mb[ptr].offset + fill - off;
137 }
138
139 ISAMC_P isc_merge (ISAMC is, ISAMC_P ipos, ISAMC_I data)
140 {
141
142     char i_item[128], *i_item_ptr;
143     int i_more, i_mode, i;
144
145     ISAMC_PP pp; 
146     char f_item[128], *f_item_ptr;
147     int f_more;
148     int last_dirty = 0;
149     int debug = is->method->debug;
150  
151     struct isc_merge_block mb[200];
152
153     int firstpos = 0;
154     int cat = 0;
155     char r_item_buf[128]; /* temporary result output */
156     char *r_buf;          /* block with resulting data */
157     int r_offset = 0;     /* current offset in r_buf */
158     int ptr = 0;          /* pointer */
159     void *r_clientData;   /* encode client data */
160     int border;
161     int numKeys = 0;
162
163     r_clientData = (*is->method->code_start)(ISAMC_ENCODE);
164     r_buf = is->merge_buf + 128;
165
166     pp = isc_pp_open (is, ipos);
167     /* read first item from file. make sure f_more indicates no boundary */
168     f_item_ptr = f_item;
169     f_more = isc_read_item (pp, &f_item_ptr);
170     if (f_more > 0)
171         f_more = 1;
172     cat = pp->cat;
173
174     if (debug > 1)
175         logf (LOG_LOG, "isc: isc_merge begin %d %d", cat, pp->pos);
176
177     /* read first item from i */
178     i_item_ptr = i_item;
179     i_more = (*data->read_item)(data->clientData, &i_item_ptr, &i_mode);
180
181     mb[ptr].block = pp->pos;     /* is zero if no block on disk */
182     mb[ptr].dirty = 0;
183     mb[ptr].offset = 0;
184
185     border = get_border (is, mb, ptr, cat, firstpos);
186     while (i_more || f_more)
187     {
188         char *r_item = r_item_buf;
189         int cmp;
190
191         if (f_more > 1)
192         {
193             /* block to block boundary in the original file. */
194             f_more = 1;
195             if (cat == pp->cat) 
196             {
197                 /* the resulting output is of the same category as the
198                    the original 
199                 */
200                 if (r_offset <= mb[ptr].offset +is->method->filecat[cat].mfill)
201                 {
202                     /* the resulting output block is too small/empty. Delete
203                        the original (if any)
204                     */
205                     if (debug > 3)
206                         logf (LOG_LOG, "isc: release A");
207                     if (mb[ptr].block)
208                         isc_release_block (is, pp->cat, mb[ptr].block);
209                     mb[ptr].block = pp->pos;
210                     if (!mb[ptr].dirty)
211                         mb[ptr].dirty = 1;
212                     if (ptr > 0)
213                         mb[ptr-1].dirty = 1;
214                 }
215                 else
216                 {
217                     /* indicate new boundary based on the original file */
218                     mb[++ptr].block = pp->pos;
219                     mb[ptr].dirty = last_dirty;
220                     mb[ptr].offset = r_offset;
221                     if (debug > 3)
222                         logf (LOG_LOG, "isc: bound ptr=%d,offset=%d",
223                             ptr, r_offset);
224                     if (cat==is->max_cat && ptr >= is->method->max_blocks_mem)
225                     {
226                         /* We are dealing with block(s) of max size. Block(s)
227                            except 1 will be flushed.
228                          */
229                         if (debug > 2)
230                             logf (LOG_LOG, "isc: flush A %d sections", ptr);
231                         flush_blocks (is, mb, ptr-1, r_buf, &firstpos, cat,
232                                       0, &pp->numKeys);
233                         mb[0].block = mb[ptr-1].block;
234                         mb[0].dirty = mb[ptr-1].dirty;
235                         memcpy (r_buf, r_buf + mb[ptr-1].offset,
236                                 mb[ptr].offset - mb[ptr-1].offset);
237                         mb[0].offset = 0;
238
239                         mb[1].block = mb[ptr].block;
240                         mb[1].dirty = mb[ptr].dirty;
241                         mb[1].offset = mb[ptr].offset - mb[ptr-1].offset;
242                         ptr = 1;
243                         r_offset = mb[ptr].offset;
244                     }
245                 }
246             }
247             border = get_border (is, mb, ptr, cat, firstpos);
248         }
249         last_dirty = 0;
250         if (!f_more)
251             cmp = -1;
252         else if (!i_more)
253             cmp = 1;
254         else
255             cmp = (*is->method->compare_item)(i_item, f_item);
256         if (cmp == 0)                   /* insert i=f */
257         {
258             if (!i_mode)   /* delete item? */
259             {
260                 /* move i */
261                 i_item_ptr = i_item;
262                 i_more = (*data->read_item)(data->clientData, &i_item_ptr,
263                                            &i_mode);
264                 /* is next input item the same as current except
265                    for the delete flag? */
266                 cmp = (*is->method->compare_item)(i_item, f_item);
267                 if (!cmp && i_mode)   /* delete/insert nop? */
268                 {
269                     /* yes! insert as if it was an insert only */
270                     memcpy (r_item, i_item, i_item_ptr - i_item);
271                     i_item_ptr = i_item;
272                     i_more = (*data->read_item)(data->clientData, &i_item_ptr,
273                                                 &i_mode);
274                 }
275                 else
276                 {
277                     /* no! delete the item */
278                     r_item = NULL;
279                     last_dirty = 1;
280                     mb[ptr].dirty = 2;
281                 }
282             }
283             else
284             {
285                 memcpy (r_item, f_item, f_item_ptr - f_item);
286
287                 /* move i */
288                 i_item_ptr = i_item;
289                 i_more = (*data->read_item)(data->clientData, &i_item_ptr,
290                                            &i_mode);
291             }
292             /* move f */
293             f_item_ptr = f_item;
294             f_more = isc_read_item (pp, &f_item_ptr);
295         }
296         else if (cmp > 0)               /* insert f */
297         {
298             memcpy (r_item, f_item, f_item_ptr - f_item);
299             /* move f */
300             f_item_ptr = f_item;
301             f_more = isc_read_item (pp, &f_item_ptr);
302         }
303         else                            /* insert i */
304         {
305             if (!i_mode)                /* delete item which isn't there? */
306             {
307                 logf (LOG_FATAL, "Inconsistent register at offset %d",
308                                  r_offset);
309                 abort ();
310             }
311             memcpy (r_item, i_item, i_item_ptr - i_item);
312             mb[ptr].dirty = 2;
313             last_dirty = 1;
314             /* move i */
315             i_item_ptr = i_item;
316             i_more = (*data->read_item)(data->clientData, &i_item_ptr,
317                                         &i_mode);
318         }
319         if (r_item)  /* insert resulting item? */
320         {
321             char *r_out_ptr = r_buf + r_offset;
322             int new_offset;
323
324             (*is->method->code_item)(ISAMC_ENCODE, r_clientData,
325                                      &r_out_ptr, &r_item);
326             new_offset = r_out_ptr - r_buf; 
327
328             numKeys++;
329
330             if (border < new_offset && border >= r_offset)
331             {
332                 if (debug > 2)
333                     logf (LOG_LOG, "isc: border %d %d", ptr, border);
334                 /* Max size of current block category reached ...
335                    make new virtual block entry */
336                 mb[++ptr].block = 0;
337                 mb[ptr].dirty = 1;
338                 mb[ptr].offset = r_offset;
339                 if (cat == is->max_cat && ptr >= is->method->max_blocks_mem)
340                 {
341                     /* We are dealing with block(s) of max size. Block(s)
342                        except one will be flushed. Note: the block(s) are
343                        surely not the last one(s).
344                      */
345                     if (debug > 2)
346                         logf (LOG_LOG, "isc: flush B %d sections", ptr-1);
347                     flush_blocks (is, mb, ptr-1, r_buf, &firstpos, cat,
348                                   0, &pp->numKeys);
349                     mb[0].block = mb[ptr-1].block;
350                     mb[0].dirty = mb[ptr-1].dirty;
351                     memcpy (r_buf, r_buf + mb[ptr-1].offset,
352                             mb[ptr].offset - mb[ptr-1].offset);
353                     mb[0].offset = 0;
354
355                     mb[1].block = mb[ptr].block;
356                     mb[1].dirty = mb[0].dirty;
357                     mb[1].offset = mb[ptr].offset - mb[ptr-1].offset;
358                     memcpy (r_buf + mb[1].offset, r_buf + r_offset,
359                             new_offset - r_offset);
360                     new_offset = (new_offset - r_offset) + mb[1].offset;
361                     ptr = 1;
362                 }
363                 border = get_border (is, mb, ptr, cat, firstpos);
364             }
365             r_offset = new_offset;
366         }
367         if (cat < is->max_cat && ptr >= is->method->filecat[cat].mblocks)
368         {
369             /* Max number blocks in current category reached ->
370                must switch to next category (with larger block size) 
371             */
372             int j = 0;
373
374             (is->files[cat].no_remap)++;
375             /* delete all original block(s) read so far */
376             for (i = 0; i < ptr; i++)
377                 if (mb[i].block)
378                     isc_release_block (is, pp->cat, mb[i].block);
379             /* also delete all block to be read in the future */
380             pp->deleteFlag = 1;
381
382             /* remap block offsets */
383             assert (mb[j].offset == 0);
384             cat++;
385             mb[j].dirty = 1;
386             mb[j].block = 0;
387             mb[ptr].offset = r_offset;
388             for (i = 1; i < ptr; i++)
389             {
390                 int border = is->method->filecat[cat].ifill -
391                          ISAMC_BLOCK_OFFSET_1 + mb[j].offset;
392                 if (debug > 3)
393                     logf (LOG_LOG, "isc: remap %d border=%d", i, border);
394                 if (mb[i+1].offset > border && mb[i].offset <= border)
395                 {
396                     if (debug > 3)
397                         logf (LOG_LOG, "isc:  to %d %d", j, mb[i].offset);
398                     mb[++j].dirty = 1;
399                     mb[j].block = 0;
400                     mb[j].offset = mb[i].offset;
401                 }
402             }
403             if (debug > 2)
404                 logf (LOG_LOG, "isc: remap from %d to %d sections to cat %d",
405                       ptr, j, cat);
406             ptr = j;
407             border = get_border (is, mb, ptr, cat, firstpos);
408             if (debug > 3)
409                 logf (LOG_LOG, "isc: border=%d r_offset=%d", border, r_offset);
410         }
411     }
412     if (mb[ptr].offset < r_offset)
413     {   /* make the final boundary offset */
414         mb[++ptr].dirty = 1; 
415         mb[ptr].block = 0; 
416         mb[ptr].offset = r_offset;
417     }
418     else
419     {   /* empty output. Release last block if any */
420         if (cat == pp->cat && mb[ptr].block)
421         {
422             if (debug > 3)
423                 logf (LOG_LOG, "isc: release C");
424             isc_release_block (is, pp->cat, mb[ptr].block);
425             mb[ptr].block = 0;
426             if (ptr > 0)
427                 mb[ptr-1].dirty = 1;
428         }
429     }
430
431     if (debug > 2)
432         logf (LOG_LOG, "isc: flush C, %d sections", ptr);
433
434     if (firstpos)
435     {
436         /* we have to patch initial block with num keys if that
437            has changed */
438         if (numKeys != isc_pp_num (pp))
439         {
440             if (debug > 2)
441                 logf (LOG_LOG, "isc: patch num keys firstpos=%d num=%d",
442                                 firstpos, numKeys);
443             bf_write (is->files[cat].bf, firstpos, ISAMC_BLOCK_OFFSET_N,
444                       sizeof(numKeys), &numKeys);
445         }
446     }
447     else if (ptr > 0)
448     {   /* we haven't flushed initial block yet and there surely are some
449            blocks to flush. Make first block dirty if numKeys differ */
450         if (numKeys != isc_pp_num (pp))
451             mb[0].dirty = 1;
452     }
453     /* flush rest of block(s) in r_buf */
454     flush_blocks (is, mb, ptr, r_buf, &firstpos, cat, 1, &numKeys);
455
456     (*is->method->code_stop)(ISAMC_ENCODE, r_clientData);
457     if (!firstpos)
458         cat = 0;
459     if (debug > 1)
460         logf (LOG_LOG, "isc: isc_merge return %d %d", cat, firstpos);
461     isc_pp_close (pp);
462     return cat + firstpos * 8;
463 }
464
465 static char *hexdump(unsigned char *p, int len, char *buff) {
466   static char localbuff[128];
467   char bytebuff[8];
468   if (!buff) buff=localbuff;
469   *buff='\0';
470   while (len--) {
471     sprintf(bytebuff,"%02x",*p);
472     p++;
473     strcat(buff,bytebuff);
474     if (len) strcat(buff,",");
475   }
476   return buff;
477 }
478
479
480 /* isamh - heikki's append-only isam 
481  * Idea: When allocating a new block, allocate memory for a very large block
482  *       (maximal blocksize). When done, see if you can shrink it to some 
483  *       smaller size. First-time indexing will go in optimal blocks, and
484  *       following small additions will go to the end of the last of the 
485  *       maximal ones. Only later, when new blocks need to be allocated, it
486  *       may make sense to reserve some extra space...
487 */
488
489
490 static void isamh_reduceblock(ISAMH is, ISAMH_PP pp, int numKeys)
491 {
492   if (pp->is->method->debug > 2)
493      logf(LOG_LOG,"isamh_reduce: block p=%d c=%d o=%d nk=%d ", 
494        pp->pos, pp->cat, pp->offset, numKeys);
495   if (pp->pos != 0)
496     return; /* already allocated in some size */
497   while ( ( pp->cat > 0 ) &&
498           ( pp->offset < is->method->filecat[pp->cat-1].bsize) &&
499           ( numKeys < is->method->filecat[pp->cat-1].mblocks) )
500     pp->cat--;
501   pp->pos =  isamh_alloc_block(is,pp->cat) ;
502   if (pp->is->method->debug > 2)
503      logf(LOG_LOG,"isamh_reduced block p=%d to c=%d o=%d nk=%d bs=%d", 
504          pp->pos, pp->cat, pp->offset, numKeys, 
505          is->method->filecat[pp->cat].bsize);
506 } /* reduceblock */
507
508
509 ISAMC_P isamh_append (ISAMH is, ISAMH_P ipos, ISAMH_I data)
510 {
511
512     ISAMH_PP pp;      /* always the last pp in the chain */ 
513     ISAMH_PP firstpp; /* always the first one in the chain, may ==pp */
514     ISAMH_PP prevpp;  /* the one that points to pp, may be null or ==firstpp */
515                       /* needed to postpone the writing of its next field until */
516                       /* pp itself has been categorized */
517     char i_item[128];
518     char *i_item_ptr;
519     int i_more=1, i_mode;
520
521     char codebuffer[128];
522     char *codeptr;
523     char *bufptr;
524     int codelen;
525
526     void *r_clientData;   /* encode client data */
527     int newblock;
528     int newcat;
529     int maxkeys;
530     int maxsize;
531     int retval;
532     int maxcat;
533             
534     pp = firstpp = isamh_pp_open (is, ipos);
535     prevpp =0; /* only used when new blocks allocated */
536     
537     assert (*is->method->code_reset);
538     
539     maxcat=0; /* find the largest block size for default allocation */
540     for ( newcat=0; is->method->filecat[newcat].mblocks > 0; newcat++)
541       if ( is->method->filecat[newcat].bsize > 
542            is->method->filecat[maxcat].bsize)
543         maxcat = newcat;
544    
545     if ( 0==ipos)
546     {   /* new block */
547       pp->cat=maxcat; /* start large... */
548       pp->pos = 0; /* not allocated yet */ 
549       pp->size= pp->offset = ISAMH_BLOCK_OFFSET_1 ;
550       pp->buf=xrealloc(pp->buf,is->method->filecat[maxcat].bsize);
551       r_clientData = (*is->method->code_start)(ISAMH_ENCODE);
552       if (pp->is->method->debug > 2)
553         logf(LOG_LOG,"isamh_append: starting with new block");
554     }
555     else
556     { /* existing block */
557       if (isamh_block(firstpp->lastblock) == firstpp->pos) 
558         /*!!! TODO: BUG: Compare whole addresses !!! (later) */
559       { /* only one block, we have it already */
560         pp->offset=ISAMH_BLOCK_OFFSET_1;
561         if (pp->is->method->debug > 2)
562           logf(LOG_LOG,"isamh_append: starting with one block %d",pp->pos);
563       }
564       else
565       { 
566         if (pp->is->method->debug > 2)
567           logf(LOG_LOG,"isamh_append: starting with multiple blocks %d>%d>%d",
568             firstpp->pos,isamh_block(firstpp->next),isamh_block(firstpp->lastblock));
569         pp=isamh_pp_open(is,firstpp->lastblock);
570           /* dirty, but this can also read a N-block. Just clear extra values*/
571         pp->lastblock=0;
572         pp->offset=ISAMH_BLOCK_OFFSET_N;
573       } /* get last */ 
574       r_clientData = (*is->method->code_start)(ISAMH_ENCODE);
575       if (pp->is->method->debug > 3)
576         logf(LOG_LOG,"isamh_append: scanning to end of block %d %d->%d",
577              pp->pos, pp->offset, pp->size);
578       codeptr=codebuffer;
579       while (pp->offset<pp->size) {
580         codeptr=codebuffer;
581         bufptr=pp->buf + pp->offset;
582         (*is->method->code_item)(ISAMH_DECODE, r_clientData, &codeptr, &bufptr);
583         codelen = bufptr - (pp->buf+pp->offset) ;
584         if (pp->is->method->debug > 3)
585           logf(LOG_LOG,"isamh_append: dec at %d %d/%d:%s",
586             pp->offset, codelen, codeptr-codebuffer,
587             hexdump(codebuffer,codeptr-codebuffer,0) );
588         pp->offset += codelen;
589       }
590     } /* existing block */
591
592     
593     i_item_ptr = i_item;
594     i_more = (*data->read_item)(data->clientData,&i_item_ptr,&i_mode);
595     if (pp->is->method->debug > 3)
596       logf(LOG_LOG,"isamh_append 1: m=%d l=%d %s",
597         i_mode, i_item_ptr-i_item, hexdump(i_item,i_item_ptr-i_item,0));
598
599     maxsize = is->method->filecat[pp->cat].bsize;
600     
601     while(i_more) {
602        if (i_mode) 
603        { /* insert key, ignore all delete keys time being... */
604           codeptr = codebuffer;
605           i_item_ptr=i_item;
606           (*is->method->code_item)(ISAMH_ENCODE, r_clientData, &codeptr, &i_item_ptr);
607           codelen = codeptr-codebuffer;
608    
609           assert( (codelen < 128) && (codelen>0));
610        
611           if (pp->is->method->debug > 3)
612             logf(LOG_LOG,"isamh_append: coded into %d:%s  (nk=%d)", 
613                codelen,hexdump(codebuffer,codelen,0),firstpp->numKeys);
614
615           if ( pp->offset + codelen > maxsize ) 
616           { /* oops, block full, do something */
617             newcat = maxcat; /* start with a large block again */
618             maxkeys = is->method->filecat[pp->cat].mblocks;  /* max keys */
619             if (pp->is->method->debug > 2)
620               logf(LOG_LOG,"isamh_append: need new block: %d > %d (k:%d/%d)", 
621                   pp->offset + codelen, maxsize, firstpp->numKeys,maxkeys );
622
623             newblock = 0; 
624             pp->next = 0; 
625             
626             /* Four possibilities: */
627             if (prevpp!=0) 
628             {  /* 1: we have a prevpp that can go on the disk */
629                /*    reduce pp, set next ptr, and store block */
630                /*    set pp up as new last block, and remember the just */
631                /*    filled as (pp) the new prev */
632               assert(pp!=firstpp); 
633               isamh_reduceblock(is,pp,firstpp->numKeys);
634               prevpp->next=isamh_addr(pp->pos,pp->cat);
635               isamh_buildlaterblock(prevpp);
636               isamh_write_block(is,prevpp->cat,prevpp->pos,prevpp->buf);
637               if (pp->is->method->debug > 2)
638                 logf(LOG_LOG,"isamh_append N1: Wrote prevpp (%d:%d) -> %d:%d",
639                       prevpp->pos, prevpp->cat, pp->pos, pp->cat);
640               isamh_pp_close(prevpp); /* it's done its job */
641               prevpp = pp;
642               pp=isamh_pp_open(is,isamh_addr(0,maxcat)); /* start a large one */ 
643               pp->size=pp->offset=ISAMH_BLOCK_OFFSET_N ;
644               pp->next=0;
645               pp->lastblock=0;
646             }
647             else if ( (firstpp!=pp) && (firstpp->next != 0))
648             { /* 2: we are working at end of list, but have no prevpp */
649               /*    some block (already on disk) points to pp */ 
650               /*    set the newly filled block as prev, don't save yet */
651               /*    allocate new pp */
652               if (pp->is->method->debug > 2)
653                 logf(LOG_LOG,"isamh_append N2: set up a new prevpp (%d:%d)",
654                       pp->pos, pp->cat);
655               prevpp = pp;
656               pp=isamh_pp_open(is,isamh_addr(0,maxcat)); /* start a large one */ 
657               pp->size=pp->offset=ISAMH_BLOCK_OFFSET_N ;
658               pp->next=0;
659               pp->lastblock=0;              
660             }
661             else if ( (firstpp!=pp) && (firstpp->next==0))
662             { /* 3: we have earlier allocated pp as the second block */
663               /*    reduce it to get its address. Set first->next to it */
664               /*    move it to prevpp, and create a new pp */
665               isamh_reduceblock(is,pp,firstpp->numKeys);
666               firstpp->next=isamh_addr(pp->pos,pp->cat);
667               if (pp->is->method->debug > 2)
668                 logf(LOG_LOG,"isamh_append N3: set up a new firstpp->next (%d:%d)",
669                       pp->pos, pp->cat);
670               prevpp=pp;
671               pp=isamh_pp_open(is,isamh_addr(0,maxcat)); /* start a large one */ 
672               pp->size=pp->offset=ISAMH_BLOCK_OFFSET_N ;
673               pp->next=0;
674               pp->lastblock=0;                            
675             }
676             else 
677             { /* 4: firstpp itself has got full. */
678               /* allocate a new pp for it, store nothing */
679               assert(firstpp==pp);
680               assert(firstpp->next==0);
681               if (pp->is->method->debug > 2)
682                 logf(LOG_LOG,"isamh_append N4: allocated new pp for full first (%d:%d)",
683                       firstpp->pos, firstpp->cat);
684               pp=isamh_pp_open(is,isamh_addr(0,maxcat)); /* start a large one */ 
685               pp->size=pp->offset=ISAMH_BLOCK_OFFSET_N ;
686               pp->next=0;
687               pp->lastblock=0;       
688             }
689
690             maxsize = is->method->filecat[pp->cat].bsize;
691             pp->size=pp->offset=ISAMH_BLOCK_OFFSET_N ;
692             pp->next=0;
693             pp->lastblock=0;
694             if (pp->is->method->debug > 2)
695               logf(LOG_LOG,"isamh_append: got a new block c=%d p=%d",pp->cat,pp->pos);
696
697
698             /* reset the encoding, and code again */
699             (*is->method->code_reset)(r_clientData);
700             codeptr = codebuffer;
701             i_item_ptr=i_item;
702             (*is->method->code_item)(ISAMH_ENCODE, r_clientData, &codeptr, &i_item_ptr);
703             codelen = codeptr-codebuffer;
704             if (pp->is->method->debug > 3)
705               logf(LOG_LOG,"isamh_append: coded again %d:%s  (nk=%d)", 
706                    codelen,hexdump(codebuffer,codelen,0),firstpp->numKeys);
707
708           } /* block full */
709
710           /* ok, now we can write it */
711           memcpy(&(pp->buf[pp->offset]), codebuffer, codelen);
712           pp->offset += codelen;
713           pp->size += codelen;
714           firstpp->numKeys++; 
715        } /* not a delete */
716
717        /* try to read the next element */
718        i_item_ptr = i_item;
719        i_more = (*data->read_item)(data->clientData,&i_item_ptr,&i_mode);
720        if (pp->is->method->debug > 3)
721          logf(LOG_LOG,"isamh_append 2: m=%d l=%d %s",
722              i_mode, i_item_ptr-i_item, hexdump(i_item,i_item_ptr-i_item,0));
723     
724     } /* while */
725
726     isamh_reduceblock(is,pp,firstpp->numKeys);
727       /* may be the same as firstpp! */
728
729     if (prevpp)
730     { /* Write the prev block we have been holding off */
731       assert(prevpp->pos);
732       prevpp->next = isamh_addr(pp->pos, pp->cat);
733       isamh_buildlaterblock(prevpp);
734       isamh_write_block(is,prevpp->cat,prevpp->pos,prevpp->buf);
735       if (firstpp->next==0) /* can happen if extending many blocks */
736         firstpp->next=isamh_addr(prevpp->pos,prevpp->cat);
737       isamh_pp_close(prevpp);
738     }
739     /* Write the last (partial) block, if needed. */
740     if (pp!=firstpp) 
741     {
742       pp->next=0; /* just to be sure */
743       if (firstpp->next==0)
744         firstpp->next=isamh_addr(pp->pos,pp->cat);
745       isamh_buildlaterblock(pp);
746       isamh_write_block(is,pp->cat,pp->pos,pp->buf);    
747     }
748     
749     /* update first block and write it */
750     firstpp->lastblock = isamh_addr(pp->pos,pp->cat);
751     isamh_reduceblock(is,firstpp,firstpp->numKeys);
752     isamh_buildfirstblock(firstpp);
753     isamh_write_block(is,firstpp->cat,firstpp->pos,firstpp->buf);
754
755     /* release the second block, if we allocated one */
756     if ( firstpp != pp ) 
757       isamh_pp_close(pp); 
758  
759     /* get return value (before it disappears at close! */
760     retval = isamh_addr(firstpp->pos,firstpp->cat);
761
762     isamh_pp_close(firstpp);
763     
764     return retval;
765     
766 } /*  isamh_append */
767
768
769 /*
770  * $Log: merge.c,v $
771  * Revision 1.19  1999-07-14 12:12:07  heikki
772  * Large-block isam-h  (may not work too well... Abandoning for isam-d)
773  *
774  * Revision 1.17  1999/07/13 14:22:17  heikki
775  * Better allocation strategy in isamh_merge
776  *
777  * Revision 1.16  1999/07/08 14:23:27  heikki
778  * Fixed a bug in isamh_pp_read and cleaned up a bit
779  *
780  * Revision 1.15  1999/07/07 09:36:04  heikki
781  * Fixed an assertion in isamh
782  *
783  * Revision 1.13  1999/07/06 09:37:05  heikki
784  * Working on isamh - not ready yet.
785  *
786  * Revision 1.12  1999/06/30 15:03:55  heikki
787  * first take on isamh, the append-only isam structure
788  *
789  * Revision 1.11  1999/05/26 07:49:14  adam
790  * C++ compilation.
791  *
792  * Revision 1.10  1998/03/19 12:22:09  adam
793  * Minor change.
794  *
795  * Revision 1.9  1998/03/19 10:04:38  adam
796  * Minor changes.
797  *
798  * Revision 1.8  1998/03/18 09:23:55  adam
799  * Blocks are stored in chunks on free list - up to factor 2 in speed.
800  * Fixed bug that could occur in block category rearrangemen.
801  *
802  * Revision 1.7  1998/03/11 11:18:18  adam
803  * Changed the isc_merge to take into account the mfill (minimum-fill).
804  *
805  * Revision 1.6  1998/03/06 13:54:03  adam
806  * Fixed two nasty bugs in isc_merge.
807  *
808  * Revision 1.5  1997/02/12 20:42:43  adam
809  * Bug fix: during isc_merge operations, some pages weren't marked dirty
810  * even though they should be. At this point the merge operation marks
811  * a page dirty if the previous page changed at all. A better approach is
812  * to mark it dirty if the last key written changed in previous page.
813  *
814  * Revision 1.4  1996/11/08 11:15:31  adam
815  * Number of keys in chain are stored in first block and the function
816  * to retrieve this information, isc_pp_num is implemented.
817  *
818  * Revision 1.3  1996/11/04 14:08:59  adam
819  * Optimized free block usage.
820  *
821  * Revision 1.2  1996/11/01 13:36:46  adam
822  * New element, max_blocks_mem, that control how many blocks of max size
823  * to store in memory during isc_merge.
824  * Function isc_merge now ignores delete/update of identical keys and
825  * the proper blocks are then non-dirty and not written in flush_blocks.
826  *
827  * Revision 1.1  1996/11/01  08:59:15  adam
828  * First version of isc_merge that supports update/delete.
829  *
830  */
831
832