b6eaca284c5311045baedf305cef0c40626bd6df
[idzebra-moved-to-github.git] / index / kinput.c
1 /*
2  * Copyright (C) 1994-2002, Index Data
3  * All rights reserved.
4  * Sebastian Hammer, Adam Dickmeiss, Heikki Levanto
5  *
6  * $Id: kinput.c,v 1.47 2002-04-04 14:14:13 adam Exp $
7  *
8  * Bugs
9  *  - Allocates a lot of memory for the merge process, but never releases it.
10  *    Doesn't matter, as the program terminates soon after.  
11  
12  */
13  
14 #include <fcntl.h>
15 #ifdef WIN32
16 #include <io.h>
17 #else
18 #include <unistd.h>
19 #endif
20 #include <stdlib.h>
21 #include <string.h>
22 #include <stdio.h>
23 #include <assert.h>
24
25 #include "index.h"
26
27 #define KEY_SIZE (1+sizeof(struct it_key))
28 #define INP_NAME_MAX 768
29 #define INP_BUF_START 60000
30 #define INP_BUF_ADD  400000
31
32 static int no_diffs   = 0;
33 static int no_updates = 0;
34 static int no_deletions = 0;
35 static int no_insertions = 0;
36 static int no_iterations = 0;
37
38 struct key_file {
39     int   no;            /* file no */
40     off_t offset;        /* file offset */
41     unsigned char *buf;  /* buffer block */
42     size_t buf_size;     /* number of read bytes in block */
43     size_t chunk;        /* number of bytes allocated */
44     size_t buf_ptr;      /* current position in buffer */
45     char *prev_name;     /* last word read */
46     int   sysno;         /* last sysno */
47     int   seqno;         /* last seqno */
48     off_t length;        /* length of file */
49                          /* handler invoked in each read */
50     void (*readHandler)(struct key_file *keyp, void *rinfo);
51     void *readInfo;
52     Res res;
53 };
54
55 void getFnameTmp (Res res, char *fname, int no)
56 {
57     const char *pre;
58     
59     pre = res_get_def (res, "keyTmpDir", ".");
60     sprintf (fname, "%s/key%d.tmp", pre, no);
61 }
62
63 void extract_get_fname_tmp (ZebraHandle zh, char *fname, int no)
64 {
65     const char *pre;
66     
67     pre = res_get_def (zh->res, "keyTmpDir", ".");
68     sprintf (fname, "%s/key%d.tmp", pre, no);
69 }
70
71 void key_file_chunk_read (struct key_file *f)
72 {
73     int nr = 0, r = 0, fd;
74     char fname[1024];
75     getFnameTmp (f->res, fname, f->no);
76     fd = open (fname, O_BINARY|O_RDONLY);
77
78     f->buf_ptr = 0;
79     f->buf_size = 0;
80     if (fd == -1)
81     {
82         logf (LOG_WARN|LOG_ERRNO, "cannot open %s", fname);
83         return ;
84     }
85     if (!f->length)
86     {
87         if ((f->length = lseek (fd, 0L, SEEK_END)) == (off_t) -1)
88         {
89             logf (LOG_WARN|LOG_ERRNO, "cannot seek %s", fname);
90             close (fd);
91             return ;
92         }
93     }
94     if (lseek (fd, f->offset, SEEK_SET) == -1)
95     {
96         logf (LOG_WARN|LOG_ERRNO, "cannot seek %s", fname);
97         close(fd);
98         return ;
99     }
100     while (f->chunk - nr > 0)
101     {
102         r = read (fd, f->buf + nr, f->chunk - nr);
103         if (r <= 0)
104             break;
105         nr += r;
106     }
107     if (r == -1)
108     {
109         logf (LOG_WARN|LOG_ERRNO, "read of %s", fname);
110         close (fd);
111         return;
112     }
113     f->buf_size = nr;
114     if (f->readHandler)
115         (*f->readHandler)(f, f->readInfo);
116     close (fd);
117 }
118
119 void key_file_destroy (struct key_file *f)
120 {
121     xfree (f->buf);
122     xfree (f->prev_name);
123     xfree (f);
124 }
125
126 struct key_file *key_file_init (int no, int chunk, Res res)
127 {
128     struct key_file *f;
129
130     f = (struct key_file *) xmalloc (sizeof(*f));
131     f->res = res;
132     f->sysno = 0;
133     f->seqno = 0;
134     f->no = no;
135     f->chunk = chunk;
136     f->offset = 0;
137     f->length = 0;
138     f->readHandler = NULL;
139     f->buf = (unsigned char *) xmalloc (f->chunk);
140     f->prev_name = (char *) xmalloc (INP_NAME_MAX);
141     *f->prev_name = '\0';
142     key_file_chunk_read (f);
143     return f;
144 }
145
146 int key_file_getc (struct key_file *f)
147 {
148     if (f->buf_ptr < f->buf_size)
149         return f->buf[(f->buf_ptr)++];
150     if (f->buf_size < f->chunk)
151         return EOF;
152     f->offset += f->buf_size;
153     key_file_chunk_read (f);
154     if (f->buf_ptr < f->buf_size)
155         return f->buf[(f->buf_ptr)++];
156     else
157         return EOF;
158 }
159
160 int key_file_decode (struct key_file *f)
161 {
162     int c, d;
163
164     c = key_file_getc (f);
165     switch (c & 192) 
166     {
167     case 0:
168         d = c;
169         break;
170     case 64:
171         d = ((c&63) << 8) + (key_file_getc (f) & 0xff);
172         break;
173     case 128:
174         d = ((c&63) << 8) + (key_file_getc (f) & 0xff);
175         d = (d << 8) + (key_file_getc (f) & 0xff);
176         break;
177     case 192:
178         d = ((c&63) << 8) + (key_file_getc (f) & 0xff);
179         d = (d << 8) + (key_file_getc (f) & 0xff);
180         d = (d << 8) + (key_file_getc (f) & 0xff);
181         break;
182     }
183     return d;
184 }
185
186 int key_file_read (struct key_file *f, char *key)
187 {
188     int i, d, c;
189     struct it_key itkey;
190
191     c = key_file_getc (f);
192     if (c == 0)
193     {
194         strcpy (key, f->prev_name);
195         i = 1+strlen (key);
196     }
197     else if (c == EOF)
198         return 0;
199     else
200     {
201         i = 0;
202         key[i++] = c;
203         while ((key[i++] = key_file_getc (f)))
204             ;
205         strcpy (f->prev_name, key);
206         f->sysno = 0;
207     }
208     d = key_file_decode (f);
209     key[i++] = d & 1;
210     d = d >> 1;
211     itkey.sysno = d + f->sysno;
212     if (d) 
213     {
214         f->sysno = itkey.sysno;
215         f->seqno = 0;
216     }
217     d = key_file_decode (f);
218     itkey.seqno = d + f->seqno;
219     f->seqno = itkey.seqno;
220     memcpy (key + i, &itkey, sizeof(struct it_key));
221     return i + sizeof (struct it_key);
222 }
223
224 struct heap_info {
225     struct {
226         struct key_file **file;
227         char   **buf;
228     } info;
229     int    heapnum;
230     int    *ptr;
231     int    (*cmp)(const void *p1, const void *p2);
232     Dict dict;
233     ISAMS isams;
234 #if ZMBOL
235     ISAM isam;
236     ISAMC isamc;
237     ISAMD isamd;
238 #endif
239 };
240
241 struct heap_info *key_heap_init (int nkeys,
242                                  int (*cmp)(const void *p1, const void *p2))
243 {
244     struct heap_info *hi;
245     int i;
246
247     hi = (struct heap_info *) xmalloc (sizeof(*hi));
248     hi->info.file = (struct key_file **)
249         xmalloc (sizeof(*hi->info.file) * (1+nkeys));
250     hi->info.buf = (char **) xmalloc (sizeof(*hi->info.buf) * (1+nkeys));
251     hi->heapnum = 0;
252     hi->ptr = (int *) xmalloc (sizeof(*hi->ptr) * (1+nkeys));
253     hi->cmp = cmp;
254     for (i = 0; i<= nkeys; i++)
255     {
256         hi->ptr[i] = i;
257         hi->info.buf[i] = (char *) xmalloc (INP_NAME_MAX);
258     }
259     return hi;
260 }
261
262 void key_heap_destroy (struct heap_info *hi, int nkeys)
263 {
264     int i;
265     yaz_log (LOG_LOG, "key_heap_destroy");
266     for (i = 0; i<=nkeys; i++)
267         xfree (hi->info.buf[i]);
268     
269     xfree (hi->info.buf);
270     xfree (hi->ptr);
271     xfree (hi->info.file);
272     xfree (hi);
273 }
274
275 static void key_heap_swap (struct heap_info *hi, int i1, int i2)
276 {
277     int swap;
278
279     swap = hi->ptr[i1];
280     hi->ptr[i1] = hi->ptr[i2];
281     hi->ptr[i2] = swap;
282 }
283
284
285 static void key_heap_delete (struct heap_info *hi)
286 {
287     int cur = 1, child = 2;
288
289     assert (hi->heapnum > 0);
290
291     key_heap_swap (hi, 1, hi->heapnum);
292     hi->heapnum--;
293     while (child <= hi->heapnum) {
294         if (child < hi->heapnum &&
295             (*hi->cmp)(&hi->info.buf[hi->ptr[child]],
296                        &hi->info.buf[hi->ptr[child+1]]) > 0)
297             child++;
298         if ((*hi->cmp)(&hi->info.buf[hi->ptr[cur]],
299                        &hi->info.buf[hi->ptr[child]]) > 0)
300         {            
301             key_heap_swap (hi, cur, child);
302             cur = child;
303             child = 2*cur;
304         }
305         else
306             break;
307     }
308 }
309
310 static void key_heap_insert (struct heap_info *hi, const char *buf, int nbytes,
311                              struct key_file *kf)
312 {
313     int cur, parent;
314
315     cur = ++(hi->heapnum);
316     memcpy (hi->info.buf[hi->ptr[cur]], buf, nbytes);
317     hi->info.file[hi->ptr[cur]] = kf;
318
319     parent = cur/2;
320     while (parent && (*hi->cmp)(&hi->info.buf[hi->ptr[parent]],
321                                 &hi->info.buf[hi->ptr[cur]]) > 0)
322     {
323         key_heap_swap (hi, cur, parent);
324         cur = parent;
325         parent = cur/2;
326     }
327 }
328
329 static int heap_read_one (struct heap_info *hi, char *name, char *key)
330 {
331     int n, r;
332     char rbuf[INP_NAME_MAX];
333     struct key_file *kf;
334
335     if (!hi->heapnum)
336         return 0;
337     n = hi->ptr[1];
338     strcpy (name, hi->info.buf[n]);
339     kf = hi->info.file[n];
340     r = strlen(name);
341     memcpy (key, hi->info.buf[n] + r+1, KEY_SIZE);
342     key_heap_delete (hi);
343     if ((r = key_file_read (kf, rbuf)))
344         key_heap_insert (hi, rbuf, r, kf);
345     no_iterations++;
346     return 1;
347 }
348
349 struct heap_cread_info {
350     char prev_name[INP_NAME_MAX];
351     char cur_name[INP_NAME_MAX];
352     char *key;
353     struct heap_info *hi;
354     int mode;
355     int more;
356 };
357       
358 int heap_cread_item (void *vp, char **dst, int *insertMode)
359 {
360     struct heap_cread_info *p = (struct heap_cread_info *) vp;
361     struct heap_info *hi = p->hi;
362
363     if (p->mode == 1)
364     {
365         *insertMode = p->key[0];
366         memcpy (*dst, p->key+1, sizeof(struct it_key));
367         (*dst) += sizeof(struct it_key);
368         p->mode = 2;
369         return 1;
370     }
371     strcpy (p->prev_name, p->cur_name);
372     if (!(p->more = heap_read_one (hi, p->cur_name, p->key)))
373         return 0;
374     if (*p->cur_name && strcmp (p->cur_name, p->prev_name))
375     {
376         p->mode = 1;
377         return 0;
378     }
379     *insertMode = p->key[0];
380     memcpy (*dst, p->key+1, sizeof(struct it_key));
381     (*dst) += sizeof(struct it_key);
382     return 1;
383 }
384
385 #if ZMBOL
386 int heap_inpc (struct heap_info *hi)
387 {
388     struct heap_cread_info hci;
389     ISAMC_I isamc_i = (ISAMC_I) xmalloc (sizeof(*isamc_i));
390
391     hci.key = (char *) xmalloc (KEY_SIZE);
392     hci.mode = 1;
393     hci.hi = hi;
394     hci.more = heap_read_one (hi, hci.cur_name, hci.key);
395
396     isamc_i->clientData = &hci;
397     isamc_i->read_item = heap_cread_item;
398
399     while (hci.more)
400     {
401         char this_name[INP_NAME_MAX];
402         ISAMC_P isamc_p, isamc_p2;
403         char *dict_info;
404
405         strcpy (this_name, hci.cur_name);
406         assert (hci.cur_name[1]);
407         no_diffs++;
408         if ((dict_info = dict_lookup (hi->dict, hci.cur_name)))
409         {
410             memcpy (&isamc_p, dict_info+1, sizeof(ISAMC_P));
411             isamc_p2 = isc_merge (hi->isamc, isamc_p, isamc_i);
412             if (!isamc_p2)
413             {
414                 no_deletions++;
415                 if (!dict_delete (hi->dict, this_name))
416                     abort();
417             }
418             else 
419             {
420                 no_updates++;
421                 if (isamc_p2 != isamc_p)
422                     dict_insert (hi->dict, this_name,
423                                  sizeof(ISAMC_P), &isamc_p2);
424             }
425         } 
426         else
427         {
428             isamc_p = isc_merge (hi->isamc, 0, isamc_i);
429             no_insertions++;
430             dict_insert (hi->dict, this_name, sizeof(ISAMC_P), &isamc_p);
431         }
432     }
433     xfree (isamc_i);
434     xfree (hci.key);
435     return 0;
436
437
438 int heap_inpd (struct heap_info *hi)
439 {
440     struct heap_cread_info hci;
441     ISAMD_I isamd_i = (ISAMD_I) xmalloc (sizeof(*isamd_i));
442
443     hci.key = (char *) xmalloc (KEY_SIZE);
444     hci.mode = 1;
445     hci.hi = hi;
446     hci.more = heap_read_one (hi, hci.cur_name, hci.key);
447
448     isamd_i->clientData = &hci;
449     isamd_i->read_item = heap_cread_item;
450
451     while (hci.more)
452     {
453         char this_name[INP_NAME_MAX];
454         ISAMD_P isamd_p, isamd_p2;
455         char *dict_info;
456
457         strcpy (this_name, hci.cur_name);
458         assert (hci.cur_name[1]);
459         no_diffs++;
460         if ((dict_info = dict_lookup (hi->dict, hci.cur_name)))
461         {
462             memcpy (&isamd_p, dict_info+1, sizeof(ISAMD_P));
463             isamd_p2 = isamd_append (hi->isamd, isamd_p, isamd_i);
464             if (!isamd_p2)
465             {
466                 no_deletions++;
467                 if (!dict_delete (hi->dict, this_name))
468                     abort();
469             }
470             else 
471             {
472                 no_updates++;
473                 if (isamd_p2 != isamd_p)
474                     dict_insert (hi->dict, this_name,
475                                  sizeof(ISAMD_P), &isamd_p2);
476             }
477         } 
478         else
479         {
480             isamd_p = isamd_append (hi->isamd, 0, isamd_i);
481             no_insertions++;
482             dict_insert (hi->dict, this_name, sizeof(ISAMD_P), &isamd_p);
483         }
484     }
485     xfree (isamd_i);
486     return 0;
487
488
489 int heap_inp (struct heap_info *hi)
490 {
491     char *info;
492     char next_name[INP_NAME_MAX];
493     char cur_name[INP_NAME_MAX];
494     int key_buf_size = INP_BUF_START;
495     int key_buf_ptr;
496     char *next_key;
497     char *key_buf;
498     int more;
499     
500     next_key = (char *) xmalloc (KEY_SIZE);
501     key_buf = (char *) xmalloc (key_buf_size);
502     more = heap_read_one (hi, cur_name, key_buf);
503     while (more)                   /* EOF ? */
504     {
505         int nmemb;
506         key_buf_ptr = KEY_SIZE;
507         while (1)
508         {
509             if (!(more = heap_read_one (hi, next_name, next_key)))
510                 break;
511             if (*next_name && strcmp (next_name, cur_name))
512                 break;
513             memcpy (key_buf + key_buf_ptr, next_key, KEY_SIZE);
514             key_buf_ptr += KEY_SIZE;
515             if (key_buf_ptr+(int) KEY_SIZE >= key_buf_size)
516             {
517                 char *new_key_buf;
518                 new_key_buf = (char *) xmalloc (key_buf_size + INP_BUF_ADD);
519                 memcpy (new_key_buf, key_buf, key_buf_size);
520                 key_buf_size += INP_BUF_ADD;
521                 xfree (key_buf);
522                 key_buf = new_key_buf;
523             }
524         }
525         no_diffs++;
526         nmemb = key_buf_ptr / KEY_SIZE;
527         assert (nmemb * (int) KEY_SIZE == key_buf_ptr);
528         if ((info = dict_lookup (hi->dict, cur_name)))
529         {
530             ISAM_P isam_p, isam_p2;
531             memcpy (&isam_p, info+1, sizeof(ISAM_P));
532             isam_p2 = is_merge (hi->isam, isam_p, nmemb, key_buf);
533             if (!isam_p2)
534             {
535                 no_deletions++;
536                 if (!dict_delete (hi->dict, cur_name))
537                     abort ();
538             }
539             else 
540             {
541                 no_updates++;
542                 if (isam_p2 != isam_p)
543                     dict_insert (hi->dict, cur_name, sizeof(ISAM_P), &isam_p2);
544             }
545         }
546         else
547         {
548             ISAM_P isam_p;
549             no_insertions++;
550             isam_p = is_merge (hi->isam, 0, nmemb, key_buf);
551             dict_insert (hi->dict, cur_name, sizeof(ISAM_P), &isam_p);
552         }
553         memcpy (key_buf, next_key, KEY_SIZE);
554         strcpy (cur_name, next_name);
555     }
556     return 0;
557 }
558
559 #endif
560
561 int heap_inps (struct heap_info *hi)
562 {
563     struct heap_cread_info hci;
564     ISAMS_I isams_i = (ISAMS_I) xmalloc (sizeof(*isams_i));
565
566     hci.key = (char *) xmalloc (KEY_SIZE);
567     hci.mode = 1;
568     hci.hi = hi;
569     hci.more = heap_read_one (hi, hci.cur_name, hci.key);
570
571     isams_i->clientData = &hci;
572     isams_i->read_item = heap_cread_item;
573
574     while (hci.more)
575     {
576         char this_name[INP_NAME_MAX];
577         ISAMS_P isams_p;
578         char *dict_info;
579
580         strcpy (this_name, hci.cur_name);
581         assert (hci.cur_name[1]);
582         no_diffs++;
583         if (!(dict_info = dict_lookup (hi->dict, hci.cur_name)))
584         {
585             isams_p = isams_merge (hi->isams, isams_i);
586             no_insertions++;
587             dict_insert (hi->dict, this_name, sizeof(ISAMS_P), &isams_p);
588         }
589         else
590         {
591             logf (LOG_FATAL, "isams doesn't support this kind of update");
592             break;
593         }
594     }
595     xfree (isams_i);
596     return 0;
597
598
599 struct progressInfo {
600     time_t   startTime;
601     time_t   lastTime;
602     off_t    totalBytes;
603     off_t    totalOffset;
604 };
605
606 void progressFunc (struct key_file *keyp, void *info)
607 {
608     struct progressInfo *p = (struct progressInfo *) info;
609     time_t now, remaining;
610
611     if (keyp->buf_size <= 0 || p->totalBytes <= 0)
612         return ;
613     time (&now);
614
615     if (now >= p->lastTime+10)
616     {
617         p->lastTime = now;
618         remaining = (time_t) ((now - p->startTime)*
619             ((double) p->totalBytes/p->totalOffset - 1.0));
620         if (remaining <= 130)
621             logf (LOG_LOG, "Merge %2.1f%% completed; %ld seconds remaining",
622                  (100.0*p->totalOffset) / p->totalBytes, (long) remaining);
623         else
624             logf (LOG_LOG, "Merge %2.1f%% completed; %ld minutes remaining",
625                  (100.0*p->totalOffset) / p->totalBytes, (long) remaining/60);
626     }
627     p->totalOffset += keyp->buf_size;
628 }
629
630 #ifndef R_OK
631 #define R_OK 4
632 #endif
633
634 void zebra_index_merge (ZebraHandle zh)
635 {
636     struct key_file **kf;
637     char rbuf[1024];
638     int i, r;
639     struct heap_info *hi;
640     struct progressInfo progressInfo;
641     int nkeys = zh->reg->key_file_no;
642     
643     if (nkeys < 0)
644     {
645         char fname[1024];
646         nkeys = 0;
647         while (1)
648         {
649             extract_get_fname_tmp  (zh, fname, nkeys+1);
650             if (access (fname, R_OK) == -1)
651                 break;
652             nkeys++;
653         }
654         if (!nkeys)
655             return ;
656     }
657     kf = (struct key_file **) xmalloc ((1+nkeys) * sizeof(*kf));
658     progressInfo.totalBytes = 0;
659     progressInfo.totalOffset = 0;
660     time (&progressInfo.startTime);
661     time (&progressInfo.lastTime);
662     for (i = 1; i<=nkeys; i++)
663     {
664         kf[i] = key_file_init (i, 8192, zh->res);
665         kf[i]->readHandler = progressFunc;
666         kf[i]->readInfo = &progressInfo;
667         progressInfo.totalBytes += kf[i]->length;
668         progressInfo.totalOffset += kf[i]->buf_size;
669     }
670     hi = key_heap_init (nkeys, key_qsort_compare);
671     hi->dict = zh->reg->dict;
672     hi->isams = zh->reg->isams;
673 #if ZMBOL
674     hi->isam = zh->reg->isam;
675     hi->isamc = zh->reg->isamc;
676     hi->isamd = zh->reg->isamd;
677 #endif
678     
679     for (i = 1; i<=nkeys; i++)
680         if ((r = key_file_read (kf[i], rbuf)))
681             key_heap_insert (hi, rbuf, r, kf[i]);
682     if (zh->reg->isams)
683         heap_inps (hi);
684 #if ZMBOL
685     else if (zh->reg->isamc)
686         heap_inpc (hi);
687     else if (zh->reg->isam)
688         heap_inp (hi);
689     else if (zh->reg->isamd)
690         heap_inpd (hi);
691 #endif
692         
693     for (i = 1; i<=nkeys; i++)
694     {
695         extract_get_fname_tmp  (zh, rbuf, i);
696         unlink (rbuf);
697     }
698     logf (LOG_LOG, "Iterations . . .%7d", no_iterations);
699     logf (LOG_LOG, "Distinct words .%7d", no_diffs);
700     logf (LOG_LOG, "Updates. . . . .%7d", no_updates);
701     logf (LOG_LOG, "Deletions. . . .%7d", no_deletions);
702     logf (LOG_LOG, "Insertions . . .%7d", no_insertions);
703     zh->reg->key_file_no = 0;
704
705     key_heap_destroy (hi, nkeys);
706     for (i = 1; i<=nkeys; i++)
707         key_file_destroy (kf[i]);
708     xfree (kf);
709 }
710
711