Fixes for update
[idzebra-moved-to-github.git] / index / kinput.c
1 /*
2  * Copyright (C) 1994-2002, Index Data
3  * All rights reserved.
4  * Sebastian Hammer, Adam Dickmeiss, Heikki Levanto
5  *
6  * $Id: kinput.c,v 1.46 2002-02-20 23:07:54 adam Exp $
7  *
8  * Bugs
9  *  - Allocates a lot of memory for the merge process, but never releases it.
10  *    Doesn't matter, as the program terminates soon after.  
11  
12  */
13  
14 #include <fcntl.h>
15 #ifdef WIN32
16 #include <io.h>
17 #else
18 #include <unistd.h>
19 #endif
20 #include <stdlib.h>
21 #include <string.h>
22 #include <stdio.h>
23 #include <assert.h>
24
25 #include "index.h"
26 #include "zserver.h"
27
28 #define KEY_SIZE (1+sizeof(struct it_key))
29 #define INP_NAME_MAX 768
30 #define INP_BUF_START 60000
31 #define INP_BUF_ADD  400000
32
33 static int no_diffs   = 0;
34 static int no_updates = 0;
35 static int no_deletions = 0;
36 static int no_insertions = 0;
37 static int no_iterations = 0;
38
39 struct key_file {
40     int   no;            /* file no */
41     off_t offset;        /* file offset */
42     unsigned char *buf;  /* buffer block */
43     size_t buf_size;     /* number of read bytes in block */
44     size_t chunk;        /* number of bytes allocated */
45     size_t buf_ptr;      /* current position in buffer */
46     char *prev_name;     /* last word read */
47     int   sysno;         /* last sysno */
48     int   seqno;         /* last seqno */
49     off_t length;        /* length of file */
50                          /* handler invoked in each read */
51     void (*readHandler)(struct key_file *keyp, void *rinfo);
52     void *readInfo;
53     Res res;
54 };
55
56 void getFnameTmp (Res res, char *fname, int no)
57 {
58     const char *pre;
59     
60     pre = res_get_def (res, "keyTmpDir", ".");
61     sprintf (fname, "%s/key%d.tmp", pre, no);
62 }
63
64 void extract_get_fname_tmp (ZebraHandle zh, char *fname, int no)
65 {
66     const char *pre;
67     
68     pre = res_get_def (zh->service->res, "keyTmpDir", ".");
69     sprintf (fname, "%s/key%d.tmp", pre, no);
70 }
71
72 void key_file_chunk_read (struct key_file *f)
73 {
74     int nr = 0, r = 0, fd;
75     char fname[1024];
76     getFnameTmp (f->res, fname, f->no);
77     fd = open (fname, O_BINARY|O_RDONLY);
78
79     f->buf_ptr = 0;
80     f->buf_size = 0;
81     if (fd == -1)
82     {
83         logf (LOG_WARN|LOG_ERRNO, "cannot open %s", fname);
84         return ;
85     }
86     if (!f->length)
87     {
88         if ((f->length = lseek (fd, 0L, SEEK_END)) == (off_t) -1)
89         {
90             logf (LOG_WARN|LOG_ERRNO, "cannot seek %s", fname);
91             close (fd);
92             return ;
93         }
94     }
95     if (lseek (fd, f->offset, SEEK_SET) == -1)
96     {
97         logf (LOG_WARN|LOG_ERRNO, "cannot seek %s", fname);
98         close(fd);
99         return ;
100     }
101     while (f->chunk - nr > 0)
102     {
103         r = read (fd, f->buf + nr, f->chunk - nr);
104         if (r <= 0)
105             break;
106         nr += r;
107     }
108     if (r == -1)
109     {
110         logf (LOG_WARN|LOG_ERRNO, "read of %s", fname);
111         close (fd);
112         return;
113     }
114     f->buf_size = nr;
115     if (f->readHandler)
116         (*f->readHandler)(f, f->readInfo);
117     close (fd);
118 }
119
120 void key_file_destroy (struct key_file *f)
121 {
122     xfree (f->buf);
123     xfree (f->prev_name);
124     xfree (f);
125 }
126
127 struct key_file *key_file_init (int no, int chunk, Res res)
128 {
129     struct key_file *f;
130
131     f = (struct key_file *) xmalloc (sizeof(*f));
132     f->res = res;
133     f->sysno = 0;
134     f->seqno = 0;
135     f->no = no;
136     f->chunk = chunk;
137     f->offset = 0;
138     f->length = 0;
139     f->readHandler = NULL;
140     f->buf = (unsigned char *) xmalloc (f->chunk);
141     f->prev_name = (char *) xmalloc (INP_NAME_MAX);
142     *f->prev_name = '\0';
143     key_file_chunk_read (f);
144     return f;
145 }
146
147 int key_file_getc (struct key_file *f)
148 {
149     if (f->buf_ptr < f->buf_size)
150         return f->buf[(f->buf_ptr)++];
151     if (f->buf_size < f->chunk)
152         return EOF;
153     f->offset += f->buf_size;
154     key_file_chunk_read (f);
155     if (f->buf_ptr < f->buf_size)
156         return f->buf[(f->buf_ptr)++];
157     else
158         return EOF;
159 }
160
161 int key_file_decode (struct key_file *f)
162 {
163     int c, d;
164
165     c = key_file_getc (f);
166     switch (c & 192) 
167     {
168     case 0:
169         d = c;
170         break;
171     case 64:
172         d = ((c&63) << 8) + (key_file_getc (f) & 0xff);
173         break;
174     case 128:
175         d = ((c&63) << 8) + (key_file_getc (f) & 0xff);
176         d = (d << 8) + (key_file_getc (f) & 0xff);
177         break;
178     case 192:
179         d = ((c&63) << 8) + (key_file_getc (f) & 0xff);
180         d = (d << 8) + (key_file_getc (f) & 0xff);
181         d = (d << 8) + (key_file_getc (f) & 0xff);
182         break;
183     }
184     return d;
185 }
186
187 int key_file_read (struct key_file *f, char *key)
188 {
189     int i, d, c;
190     struct it_key itkey;
191
192     c = key_file_getc (f);
193     if (c == 0)
194     {
195         strcpy (key, f->prev_name);
196         i = 1+strlen (key);
197     }
198     else if (c == EOF)
199         return 0;
200     else
201     {
202         i = 0;
203         key[i++] = c;
204         while ((key[i++] = key_file_getc (f)))
205             ;
206         strcpy (f->prev_name, key);
207         f->sysno = 0;
208     }
209     d = key_file_decode (f);
210     key[i++] = d & 1;
211     d = d >> 1;
212     itkey.sysno = d + f->sysno;
213     if (d) 
214     {
215         f->sysno = itkey.sysno;
216         f->seqno = 0;
217     }
218     d = key_file_decode (f);
219     itkey.seqno = d + f->seqno;
220     f->seqno = itkey.seqno;
221     memcpy (key + i, &itkey, sizeof(struct it_key));
222     return i + sizeof (struct it_key);
223 }
224
225 struct heap_info {
226     struct {
227         struct key_file **file;
228         char   **buf;
229     } info;
230     int    heapnum;
231     int    *ptr;
232     int    (*cmp)(const void *p1, const void *p2);
233     Dict dict;
234     ISAMS isams;
235 #if ZMBOL
236     ISAM isam;
237     ISAMC isamc;
238     ISAMD isamd;
239 #endif
240 };
241
242 struct heap_info *key_heap_init (int nkeys,
243                                  int (*cmp)(const void *p1, const void *p2))
244 {
245     struct heap_info *hi;
246     int i;
247
248     hi = (struct heap_info *) xmalloc (sizeof(*hi));
249     hi->info.file = (struct key_file **)
250         xmalloc (sizeof(*hi->info.file) * (1+nkeys));
251     hi->info.buf = (char **) xmalloc (sizeof(*hi->info.buf) * (1+nkeys));
252     hi->heapnum = 0;
253     hi->ptr = (int *) xmalloc (sizeof(*hi->ptr) * (1+nkeys));
254     hi->cmp = cmp;
255     for (i = 0; i<= nkeys; i++)
256     {
257         hi->ptr[i] = i;
258         hi->info.buf[i] = (char *) xmalloc (INP_NAME_MAX);
259     }
260     return hi;
261 }
262
263 void key_heap_destroy (struct heap_info *hi, int nkeys)
264 {
265     int i;
266     yaz_log (LOG_LOG, "key_heap_destroy");
267     for (i = 0; i<=nkeys; i++)
268         xfree (hi->info.buf[i]);
269     
270     xfree (hi->info.buf);
271     xfree (hi->ptr);
272     xfree (hi->info.file);
273     xfree (hi);
274 }
275
276 static void key_heap_swap (struct heap_info *hi, int i1, int i2)
277 {
278     int swap;
279
280     swap = hi->ptr[i1];
281     hi->ptr[i1] = hi->ptr[i2];
282     hi->ptr[i2] = swap;
283 }
284
285
286 static void key_heap_delete (struct heap_info *hi)
287 {
288     int cur = 1, child = 2;
289
290     assert (hi->heapnum > 0);
291
292     key_heap_swap (hi, 1, hi->heapnum);
293     hi->heapnum--;
294     while (child <= hi->heapnum) {
295         if (child < hi->heapnum &&
296             (*hi->cmp)(&hi->info.buf[hi->ptr[child]],
297                        &hi->info.buf[hi->ptr[child+1]]) > 0)
298             child++;
299         if ((*hi->cmp)(&hi->info.buf[hi->ptr[cur]],
300                        &hi->info.buf[hi->ptr[child]]) > 0)
301         {            
302             key_heap_swap (hi, cur, child);
303             cur = child;
304             child = 2*cur;
305         }
306         else
307             break;
308     }
309 }
310
311 static void key_heap_insert (struct heap_info *hi, const char *buf, int nbytes,
312                              struct key_file *kf)
313 {
314     int cur, parent;
315
316     cur = ++(hi->heapnum);
317     memcpy (hi->info.buf[hi->ptr[cur]], buf, nbytes);
318     hi->info.file[hi->ptr[cur]] = kf;
319
320     parent = cur/2;
321     while (parent && (*hi->cmp)(&hi->info.buf[hi->ptr[parent]],
322                                 &hi->info.buf[hi->ptr[cur]]) > 0)
323     {
324         key_heap_swap (hi, cur, parent);
325         cur = parent;
326         parent = cur/2;
327     }
328 }
329
330 static int heap_read_one (struct heap_info *hi, char *name, char *key)
331 {
332     int n, r;
333     char rbuf[INP_NAME_MAX];
334     struct key_file *kf;
335
336     if (!hi->heapnum)
337         return 0;
338     n = hi->ptr[1];
339     strcpy (name, hi->info.buf[n]);
340     kf = hi->info.file[n];
341     r = strlen(name);
342     memcpy (key, hi->info.buf[n] + r+1, KEY_SIZE);
343     key_heap_delete (hi);
344     if ((r = key_file_read (kf, rbuf)))
345         key_heap_insert (hi, rbuf, r, kf);
346     no_iterations++;
347     return 1;
348 }
349
350 struct heap_cread_info {
351     char prev_name[INP_NAME_MAX];
352     char cur_name[INP_NAME_MAX];
353     char *key;
354     struct heap_info *hi;
355     int mode;
356     int more;
357 };
358       
359 int heap_cread_item (void *vp, char **dst, int *insertMode)
360 {
361     struct heap_cread_info *p = (struct heap_cread_info *) vp;
362     struct heap_info *hi = p->hi;
363
364     if (p->mode == 1)
365     {
366         *insertMode = p->key[0];
367         memcpy (*dst, p->key+1, sizeof(struct it_key));
368         (*dst) += sizeof(struct it_key);
369         p->mode = 2;
370         return 1;
371     }
372     strcpy (p->prev_name, p->cur_name);
373     if (!(p->more = heap_read_one (hi, p->cur_name, p->key)))
374         return 0;
375     if (*p->cur_name && strcmp (p->cur_name, p->prev_name))
376     {
377         p->mode = 1;
378         return 0;
379     }
380     *insertMode = p->key[0];
381     memcpy (*dst, p->key+1, sizeof(struct it_key));
382     (*dst) += sizeof(struct it_key);
383     return 1;
384 }
385
386 #if ZMBOL
387 int heap_inpc (struct heap_info *hi)
388 {
389     struct heap_cread_info hci;
390     ISAMC_I isamc_i = (ISAMC_I) xmalloc (sizeof(*isamc_i));
391
392     hci.key = (char *) xmalloc (KEY_SIZE);
393     hci.mode = 1;
394     hci.hi = hi;
395     hci.more = heap_read_one (hi, hci.cur_name, hci.key);
396
397     isamc_i->clientData = &hci;
398     isamc_i->read_item = heap_cread_item;
399
400     while (hci.more)
401     {
402         char this_name[INP_NAME_MAX];
403         ISAMC_P isamc_p, isamc_p2;
404         char *dict_info;
405
406         strcpy (this_name, hci.cur_name);
407         assert (hci.cur_name[1]);
408         no_diffs++;
409         if ((dict_info = dict_lookup (hi->dict, hci.cur_name)))
410         {
411             memcpy (&isamc_p, dict_info+1, sizeof(ISAMC_P));
412             isamc_p2 = isc_merge (hi->isamc, isamc_p, isamc_i);
413             if (!isamc_p2)
414             {
415                 no_deletions++;
416                 if (!dict_delete (hi->dict, this_name))
417                     abort();
418             }
419             else 
420             {
421                 no_updates++;
422                 if (isamc_p2 != isamc_p)
423                     dict_insert (hi->dict, this_name,
424                                  sizeof(ISAMC_P), &isamc_p2);
425             }
426         } 
427         else
428         {
429             isamc_p = isc_merge (hi->isamc, 0, isamc_i);
430             no_insertions++;
431             dict_insert (hi->dict, this_name, sizeof(ISAMC_P), &isamc_p);
432         }
433     }
434     xfree (isamc_i);
435     xfree (hci.key);
436     return 0;
437
438
439 int heap_inpd (struct heap_info *hi)
440 {
441     struct heap_cread_info hci;
442     ISAMD_I isamd_i = (ISAMD_I) xmalloc (sizeof(*isamd_i));
443
444     hci.key = (char *) xmalloc (KEY_SIZE);
445     hci.mode = 1;
446     hci.hi = hi;
447     hci.more = heap_read_one (hi, hci.cur_name, hci.key);
448
449     isamd_i->clientData = &hci;
450     isamd_i->read_item = heap_cread_item;
451
452     while (hci.more)
453     {
454         char this_name[INP_NAME_MAX];
455         ISAMD_P isamd_p, isamd_p2;
456         char *dict_info;
457
458         strcpy (this_name, hci.cur_name);
459         assert (hci.cur_name[1]);
460         no_diffs++;
461         if ((dict_info = dict_lookup (hi->dict, hci.cur_name)))
462         {
463             memcpy (&isamd_p, dict_info+1, sizeof(ISAMD_P));
464             isamd_p2 = isamd_append (hi->isamd, isamd_p, isamd_i);
465             if (!isamd_p2)
466             {
467                 no_deletions++;
468                 if (!dict_delete (hi->dict, this_name))
469                     abort();
470             }
471             else 
472             {
473                 no_updates++;
474                 if (isamd_p2 != isamd_p)
475                     dict_insert (hi->dict, this_name,
476                                  sizeof(ISAMD_P), &isamd_p2);
477             }
478         } 
479         else
480         {
481             isamd_p = isamd_append (hi->isamd, 0, isamd_i);
482             no_insertions++;
483             dict_insert (hi->dict, this_name, sizeof(ISAMD_P), &isamd_p);
484         }
485     }
486     xfree (isamd_i);
487     return 0;
488
489
490 int heap_inp (struct heap_info *hi)
491 {
492     char *info;
493     char next_name[INP_NAME_MAX];
494     char cur_name[INP_NAME_MAX];
495     int key_buf_size = INP_BUF_START;
496     int key_buf_ptr;
497     char *next_key;
498     char *key_buf;
499     int more;
500     
501     next_key = (char *) xmalloc (KEY_SIZE);
502     key_buf = (char *) xmalloc (key_buf_size);
503     more = heap_read_one (hi, cur_name, key_buf);
504     while (more)                   /* EOF ? */
505     {
506         int nmemb;
507         key_buf_ptr = KEY_SIZE;
508         while (1)
509         {
510             if (!(more = heap_read_one (hi, next_name, next_key)))
511                 break;
512             if (*next_name && strcmp (next_name, cur_name))
513                 break;
514             memcpy (key_buf + key_buf_ptr, next_key, KEY_SIZE);
515             key_buf_ptr += KEY_SIZE;
516             if (key_buf_ptr+(int) KEY_SIZE >= key_buf_size)
517             {
518                 char *new_key_buf;
519                 new_key_buf = (char *) xmalloc (key_buf_size + INP_BUF_ADD);
520                 memcpy (new_key_buf, key_buf, key_buf_size);
521                 key_buf_size += INP_BUF_ADD;
522                 xfree (key_buf);
523                 key_buf = new_key_buf;
524             }
525         }
526         no_diffs++;
527         nmemb = key_buf_ptr / KEY_SIZE;
528         assert (nmemb * (int) KEY_SIZE == key_buf_ptr);
529         if ((info = dict_lookup (hi->dict, cur_name)))
530         {
531             ISAM_P isam_p, isam_p2;
532             memcpy (&isam_p, info+1, sizeof(ISAM_P));
533             isam_p2 = is_merge (hi->isam, isam_p, nmemb, key_buf);
534             if (!isam_p2)
535             {
536                 no_deletions++;
537                 if (!dict_delete (hi->dict, cur_name))
538                     abort ();
539             }
540             else 
541             {
542                 no_updates++;
543                 if (isam_p2 != isam_p)
544                     dict_insert (hi->dict, cur_name, sizeof(ISAM_P), &isam_p2);
545             }
546         }
547         else
548         {
549             ISAM_P isam_p;
550             no_insertions++;
551             isam_p = is_merge (hi->isam, 0, nmemb, key_buf);
552             dict_insert (hi->dict, cur_name, sizeof(ISAM_P), &isam_p);
553         }
554         memcpy (key_buf, next_key, KEY_SIZE);
555         strcpy (cur_name, next_name);
556     }
557     return 0;
558 }
559
560 #endif
561
562 int heap_inps (struct heap_info *hi)
563 {
564     struct heap_cread_info hci;
565     ISAMS_I isams_i = (ISAMS_I) xmalloc (sizeof(*isams_i));
566
567     hci.key = (char *) xmalloc (KEY_SIZE);
568     hci.mode = 1;
569     hci.hi = hi;
570     hci.more = heap_read_one (hi, hci.cur_name, hci.key);
571
572     isams_i->clientData = &hci;
573     isams_i->read_item = heap_cread_item;
574
575     while (hci.more)
576     {
577         char this_name[INP_NAME_MAX];
578         ISAMS_P isams_p;
579         char *dict_info;
580
581         strcpy (this_name, hci.cur_name);
582         assert (hci.cur_name[1]);
583         no_diffs++;
584         if (!(dict_info = dict_lookup (hi->dict, hci.cur_name)))
585         {
586             isams_p = isams_merge (hi->isams, isams_i);
587             no_insertions++;
588             dict_insert (hi->dict, this_name, sizeof(ISAMS_P), &isams_p);
589         }
590         else
591         {
592             logf (LOG_FATAL, "isams doesn't support this kind of update");
593             break;
594         }
595     }
596     xfree (isams_i);
597     return 0;
598
599
600 struct progressInfo {
601     time_t   startTime;
602     time_t   lastTime;
603     off_t    totalBytes;
604     off_t    totalOffset;
605 };
606
607 void progressFunc (struct key_file *keyp, void *info)
608 {
609     struct progressInfo *p = (struct progressInfo *) info;
610     time_t now, remaining;
611
612     if (keyp->buf_size <= 0 || p->totalBytes <= 0)
613         return ;
614     time (&now);
615
616     if (now >= p->lastTime+10)
617     {
618         p->lastTime = now;
619         remaining = (time_t) ((now - p->startTime)*
620             ((double) p->totalBytes/p->totalOffset - 1.0));
621         if (remaining <= 130)
622             logf (LOG_LOG, "Merge %2.1f%% completed; %ld seconds remaining",
623                  (100.0*p->totalOffset) / p->totalBytes, (long) remaining);
624         else
625             logf (LOG_LOG, "Merge %2.1f%% completed; %ld minutes remaining",
626                  (100.0*p->totalOffset) / p->totalBytes, (long) remaining/60);
627     }
628     p->totalOffset += keyp->buf_size;
629 }
630
631 #ifndef R_OK
632 #define R_OK 4
633 #endif
634
635 void zebra_index_merge (ZebraHandle zh)
636 {
637     struct key_file **kf;
638     char rbuf[1024];
639     int i, r;
640     struct heap_info *hi;
641     struct progressInfo progressInfo;
642     int nkeys = zh->key_file_no;
643     
644     if (nkeys < 0)
645     {
646         char fname[1024];
647         nkeys = 0;
648         while (1)
649         {
650             extract_get_fname_tmp  (zh, fname, nkeys+1);
651             if (access (fname, R_OK) == -1)
652                 break;
653             nkeys++;
654         }
655         if (!nkeys)
656             return ;
657     }
658     kf = (struct key_file **) xmalloc ((1+nkeys) * sizeof(*kf));
659     progressInfo.totalBytes = 0;
660     progressInfo.totalOffset = 0;
661     time (&progressInfo.startTime);
662     time (&progressInfo.lastTime);
663     for (i = 1; i<=nkeys; i++)
664     {
665         kf[i] = key_file_init (i, 8192, zh->service->res);
666         kf[i]->readHandler = progressFunc;
667         kf[i]->readInfo = &progressInfo;
668         progressInfo.totalBytes += kf[i]->length;
669         progressInfo.totalOffset += kf[i]->buf_size;
670     }
671     hi = key_heap_init (nkeys, key_qsort_compare);
672     hi->dict = zh->service->dict;
673     hi->isams = zh->service->isams;
674 #if ZMBOL
675     hi->isam = zh->service->isam;
676     hi->isamc = zh->service->isamc;
677     hi->isamd = zh->service->isamd;
678 #endif
679     
680     for (i = 1; i<=nkeys; i++)
681         if ((r = key_file_read (kf[i], rbuf)))
682             key_heap_insert (hi, rbuf, r, kf[i]);
683     if (zh->service->isams)
684         heap_inps (hi);
685 #if ZMBOL
686     else if (zh->service->isamc)
687         heap_inpc (hi);
688     else if (zh->service->isam)
689         heap_inp (hi);
690     else if (zh->service->isamd)
691         heap_inpd (hi);
692 #endif
693         
694     for (i = 1; i<=nkeys; i++)
695     {
696         extract_get_fname_tmp  (zh, rbuf, i);
697         unlink (rbuf);
698     }
699     logf (LOG_LOG, "Iterations . . .%7d", no_iterations);
700     logf (LOG_LOG, "Distinct words .%7d", no_diffs);
701     logf (LOG_LOG, "Updates. . . . .%7d", no_updates);
702     logf (LOG_LOG, "Deletions. . . .%7d", no_deletions);
703     logf (LOG_LOG, "Insertions . . .%7d", no_insertions);
704     zh->key_file_no = 0;
705
706     key_heap_destroy (hi, nkeys);
707     for (i = 1; i<=nkeys; i++)
708         key_file_destroy (kf[i]);
709     xfree (kf);
710 }
711
712