Fix snippets for complete fields, bug #4590
[idzebra-moved-to-github.git] / index / extract.c
1 /* This file is part of the Zebra server.
2    Copyright (C) 1994-2011 Index Data
3
4 Zebra is free software; you can redistribute it and/or modify it under
5 the terms of the GNU General Public License as published by the Free
6 Software Foundation; either version 2, or (at your option) any later
7 version.
8
9 Zebra is distributed in the hope that it will be useful, but WITHOUT ANY
10 WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12 for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
17
18 */
19
20 /** \file
21     \brief indexes records and extract tokens for indexing and sorting
22 */
23
24 #if HAVE_CONFIG_H
25 #include <config.h>
26 #endif
27 #include <stdio.h>
28 #include <assert.h>
29 #include <ctype.h>
30 #ifdef WIN32
31 #include <io.h>
32 #endif
33 #if HAVE_UNISTD_H
34 #include <unistd.h>
35 #endif
36 #include <fcntl.h>
37
38
39 #include "index.h"
40 #include "orddict.h"
41 #include <direntz.h>
42 #include <charmap.h>
43 #include <yaz/snprintf.h>
44
45 static int log_level_extract = 0;
46 static int log_level_details = 0;
47 static int log_level_initialized = 0;
48
49 /* 1 if we use eliminitate identical delete/insert keys */
50 /* eventually this the 0-case code will be removed */
51 #define FLUSH2 1
52
53 #if FLUSH2
54 static void extract_flush_record_keys2(ZebraHandle zh, zint sysno,
55                                        zebra_rec_keys_t ins_keys,
56                                        zint ins_rank,
57                                        zebra_rec_keys_t del_keys,
58                                        zint del_rank);
59 #else
60 static void extract_flush_record_keys(ZebraHandle zh, zint sysno,
61                                       int cmd,
62                                       zebra_rec_keys_t reckeys,
63                                       zint staticrank);
64 #endif
65
66 static void zebra_init_log_level(void)
67 {
68     if (!log_level_initialized)
69     {
70         log_level_initialized = 1;
71
72         log_level_extract = yaz_log_module_level("extract");
73         log_level_details = yaz_log_module_level("indexdetails");
74     }
75 }
76
77 static WRBUF wrbuf_hex_str(const char *cstr)
78 {
79     size_t i;
80     WRBUF w = wrbuf_alloc();
81     for (i = 0; cstr[i]; i++)
82     {
83         if (cstr[i] < ' ' || cstr[i] > 126)
84             wrbuf_printf(w, "\\%02X", cstr[i] & 0xff);
85         else
86             wrbuf_putc(w, cstr[i]);
87     }
88     return w;
89 }
90
91
92 static void extract_flush_sort_keys(ZebraHandle zh, zint sysno,
93                                     int cmd, zebra_rec_keys_t skp);
94 static void extract_schema_add(struct recExtractCtrl *p, Odr_oid *oid);
95 static void extract_token_add(RecWord *p);
96
97 static void check_log_limit(ZebraHandle zh)
98 {
99     if (zh->records_processed + zh->records_skipped == zh->m_file_verbose_limit)
100     {
101         yaz_log(YLOG_LOG, "More than %d file log entries. Omitting rest",
102                 zh->m_file_verbose_limit);
103     }
104 }
105
106 static void logRecord(ZebraHandle zh)
107 {
108     check_log_limit(zh);
109     ++zh->records_processed;
110     if (!(zh->records_processed % 1000))
111     {
112         yaz_log(YLOG_LOG, "Records: "ZINT_FORMAT" i/u/d "
113                 ZINT_FORMAT"/"ZINT_FORMAT"/"ZINT_FORMAT, 
114                 zh->records_processed, zh->records_inserted, 
115                 zh->records_updated, zh->records_deleted);
116     }
117 }
118
119 static void init_extractCtrl(ZebraHandle zh, struct recExtractCtrl *ctrl)
120 {
121     ctrl->flagShowRecords = !zh->m_flag_rw;
122 }
123
124
125 static void extract_add_index_string(RecWord *p, 
126                                       zinfo_index_category_t cat,
127                                       const char *str, int length);
128
129 static void extract_set_store_data_prepare(struct recExtractCtrl *p);
130
131 static void extract_init(struct recExtractCtrl *p, RecWord *w)
132 {
133     w->seqno = 1;
134     w->index_name = "any";
135     w->index_type = "w";
136     w->extractCtrl = p;
137     w->record_id = 0;
138     w->section_id = 0;
139     w->segment = 0;
140 }
141
142 struct snip_rec_info {
143     ZebraHandle zh;
144     zebra_snippets *snippets;
145 };
146
147
148 static void snippet_add_complete_field(RecWord *p, int ord,
149                                        zebra_map_t zm)
150 {
151     struct snip_rec_info *h = p->extractCtrl->handle;
152     if (p->term_len && p->term_buf && zebra_maps_is_index(zm))
153         zebra_snippets_appendn(h->snippets, p->seqno, 0, ord,
154                                p->term_buf, p->term_len);
155     p->seqno++;
156 }
157
158 static void snippet_add_incomplete_field(RecWord *p, int ord, zebra_map_t zm)
159 {
160     struct snip_rec_info *h = p->extractCtrl->handle;
161     const char *b = p->term_buf;
162     int remain = p->term_len;
163     int first = 1;
164     const char **map = 0;
165     const char *start = b;
166     const char *last = b;
167
168     if (remain > 0)
169         map = zebra_maps_input(zm, &b, remain, 0);
170
171     while (map)
172     {
173         char buf[IT_MAX_WORD+1];
174         int i, remain;
175
176         /* Skip spaces */
177         while (map && *map && **map == *CHR_SPACE)
178         {
179             remain = p->term_len - (b - p->term_buf);
180             last = b;
181             if (remain > 0)
182                 map = zebra_maps_input(zm, &b, remain, 0);
183             else
184                 map = 0;
185         }
186         if (!map)
187             break;
188         if (start != last && zebra_maps_is_index(zm))
189         {
190             zebra_snippets_appendn(h->snippets, p->seqno, 1, ord,
191                                    start, last - start);
192
193         }
194         start = last;
195
196         i = 0;
197         while (map && *map && **map != *CHR_SPACE)
198         {
199             const char *cp = *map;
200
201             while (i < IT_MAX_WORD && *cp)
202                 buf[i++] = *(cp++);
203             remain = p->term_len - (b - p->term_buf);
204             last = b;
205             if (remain > 0)
206                 map = zebra_maps_input(zm, &b, remain, 0);
207             else
208                 map = 0;
209         }
210         if (!i)
211             return;
212
213         if (first)
214         {   
215             first = 0;
216             if (zebra_maps_is_first_in_field(zm))
217             {
218                 /* first in field marker */
219                 p->seqno++;
220             }
221         }
222         if (start != last && zebra_maps_is_index(zm))
223             zebra_snippets_appendn(h->snippets, p->seqno, 0, ord,
224                                    start, last - start);
225         start = last;
226         p->seqno++;
227     }
228
229 }
230
231 static void snippet_add_icu(RecWord *p, int ord, zebra_map_t zm)
232 {
233     struct snip_rec_info *h = p->extractCtrl->handle;
234
235     const char *res_buf = 0;
236     size_t res_len = 0;
237
238     const char *display_buf = 0;
239     size_t display_len = 0;
240
241     zebra_map_tokenize_start(zm, p->term_buf, p->term_len);
242     while (zebra_map_tokenize_next(zm, &res_buf, &res_len,
243                                    &display_buf, &display_len))
244     {
245         if (zebra_maps_is_index(zm))
246             zebra_snippets_appendn(h->snippets, p->seqno, 0, ord,
247                                    display_buf, display_len);
248         p->seqno++;
249     }
250 }
251
252 static void snippet_token_add(RecWord *p)
253 {
254     struct snip_rec_info *h = p->extractCtrl->handle;
255     ZebraHandle zh = h->zh;
256     zebra_map_t zm = zebra_map_get_or_add(zh->reg->zebra_maps, p->index_type);
257
258     if (zm)
259     {
260         ZebraExplainInfo zei = zh->reg->zei;
261         int ch = zebraExplain_lookup_attr_str(
262             zei, zinfo_index_category_index, p->index_type, p->index_name);
263
264         if (zebra_maps_is_icu(zm))
265             snippet_add_icu(p, ch, zm);
266         else
267         {
268             if (zebra_maps_is_complete(zm))
269                 snippet_add_complete_field(p, ch, zm);
270             else
271                 snippet_add_incomplete_field(p, ch, zm);
272         }
273     }
274 }
275
276 static void snippet_schema_add(
277     struct recExtractCtrl *p, Odr_oid *oid)
278 {
279
280 }
281
282 void extract_snippet(ZebraHandle zh, zebra_snippets *sn,
283                      struct ZebraRecStream *stream,
284                      RecType rt, void *recTypeClientData)
285 {
286     struct recExtractCtrl extractCtrl;
287     struct snip_rec_info info;
288     int r;
289
290     extractCtrl.stream = stream;
291     extractCtrl.first_record = 1;
292     extractCtrl.init = extract_init;
293     extractCtrl.tokenAdd = snippet_token_add;
294     extractCtrl.schemaAdd = snippet_schema_add;
295     assert(zh->reg);
296     assert(zh->reg->dh);
297
298     extractCtrl.dh = zh->reg->dh;
299     
300     info.zh = zh;
301     info.snippets = sn;
302     extractCtrl.handle = &info;
303     extractCtrl.match_criteria[0] = '\0';
304     extractCtrl.staticrank = 0;
305     extractCtrl.action = action_insert;
306     
307     init_extractCtrl(zh, &extractCtrl);
308
309     extractCtrl.setStoreData = 0;
310
311     r = (*rt->extract)(recTypeClientData, &extractCtrl);
312
313 }
314
315 static void searchRecordKey(ZebraHandle zh,
316                             zebra_rec_keys_t reckeys,
317                             const char *index_name,
318                             const char **ws, int ws_length)
319 {
320     int i;
321     int ch = -1;
322     zinfo_index_category_t cat = zinfo_index_category_index;
323
324     for (i = 0; i<ws_length; i++)
325         ws[i] = NULL;
326
327     if (ch < 0)
328         ch = zebraExplain_lookup_attr_str(zh->reg->zei, cat, "0", index_name);
329     if (ch < 0)
330         ch = zebraExplain_lookup_attr_str(zh->reg->zei, cat, "p", index_name);
331     if (ch < 0)
332         ch = zebraExplain_lookup_attr_str(zh->reg->zei, cat, "w", index_name);
333
334     if (ch < 0)
335         return ;
336
337     if (zebra_rec_keys_rewind(reckeys))
338     {
339         zint startSeq = -1;
340         const char *str;
341         size_t slen;
342         struct it_key key;
343         zint seqno;
344         while (zebra_rec_keys_read(reckeys, &str, &slen, &key))
345         {
346             assert(key.len <= IT_KEY_LEVEL_MAX && key.len > 2);
347
348             seqno = key.mem[key.len-1];
349             
350             if (key.mem[0] == ch)
351             {
352                 zint woff;
353                 
354                 if (startSeq == -1)
355                     startSeq = seqno;
356                 woff = seqno - startSeq;
357                 if (woff >= 0 && woff < ws_length)
358                     ws[woff] = str;
359             }
360         }
361     }
362 }
363
364 #define FILE_MATCH_BLANK "\t "
365
366 static char *get_match_from_spec(ZebraHandle zh,
367                           zebra_rec_keys_t reckeys,
368                           const char *fname, const char *spec)
369 {
370     static char dstBuf[2048];      /* static here ??? */
371     char *dst = dstBuf;
372     const char *s = spec;
373
374     while (1)
375     {
376         for (; *s && strchr(FILE_MATCH_BLANK, *s); s++)
377             ;
378         if (!*s)
379             break;
380         if (*s == '(')
381         {
382             const char *ws[32];
383             char attset_str[64], attname_str[64];
384             int i;
385             int first = 1;
386             
387             for (s++; strchr(FILE_MATCH_BLANK, *s); s++)
388                 ;
389             for (i = 0; *s && *s != ',' && *s != ')' && 
390                      !strchr(FILE_MATCH_BLANK, *s); s++)
391                 if (i+1 < sizeof(attset_str))
392                     attset_str[i++] = *s;
393             attset_str[i] = '\0';
394             
395             for (; strchr(FILE_MATCH_BLANK, *s); s++)
396                 ;
397             if (*s != ',')
398                 strcpy(attname_str, attset_str);
399             else
400             {
401                 for (s++; strchr(FILE_MATCH_BLANK, *s); s++)
402                     ;
403                 for (i = 0; *s && *s != ')' && 
404                          !strchr(FILE_MATCH_BLANK, *s); s++)
405                     if (i+1 < sizeof(attname_str))
406                         attname_str[i++] = *s;
407                 attname_str[i] = '\0';
408             }
409             if (*s != ')')
410             {
411                 yaz_log(YLOG_WARN, "Missing ) in match criteria %s in group %s",
412                       spec, zh->m_group ? zh->m_group : "none");
413                 return NULL;
414             }
415             s++;
416
417             searchRecordKey(zh, reckeys, attname_str, ws, 32);
418             if (0) /* for debugging */
419             {   
420                 for (i = 0; i<32; i++)
421                 {
422                     if (ws[i])
423                     {
424                         WRBUF w = wrbuf_hex_str(ws[i]);
425                         yaz_log(YLOG_LOG, "ws[%d] = %s", i, wrbuf_cstr(w));
426                         wrbuf_destroy(w);
427                     }
428                 }
429             }
430
431             for (i = 0; i<32; i++)
432                 if (ws[i])
433                 {
434                     if (first)
435                     {
436                         *dst++ = ' ';
437                         first = 0;
438                     }
439                     strcpy(dst, ws[i]);
440                     dst += strlen(ws[i]);
441                 }
442             if (first)
443             {
444                 yaz_log(YLOG_WARN, "Record didn't contain match"
445                       " fields in (%s,%s)", attset_str, attname_str);
446                 return NULL;
447             }
448         }
449         else if (*s == '$')
450         {
451             int spec_len;
452             char special[64];
453             const char *spec_src = NULL;
454             const char *s1 = ++s;
455             while (*s1 && !strchr(FILE_MATCH_BLANK, *s1))
456                 s1++;
457
458             spec_len = s1 - s;
459             if (spec_len > sizeof(special)-1)
460                 spec_len = sizeof(special)-1;
461             memcpy(special, s, spec_len);
462             special[spec_len] = '\0';
463             s = s1;
464
465             if (!strcmp(special, "group"))
466                 spec_src = zh->m_group;
467             else if (!strcmp(special, "database"))
468                 spec_src = zh->basenames[0];
469             else if (!strcmp(special, "filename")) {
470                 spec_src = fname;
471             }
472             else if (!strcmp(special, "type"))
473                 spec_src = zh->m_record_type;
474             else 
475                 spec_src = NULL;
476             if (spec_src)
477             {
478                 strcpy(dst, spec_src);
479                 dst += strlen(spec_src);
480             }
481         }
482         else if (*s == '\"' || *s == '\'')
483         {
484             int stopMarker = *s++;
485             char tmpString[64];
486             int i = 0;
487
488             while (*s && *s != stopMarker)
489             {
490                 if (i+1 < sizeof(tmpString))
491                     tmpString[i++] = *s++;
492             }
493             if (*s)
494                 s++;
495             tmpString[i] = '\0';
496             strcpy(dst, tmpString);
497             dst += strlen(tmpString);
498         }
499         else
500         {
501             yaz_log(YLOG_WARN, "Syntax error in match criteria %s in group %s",
502                   spec, zh->m_group ? zh->m_group : "none");
503             return NULL;
504         }
505         *dst++ = 1;
506     }
507     if (dst == dstBuf)
508     {
509         yaz_log(YLOG_WARN, "No match criteria for record %s in group %s",
510               fname, zh->m_group ? zh->m_group : "none");
511         return NULL;
512     }
513     *dst = '\0';
514
515     if (0) /* for debugging */
516     {
517         WRBUF w = wrbuf_hex_str(dstBuf);
518         yaz_log(YLOG_LOG, "get_match_from_spec %s", wrbuf_cstr(w));
519         wrbuf_destroy(w);
520     }
521
522     return dstBuf;
523 }
524
525 struct recordLogInfo {
526     const char *fname;
527     int recordOffset;
528     struct recordGroup *rGroup;
529 };
530
531 /** \brief add the always-matches index entry and map to real record ID
532     \param ctrl record control
533     \param record_id custom record ID
534     \param sysno system record ID
535     
536     This function serves two purposes.. It adds the always matches
537     entry and makes a pointer from the custom record ID (if defined)
538     back to the system record ID (sysno)
539     See zebra_recid_to_sysno .
540   */
541 static void all_matches_add(struct recExtractCtrl *ctrl, zint record_id,
542                             zint sysno)
543 {
544     RecWord word;
545     extract_init(ctrl, &word);
546     word.record_id = record_id;
547     /* we use the seqno as placeholder for a way to get back to
548        record database from _ALLRECORDS.. This is used if a custom
549        RECORD was defined */
550     word.seqno = sysno;
551     word.index_name = "_ALLRECORDS";
552     word.index_type = "w";
553
554     extract_add_index_string(&word, zinfo_index_category_alwaysmatches,
555                               "", 0);
556 }
557
558 /* forward declaration */
559 ZEBRA_RES zebra_extract_records_stream(ZebraHandle zh, 
560                                        struct ZebraRecStream *stream,
561                                        enum zebra_recctrl_action_t action,
562                                        const char *recordType,
563                                        zint *sysno,
564                                        const char *match_criteria,
565                                        const char *fname,
566                                        RecType recType,
567                                        void *recTypeClientData);
568
569
570 ZEBRA_RES zebra_extract_file(ZebraHandle zh, zint *sysno, const char *fname, 
571                              enum zebra_recctrl_action_t action)
572 {
573     ZEBRA_RES r = ZEBRA_OK;
574     int i, fd;
575     char gprefix[128];
576     char ext[128];
577     char ext_res[128];
578     struct file_read_info *fi = 0;
579     const char *original_record_type = 0;
580     RecType recType;
581     void *recTypeClientData;
582     struct ZebraRecStream stream, *streamp;
583
584     zebra_init_log_level();
585
586     if (!zh->m_group || !*zh->m_group)
587         *gprefix = '\0';
588     else
589         sprintf(gprefix, "%s.", zh->m_group);
590     
591     yaz_log(log_level_extract, "zebra_extract_file %s", fname);
592
593     /* determine file extension */
594     *ext = '\0';
595     for (i = strlen(fname); --i >= 0; )
596         if (fname[i] == '/')
597             break;
598         else if (fname[i] == '.')
599         {
600             strcpy(ext, fname+i+1);
601             break;
602         }
603     /* determine file type - depending on extension */
604     original_record_type = zh->m_record_type;
605     if (!zh->m_record_type)
606     {
607         sprintf(ext_res, "%srecordType.%s", gprefix, ext);
608         zh->m_record_type = res_get(zh->res, ext_res);
609     }
610     if (!zh->m_record_type)
611     {
612         check_log_limit(zh);
613         if (zh->records_processed + zh->records_skipped
614             < zh->m_file_verbose_limit)
615             yaz_log(YLOG_LOG, "? %s", fname);
616         zh->records_skipped++;
617         return 0;
618     }
619     /* determine match criteria */
620     if (!zh->m_record_id)
621     {
622         sprintf(ext_res, "%srecordId.%s", gprefix, ext);
623         zh->m_record_id = res_get(zh->res, ext_res);
624     }
625
626     if (!(recType =
627           recType_byName(zh->reg->recTypes, zh->res, zh->m_record_type,
628                           &recTypeClientData)))
629     {
630         yaz_log(YLOG_WARN, "No such record type: %s", zh->m_record_type);
631         return ZEBRA_FAIL;
632     }
633
634     switch(recType->version)
635     {
636     case 0:
637         break;
638     default:
639         yaz_log(YLOG_WARN, "Bad filter version: %s", zh->m_record_type);
640     }
641     if (sysno && (action == action_delete || action == action_a_delete))
642     {
643         streamp = 0;
644         fi = 0;
645     }
646     else
647     {
648         char full_rep[1024];
649
650         if (zh->path_reg && !yaz_is_abspath(fname))
651         {
652             strcpy(full_rep, zh->path_reg);
653             strcat(full_rep, "/");
654             strcat(full_rep, fname);
655         }
656         else
657             strcpy(full_rep, fname);
658         
659         if ((fd = open(full_rep, O_BINARY|O_RDONLY)) == -1)
660         {
661             yaz_log(YLOG_WARN|YLOG_ERRNO, "open %s", full_rep);
662             zh->m_record_type = original_record_type;
663             return ZEBRA_FAIL;
664         }
665         streamp = &stream;
666         zebra_create_stream_fd(streamp, fd, 0);
667     }
668     r = zebra_extract_records_stream(zh, streamp,
669                                      action,
670                                      zh->m_record_type,
671                                      sysno,
672                                      0, /*match_criteria */
673                                      fname,
674                                      recType, recTypeClientData);
675     if (streamp)
676         stream.destroy(streamp);
677     zh->m_record_type = original_record_type;
678     return r;
679 }
680
681 /*
682   If sysno is provided, then it's used to identify the reocord.
683   If not, and match_criteria is provided, then sysno is guessed
684   If not, and a record is provided, then sysno is got from there
685   
686  */
687
688 ZEBRA_RES zebra_buffer_extract_record(ZebraHandle zh, 
689                                       const char *buf, size_t buf_size,
690                                       enum zebra_recctrl_action_t action,
691                                       const char *recordType,
692                                       zint *sysno,
693                                       const char *match_criteria,
694                                       const char *fname)
695 {
696     struct ZebraRecStream stream;
697     ZEBRA_RES res;
698     void *clientData;
699     RecType recType = 0;
700
701     if (recordType && *recordType)
702     {
703         yaz_log(log_level_extract,
704                 "Record type explicitly specified: %s", recordType);
705         recType = recType_byName(zh->reg->recTypes, zh->res, recordType,
706                                   &clientData);
707     } 
708     else
709     {
710         if (!(zh->m_record_type))
711         {
712             yaz_log(YLOG_WARN, "No such record type defined");
713             return ZEBRA_FAIL;
714         }
715         yaz_log(log_level_extract, "Get record type from rgroup: %s",
716                 zh->m_record_type);
717         recType = recType_byName(zh->reg->recTypes, zh->res,
718                                   zh->m_record_type, &clientData);
719         recordType = zh->m_record_type;
720     }
721     
722     if (!recType)
723     {
724         yaz_log(YLOG_WARN, "No such record type: %s", recordType);
725         return ZEBRA_FAIL;
726     }
727
728     zebra_create_stream_mem(&stream, buf, buf_size);
729
730     res = zebra_extract_records_stream(zh, &stream,
731                                        action,
732                                        recordType,
733                                        sysno,
734                                        match_criteria,
735                                        fname,
736                                        recType, clientData);
737     stream.destroy(&stream);
738     return res;
739 }
740
741 static ZEBRA_RES zebra_extract_record_stream(ZebraHandle zh, 
742                                              struct ZebraRecStream *stream,
743                                              enum zebra_recctrl_action_t action,
744                                              const char *recordType,
745                                              zint *sysno,
746                                              const char *match_criteria,
747                                              const char *fname,
748                                              RecType recType,
749                                              void *recTypeClientData,
750                                              int *more)
751     
752 {
753     zint sysno0 = 0;
754     RecordAttr *recordAttr;
755     struct recExtractCtrl extractCtrl;
756     int r;
757     const char *matchStr = 0;
758     Record rec;
759     off_t start_offset = 0, end_offset = 0;
760     const char *pr_fname = fname;  /* filename to print .. */
761     int show_progress = zh->records_processed + zh->records_skipped 
762         < zh->m_file_verbose_limit ? 1:0;
763
764     zebra_init_log_level();
765
766     if (!pr_fname)
767         pr_fname = "<no file>";  /* make it printable if file is omitted */
768
769     zebra_rec_keys_reset(zh->reg->keys);
770     zebra_rec_keys_reset(zh->reg->sortKeys);
771
772     if (zebraExplain_curDatabase(zh->reg->zei, zh->basenames[0]))
773     {
774         if (zebraExplain_newDatabase(zh->reg->zei, zh->basenames[0], 
775                                       zh->m_explain_database))
776             return ZEBRA_FAIL;
777     }
778
779     if (stream)
780     {
781         off_t null_offset = 0;
782         extractCtrl.stream = stream;
783
784         start_offset = stream->tellf(stream);
785
786         extractCtrl.first_record = start_offset ? 0 : 1;
787         
788         stream->endf(stream, &null_offset);;
789
790         extractCtrl.init = extract_init;
791         extractCtrl.tokenAdd = extract_token_add;
792         extractCtrl.schemaAdd = extract_schema_add;
793         extractCtrl.dh = zh->reg->dh;
794         extractCtrl.handle = zh;
795         extractCtrl.match_criteria[0] = '\0';
796         extractCtrl.staticrank = 0;
797         extractCtrl.action = action;
798
799         init_extractCtrl(zh, &extractCtrl);
800
801         extract_set_store_data_prepare(&extractCtrl);
802         
803         r = (*recType->extract)(recTypeClientData, &extractCtrl);
804
805         if (action == action_update)
806         {
807             action = extractCtrl.action;
808         }
809         
810         switch (r)
811         {
812         case RECCTRL_EXTRACT_EOF:
813             return ZEBRA_FAIL;
814         case RECCTRL_EXTRACT_ERROR_GENERIC:
815             /* error occured during extraction ... */
816             yaz_log(YLOG_WARN, "extract error: generic");
817             return ZEBRA_FAIL;
818         case RECCTRL_EXTRACT_ERROR_NO_SUCH_FILTER:
819             /* error occured during extraction ... */
820             yaz_log(YLOG_WARN, "extract error: no such filter");
821             return ZEBRA_FAIL;
822         case RECCTRL_EXTRACT_SKIP:
823             if (show_progress)
824                 yaz_log(YLOG_LOG, "skip %s %s " ZINT_FORMAT,
825                          recordType, pr_fname, (zint) start_offset);
826             *more = 1;
827             
828             end_offset = stream->endf(stream, 0);
829             if (end_offset)
830                 stream->seekf(stream, end_offset);
831
832             return ZEBRA_OK;
833         case RECCTRL_EXTRACT_OK:
834             break;
835         default:
836             yaz_log(YLOG_WARN, "extract error: unknown error: %d", r);
837             return ZEBRA_FAIL;
838         }
839         end_offset = stream->endf(stream, 0);
840         if (end_offset)
841             stream->seekf(stream, end_offset);
842         else
843             end_offset = stream->tellf(stream);
844
845         if (extractCtrl.match_criteria[0])
846             match_criteria = extractCtrl.match_criteria;
847     }
848
849     *more = 1;
850
851     if (zh->m_flag_rw == 0)
852     {
853         yaz_log(YLOG_LOG, "test %s %s " ZINT_FORMAT, recordType,
854                 pr_fname, (zint) start_offset);
855         /* test mode .. Do not perform match */
856         return ZEBRA_OK;
857     }
858         
859     if (!sysno)
860     {
861         sysno = &sysno0;
862         
863         if (match_criteria && *match_criteria)
864             matchStr = match_criteria;
865         else
866         {
867             if (zh->m_record_id && *zh->m_record_id)
868             {
869                 matchStr = get_match_from_spec(zh, zh->reg->keys, pr_fname, 
870                                                zh->m_record_id);
871                 if (!matchStr)
872                 {
873                     yaz_log(YLOG_LOG, "error %s %s " ZINT_FORMAT, recordType,
874                              pr_fname, (zint) start_offset);
875                     return ZEBRA_FAIL;
876                 }
877                 if (0 && matchStr)
878                 {
879                     WRBUF w = wrbuf_alloc();
880                     size_t i;
881                     for (i = 0; i < strlen(matchStr); i++)
882                     {
883                         wrbuf_printf(w, "%02X", matchStr[i] & 0xff);
884                     }
885                     yaz_log(YLOG_LOG, "Got match %s", wrbuf_cstr(w));
886                     wrbuf_destroy(w);
887                 }
888             }
889         }
890         if (matchStr) 
891         {
892             int db_ord = zebraExplain_get_database_ord(zh->reg->zei);
893             char *rinfo = dict_lookup_ord(zh->reg->matchDict, db_ord,
894                                           matchStr);
895
896             
897             if (log_level_extract)
898             {
899                 WRBUF w = wrbuf_hex_str(matchStr);
900                 yaz_log(log_level_extract, "matchStr: %s", wrbuf_cstr(w));
901                 wrbuf_destroy(w);
902             }
903             if (rinfo)
904             {
905                 assert(*rinfo == sizeof(*sysno));
906                 memcpy(sysno, rinfo+1, sizeof(*sysno));
907             }
908        }
909     }
910
911     if (! *sysno)
912     {
913         /* new record AKA does not exist already */
914         if (action == action_delete)
915         {
916             yaz_log(YLOG_LOG, "delete %s %s " ZINT_FORMAT, recordType,
917                     pr_fname, (zint) start_offset);
918             yaz_log(YLOG_WARN, "cannot delete record above (seems new)");
919             return ZEBRA_FAIL;
920         }
921         else if (action == action_a_delete)
922         {
923             if (show_progress)
924                 yaz_log(YLOG_LOG, "adelete %s %s " ZINT_FORMAT, recordType,
925                         pr_fname, (zint) start_offset);
926             return ZEBRA_OK;
927         }
928         else if (action == action_replace)
929         {
930             yaz_log(YLOG_LOG, "update %s %s " ZINT_FORMAT, recordType,
931                          pr_fname, (zint) start_offset);
932             yaz_log(YLOG_WARN, "cannot update record above (seems new)");
933             return ZEBRA_FAIL;
934         }
935         if (show_progress)
936             yaz_log(YLOG_LOG, "add %s %s " ZINT_FORMAT, recordType, pr_fname,
937                      (zint) start_offset);
938         rec = rec_new(zh->reg->records);
939
940         *sysno = rec->sysno;
941
942
943         if (stream)
944         {
945             all_matches_add(&extractCtrl,
946                             zebra_rec_keys_get_custom_record_id(zh->reg->keys),
947                             *sysno);
948         }
949
950
951         recordAttr = rec_init_attr(zh->reg->zei, rec);
952         if (extractCtrl.staticrank < 0)
953         {
954             yaz_log(YLOG_WARN, "Negative staticrank for record. Set to 0");
955             extractCtrl.staticrank = 0;
956         }
957
958         if (matchStr)
959         {
960             int db_ord = zebraExplain_get_database_ord(zh->reg->zei);
961             dict_insert_ord(zh->reg->matchDict, db_ord, matchStr,
962                             sizeof(*sysno), sysno);
963         }
964
965         extract_flush_sort_keys(zh, *sysno, 1, zh->reg->sortKeys);
966 #if FLUSH2
967         extract_flush_record_keys2(zh, *sysno,
968                                    zh->reg->keys, extractCtrl.staticrank,
969                                    0, recordAttr->staticrank);
970 #else
971         extract_flush_record_keys(zh, *sysno, 1, zh->reg->keys,
972                                   extractCtrl.staticrank);
973 #endif
974         recordAttr->staticrank = extractCtrl.staticrank;
975         zh->records_inserted++;
976     } 
977     else
978     {
979         /* record already exists */
980         zebra_rec_keys_t delkeys = zebra_rec_keys_open();
981         zebra_rec_keys_t sortKeys = zebra_rec_keys_open();
982         if (action == action_insert)
983         {
984             yaz_log(YLOG_LOG, "skipped %s %s " ZINT_FORMAT, 
985                          recordType, pr_fname, (zint) start_offset);
986             logRecord(zh);
987             return ZEBRA_FAIL;
988         }
989
990         rec = rec_get(zh->reg->records, *sysno);
991         assert(rec);
992
993         if (stream)
994         {
995             all_matches_add(&extractCtrl,
996                             zebra_rec_keys_get_custom_record_id(zh->reg->keys),
997                             *sysno);
998         }
999         
1000         recordAttr = rec_init_attr(zh->reg->zei, rec);
1001
1002         /* decrease total size */
1003         zebraExplain_recordBytesIncrement(zh->reg->zei,
1004                                            - recordAttr->recordSize);
1005
1006         zebra_rec_keys_set_buf(delkeys,
1007                                rec->info[recInfo_delKeys],
1008                                rec->size[recInfo_delKeys],
1009                                0);
1010         zebra_rec_keys_set_buf(sortKeys,
1011                                rec->info[recInfo_sortKeys],
1012                                rec->size[recInfo_sortKeys],
1013                                0);
1014
1015         extract_flush_sort_keys(zh, *sysno, 0, sortKeys);
1016 #if !FLUSH2
1017         extract_flush_record_keys(zh, *sysno, 0, delkeys,
1018                                   recordAttr->staticrank);
1019 #endif
1020         if (action == action_delete || action == action_a_delete)
1021         {
1022             /* record going to be deleted */
1023 #if FLUSH2
1024             extract_flush_record_keys2(zh, *sysno, 0, recordAttr->staticrank,
1025                                        delkeys, recordAttr->staticrank);
1026 #endif       
1027             if (zebra_rec_keys_empty(delkeys))
1028             {
1029                 yaz_log(YLOG_LOG, "delete %s %s " ZINT_FORMAT, recordType,
1030                         pr_fname, (zint) start_offset);
1031                 yaz_log(YLOG_WARN, "cannot delete file above, "
1032                         "storeKeys false (3)");
1033             }
1034             else
1035             {
1036                 if (show_progress)
1037                     yaz_log(YLOG_LOG, "delete %s %s " ZINT_FORMAT, recordType,
1038                             pr_fname, (zint) start_offset);
1039                 zh->records_deleted++;
1040                 if (matchStr)
1041                 {
1042                     int db_ord = zebraExplain_get_database_ord(zh->reg->zei);
1043                     dict_delete_ord(zh->reg->matchDict, db_ord, matchStr);
1044                 }
1045                 rec_del(zh->reg->records, &rec);
1046             }
1047             zebra_rec_keys_close(delkeys);
1048             zebra_rec_keys_close(sortKeys);
1049             rec_free(&rec);
1050             logRecord(zh);
1051             return ZEBRA_OK;
1052         }
1053         else
1054         {   /* update or special_update */
1055             if (show_progress)
1056                 yaz_log(YLOG_LOG, "update %s %s " ZINT_FORMAT, recordType,
1057                         pr_fname, (zint) start_offset);
1058             extract_flush_sort_keys(zh, *sysno, 1, zh->reg->sortKeys);
1059
1060 #if FLUSH2
1061             extract_flush_record_keys2(zh, *sysno,
1062                                        zh->reg->keys, extractCtrl.staticrank,
1063                                        delkeys, recordAttr->staticrank);
1064 #else
1065             extract_flush_record_keys(zh, *sysno, 1, 
1066                                       zh->reg->keys, extractCtrl.staticrank);
1067 #endif
1068             recordAttr->staticrank = extractCtrl.staticrank;
1069             zh->records_updated++;
1070         }
1071         zebra_rec_keys_close(delkeys);
1072         zebra_rec_keys_close(sortKeys);
1073     }
1074     /* update file type */
1075     xfree(rec->info[recInfo_fileType]);
1076     rec->info[recInfo_fileType] =
1077         rec_strdup(recordType, &rec->size[recInfo_fileType]);
1078
1079     /* update filename */
1080     xfree(rec->info[recInfo_filename]);
1081     rec->info[recInfo_filename] =
1082         rec_strdup(fname, &rec->size[recInfo_filename]);
1083
1084     /* update delete keys */
1085     xfree(rec->info[recInfo_delKeys]);
1086     if (!zebra_rec_keys_empty(zh->reg->keys) && zh->m_store_keys == 1)
1087     {
1088         zebra_rec_keys_get_buf(zh->reg->keys,
1089                                &rec->info[recInfo_delKeys],
1090                                &rec->size[recInfo_delKeys]);
1091     }
1092     else
1093     {
1094         rec->info[recInfo_delKeys] = NULL;
1095         rec->size[recInfo_delKeys] = 0;
1096     }
1097     /* update sort keys */
1098     xfree(rec->info[recInfo_sortKeys]);
1099
1100     zebra_rec_keys_get_buf(zh->reg->sortKeys,
1101                            &rec->info[recInfo_sortKeys],
1102                            &rec->size[recInfo_sortKeys]);
1103
1104     if (stream)
1105     {
1106         recordAttr->recordSize = end_offset - start_offset;
1107         zebraExplain_recordBytesIncrement(zh->reg->zei,
1108                                           recordAttr->recordSize);
1109     }
1110
1111     /* set run-number for this record */
1112     recordAttr->runNumber =
1113         zebraExplain_runNumberIncrement(zh->reg->zei, 0);
1114
1115     /* update store data */
1116     xfree(rec->info[recInfo_storeData]);
1117
1118     /* update store data */
1119     if (zh->store_data_buf)
1120     {
1121         rec->size[recInfo_storeData] = zh->store_data_size;
1122         rec->info[recInfo_storeData] = zh->store_data_buf;
1123         zh->store_data_buf = 0;
1124         recordAttr->recordSize = zh->store_data_size;
1125     }
1126     else if (zh->m_store_data)
1127     {
1128         off_t cur_offset = stream->tellf(stream);
1129
1130         rec->size[recInfo_storeData] = recordAttr->recordSize;
1131         rec->info[recInfo_storeData] = (char *)
1132             xmalloc(recordAttr->recordSize);
1133         stream->seekf(stream, start_offset);
1134         stream->readf(stream, rec->info[recInfo_storeData],
1135                       recordAttr->recordSize);
1136         stream->seekf(stream, cur_offset);
1137     }
1138     else
1139     {
1140         rec->info[recInfo_storeData] = NULL;
1141         rec->size[recInfo_storeData] = 0;
1142     }
1143     /* update database name */
1144     xfree(rec->info[recInfo_databaseName]);
1145     rec->info[recInfo_databaseName] =
1146         rec_strdup(zh->basenames[0], &rec->size[recInfo_databaseName]); 
1147
1148     /* update offset */
1149     recordAttr->recordOffset = start_offset;
1150     
1151     /* commit this record */
1152     rec_put(zh->reg->records, &rec);
1153     logRecord(zh);
1154     return ZEBRA_OK;
1155 }
1156
1157 /** \brief extracts records from stream
1158     \param zh Zebra Handle
1159     \param stream stream that we read from
1160     \param action (action_insert, action_replace, action_delete, ..)
1161     \param recordType Record filter type "grs.xml", etc.
1162     \param sysno pointer to sysno if already known; NULL otherwise
1163     \param match_criteria (NULL if not already given)
1164     \param fname filename that we read from (for logging purposes only)
1165     \param recType record type
1166     \param recTypeClientData client data for record type
1167     \returns ZEBRA_OK for success; ZEBRA_FAIL for failure
1168 */
1169 ZEBRA_RES zebra_extract_records_stream(ZebraHandle zh, 
1170                                        struct ZebraRecStream *stream,
1171                                        enum zebra_recctrl_action_t action,
1172                                        const char *recordType,
1173                                        zint *sysno,
1174                                        const char *match_criteria,
1175                                        const char *fname,
1176                                        RecType recType,
1177                                        void *recTypeClientData)
1178 {
1179     ZEBRA_RES res = ZEBRA_OK;
1180     while (1)
1181     {
1182         int more = 0;
1183         res = zebra_extract_record_stream(zh, stream,
1184                                           action,
1185                                           recordType,
1186                                           sysno,
1187                                           match_criteria,
1188                                           fname,
1189                                           recType, recTypeClientData, &more);
1190         if (!more)
1191         {
1192             res = ZEBRA_OK;
1193             break;
1194         }
1195         if (res != ZEBRA_OK)
1196             break;
1197         if (sysno)
1198             break;
1199     }
1200     return res;
1201 }
1202
1203 ZEBRA_RES zebra_extract_explain(void *handle, Record rec, data1_node *n)
1204 {
1205     ZebraHandle zh = (ZebraHandle) handle;
1206     struct recExtractCtrl extractCtrl;
1207
1208     if (zebraExplain_curDatabase(zh->reg->zei,
1209                                   rec->info[recInfo_databaseName]))
1210     {
1211         abort();
1212         if (zebraExplain_newDatabase(zh->reg->zei,
1213                                       rec->info[recInfo_databaseName], 0))
1214             abort();
1215     }
1216
1217     zebra_rec_keys_reset(zh->reg->keys);
1218     zebra_rec_keys_reset(zh->reg->sortKeys);
1219
1220     extractCtrl.init = extract_init;
1221     extractCtrl.tokenAdd = extract_token_add;
1222     extractCtrl.schemaAdd = extract_schema_add;
1223     extractCtrl.dh = zh->reg->dh;
1224
1225     init_extractCtrl(zh, &extractCtrl);
1226
1227     extractCtrl.flagShowRecords = 0;
1228     extractCtrl.match_criteria[0] = '\0';
1229     extractCtrl.staticrank = 0;
1230     extractCtrl.action = action_update;
1231
1232     extractCtrl.handle = handle;
1233     extractCtrl.first_record = 1;
1234     
1235     extract_set_store_data_prepare(&extractCtrl);
1236
1237     if (n)
1238         grs_extract_tree(&extractCtrl, n);
1239
1240     if (rec->size[recInfo_delKeys])
1241     {
1242         zebra_rec_keys_t delkeys = zebra_rec_keys_open();
1243         
1244         zebra_rec_keys_t sortkeys = zebra_rec_keys_open();
1245
1246         zebra_rec_keys_set_buf(delkeys, rec->info[recInfo_delKeys],
1247                                rec->size[recInfo_delKeys],
1248                                0);
1249 #if FLUSH2
1250         extract_flush_record_keys2(zh, rec->sysno, 
1251                                    zh->reg->keys, 0, delkeys, 0);
1252 #else
1253         extract_flush_record_keys(zh, rec->sysno, 0, delkeys, 0);
1254         extract_flush_record_keys(zh, rec->sysno, 1, zh->reg->keys, 0);
1255 #endif
1256         zebra_rec_keys_close(delkeys);
1257
1258         zebra_rec_keys_set_buf(sortkeys, rec->info[recInfo_sortKeys],
1259                                rec->size[recInfo_sortKeys],
1260                                0);
1261
1262         extract_flush_sort_keys(zh, rec->sysno, 0, sortkeys);
1263         zebra_rec_keys_close(sortkeys);
1264     }
1265     else
1266     {
1267 #if FLUSH2
1268         extract_flush_record_keys2(zh, rec->sysno, zh->reg->keys, 0, 0, 0);
1269 #else
1270         extract_flush_record_keys(zh, rec->sysno, 1, zh->reg->keys, 0);        
1271 #endif
1272     }
1273     extract_flush_sort_keys(zh, rec->sysno, 1, zh->reg->sortKeys);
1274     
1275     xfree(rec->info[recInfo_delKeys]);
1276     zebra_rec_keys_get_buf(zh->reg->keys,
1277                            &rec->info[recInfo_delKeys], 
1278                            &rec->size[recInfo_delKeys]);
1279
1280     xfree(rec->info[recInfo_sortKeys]);
1281     zebra_rec_keys_get_buf(zh->reg->sortKeys,
1282                            &rec->info[recInfo_sortKeys],
1283                            &rec->size[recInfo_sortKeys]);
1284     return ZEBRA_OK;
1285 }
1286
1287 void zebra_it_key_str_dump(ZebraHandle zh, struct it_key *key,
1288                            const char *str, size_t slen, NMEM nmem, int level)
1289 {
1290     char keystr[200]; /* room for zints to print */
1291     char *dst_term = 0;
1292     int ord = CAST_ZINT_TO_INT(key->mem[0]);
1293     const char *index_type;
1294     int i;
1295     const char *string_index;
1296     
1297     zebraExplain_lookup_ord(zh->reg->zei, ord, &index_type,
1298                             0/* db */, &string_index);
1299     assert(index_type);
1300     zebra_term_untrans_iconv(zh, nmem, index_type,
1301                              &dst_term, str);
1302     *keystr = '\0';
1303     for (i = 0; i < key->len; i++)
1304     {
1305         sprintf(keystr + strlen(keystr), ZINT_FORMAT " ", key->mem[i]);
1306     }
1307     
1308     if (*str < CHR_BASE_CHAR)
1309     {
1310         int i;
1311         char dst_buf[200]; /* room for special chars */
1312         
1313         strcpy(dst_buf , "?");
1314         
1315         if (!strcmp(str, ""))
1316             strcpy(dst_buf, "alwaysmatches");
1317         if (!strcmp(str, FIRST_IN_FIELD_STR))
1318             strcpy(dst_buf, "firstinfield");
1319         else if (!strcmp(str, CHR_UNKNOWN))
1320             strcpy(dst_buf, "unknown");
1321         else if (!strcmp(str, CHR_SPACE))
1322             strcpy(dst_buf, "space");
1323         
1324         for (i = 0; i<slen; i++)
1325         {
1326             sprintf(dst_buf + strlen(dst_buf), " %d", str[i] & 0xff);
1327         }
1328         yaz_log(level, "%s%s %s %s", keystr, index_type,
1329                 string_index, dst_buf);
1330         
1331     }
1332     else
1333         yaz_log(level, "%s%s %s \"%s\"", keystr, index_type,
1334                 string_index, dst_term);
1335 }
1336
1337 void extract_rec_keys_log(ZebraHandle zh, int is_insert,
1338                           zebra_rec_keys_t reckeys,
1339                           int level)
1340 {
1341     if (zebra_rec_keys_rewind(reckeys))
1342     {
1343         size_t slen;
1344         const char *str;
1345         struct it_key key;
1346         NMEM nmem = nmem_create();
1347
1348         while(zebra_rec_keys_read(reckeys, &str, &slen, &key))
1349         {
1350             zebra_it_key_str_dump(zh, &key, str, slen, nmem, level);
1351             nmem_reset(nmem);
1352         }
1353         nmem_destroy(nmem);
1354     }
1355 }
1356
1357 void extract_rec_keys_adjust(ZebraHandle zh, int is_insert,
1358                              zebra_rec_keys_t reckeys)
1359 {
1360     ZebraExplainInfo zei = zh->reg->zei;
1361     struct ord_stat {
1362         int no;
1363         int ord;
1364         struct ord_stat *next;
1365     };
1366
1367     if (zebra_rec_keys_rewind(reckeys))
1368     {
1369         struct ord_stat *ord_list = 0;
1370         struct ord_stat *p;
1371         size_t slen;
1372         const char *str;
1373         struct it_key key_in;
1374         while(zebra_rec_keys_read(reckeys, &str, &slen, &key_in))
1375         {
1376             int ord = CAST_ZINT_TO_INT(key_in.mem[0]);
1377
1378             for (p = ord_list; p ; p = p->next)
1379                 if (p->ord == ord)
1380                 {
1381                     p->no++;
1382                     break;
1383                 }
1384             if (!p)
1385             {
1386                 p = xmalloc(sizeof(*p));
1387                 p->no = 1;
1388                 p->ord = ord;
1389                 p->next = ord_list;
1390                 ord_list = p;
1391             }
1392         }
1393
1394         p = ord_list;
1395         while (p)
1396         {
1397             struct ord_stat *p1 = p;
1398
1399             if (is_insert)
1400                 zebraExplain_ord_adjust_occurrences(zei, p->ord, p->no, 1);
1401             else
1402                 zebraExplain_ord_adjust_occurrences(zei, p->ord, - p->no, -1);
1403             p = p->next;
1404             xfree(p1);
1405         }
1406     }
1407 }
1408
1409 #if FLUSH2
1410 static void extract_flush_record_keys2(
1411     ZebraHandle zh, zint sysno,
1412     zebra_rec_keys_t ins_keys, zint ins_rank,
1413     zebra_rec_keys_t del_keys, zint del_rank)
1414 {
1415     ZebraExplainInfo zei = zh->reg->zei;
1416     int normal = 0;
1417     int optimized = 0;
1418
1419     if (!zh->reg->key_block)
1420     {
1421         int mem = 1024*1024 * atoi( res_get_def( zh->res, "memmax", "8"));
1422         const char *key_tmp_dir = res_get_def(zh->res, "keyTmpDir", ".");
1423         int use_threads = atoi(res_get_def(zh->res, "threads", "1"));
1424         zh->reg->key_block = key_block_create(mem, key_tmp_dir, use_threads);
1425     }
1426
1427     if (ins_keys)
1428     {
1429         extract_rec_keys_adjust(zh, 1, ins_keys);
1430         if (!del_keys)
1431             zebraExplain_recordCountIncrement(zei, 1);
1432         zebra_rec_keys_rewind(ins_keys);
1433     }
1434     if (del_keys)
1435     {
1436         extract_rec_keys_adjust(zh, 0, del_keys);
1437         if (!ins_keys)
1438             zebraExplain_recordCountIncrement(zei, -1);
1439         zebra_rec_keys_rewind(del_keys);
1440     }
1441
1442     while (1)
1443     {
1444         size_t del_slen;
1445         const char *del_str;
1446         struct it_key del_key_in;
1447         int del = 0;
1448
1449         size_t ins_slen;
1450         const char *ins_str;
1451         struct it_key ins_key_in;
1452         int ins = 0;
1453
1454         if (del_keys)
1455             del = zebra_rec_keys_read(del_keys, &del_str, &del_slen,
1456                                       &del_key_in);
1457         if (ins_keys)
1458             ins = zebra_rec_keys_read(ins_keys, &ins_str, &ins_slen,
1459                                       &ins_key_in);
1460
1461         if (del && ins && ins_rank == del_rank
1462             && !key_compare(&del_key_in, &ins_key_in) 
1463             && ins_slen == del_slen && !memcmp(del_str, ins_str, del_slen))
1464         {
1465             optimized++;
1466             continue;
1467         }
1468         if (!del && !ins)
1469             break;
1470         
1471         normal++;
1472         if (del)
1473             key_block_write(zh->reg->key_block, sysno, 
1474                             &del_key_in, 0, del_str, del_slen,
1475                             del_rank, zh->m_staticrank);
1476         if (ins)
1477             key_block_write(zh->reg->key_block, sysno, 
1478                             &ins_key_in, 1, ins_str, ins_slen,
1479                             ins_rank, zh->m_staticrank);
1480     }
1481     yaz_log(log_level_extract, "normal=%d optimized=%d", normal, optimized);
1482 }
1483 #else
1484 static void extract_flush_record_keys(
1485     ZebraHandle zh, zint sysno, int cmd,
1486     zebra_rec_keys_t reckeys,
1487     zint staticrank)
1488 {
1489     ZebraExplainInfo zei = zh->reg->zei;
1490
1491     extract_rec_keys_adjust(zh, cmd, reckeys);
1492
1493     if (log_level_details)
1494     {
1495         yaz_log(log_level_details, "Keys for record " ZINT_FORMAT " %s",
1496                 sysno, cmd ? "insert" : "delete");
1497         extract_rec_keys_log(zh, cmd, reckeys, log_level_details);
1498     }
1499
1500     if (!zh->reg->key_block)
1501     {
1502         int mem = 1024*1024 * atoi( res_get_def( zh->res, "memmax", "8"));
1503         const char *key_tmp_dir = res_get_def(zh->res, "keyTmpDir", ".");
1504         int use_threads = atoi(res_get_def(zh->res, "threads", "1"));
1505         zh->reg->key_block = key_block_create(mem, key_tmp_dir, use_threads);
1506     }
1507     zebraExplain_recordCountIncrement(zei, cmd ? 1 : -1);
1508
1509 #if 0
1510     yaz_log(YLOG_LOG, "sysno=" ZINT_FORMAT " cmd=%d", sysno, cmd);
1511     print_rec_keys(zh, reckeys);
1512 #endif
1513     if (zebra_rec_keys_rewind(reckeys))
1514     {
1515         size_t slen;
1516         const char *str;
1517         struct it_key key_in;
1518         while(zebra_rec_keys_read(reckeys, &str, &slen, &key_in))
1519         {
1520             key_block_write(zh->reg->key_block, sysno, 
1521                             &key_in, cmd, str, slen,
1522                             staticrank, zh->m_staticrank);
1523         }
1524     }
1525 }
1526 #endif
1527
1528 ZEBRA_RES zebra_rec_keys_to_snippets1(ZebraHandle zh,
1529                                      zebra_rec_keys_t reckeys,
1530                                      zebra_snippets *snippets)
1531 {
1532     NMEM nmem = nmem_create();
1533     if (zebra_rec_keys_rewind(reckeys)) 
1534     {
1535         const char *str;
1536         size_t slen;
1537         struct it_key key;
1538         while (zebra_rec_keys_read(reckeys, &str, &slen, &key))
1539         {
1540             char *dst_term = 0;
1541             int ord;
1542             zint seqno;
1543             const char *index_type;
1544
1545             assert(key.len <= IT_KEY_LEVEL_MAX && key.len > 2);
1546             seqno = key.mem[key.len-1];
1547             ord = CAST_ZINT_TO_INT(key.mem[0]);
1548             
1549             zebraExplain_lookup_ord(zh->reg->zei, ord, &index_type,
1550                                     0/* db */, 0 /* string_index */);
1551             assert(index_type);
1552             zebra_term_untrans_iconv(zh, nmem, index_type,
1553                                      &dst_term, str);
1554             zebra_snippets_append(snippets, seqno, 0, ord, dst_term);
1555             nmem_reset(nmem);
1556         }
1557     }
1558     nmem_destroy(nmem);
1559     return ZEBRA_OK;
1560 }
1561
1562 void print_rec_keys(ZebraHandle zh, zebra_rec_keys_t reckeys)
1563 {
1564     yaz_log(YLOG_LOG, "print_rec_keys");
1565     if (zebra_rec_keys_rewind(reckeys))
1566     {
1567         const char *str;
1568         size_t slen;
1569         struct it_key key;
1570         while (zebra_rec_keys_read(reckeys, &str, &slen, &key))
1571         {
1572             char dst_buf[IT_MAX_WORD];
1573             zint seqno;
1574             const char *index_type;
1575             int ord = CAST_ZINT_TO_INT(key.mem[0]);
1576             const char *db = 0;
1577             assert(key.len <= IT_KEY_LEVEL_MAX && key.len > 2);
1578
1579             zebraExplain_lookup_ord(zh->reg->zei, ord, &index_type, &db, 0);
1580             
1581             seqno = key.mem[key.len-1];
1582             
1583             zebra_term_untrans(zh, index_type, dst_buf, str);
1584             
1585             yaz_log(YLOG_LOG, "ord=%d seqno=" ZINT_FORMAT 
1586                     " term=%s", ord, seqno, dst_buf); 
1587         }
1588     }
1589 }
1590
1591 static void extract_add_index_string(RecWord *p, zinfo_index_category_t cat,
1592                                      const char *str, int length)
1593 {
1594     struct it_key key;
1595     ZebraHandle zh = p->extractCtrl->handle;
1596     ZebraExplainInfo zei = zh->reg->zei;
1597     int ch, i;
1598
1599     ch = zebraExplain_lookup_attr_str(zei, cat, p->index_type, p->index_name);
1600     if (ch < 0)
1601         ch = zebraExplain_add_attr_str(zei, cat, p->index_type, p->index_name);
1602
1603     i = 0;
1604     key.mem[i++] = ch;
1605     key.mem[i++] = p->record_id;
1606     key.mem[i++] = p->section_id;
1607
1608     if (zh->m_segment_indexing)
1609         key.mem[i++] = p->segment;
1610     key.mem[i++] = p->seqno;
1611     key.len = i;
1612
1613     zebra_rec_keys_write(zh->reg->keys, str, length, &key);
1614 }
1615
1616 static void extract_add_sort_string(RecWord *p, const char *str, int length)
1617 {
1618     struct it_key key;
1619     ZebraHandle zh = p->extractCtrl->handle;
1620     ZebraExplainInfo zei = zh->reg->zei;
1621     int ch;
1622     zinfo_index_category_t cat = zinfo_index_category_sort;
1623
1624     ch = zebraExplain_lookup_attr_str(zei, cat, p->index_type, p->index_name);
1625     if (ch < 0)
1626         ch = zebraExplain_add_attr_str(zei, cat, p->index_type, p->index_name);
1627     key.len = 3;
1628     key.mem[0] = ch;
1629     key.mem[1] = p->record_id;
1630     key.mem[2] = p->section_id;
1631
1632     zebra_rec_keys_write(zh->reg->sortKeys, str, length, &key);
1633 }
1634
1635 static void extract_add_staticrank_string(RecWord *p,
1636                                           const char *str, int length)
1637 {
1638     char valz[40];
1639     struct recExtractCtrl *ctrl = p->extractCtrl;
1640
1641     if (length > sizeof(valz)-1)
1642         length = sizeof(valz)-1;
1643
1644     memcpy(valz, str, length);
1645     valz[length] = '\0';
1646     ctrl->staticrank = atozint(valz);
1647 }
1648
1649 static void extract_add_string(RecWord *p, zebra_map_t zm,
1650                                const char *string, int length)
1651 {
1652     assert(length > 0);
1653
1654     if (!p->index_name)
1655         return;
1656     if (log_level_details)
1657     {
1658
1659         WRBUF w = wrbuf_alloc();
1660         
1661         wrbuf_write_escaped(w, string, length);
1662         yaz_log(log_level_details, "extract_add_string: %s", wrbuf_cstr(w));
1663         wrbuf_destroy(w);
1664     }
1665     if (zebra_maps_is_index(zm))
1666     {
1667         extract_add_index_string(p, zinfo_index_category_index,
1668                                  string, length);
1669         if (zebra_maps_is_alwaysmatches(zm))
1670         {
1671             RecWord word;
1672             memcpy(&word, p, sizeof(word));
1673
1674             word.seqno = 1;
1675             extract_add_index_string(
1676                 &word, zinfo_index_category_alwaysmatches, "", 0);
1677         }
1678     }
1679     else if (zebra_maps_is_sort(zm))
1680     {
1681         extract_add_sort_string(p, string, length);
1682     }
1683     else if (zebra_maps_is_staticrank(zm))
1684     {
1685         extract_add_staticrank_string(p, string, length);
1686     }
1687 }
1688
1689 static void extract_add_incomplete_field(RecWord *p, zebra_map_t zm)
1690 {
1691     const char *b = p->term_buf;
1692     int remain = p->term_len;
1693     int first = 1;
1694     const char **map = 0;
1695     
1696     if (remain > 0)
1697         map = zebra_maps_input(zm, &b, remain, 0);
1698
1699     while (map)
1700     {
1701         char buf[IT_MAX_WORD+1];
1702         int i, remain;
1703
1704         /* Skip spaces */
1705         while (map && *map && **map == *CHR_SPACE)
1706         {
1707             remain = p->term_len - (b - p->term_buf);
1708             if (remain > 0)
1709                 map = zebra_maps_input(zm, &b, remain, 0);
1710             else
1711                 map = 0;
1712         }
1713         if (!map)
1714             break;
1715         i = 0;
1716         while (map && *map && **map != *CHR_SPACE)
1717         {
1718             const char *cp = *map;
1719
1720             while (i < IT_MAX_WORD && *cp)
1721                 buf[i++] = *(cp++);
1722             remain = p->term_len - (b - p->term_buf);
1723             if (remain > 0)
1724                 map = zebra_maps_input(zm, &b, remain, 0);
1725             else
1726                 map = 0;
1727         }
1728         if (!i)
1729             return;
1730
1731         if (first)
1732         {   
1733             first = 0;
1734             if (zebra_maps_is_first_in_field(zm))
1735             {
1736                 /* first in field marker */
1737                 extract_add_string(p, zm, FIRST_IN_FIELD_STR, FIRST_IN_FIELD_LEN);
1738                 p->seqno++;
1739             }
1740         }
1741         extract_add_string(p, zm, buf, i);
1742         p->seqno++;
1743     }
1744 }
1745
1746 static void extract_add_complete_field(RecWord *p, zebra_map_t zm)
1747 {
1748     const char *b = p->term_buf;
1749     char buf[IT_MAX_WORD+1];
1750     const char **map = 0;
1751     int i = 0, remain = p->term_len;
1752
1753     if (remain > 0)
1754         map = zebra_maps_input(zm, &b, remain, 1);
1755
1756     while (remain > 0 && i < IT_MAX_WORD)
1757     {
1758         while (map && *map && **map == *CHR_SPACE)
1759         {
1760             remain = p->term_len - (b - p->term_buf);
1761
1762             if (remain > 0)
1763             {
1764                 int first = i ? 0 : 1;  /* first position */
1765                 map = zebra_maps_input(zm, &b, remain, first);
1766             }
1767             else
1768                 map = 0;
1769         }
1770         if (!map)
1771             break;
1772
1773         if (i && i < IT_MAX_WORD)
1774             buf[i++] = *CHR_SPACE;
1775         while (map && *map && **map != *CHR_SPACE)
1776         {
1777             const char *cp = *map;
1778
1779             if (**map == *CHR_CUT)
1780             {
1781                 i = 0;
1782             }
1783             else
1784             {
1785                 if (i >= IT_MAX_WORD)
1786                     break;
1787                 while (i < IT_MAX_WORD && *cp)
1788                     buf[i++] = *(cp++);
1789             }
1790             remain = p->term_len  - (b - p->term_buf);
1791             if (remain > 0)
1792             {
1793                 map = zebra_maps_input(zm, &b, remain, 0);
1794             }
1795             else
1796                 map = 0;
1797         }
1798     }
1799     if (!i)
1800         return;
1801     extract_add_string(p, zm, buf, i);
1802     p->seqno++;
1803 }
1804
1805 static void extract_add_icu(RecWord *p, zebra_map_t zm)
1806 {
1807     const char *res_buf = 0;
1808     size_t res_len = 0;
1809
1810     zebra_map_tokenize_start(zm, p->term_buf, p->term_len);
1811     while (zebra_map_tokenize_next(zm, &res_buf, &res_len, 0, 0))
1812     {
1813         extract_add_string(p, zm, res_buf, res_len);
1814         p->seqno++;
1815     }
1816 }
1817
1818
1819 /** \brief top-level indexing handler for recctrl system
1820     \param p token data to be indexed
1821
1822     Call sequence:
1823     extract_token_add
1824     extract_add_{in}_complete / extract_add_icu
1825     extract_add_string
1826     
1827     extract_add_index_string
1828     or
1829     extract_add_sort_string
1830     or
1831     extract_add_staticrank_string
1832     
1833 */
1834 static void extract_token_add(RecWord *p)
1835 {
1836     ZebraHandle zh = p->extractCtrl->handle;
1837     zebra_map_t zm = zebra_map_get_or_add(zh->reg->zebra_maps, p->index_type);
1838     WRBUF wrbuf;
1839
1840     if (log_level_details)
1841     {
1842         yaz_log(log_level_details, "extract_token_add "
1843                 "type=%s index=%s seqno=" ZINT_FORMAT " s=%.*s",
1844                 p->index_type, p->index_name, 
1845                 p->seqno, p->term_len, p->term_buf);
1846     }
1847     if ((wrbuf = zebra_replace(zm, 0, p->term_buf, p->term_len)))
1848     {
1849         p->term_buf = wrbuf_buf(wrbuf);
1850         p->term_len = wrbuf_len(wrbuf);
1851     }
1852     if (zebra_maps_is_icu(zm))
1853     {
1854         extract_add_icu(p, zm);
1855     }
1856     else
1857     {
1858         if (zebra_maps_is_complete(zm))
1859             extract_add_complete_field(p, zm);
1860         else
1861             extract_add_incomplete_field(p, zm);
1862     }
1863 }
1864
1865 static void extract_set_store_data_cb(struct recExtractCtrl *p,
1866                                       void *buf, size_t sz)
1867 {
1868     ZebraHandle zh = (ZebraHandle) p->handle;
1869
1870     xfree(zh->store_data_buf);
1871     zh->store_data_buf = 0;
1872     zh->store_data_size = 0;
1873     if (buf && sz)
1874     {
1875         zh->store_data_buf = xmalloc(sz);
1876         zh->store_data_size = sz;
1877         memcpy(zh->store_data_buf, buf, sz);
1878     }
1879 }
1880
1881 static void extract_set_store_data_prepare(struct recExtractCtrl *p)
1882 {
1883     ZebraHandle zh = (ZebraHandle) p->handle;
1884     xfree(zh->store_data_buf);
1885     zh->store_data_buf = 0;
1886     zh->store_data_size = 0;
1887     p->setStoreData = extract_set_store_data_cb;
1888 }
1889
1890 static void extract_schema_add(struct recExtractCtrl *p, Odr_oid *oid)
1891 {
1892     ZebraHandle zh = (ZebraHandle) p->handle;
1893     zebraExplain_addSchema(zh->reg->zei, oid);
1894 }
1895
1896 void extract_flush_sort_keys(ZebraHandle zh, zint sysno,
1897                              int cmd, zebra_rec_keys_t reckeys)
1898 {
1899 #if 0
1900     yaz_log(YLOG_LOG, "extract_flush_sort_keys cmd=%d sysno=" ZINT_FORMAT,
1901             cmd, sysno);
1902     extract_rec_keys_log(zh, cmd, reckeys, YLOG_LOG);
1903 #endif
1904
1905     if (zebra_rec_keys_rewind(reckeys))
1906     {
1907         zebra_sort_index_t si = zh->reg->sort_index;
1908         size_t slen;
1909         const char *str;
1910         struct it_key key_in;
1911
1912         NMEM nmem = nmem_create();
1913         struct sort_add_ent {
1914             int ord;
1915             int cmd;
1916             struct sort_add_ent *next;
1917             WRBUF wrbuf;
1918             zint sysno;
1919             zint section_id;
1920         };
1921         struct sort_add_ent *sort_ent_list = 0;
1922
1923         while (zebra_rec_keys_read(reckeys, &str, &slen, &key_in))
1924         {
1925             int ord = CAST_ZINT_TO_INT(key_in.mem[0]);
1926             zint filter_sysno = key_in.mem[1];
1927             zint section_id = key_in.mem[2];
1928
1929             struct sort_add_ent **e = &sort_ent_list;
1930             for (; *e; e = &(*e)->next)
1931                 if ((*e)->ord == ord && section_id == (*e)->section_id)
1932                     break;
1933             if (!*e)
1934             {
1935                 *e = nmem_malloc(nmem, sizeof(**e));
1936                 (*e)->next = 0;
1937                 (*e)->wrbuf = wrbuf_alloc();
1938                 (*e)->ord = ord;
1939                 (*e)->cmd = cmd;
1940                 (*e)->sysno = filter_sysno ? filter_sysno : sysno;
1941                 (*e)->section_id = section_id;
1942             }
1943             
1944             wrbuf_write((*e)->wrbuf, str, slen);
1945             wrbuf_putc((*e)->wrbuf, '\0');
1946         }
1947         if (sort_ent_list)
1948         {
1949             zint last_sysno = 0;
1950             struct sort_add_ent *e = sort_ent_list;
1951             for (; e; e = e->next)
1952             {
1953                 if (last_sysno != e->sysno)
1954                 {
1955                     zebra_sort_sysno(si, e->sysno);
1956                     last_sysno = e->sysno;
1957                 }
1958                 zebra_sort_type(si, e->ord);
1959                 if (e->cmd == 1)
1960                     zebra_sort_add(si, e->section_id, e->wrbuf);
1961                 else
1962                     zebra_sort_delete(si, e->section_id);
1963                 wrbuf_destroy(e->wrbuf);
1964             }
1965         }
1966         nmem_destroy(nmem);
1967     }
1968 }
1969
1970 /*
1971  * Local variables:
1972  * c-basic-offset: 4
1973  * c-file-style: "Stroustrup"
1974  * indent-tabs-mode: nil
1975  * End:
1976  * vim: shiftwidth=4 tabstop=8 expandtab
1977  */
1978