afbb1e9bab723fdb14444d5ee4bb576fb8ce2692
[idzebra-moved-to-github.git] / index / zebraapi.c
1 /* This file is part of the Zebra server.
2    Copyright (C) 2004-2013 Index Data
3
4 Zebra is free software; you can redistribute it and/or modify it under
5 the terms of the GNU General Public License as published by the Free
6 Software Foundation; either version 2, or (at your option) any later
7 version.
8
9 Zebra is distributed in the hope that it will be useful, but WITHOUT ANY
10 WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12 for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
17
18 */
19
20 #if HAVE_CONFIG_H
21 #include <config.h>
22 #endif
23 #include <assert.h>
24 #include <stdio.h>
25 #include <limits.h>
26 #ifdef WIN32
27 #include <io.h>
28 #include <process.h>
29 #include <direct.h>
30 #endif
31 #if HAVE_UNISTD_H
32 #include <unistd.h>
33 #endif
34
35 #include <yaz/diagbib1.h>
36 #include <yaz/pquery.h>
37 #include <yaz/sortspec.h>
38 #include "index.h"
39 #include "rank.h"
40 #include "orddict.h"
41 #include <charmap.h>
42 #include <idzebra/api.h>
43 #include <yaz/oid_db.h>
44
45 #define DEFAULT_APPROX_LIMIT 2000000000
46
47 /* simple asserts to validate the most essential input args */
48 #define ASSERTZH assert(zh && zh->service)
49 #define ASSERTZHRES assert(zh && zh->service && zh->res)
50 #define ASSERTZS assert(zs)
51
52 static int log_level = 0;
53 static int log_level_initialized = 0;
54
55 static void zebra_open_res(ZebraHandle zh);
56 static void zebra_close_res(ZebraHandle zh);
57
58 static ZEBRA_RES zebra_check_handle(ZebraHandle zh)
59 {
60     if (zh)
61         return ZEBRA_OK;
62     return ZEBRA_FAIL;
63 }
64
65 #define ZEBRA_CHECK_HANDLE(zh) if (zebra_check_handle(zh) != ZEBRA_OK) return ZEBRA_FAIL
66
67 static int zebra_chdir(ZebraService zs)
68 {
69     const char *dir ;
70     int r;
71     ASSERTZS;
72     yaz_log(log_level, "zebra_chdir");
73     dir = res_get(zs->global_res, "chdir");
74     if (!dir)
75         return 0;
76     yaz_log(YLOG_DEBUG, "chdir %s", dir);
77 #ifdef WIN32
78     r = _chdir(dir);
79 #else
80     r = chdir(dir);
81 #endif
82     if (r)
83         yaz_log(YLOG_FATAL|YLOG_ERRNO, "chdir %s", dir);
84     return r;
85 }
86
87 static ZEBRA_RES zebra_flush_reg(ZebraHandle zh)
88 {
89     ZEBRA_CHECK_HANDLE(zh);
90     yaz_log(log_level, "zebra_flush_reg");
91     zebraExplain_flush(zh->reg->zei, zh);
92
93     key_block_flush(zh->reg->key_block, 1);
94
95     zebra_index_merge(zh);
96     return ZEBRA_OK;
97 }
98
99 static struct zebra_register *zebra_register_open(ZebraService zs,
100                                                   const char *name,
101                                                   int rw, int useshadow,
102                                                   Res res,
103                                                   const char *reg_path);
104 static void zebra_register_close(ZebraService zs, struct zebra_register *reg);
105
106 const char *zebra_get_encoding(ZebraHandle zh)
107 {
108     assert(zh && zh->session_res);
109     return res_get_def(zh->session_res, "encoding", "ISO-8859-1");
110 }
111
112 ZebraHandle zebra_open(ZebraService zs, Res res)
113 {
114     ZebraHandle zh;
115     const char *default_encoding;
116     if (!log_level_initialized)
117     {
118         log_level = yaz_log_module_level("zebraapi");
119         log_level_initialized = 1;
120     }
121
122     yaz_log(log_level, "zebra_open");
123
124     if (!zs)
125         return 0;
126
127     zh = (ZebraHandle) xmalloc(sizeof(*zh));
128     yaz_log(YLOG_DEBUG, "zebra_open zs=%p returns %p", zs, zh);
129
130     zh->service = zs;
131     zh->reg = 0;          /* no register attached yet */
132     zh->sets = 0;
133     zh->destroyed = 0;
134     zh->errCode = 0;
135     zh->errString = 0;
136     zh->res = 0;
137     zh->session_res = res_open(zs->global_res, res);
138     zh->user_perm = 0;
139     zh->dbaccesslist = 0;
140
141     zh->reg_name = xstrdup("");
142     zh->path_reg = 0;
143     zh->num_basenames = 0;
144     zh->basenames = 0;
145
146     zh->approx_limit = DEFAULT_APPROX_LIMIT;
147     zh->trans_no = 0;
148     zh->trans_w_no = 0;
149
150     zh->lock_normal = 0;
151     zh->lock_shadow = 0;
152
153     zh->shadow_enable = 1;
154     zh->m_staticrank = 0;
155     zh->m_segment_indexing = 0;
156
157     zh->break_handler_func = 0;
158     zh->break_handler_data = 0;
159
160     default_encoding = zebra_get_encoding(zh);
161
162     zh->iconv_to_utf8 =
163         yaz_iconv_open("UTF-8", default_encoding);
164     if (zh->iconv_to_utf8 == 0)
165         yaz_log(YLOG_WARN, "iconv: %s to UTF-8 unsupported",
166                 default_encoding);
167     zh->iconv_from_utf8 =
168         yaz_iconv_open(default_encoding, "UTF-8");
169     if (zh->iconv_to_utf8 == 0)
170         yaz_log(YLOG_WARN, "iconv: UTF-8 to %s unsupported",
171                 default_encoding);
172
173     zh->record_encoding = 0;
174
175     zebra_mutex_cond_lock(&zs->session_lock);
176
177     zh->next = zs->sessions;
178     zs->sessions = zh;
179
180     zebra_mutex_cond_unlock(&zs->session_lock);
181
182     zh->store_data_buf = 0;
183
184     zh->m_limit = zebra_limit_create(1, 0);
185
186     zh->nmem_error = nmem_create();
187
188     return zh;
189 }
190
191 ZebraService zebra_start(const char *configName)
192 {
193     return zebra_start_res(configName, 0, 0);
194 }
195
196 ZebraService zebra_start_res(const char *configName, Res def_res, Res over_res)
197 {
198     Res res;
199     char version_str[16];
200     char system_str[80];
201
202     zebra_flock_init();
203
204     if (!log_level_initialized)
205     {
206         log_level = yaz_log_module_level("zebraapi");
207         log_level_initialized = 1;
208     }
209
210     *system_str = '\0';
211     *version_str = '\0';
212     zebra_get_version(version_str, system_str);
213
214     yaz_log(YLOG_LOG, "zebra_start %s %s", version_str, system_str);
215     if (configName)
216         yaz_log(YLOG_LOG, "config %s", configName);
217
218     yaz_log_xml_errors(0, YLOG_LOG);
219
220     if ((res = res_open(def_res, over_res)))
221     {
222         const char *passwd_plain = 0;
223         const char *passwd_encrypt = 0;
224         const char *dbaccess = 0;
225         ZebraService zh = 0;
226
227         if (configName)
228         {
229             ZEBRA_RES ret = res_read_file(res, configName);
230             if (ret != ZEBRA_OK)
231             {
232                 res_close(res);
233                 return 0;
234             }
235             if (zebra_check_res(res))
236             {
237                 yaz_log(YLOG_FATAL, "Configuration error(s) for %s",
238                         configName);
239                 return 0;
240             }
241         }
242         else
243         {
244             zebra_check_res(res);
245         }
246
247         zh = xmalloc(sizeof(*zh));
248         zh->global_res = res;
249         zh->sessions = 0;
250
251         if (zebra_chdir(zh))
252         {
253             xfree(zh);
254             return 0;
255         }
256
257         zebra_mutex_cond_init(&zh->session_lock);
258         passwd_plain = res_get(zh->global_res, "passwd");
259         passwd_encrypt = res_get(zh->global_res, "passwd.c");
260         dbaccess = res_get(zh->global_res, "dbaccess");
261
262         if (!passwd_plain && !passwd_encrypt)
263             zh->passwd_db = NULL;
264         else
265         {
266             zh->passwd_db = passwd_db_open();
267             if (!zh->passwd_db)
268                 yaz_log(YLOG_WARN|YLOG_ERRNO, "passwd_db_open failed");
269             else
270             {
271                 if (passwd_plain)
272                     passwd_db_file_plain(zh->passwd_db, passwd_plain);
273                 if (passwd_encrypt)
274                     passwd_db_file_crypt(zh->passwd_db, passwd_encrypt);
275             }
276         }
277
278         if (!dbaccess)
279             zh->dbaccess = NULL;
280         else {
281             zh->dbaccess = res_open(NULL, NULL);
282             if (res_read_file(zh->dbaccess, dbaccess) != ZEBRA_OK) {
283                 yaz_log(YLOG_FATAL, "Failed to read %s", dbaccess);
284                 return NULL;
285             }
286         }
287
288         zh->timing = yaz_timing_create();
289         zh->path_root = res_get(zh->global_res, "root");
290         zh->nmem = nmem_create();
291         zh->record_classes = recTypeClass_create(zh->global_res, zh->nmem);
292
293         if (1)
294         {
295             const char *module_path = res_get(res, "modulePath");
296             if (module_path)
297                 recTypeClass_load_modules(&zh->record_classes, zh->nmem,
298                                           module_path);
299         }
300         return zh;
301     }
302     return 0;
303 }
304
305 void zebra_filter_info(ZebraService zs, void *cd,
306                        void(*cb)(void *cd, const char *name))
307 {
308     ASSERTZS;
309     assert(cb);
310     recTypeClass_info(zs->record_classes, cd, cb);
311 }
312
313 void zebra_pidfname(ZebraService zs, char *path)
314 {
315     ASSERTZS;
316     zebra_lock_prefix(zs->global_res, path);
317     strcat(path, "zebrasrv.pid");
318 }
319
320 Dict dict_open_res(BFiles bfs, const char *name, int cache, int rw,
321                    int compact_flag, Res res)
322 {
323     int page_size = 4096;
324     char resource_str[200];
325     sprintf(resource_str, "dict.%.100s.pagesize", name);
326     assert(bfs);
327     assert(name);
328
329     if (res_get_int(res, resource_str, &page_size) == ZEBRA_OK)
330         yaz_log(YLOG_LOG, "Using custom dictionary page size %d for %s",
331                 page_size, name);
332     return dict_open(bfs, name, cache, rw, compact_flag, page_size);
333 }
334
335 static
336 struct zebra_register *zebra_register_open(ZebraService zs, const char *name,
337                                            int rw, int useshadow, Res res,
338                                            const char *reg_path)
339 {
340     struct zebra_register *reg;
341     int record_compression = REC_COMPRESS_NONE;
342     const char *compression_str = 0;
343     const char *profilePath;
344     int sort_type = ZEBRA_SORT_TYPE_FLAT;
345     ZEBRA_RES ret = ZEBRA_OK;
346
347     ASSERTZS;
348
349     reg = xmalloc(sizeof(*reg));
350
351     assert(name);
352     reg->name = xstrdup(name);
353
354     reg->seqno = 0;
355     reg->last_val = 0;
356
357     assert(res);
358
359     yaz_log(YLOG_DEBUG, "zebra_register_open rw=%d useshadow=%d p=%p n=%s rp=%s",
360             rw, useshadow, reg, name, reg_path ? reg_path : "(none)");
361
362     reg->dh = data1_create();
363     if (!reg->dh)
364     {
365         xfree(reg->name);
366         xfree(reg);
367         return 0;
368     }
369     reg->bfs = bfs_create(res_get(res, "register"), reg_path);
370     if (!reg->bfs)
371     {
372         data1_destroy(reg->dh);
373         xfree(reg->name);
374         xfree(reg);
375         return 0;
376     }
377     if (useshadow)
378     {
379         if (bf_cache(reg->bfs, res_get(res, "shadow")) == ZEBRA_FAIL)
380         {
381             bfs_destroy(reg->bfs);
382             data1_destroy(reg->dh);
383             xfree(reg->name);
384             xfree(reg);
385             return 0;
386         }
387     }
388
389     profilePath = res_get_def(res, "profilePath", 0);
390
391     data1_set_tabpath(reg->dh, profilePath);
392     data1_set_tabroot(reg->dh, reg_path);
393     reg->recTypes = recTypes_init(zs->record_classes, reg->dh);
394
395     reg->zebra_maps =
396         zebra_maps_open(res, reg_path, profilePath);
397     if (!reg->zebra_maps)
398     {
399         recTypes_destroy(reg->recTypes);
400         bfs_destroy(reg->bfs);
401         data1_destroy(reg->dh);
402         xfree(reg->name);
403         xfree(reg);
404         return 0;
405     }
406     reg->rank_classes = NULL;
407
408     reg->key_block = 0;
409     reg->keys = zebra_rec_keys_open();
410
411     reg->sortKeys = zebra_rec_keys_open();
412
413     reg->records = 0;
414     reg->dict = 0;
415     reg->sort_index = 0;
416     reg->isams = 0;
417     reg->matchDict = 0;
418     reg->isamc = 0;
419     reg->isamb = 0;
420     reg->zei = 0;
421
422     /* installing rank classes */
423     zebraRankInstall(reg, rank_1_class);
424     zebraRankInstall(reg, rank_2_class);
425     zebraRankInstall(reg, rank_similarity_class);
426     zebraRankInstall(reg, rank_static_class);
427
428     compression_str = res_get_def(res, "recordCompression", "none");
429     if (!strcmp(compression_str, "none"))
430         record_compression = REC_COMPRESS_NONE;
431     else if (!strcmp(compression_str, "bzip2"))
432         record_compression = REC_COMPRESS_BZIP2;
433     else if (!strcmp(compression_str, "zlib"))
434         record_compression = REC_COMPRESS_ZLIB;
435     else
436     {
437         yaz_log(YLOG_FATAL, "invalid recordCompression: %s", compression_str);
438         ret = ZEBRA_FAIL;
439     }
440
441     if (!rec_check_compression_method(record_compression))
442     {
443         yaz_log(YLOG_FATAL, "unsupported recordCompression: %s",
444                 compression_str);
445         ret = ZEBRA_FAIL;
446     }
447
448     {
449         const char *index_fname = res_get_def(res, "index", "default.idx");
450         if (index_fname && *index_fname && strcmp(index_fname, "none"))
451         {
452             if (zebra_maps_read_file(reg->zebra_maps, index_fname) != ZEBRA_OK)
453                 ret = ZEBRA_FAIL;
454         }
455         else
456         {
457             zebra_maps_define_default_sort(reg->zebra_maps);
458         }
459     }
460
461     if (!(reg->records = rec_open(reg->bfs, rw, record_compression)))
462     {
463         yaz_log(YLOG_WARN, "rec_open failed");
464         ret = ZEBRA_FAIL;
465     }
466     if (rw)
467     {
468         reg->matchDict = dict_open_res(reg->bfs, GMATCH_DICT, 20, 1, 0, res);
469     }
470     if (!(reg->dict = dict_open_res(reg->bfs, FNAME_DICT, 40, rw, 0, res)))
471     {
472         yaz_log(YLOG_WARN, "dict_open failed");
473         ret = ZEBRA_FAIL;
474     }
475
476
477     if (res_get_match(res, "sortindex", "f", "f"))
478         sort_type = ZEBRA_SORT_TYPE_FLAT;
479     else if (res_get_match(res, "sortindex", "i", "f"))
480         sort_type = ZEBRA_SORT_TYPE_ISAMB;
481     else if (res_get_match(res, "sortindex", "m", "f"))
482         sort_type = ZEBRA_SORT_TYPE_MULTI;
483     else
484     {
485         yaz_log(YLOG_WARN, "bad_value for 'sortindex'");
486         ret = ZEBRA_FAIL;
487     }
488
489
490     if (!(reg->sort_index = zebra_sort_open(reg->bfs, rw, sort_type)))
491     {
492         yaz_log(YLOG_WARN, "zebra_sort_open failed");
493         ret = ZEBRA_FAIL;
494     }
495     if (res_get_match(res, "isam", "s", ISAM_DEFAULT))
496     {
497         struct ISAMS_M_s isams_m;
498         if (!(reg->isams = isams_open(reg->bfs, FNAME_ISAMS, rw,
499                                       key_isams_m(res, &isams_m))))
500         {
501             yaz_log(YLOG_WARN, "isams_open failed");
502             ret = ZEBRA_FAIL;
503         }
504     }
505     if (res_get_match(res, "isam", "c", ISAM_DEFAULT))
506     {
507         struct ISAMC_M_s isamc_m;
508         if (!(reg->isamc = isamc_open(reg->bfs, FNAME_ISAMC,
509                                       rw, key_isamc_m(res, &isamc_m))))
510         {
511             yaz_log(YLOG_WARN, "isamc_open failed");
512             ret = ZEBRA_FAIL;
513         }
514     }
515     if (res_get_match(res, "isam", "b", ISAM_DEFAULT))
516     {
517         struct ISAMC_M_s isamc_m;
518
519         if (!(reg->isamb = isamb_open(reg->bfs, "isamb",
520                                       rw, key_isamc_m(res, &isamc_m), 0)))
521         {
522             yaz_log(YLOG_WARN, "isamb_open failed");
523             ret = ZEBRA_FAIL;
524         }
525     }
526     if (res_get_match(res, "isam", "bc", ISAM_DEFAULT))
527     {
528         struct ISAMC_M_s isamc_m;
529
530         if (!(reg->isamb = isamb_open(reg->bfs, "isamb",
531                                       rw, key_isamc_m(res, &isamc_m), 1)))
532         {
533             yaz_log(YLOG_WARN, "isamb_open failed");
534             ret = ZEBRA_FAIL;
535         }
536     }
537     if (res_get_match(res, "isam", "null", ISAM_DEFAULT))
538     {
539         struct ISAMC_M_s isamc_m;
540
541         if (!(reg->isamb = isamb_open(reg->bfs, "isamb",
542                                       rw, key_isamc_m(res, &isamc_m), -1)))
543         {
544             yaz_log(YLOG_WARN, "isamb_open failed");
545             ret = ZEBRA_FAIL;
546         }
547     }
548     if (ret == ZEBRA_OK)
549     {
550         reg->zei = zebraExplain_open(reg->records, reg->dh,
551                                      res, rw, reg,
552                                      zebra_extract_explain);
553         if (!reg->zei)
554         {
555             yaz_log(YLOG_WARN, "Cannot obtain EXPLAIN information");
556             ret = ZEBRA_FAIL;
557         }
558     }
559
560     if (ret != ZEBRA_OK)
561     {
562         zebra_register_close(zs, reg);
563         return 0;
564     }
565     yaz_log(YLOG_DEBUG, "zebra_register_open ok p=%p", reg);
566     return reg;
567 }
568
569 ZEBRA_RES zebra_admin_shutdown(ZebraHandle zh)
570 {
571     ZEBRA_CHECK_HANDLE(zh);
572     yaz_log(log_level, "zebra_admin_shutdown");
573
574     zebra_mutex_cond_lock(&zh->service->session_lock);
575     zh->service->stop_flag = 1;
576     zebra_mutex_cond_unlock(&zh->service->session_lock);
577     return ZEBRA_OK;
578 }
579
580 ZEBRA_RES zebra_admin_start(ZebraHandle zh)
581 {
582     ZebraService zs;
583     ZEBRA_CHECK_HANDLE(zh);
584     yaz_log(log_level, "zebra_admin_start");
585     zs = zh->service;
586     zebra_mutex_cond_lock(&zs->session_lock);
587     zebra_mutex_cond_unlock(&zs->session_lock);
588     return ZEBRA_OK;
589 }
590
591 static void zebra_register_close(ZebraService zs, struct zebra_register *reg)
592 {
593     ASSERTZS;
594     assert(reg);
595     yaz_log(YLOG_DEBUG, "zebra_register_close p=%p", reg);
596     reg->stop_flag = 0;
597     zebra_chdir(zs);
598
599     zebraExplain_close(reg->zei);
600     dict_close(reg->dict);
601     if (reg->matchDict)
602         dict_close(reg->matchDict);
603     zebra_sort_close(reg->sort_index);
604     if (reg->isams)
605         isams_close(reg->isams);
606     if (reg->isamc)
607         isamc_close(reg->isamc);
608     if (reg->isamb)
609         isamb_close(reg->isamb);
610     rec_close(&reg->records);
611
612     recTypes_destroy(reg->recTypes);
613     zebra_maps_close(reg->zebra_maps);
614     zebraRankDestroy(reg);
615     bfs_destroy(reg->bfs);
616     data1_destroy(reg->dh);
617
618     zebra_rec_keys_close(reg->keys);
619     zebra_rec_keys_close(reg->sortKeys);
620
621     key_block_destroy(&reg->key_block);
622     xfree(reg->name);
623     xfree(reg);
624 }
625
626 ZEBRA_RES zebra_stop(ZebraService zs)
627 {
628     if (!zs)
629         return ZEBRA_OK;
630     while (zs->sessions)
631     {
632         zebra_close(zs->sessions);
633     }
634
635     zebra_mutex_cond_destroy(&zs->session_lock);
636
637     if (zs->passwd_db)
638         passwd_db_close(zs->passwd_db);
639
640     recTypeClass_destroy(zs->record_classes);
641     nmem_destroy(zs->nmem);
642     res_close(zs->global_res);
643
644     yaz_timing_stop(zs->timing);
645     yaz_log(YLOG_LOG, "zebra_stop: %4.2f %4.2f %4.2f",
646             yaz_timing_get_real(zs->timing),
647             yaz_timing_get_user(zs->timing),
648             yaz_timing_get_sys(zs->timing));
649
650
651     yaz_timing_destroy(&zs->timing);
652     xfree(zs);
653     return ZEBRA_OK;
654 }
655
656 ZEBRA_RES zebra_close(ZebraHandle zh)
657 {
658     ZebraService zs;
659     struct zebra_session **sp;
660     int i;
661
662     yaz_log(log_level, "zebra_close");
663     ZEBRA_CHECK_HANDLE(zh);
664
665     zh->errCode = 0;
666
667     zs = zh->service;
668     yaz_log(YLOG_DEBUG, "zebra_close zh=%p", zh);
669     resultSetDestroy(zh, -1, 0, 0);
670
671     if (zh->reg)
672         zebra_register_close(zh->service, zh->reg);
673     zebra_close_res(zh);
674     res_close(zh->session_res);
675
676     xfree(zh->record_encoding);
677
678     xfree(zh->dbaccesslist);
679
680     for (i = 0; i < zh->num_basenames; i++)
681         xfree(zh->basenames[i]);
682     xfree(zh->basenames);
683
684     if (zh->iconv_to_utf8 != 0)
685         yaz_iconv_close(zh->iconv_to_utf8);
686     if (zh->iconv_from_utf8 != 0)
687         yaz_iconv_close(zh->iconv_from_utf8);
688
689     zebra_mutex_cond_lock(&zs->session_lock);
690     zebra_lock_destroy(zh->lock_normal);
691     zebra_lock_destroy(zh->lock_shadow);
692     sp = &zs->sessions;
693     while (1)
694     {
695         assert(*sp);
696         if (*sp == zh)
697         {
698             *sp = (*sp)->next;
699             break;
700         }
701         sp = &(*sp)->next;
702     }
703     zebra_mutex_cond_unlock(&zs->session_lock);
704     xfree(zh->reg_name);
705     xfree(zh->user_perm);
706     zh->service = 0; /* more likely to trigger an assert */
707
708     zebra_limit_destroy(zh->m_limit);
709
710     nmem_destroy(zh->nmem_error);
711
712     xfree(zh->path_reg);
713     xfree(zh);
714     return ZEBRA_OK;
715 }
716
717 struct map_baseinfo {
718     ZebraHandle zh;
719     NMEM mem;
720     int num_bases;
721     char **basenames;
722     int new_num_bases;
723     char **new_basenames;
724     int new_num_max;
725 };
726
727 static void zebra_open_res(ZebraHandle zh)
728 {
729     char fname[512];
730     ASSERTZH;
731     zh->errCode = 0;
732
733     if (zh->path_reg)
734     {
735         sprintf(fname, "%.200s/zebra.cfg", zh->path_reg);
736         zh->res = res_open(zh->session_res, 0);
737         res_read_file(zh->res, fname);
738     }
739     else if (*zh->reg_name == 0)
740     {
741         zh->res = res_open(zh->session_res, 0);
742     }
743     else
744     {
745         yaz_log(YLOG_WARN, "no register root specified");
746         zh->res = 0;  /* no path for register - fail! */
747     }
748 }
749
750 static void zebra_close_res(ZebraHandle zh)
751 {
752     ASSERTZH;
753     zh->errCode = 0;
754     res_close(zh->res);
755     zh->res = 0;
756 }
757
758 static void zebra_select_register(ZebraHandle zh, const char *new_reg)
759 {
760     ASSERTZH;
761     zh->errCode = 0;
762     if (zh->res && strcmp(zh->reg_name, new_reg) == 0)
763         return;
764     if (!zh->res)
765     {
766         assert(zh->reg == 0);
767         assert(*zh->reg_name == 0);
768     }
769     else
770     {
771         if (zh->reg)
772         {
773             resultSetInvalidate(zh);
774             zebra_register_close(zh->service, zh->reg);
775             zh->reg = 0;
776         }
777         zebra_close_res(zh);
778     }
779     xfree(zh->reg_name);
780     zh->reg_name = xstrdup(new_reg);
781
782     xfree(zh->path_reg);
783     zh->path_reg = 0;
784     if (zh->service->path_root)
785     {
786         zh->path_reg = xmalloc(strlen(zh->service->path_root) +
787                                strlen(zh->reg_name) + 3);
788         strcpy(zh->path_reg, zh->service->path_root);
789         if (*zh->reg_name)
790         {
791             strcat(zh->path_reg, "/");
792             strcat(zh->path_reg, zh->reg_name);
793         }
794     }
795     zebra_open_res(zh);
796
797     if (zh->lock_normal)
798         zebra_lock_destroy(zh->lock_normal);
799     zh->lock_normal = 0;
800
801     if (zh->lock_shadow)
802         zebra_lock_destroy(zh->lock_shadow);
803     zh->lock_shadow = 0;
804
805     if (zh->res)
806     {
807         char fname[512];
808         const char *lock_area = res_get(zh->res, "lockDir");
809
810         if (!lock_area && zh->path_reg)
811             res_set(zh->res, "lockDir", zh->path_reg);
812         sprintf(fname, "norm.%s.LCK", zh->reg_name);
813         zh->lock_normal =
814             zebra_lock_create(res_get(zh->res, "lockDir"), fname);
815
816         sprintf(fname, "shadow.%s.LCK", zh->reg_name);
817         zh->lock_shadow =
818             zebra_lock_create(res_get(zh->res, "lockDir"), fname);
819
820         if (!zh->lock_normal || !zh->lock_shadow)
821         {
822             if (zh->lock_normal)
823             {
824                 zebra_lock_destroy(zh->lock_normal);
825                 zh->lock_normal = 0;
826             }
827             if (zh->lock_shadow)
828             {
829                 zebra_lock_destroy(zh->lock_shadow);
830                 zh->lock_shadow = 0;
831             }
832             zebra_close_res(zh);
833         }
834     }
835     if (zh->res)
836     {
837         int approx = 0;
838         if (res_get_int(zh->res, "estimatehits", &approx) == ZEBRA_OK)
839             zebra_set_approx_limit(zh, approx);
840     }
841     if (zh->res)
842     {
843         if (res_get_int(zh->res, "staticrank", &zh->m_staticrank) == ZEBRA_OK)
844             yaz_log(YLOG_LOG, "static rank set and is %d", zh->m_staticrank);
845     }
846     if (zh->res)
847     {
848         if (res_get_int(zh->res, "segment", &zh->m_segment_indexing) ==
849             ZEBRA_OK)
850         {
851             yaz_log(YLOG_DEBUG, "segment indexing set and is %d",
852                     zh->m_segment_indexing);
853         }
854     }
855 }
856
857 void map_basenames_func(void *vp, const char *name, const char *value)
858 {
859     struct map_baseinfo *p = (struct map_baseinfo *) vp;
860     int i, no;
861     char fromdb[128], todb[8][128];
862
863     assert(value);
864     assert(name);
865     assert(vp);
866
867     no =
868         sscanf(value, "%127s %127s %127s %127s %127s %127s %127s %127s %127s",
869                fromdb,  todb[0], todb[1], todb[2], todb[3], todb[4],
870                todb[5], todb[6], todb[7]);
871     if (no < 2)
872         return ;
873     no--;
874     for (i = 0; i<p->num_bases; i++)
875         if (p->basenames[i] && !STRCASECMP(p->basenames[i], fromdb))
876         {
877             p->basenames[i] = 0;
878             for (i = 0; i < no; i++)
879             {
880                 if (p->new_num_bases == p->new_num_max)
881                     return;
882                 p->new_basenames[(p->new_num_bases)++] =
883                     nmem_strdup(p->mem, todb[i]);
884             }
885             return;
886         }
887 }
888
889 int zebra_select_default_database(ZebraHandle zh)
890 {
891     if (!zh->res)
892     {
893         /* no database has been selected - so we select based on
894            resource setting (including group)
895         */
896         const char *group = res_get(zh->session_res, "group");
897         const char *v = res_get_prefix(zh->session_res,
898                                        "database", group, "Default");
899         return zebra_select_database(zh, v);
900     }
901     return 0;
902 }
903
904 void map_basenames(ZebraHandle zh, ODR stream,
905                    int *num_bases, char ***basenames)
906 {
907     struct map_baseinfo info;
908     struct map_baseinfo *p = &info;
909     int i;
910     ASSERTZH;
911     yaz_log(log_level, "map_basenames ");
912     assert(stream);
913
914     info.zh = zh;
915
916     info.num_bases = *num_bases;
917     info.basenames = *basenames;
918     info.new_num_max = 128;
919     info.new_num_bases = 0;
920     info.new_basenames = (char **)
921         odr_malloc(stream, sizeof(*info.new_basenames) * info.new_num_max);
922     info.mem = stream->mem;
923
924     res_trav(zh->session_res, "mapdb", &info, map_basenames_func);
925
926     for (i = 0; i<p->num_bases; i++)
927         if (p->basenames[i] && p->new_num_bases < p->new_num_max)
928         {
929             p->new_basenames[(p->new_num_bases)++] =
930                 nmem_strdup(p->mem, p->basenames[i]);
931         }
932     *num_bases = info.new_num_bases;
933     *basenames = info.new_basenames;
934     for (i = 0; i<*num_bases; i++)
935         yaz_log(YLOG_DEBUG, "base %s", (*basenames)[i]);
936 }
937
938 ZEBRA_RES zebra_select_database(ZebraHandle zh, const char *basename)
939 {
940     ZEBRA_CHECK_HANDLE(zh);
941
942     yaz_log(log_level, "zebra_select_database %s",basename);
943     assert(basename);
944     return zebra_select_databases(zh, 1, &basename);
945 }
946
947 ZEBRA_RES zebra_select_databases(ZebraHandle zh, int num_bases,
948                                  const char **basenames)
949 {
950     int i;
951     const char *cp;
952     int len = 0;
953     char *new_reg = 0;
954
955     ZEBRA_CHECK_HANDLE(zh);
956     assert(basenames);
957
958     yaz_log(log_level, "zebra_select_databases n=%d [0]=%s",
959             num_bases,basenames[0]);
960     zh->errCode = 0;
961
962     if (num_bases < 1)
963     {
964         zh->errCode = YAZ_BIB1_COMBI_OF_SPECIFIED_DATABASES_UNSUPP;
965         return ZEBRA_FAIL;
966     }
967
968     /* Check if the user has access to all databases (Seb) */
969     /* You could argue that this should happen later, after we have
970      * determined that the database(s) exist. */
971     if (zh->dbaccesslist) {
972         for (i = 0; i < num_bases; i++) {
973             const char *db = basenames[i];
974             char *p, *pp;
975             for (p = zh->dbaccesslist; p && *p; p = pp) {
976                 int len;
977                 if ((pp = strchr(p, '+'))) {
978                     len = pp - p;
979                     pp++;
980                 }
981                 else
982                     len = strlen(p);
983                 if (len == strlen(db) && !strncmp(db, p, len))
984                     break;
985             }
986             if (!p) {
987                 zh->errCode = YAZ_BIB1_ACCESS_TO_SPECIFIED_DATABASE_DENIED;
988                 return ZEBRA_FAIL;
989             }
990         }
991     }
992
993     for (i = 0; i < zh->num_basenames; i++)
994         xfree(zh->basenames[i]);
995     xfree(zh->basenames);
996
997     zh->num_basenames = num_bases;
998     zh->basenames = xmalloc(zh->num_basenames * sizeof(*zh->basenames));
999     for (i = 0; i < zh->num_basenames; i++)
1000         zh->basenames[i] = xstrdup(basenames[i]);
1001
1002     cp = strrchr(basenames[0], '/');
1003     if (cp)
1004     {
1005         len = cp - basenames[0];
1006         new_reg = xmalloc(len + 1);
1007         memcpy(new_reg, basenames[0], len);
1008         new_reg[len] = '\0';
1009     }
1010     else
1011         new_reg = xstrdup("");
1012     for (i = 1; i<num_bases; i++)
1013     {
1014         const char *cp1;
1015
1016         cp1 = strrchr(basenames[i], '/');
1017         if (cp)
1018         {
1019             if (!cp1)
1020             {
1021                 zh->errCode = YAZ_BIB1_COMBI_OF_SPECIFIED_DATABASES_UNSUPP;
1022                 return -1;
1023             }
1024             if (len != cp1 - basenames[i] ||
1025                 memcmp(basenames[i], new_reg, len))
1026             {
1027                 zh->errCode = YAZ_BIB1_COMBI_OF_SPECIFIED_DATABASES_UNSUPP;
1028                 return -1;
1029             }
1030         }
1031         else
1032         {
1033             if (cp1)
1034             {
1035                 zh->errCode = YAZ_BIB1_COMBI_OF_SPECIFIED_DATABASES_UNSUPP;
1036                 return ZEBRA_FAIL;
1037             }
1038         }
1039     }
1040     zebra_select_register(zh, new_reg);
1041     xfree(new_reg);
1042     if (!zh->res)
1043     {
1044         zh->errCode = YAZ_BIB1_DATABASE_UNAVAILABLE;
1045         return ZEBRA_FAIL;
1046     }
1047     if (!zh->lock_normal || !zh->lock_shadow)
1048     {
1049         zh->errCode = YAZ_BIB1_TEMPORARY_SYSTEM_ERROR;
1050         return ZEBRA_FAIL;
1051     }
1052     return ZEBRA_OK;
1053 }
1054
1055 ZEBRA_RES zebra_set_approx_limit(ZebraHandle zh, zint approx_limit)
1056 {
1057     if (approx_limit == 0)
1058         approx_limit = DEFAULT_APPROX_LIMIT;
1059     zh->approx_limit = approx_limit;
1060     return ZEBRA_OK;
1061 }
1062
1063 void zebra_set_partial_result(ZebraHandle zh)
1064 {
1065     zh->partial_result = 1;
1066 }
1067
1068
1069 ZEBRA_RES zebra_set_break_handler(ZebraHandle zh,
1070                                   int (*f)(void *client_data),
1071                                   void *client_data)
1072 {
1073     zh->break_handler_func = f;
1074     zh->break_handler_data = client_data;
1075     return ZEBRA_OK;
1076 }
1077
1078 ZEBRA_RES zebra_search_RPN_x(ZebraHandle zh, ODR o, Z_RPNQuery *query,
1079                              const char *setname, zint *hits,
1080                              int *estimated_hit_count,
1081                              int *partial_resultset)
1082 {
1083     ZEBRA_RES r;
1084
1085     ZEBRA_CHECK_HANDLE(zh);
1086
1087     assert(o);
1088     assert(query);
1089     assert(hits);
1090     assert(setname);
1091     yaz_log(log_level, "zebra_search_rpn");
1092
1093     zh->partial_result = 0;
1094
1095     if (zebra_begin_read(zh) == ZEBRA_FAIL)
1096         return ZEBRA_FAIL;
1097
1098     r = resultSetAddRPN(zh, odr_extract_mem(o), query,
1099                         zh->num_basenames, zh->basenames, setname,
1100                         hits, estimated_hit_count);
1101
1102     *partial_resultset = zh->partial_result;
1103     zebra_end_read(zh);
1104     return r;
1105 }
1106
1107 ZEBRA_RES zebra_search_RPN(ZebraHandle zh, ODR o, Z_RPNQuery *query,
1108                            const char *setname, zint *hits)
1109 {
1110     int estimated_hit_count;
1111     int partial_resultset;
1112     return zebra_search_RPN_x(zh, o, query, setname, hits,
1113                               &estimated_hit_count,
1114                               &partial_resultset);
1115 }
1116
1117 ZEBRA_RES zebra_records_retrieve(ZebraHandle zh, ODR stream,
1118                                  const char *setname,
1119                                  Z_RecordComposition *comp,
1120                                  const Odr_oid *input_format, int num_recs,
1121                                  ZebraRetrievalRecord *recs)
1122 {
1123     ZebraMetaRecord *poset;
1124     int i;
1125     ZEBRA_RES ret = ZEBRA_OK;
1126     zint *pos_array;
1127
1128     ZEBRA_CHECK_HANDLE(zh);
1129     assert(stream);
1130     assert(setname);
1131     assert(recs);
1132     assert(num_recs>0);
1133
1134     yaz_log(log_level, "zebra_records_retrieve n=%d", num_recs);
1135
1136     if (!zh->res)
1137     {
1138         zebra_setError(zh, YAZ_BIB1_SPECIFIED_RESULT_SET_DOES_NOT_EXIST,
1139                        setname);
1140         return ZEBRA_FAIL;
1141     }
1142
1143     if (zebra_begin_read(zh) == ZEBRA_FAIL)
1144         return ZEBRA_FAIL;
1145
1146     pos_array = (zint *) xmalloc(num_recs * sizeof(*pos_array));
1147     for (i = 0; i<num_recs; i++)
1148         pos_array[i] = recs[i].position;
1149     poset = zebra_meta_records_create(zh, setname, num_recs, pos_array);
1150     if (!poset)
1151     {
1152         yaz_log(YLOG_DEBUG, "zebraPosSetCreate error");
1153         zebra_setError(zh, YAZ_BIB1_SPECIFIED_RESULT_SET_DOES_NOT_EXIST,
1154                        setname);
1155         ret = ZEBRA_FAIL;
1156     }
1157     else
1158     {
1159         WRBUF addinfo_w = wrbuf_alloc();
1160         for (i = 0; i < num_recs; i++)
1161         {
1162             recs[i].errCode = 0;
1163             recs[i].errString = 0;
1164             recs[i].format = 0;
1165             recs[i].len = 0;
1166             recs[i].buf = 0;
1167             recs[i].base = 0;
1168             recs[i].sysno = poset[i].sysno;
1169             if (poset[i].term)
1170             {
1171                 recs[i].format = yaz_oid_recsyn_sutrs;
1172                 recs[i].len = strlen(poset[i].term);
1173                 recs[i].buf = poset[i].term;
1174                 recs[i].base = poset[i].db;
1175             }
1176             else if (poset[i].sysno)
1177             {
1178                 char *buf;
1179                 int len = 0;
1180                 zebra_snippets *hit_snippet = zebra_snippets_create();
1181
1182                 /* we disable hit snippets for now. It does not work well
1183                    and it slows retrieval down a lot */
1184 #if 0
1185                 zebra_snippets_hit_vector(zh, setname, poset[i].sysno,
1186                                           hit_snippet);
1187 #endif
1188                 wrbuf_rewind(addinfo_w);
1189                 recs[i].errCode =
1190                     zebra_record_fetch(zh, setname,
1191                                        poset[i].sysno, poset[i].score,
1192                                        stream, input_format, comp,
1193                                        &recs[i].format, &buf, &len,
1194                                        &recs[i].base, addinfo_w);
1195
1196                 if (wrbuf_len(addinfo_w))
1197                     recs[i].errString =
1198                         odr_strdup(stream, wrbuf_cstr(addinfo_w));
1199                 recs[i].len = len;
1200                 if (len > 0)
1201                 {
1202                     recs[i].buf = (char*) odr_malloc(stream, len);
1203                     memcpy(recs[i].buf, buf, len);
1204                 }
1205                 else
1206                     recs[i].buf = buf;
1207                 recs[i].score = poset[i].score;
1208                 zebra_snippets_destroy(hit_snippet);
1209             }
1210             else
1211             {
1212                 /* only need to set it once */
1213                 if (pos_array[i] < zh->approx_limit && ret == ZEBRA_OK)
1214                 {
1215                     zebra_setError_zint(zh,
1216                                         YAZ_BIB1_PRESENT_REQUEST_OUT_OF_RANGE,
1217                                         pos_array[i]);
1218                     ret = ZEBRA_FAIL;
1219                     break;
1220                 }
1221             }
1222         }
1223         zebra_meta_records_destroy(zh, poset, num_recs);
1224         wrbuf_destroy(addinfo_w);
1225     }
1226     zebra_end_read(zh);
1227     xfree(pos_array);
1228     return ret;
1229 }
1230
1231 ZEBRA_RES zebra_scan_PQF(ZebraHandle zh, ODR stream, const char *query,
1232                          int *position,
1233                          int *num_entries, ZebraScanEntry **entries,
1234                          int *is_partial,
1235                          const char *setname)
1236 {
1237     YAZ_PQF_Parser pqf_parser = yaz_pqf_create();
1238     Z_AttributesPlusTerm *zapt;
1239     Odr_oid *attributeSet;
1240     ZEBRA_RES res;
1241
1242     if (!(zapt = yaz_pqf_scan(pqf_parser, stream, &attributeSet, query)))
1243     {
1244         res = ZEBRA_FAIL;
1245         zh->errCode = YAZ_BIB1_SCAN_MALFORMED_SCAN;
1246     }
1247     else
1248     {
1249         res = zebra_scan(zh, stream, zapt, yaz_oid_attset_bib_1,
1250                          position, num_entries, entries, is_partial,
1251                          setname);
1252     }
1253     yaz_pqf_destroy(pqf_parser);
1254     return res;
1255 }
1256
1257 ZEBRA_RES zebra_scan(ZebraHandle zh, ODR stream, Z_AttributesPlusTerm *zapt,
1258                      const Odr_oid *attributeset,
1259                      int *position,
1260                      int *num_entries, ZebraScanEntry **entries,
1261                      int *is_partial,
1262                      const char *setname)
1263 {
1264     ZEBRA_RES res;
1265
1266     ZEBRA_CHECK_HANDLE(zh);
1267
1268     assert(stream);
1269     assert(zapt);
1270     assert(position);
1271     assert(num_entries);
1272     assert(is_partial);
1273     assert(entries);
1274     yaz_log(log_level, "zebra_scan");
1275
1276     if (zebra_begin_read(zh) == ZEBRA_FAIL)
1277     {
1278         *entries = 0;
1279         *num_entries = 0;
1280         return ZEBRA_FAIL;
1281     }
1282
1283     res = rpn_scan(zh, stream, zapt, attributeset,
1284                    zh->num_basenames, zh->basenames, position,
1285                    num_entries, entries, is_partial, setname);
1286     zebra_end_read(zh);
1287     return res;
1288 }
1289
1290 ZEBRA_RES zebra_sort(ZebraHandle zh, ODR stream,
1291                      int num_input_setnames, const char **input_setnames,
1292                      const char *output_setname,
1293                      Z_SortKeySpecList *sort_sequence,
1294                      int *sort_status)
1295 {
1296     ZEBRA_RES res;
1297     ZEBRA_CHECK_HANDLE(zh);
1298     assert(stream);
1299     assert(num_input_setnames>0);
1300     assert(input_setnames);
1301     assert(sort_sequence);
1302     assert(sort_status);
1303     yaz_log(log_level, "zebra_sort");
1304
1305     if (zebra_begin_read(zh) == ZEBRA_FAIL)
1306         return ZEBRA_FAIL;
1307     res = resultSetSort(zh, stream->mem, num_input_setnames, input_setnames,
1308                         output_setname, sort_sequence, sort_status);
1309     zebra_end_read(zh);
1310     return res;
1311 }
1312
1313 int zebra_deleteResultSet(ZebraHandle zh, int function,
1314                           int num_setnames, char **setnames,
1315                           int *statuses)
1316 {
1317     int i, status;
1318     ASSERTZH;
1319     yaz_log(log_level, "zebra_deleteResultSet n=%d", num_setnames);
1320
1321     if (zebra_begin_read(zh))
1322         return Z_DeleteStatus_systemProblemAtTarget;
1323     switch (function)
1324     {
1325     case Z_DeleteResultSetRequest_list:
1326         assert(num_setnames>0);
1327         assert(setnames);
1328         resultSetDestroy(zh, num_setnames, setnames, statuses);
1329         break;
1330     case Z_DeleteResultSetRequest_all:
1331         resultSetDestroy(zh, -1, 0, statuses);
1332         break;
1333     }
1334     zebra_end_read(zh);
1335     status = Z_DeleteStatus_success;
1336     for (i = 0; i<num_setnames; i++)
1337         if (statuses[i] == Z_DeleteStatus_resultSetDidNotExist)
1338             status = statuses[i];
1339     return status;
1340 }
1341
1342 int zebra_errCode(ZebraHandle zh)
1343 {
1344     if (zh)
1345     {
1346         yaz_log(log_level, "zebra_errCode: %d",zh->errCode);
1347         return zh->errCode;
1348     }
1349     yaz_log(log_level, "zebra_errCode: o");
1350     return 0;
1351 }
1352
1353 const char *zebra_errString(ZebraHandle zh)
1354 {
1355     const char *e = 0;
1356     if (zh)
1357         e= diagbib1_str(zh->errCode);
1358     yaz_log(log_level, "zebra_errString: %s",e);
1359     return e;
1360 }
1361
1362 char *zebra_errAdd(ZebraHandle zh)
1363 {
1364     char *a = 0;
1365     if (zh)
1366         a= zh->errString;
1367     yaz_log(log_level, "zebra_errAdd: %s",a);
1368     return a;
1369 }
1370
1371 ZEBRA_RES zebra_auth(ZebraHandle zh, const char *user, const char *pass)
1372 {
1373     const char *p;
1374     const char *astring;
1375     char u[40];
1376     ZebraService zs;
1377
1378     ZEBRA_CHECK_HANDLE(zh);
1379
1380     zs = zh->service;
1381
1382     sprintf(u, "perm.%.30s", user ? user : "anonymous");
1383     p = res_get(zs->global_res, u);
1384     xfree(zh->user_perm);
1385     zh->user_perm = xstrdup(p ? p : "r");
1386
1387     /* Determine database access list */
1388     astring = res_get(zs->dbaccess, user ? user : "anonymous");
1389     if (astring)
1390         zh->dbaccesslist = xstrdup(astring);
1391     else
1392         zh->dbaccesslist = 0;
1393
1394     /* users that don't require a password .. */
1395     if (zh->user_perm && strchr(zh->user_perm, 'a'))
1396         return ZEBRA_OK;
1397
1398     if (!zs->passwd_db || !passwd_db_auth(zs->passwd_db, user, pass))
1399         return ZEBRA_OK;
1400     return ZEBRA_FAIL;
1401 }
1402
1403 ZEBRA_RES zebra_admin_import_begin(ZebraHandle zh, const char *database,
1404                                    const char *record_type)
1405 {
1406     yaz_log(log_level, "zebra_admin_import_begin db=%s rt=%s",
1407             database, record_type);
1408     if (zebra_select_database(zh, database) == ZEBRA_FAIL)
1409         return ZEBRA_FAIL;
1410     return zebra_begin_trans(zh, 1);
1411 }
1412
1413 ZEBRA_RES zebra_admin_import_end(ZebraHandle zh)
1414 {
1415     ZEBRA_CHECK_HANDLE(zh);
1416     yaz_log(log_level, "zebra_admin_import_end");
1417     return zebra_end_trans(zh);
1418 }
1419
1420 ZEBRA_RES zebra_admin_import_segment(ZebraHandle zh, Z_Segment *segment)
1421 {
1422     ZEBRA_RES res = ZEBRA_OK;
1423     zint sysno;
1424     int i;
1425     ZEBRA_CHECK_HANDLE(zh);
1426     yaz_log(log_level, "zebra_admin_import_segment");
1427
1428     for (i = 0; i<segment->num_segmentRecords; i++)
1429     {
1430         Z_NamePlusRecord *npr = segment->segmentRecords[i];
1431
1432         if (npr->which == Z_NamePlusRecord_intermediateFragment)
1433         {
1434             Z_FragmentSyntax *fragment = npr->u.intermediateFragment;
1435             if (fragment->which == Z_FragmentSyntax_notExternallyTagged)
1436             {
1437                 Odr_oct *oct = fragment->u.notExternallyTagged;
1438                 sysno = 0;
1439
1440                 if(zebra_update_record(
1441                        zh,
1442                        action_update,
1443                        0, /* record Type */
1444                        &sysno,
1445                        0, /* match */
1446                        0, /* fname */
1447                        (const char *) oct->buf, oct->len) == ZEBRA_FAIL)
1448                     res = ZEBRA_FAIL;
1449             }
1450         }
1451     }
1452     return res;
1453 }
1454
1455 int delete_w_handle(const char *info, void *handle)
1456 {
1457     ZebraHandle zh = (ZebraHandle) handle;
1458     ISAM_P pos;
1459
1460     if (*info == sizeof(pos))
1461     {
1462         memcpy(&pos, info+1, sizeof(pos));
1463         isamb_unlink(zh->reg->isamb, pos);
1464     }
1465     return 0;
1466 }
1467
1468 int delete_w_all_handle(const char *info, void *handle)
1469 {
1470     ZebraHandle zh = (ZebraHandle) handle;
1471     ISAM_P pos;
1472
1473     if (*info == sizeof(pos))
1474     {
1475         ISAMB_PP pt;
1476         memcpy(&pos, info+1, sizeof(pos));
1477         pt = isamb_pp_open(zh->reg->isamb, pos, 2);
1478         if (pt)
1479         {
1480             struct it_key key;
1481             key.mem[0] = 0;
1482             while (isamb_pp_read(pt, &key))
1483             {
1484                 Record rec;
1485                 rec = rec_get(zh->reg->records, key.mem[0]);
1486                 rec_del(zh->reg->records, &rec);
1487             }
1488             isamb_pp_close(pt);
1489         }
1490     }
1491     return delete_w_handle(info, handle);
1492 }
1493
1494 static int delete_SU_handle(void *handle, int ord,
1495                             const char *index_type, const char *string_index,
1496                             zinfo_index_category_t cat)
1497 {
1498     ZebraHandle zh = (ZebraHandle) handle;
1499     char ord_buf[20];
1500     int ord_len;
1501 #if 0
1502     yaz_log(YLOG_LOG, "ord=%d index_type=%s index=%s cat=%d", ord,
1503             index_type, string_index, (int) cat);
1504 #endif
1505     ord_len = key_SU_encode(ord, ord_buf);
1506     ord_buf[ord_len] = '\0';
1507
1508     assert(zh->reg->isamb);
1509     assert(zh->reg->records);
1510     dict_delete_subtree(zh->reg->dict, ord_buf,
1511                         zh,
1512                         !strcmp(string_index, "_ALLRECORDS") ?
1513                         delete_w_all_handle : delete_w_handle);
1514     return 0;
1515 }
1516
1517 ZEBRA_RES zebra_drop_database(ZebraHandle zh, const char *db)
1518 {
1519     ZEBRA_RES ret = ZEBRA_OK;
1520
1521     yaz_log(log_level, "zebra_drop_database %s", db);
1522     ZEBRA_CHECK_HANDLE(zh);
1523
1524     if (zebra_select_database(zh, db) == ZEBRA_FAIL)
1525         return ZEBRA_FAIL;
1526     if (zebra_begin_trans(zh, 1) == ZEBRA_FAIL)
1527         return ZEBRA_FAIL;
1528     if (zh->reg->isamb)
1529     {
1530         int db_ord;
1531         if (zebraExplain_curDatabase(zh->reg->zei, db))
1532         {
1533             zebra_setError(zh, YAZ_BIB1_DATABASE_DOES_NOT_EXIST, db);
1534             ret = ZEBRA_FAIL;
1535         }
1536         else
1537         {
1538             db_ord = zebraExplain_get_database_ord(zh->reg->zei);
1539             dict_delete_subtree_ord(zh->reg->matchDict, db_ord,
1540                                     0 /* handle */, 0 /* func */);
1541             zebraExplain_trav_ord(zh->reg->zei, zh, delete_SU_handle);
1542             zebraExplain_removeDatabase(zh->reg->zei, zh);
1543             zebra_remove_file_match(zh);
1544         }
1545     }
1546     else
1547     {
1548         yaz_log(YLOG_WARN, "drop database only supported for isam:b");
1549         zebra_setError(zh, YAZ_BIB1_ES_IMMEDIATE_EXECUTION_FAILED,
1550                        "drop database only supported for isam:b");
1551         ret = ZEBRA_FAIL;
1552     }
1553     if (zebra_end_trans(zh) != ZEBRA_OK)
1554     {
1555         yaz_log(YLOG_WARN, "zebra_end_trans failed");
1556         ret = ZEBRA_FAIL;
1557     }
1558     return ret;
1559 }
1560
1561 ZEBRA_RES zebra_create_database(ZebraHandle zh, const char *db)
1562 {
1563     yaz_log(log_level, "zebra_create_database %s", db);
1564     ZEBRA_CHECK_HANDLE(zh);
1565     assert(db);
1566
1567     if (zebra_select_database(zh, db) == ZEBRA_FAIL)
1568         return ZEBRA_FAIL;
1569     if (zebra_begin_trans(zh, 1))
1570         return ZEBRA_FAIL;
1571
1572     /* announce database */
1573     if (zebraExplain_newDatabase(zh->reg->zei, db, 0
1574                                  /* explainDatabase */))
1575     {
1576         if (zebra_end_trans(zh) != ZEBRA_OK)
1577         {
1578             yaz_log(YLOG_WARN, "zebra_end_trans failed");
1579         }
1580         zebra_setError(zh, YAZ_BIB1_ES_IMMEDIATE_EXECUTION_FAILED, db);
1581         return ZEBRA_FAIL;
1582     }
1583     return zebra_end_trans(zh);
1584 }
1585
1586 int zebra_string_norm(ZebraHandle zh, const char *index_type,
1587                       const char *input_str, int input_len,
1588                       char *output_str, int output_len)
1589 {
1590     WRBUF wrbuf;
1591     zebra_map_t zm = zebra_map_get(zh->reg->zebra_maps, index_type);
1592     ASSERTZH;
1593     assert(input_str);
1594     assert(output_str);
1595     yaz_log(log_level, "zebra_string_norm ");
1596
1597     if (!zh->reg->zebra_maps)
1598         return -1;
1599     wrbuf = zebra_replace(zm, "", input_str, input_len);
1600     if (!wrbuf)
1601         return -2;
1602     if (wrbuf_len(wrbuf) >= output_len)
1603         return -3;
1604     if (wrbuf_len(wrbuf))
1605         memcpy(output_str, wrbuf_buf(wrbuf), wrbuf_len(wrbuf));
1606     output_str[wrbuf_len(wrbuf)] = '\0';
1607     return wrbuf_len(wrbuf);
1608 }
1609
1610 /** \brief set register state (state*.LCK)
1611     \param zh Zebra handle
1612     \param val state
1613     \param seqno sequence number
1614
1615     val is one of:
1616     d=writing to shadow(shadow enabled); writing to register (shadow disabled)
1617     o=reading only
1618     c=commit (writing to register, reading from shadow, shadow mode only)
1619 */
1620 static void zebra_set_state(ZebraHandle zh, int val, int seqno)
1621 {
1622     char state_fname[256];
1623     char *fname;
1624     long p = getpid();
1625     FILE *f;
1626     ASSERTZH;
1627     yaz_log(log_level, "zebra_set_state v=%c seq=%d", val, seqno);
1628
1629     sprintf(state_fname, "state.%s.LCK", zh->reg_name);
1630     fname = zebra_mk_fname(res_get(zh->res, "lockDir"), state_fname);
1631     f = fopen(fname, "w");
1632     if (!f)
1633     {
1634         yaz_log(YLOG_FATAL|YLOG_ERRNO, "open %s w", state_fname);
1635         exit(1);
1636     }
1637     yaz_log(YLOG_DEBUG, "zebra_set_state: %c %d %ld", val, seqno, p);
1638     fprintf(f, "%c %d %ld\n", val, seqno, p);
1639     fclose(f);
1640     xfree(fname);
1641 }
1642
1643 static void zebra_get_state(ZebraHandle zh, char *val, int *seqno)
1644 {
1645     char state_fname[256];
1646     char *fname;
1647     FILE *f;
1648
1649     ASSERTZH;
1650     yaz_log(log_level, "zebra_get_state ");
1651
1652     sprintf(state_fname, "state.%s.LCK", zh->reg_name);
1653     fname = zebra_mk_fname(res_get(zh->res, "lockDir"), state_fname);
1654     f = fopen(fname, "r");
1655     *val = 'o';
1656     *seqno = 0;
1657
1658     if (f)
1659     {
1660         if (fscanf(f, "%c %d", val, seqno) != 2)
1661         {
1662             yaz_log(YLOG_ERRNO|YLOG_WARN, "fscan fail %s",
1663                     state_fname);
1664         }
1665         fclose(f);
1666     }
1667     xfree(fname);
1668 }
1669
1670 ZEBRA_RES zebra_begin_read(ZebraHandle zh)
1671 {
1672     return zebra_begin_trans(zh, 0);
1673 }
1674
1675 ZEBRA_RES zebra_end_read(ZebraHandle zh)
1676 {
1677     return zebra_end_trans(zh);
1678 }
1679
1680 static void read_res_for_transaction(ZebraHandle zh)
1681 {
1682     const char *group = res_get(zh->res, "group");
1683     const char *v;
1684     /* FIXME - do we still use groups ?? */
1685
1686     zh->m_group = group;
1687     v = res_get_prefix(zh->res, "followLinks", group, "1");
1688     zh->m_follow_links = atoi(v);
1689
1690     zh->m_record_id = res_get_prefix(zh->res, "recordId", group, 0);
1691     zh->m_record_type = res_get_prefix(zh->res, "recordType", group, 0);
1692
1693     v = res_get_prefix(zh->res, "storeKeys", group, "1");
1694     zh->m_store_keys = atoi(v);
1695
1696     v = res_get_prefix(zh->res, "storeData", group, "1");
1697     zh->m_store_data = atoi(v);
1698
1699     v = res_get_prefix(zh->res, "explainDatabase", group, "0");
1700     zh->m_explain_database = atoi(v);
1701
1702     v = res_get_prefix(zh->res, "openRW", group, "1");
1703     zh->m_flag_rw = atoi(v);
1704
1705     v = res_get_prefix(zh->res, "fileVerboseLimit", group, "1000");
1706     zh->m_file_verbose_limit = atoi(v);
1707 }
1708
1709 ZEBRA_RES zebra_begin_trans(ZebraHandle zh, int rw)
1710 {
1711     ZEBRA_CHECK_HANDLE(zh);
1712     zebra_select_default_database(zh);
1713     if (!zh->res)
1714     {
1715         zebra_setError(zh, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR,
1716                        "zebra_begin_trans: no database selected");
1717         return ZEBRA_FAIL;
1718     }
1719     ASSERTZHRES;
1720     yaz_log(log_level, "zebra_begin_trans rw=%d",rw);
1721
1722     if (zh->user_perm)
1723     {
1724         if (rw && !strchr(zh->user_perm, 'w'))
1725         {
1726             zebra_setError(
1727                 zh,
1728                 YAZ_BIB1_ES_PERMISSION_DENIED_ON_ES_CANNOT_MODIFY_OR_DELETE,
1729                 0);
1730             return ZEBRA_FAIL;
1731         }
1732     }
1733
1734     assert(zh->res);
1735     if (rw)
1736     {
1737         int seqno = 0;
1738         char val = '?';
1739         const char *rval = 0;
1740
1741         (zh->trans_no++);
1742         if (zh->trans_w_no)
1743         {
1744             read_res_for_transaction(zh);
1745             return 0;
1746         }
1747         if (zh->trans_no != 1)
1748         {
1749             zebra_setError(zh, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR,
1750                            "zebra_begin_trans: no write trans within read");
1751             return ZEBRA_FAIL;
1752         }
1753         if (zh->reg)
1754         {
1755             resultSetInvalidate(zh);
1756             zebra_register_close(zh->service, zh->reg);
1757         }
1758         zh->trans_w_no = zh->trans_no;
1759
1760         zh->records_inserted = 0;
1761         zh->records_updated = 0;
1762         zh->records_deleted = 0;
1763         zh->records_processed = 0;
1764         zh->records_skipped = 0;
1765
1766 #if HAVE_SYS_TIMES_H
1767         times(&zh->tms1);
1768 #endif
1769         /* lock */
1770         if (zh->shadow_enable)
1771             rval = res_get(zh->res, "shadow");
1772
1773         if (rval)
1774         {
1775             zebra_lock_r(zh->lock_normal);
1776             zebra_lock_w(zh->lock_shadow);
1777         }
1778         else
1779         {
1780             zebra_lock_w(zh->lock_normal);
1781             zebra_lock_w(zh->lock_shadow);
1782         }
1783         zebra_get_state(zh, &val, &seqno);
1784         if (val != 'o')
1785         {
1786             /* either we didn't finish commit or shadow is dirty */
1787             if (!rval)
1788             {
1789                 yaz_log(YLOG_WARN, "previous transaction did not finish "
1790                         "(shadow disabled)");
1791             }
1792             zebra_unlock(zh->lock_shadow);
1793             zebra_unlock(zh->lock_normal);
1794             if (zebra_commit(zh))
1795             {
1796                 zh->trans_no--;
1797                 zh->trans_w_no = 0;
1798                 return ZEBRA_FAIL;
1799             }
1800             if (rval)
1801             {
1802                 zebra_lock_r(zh->lock_normal);
1803                 zebra_lock_w(zh->lock_shadow);
1804             }
1805             else
1806             {
1807                 zebra_lock_w(zh->lock_normal);
1808                 zebra_lock_w(zh->lock_shadow);
1809             }
1810         }
1811
1812         zebra_set_state(zh, 'd', seqno);
1813
1814         zh->reg = zebra_register_open(zh->service, zh->reg_name,
1815                                       1, rval ? 1 : 0, zh->res,
1816                                       zh->path_reg);
1817         if (!zh->reg)
1818         {
1819             zebra_unlock(zh->lock_shadow);
1820             zebra_unlock(zh->lock_normal);
1821
1822             zh->trans_no--;
1823             zh->trans_w_no = 0;
1824
1825             zebra_setError(zh, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR,
1826                            "zebra_begin_trans: cannot open register");
1827             yaz_log(YLOG_FATAL, "%s", zh->errString);
1828             return ZEBRA_FAIL;
1829         }
1830         zh->reg->seqno = seqno;
1831         zebraExplain_curDatabase(zh->reg->zei, zh->basenames[0]);
1832     }
1833     else
1834     {
1835         int dirty = 0;
1836         char val;
1837         int seqno;
1838
1839         (zh->trans_no)++;
1840
1841         if (zh->trans_no != 1)
1842         {
1843             return zebra_flush_reg(zh);
1844         }
1845 #if HAVE_SYS_TIMES_H
1846         times(&zh->tms1);
1847 #endif
1848         if (!zh->res)
1849         {
1850             (zh->trans_no)--;
1851             zh->errCode = YAZ_BIB1_DATABASE_UNAVAILABLE;
1852             return ZEBRA_FAIL;
1853         }
1854         if (!zh->lock_normal || !zh->lock_shadow)
1855         {
1856             (zh->trans_no)--;
1857             zh->errCode = YAZ_BIB1_TEMPORARY_SYSTEM_ERROR;
1858             return ZEBRA_FAIL;
1859         }
1860         zebra_get_state(zh, &val, &seqno);
1861         if (val == 'd')
1862             val = 'o';
1863
1864         if (!zh->reg)
1865             dirty = 1;
1866         else if (seqno != zh->reg->seqno)
1867         {
1868             yaz_log(YLOG_DEBUG, "reopen seqno cur/old %d/%d",
1869                     seqno, zh->reg->seqno);
1870             dirty = 1;
1871         }
1872         else if (zh->reg->last_val != val)
1873         {
1874             yaz_log(YLOG_DEBUG, "reopen last cur/old %d/%d",
1875                     val, zh->reg->last_val);
1876             dirty = 1;
1877         }
1878         if (!dirty)
1879             return ZEBRA_OK;
1880
1881         if (val == 'c')
1882             zebra_lock_r(zh->lock_shadow);
1883         else
1884             zebra_lock_r(zh->lock_normal);
1885
1886         if (zh->reg)
1887         {
1888             resultSetInvalidate(zh);
1889             zebra_register_close(zh->service, zh->reg);
1890         }
1891         zh->reg = zebra_register_open(zh->service, zh->reg_name,
1892                                       0, val == 'c' ? 1 : 0,
1893                                       zh->res, zh->path_reg);
1894         if (!zh->reg)
1895         {
1896             zebra_unlock(zh->lock_normal);
1897             zebra_unlock(zh->lock_shadow);
1898             zh->trans_no--;
1899             zh->errCode = YAZ_BIB1_DATABASE_UNAVAILABLE;
1900             return ZEBRA_FAIL;
1901         }
1902         zh->reg->last_val = val;
1903         zh->reg->seqno = seqno;
1904     }
1905     read_res_for_transaction(zh);
1906     return ZEBRA_OK;
1907 }
1908
1909 ZEBRA_RES zebra_end_trans(ZebraHandle zh)
1910 {
1911     ZebraTransactionStatus dummy;
1912
1913     yaz_log(log_level, "zebra_end_trans");
1914     ZEBRA_CHECK_HANDLE(zh);
1915     return zebra_end_transaction(zh, &dummy);
1916 }
1917
1918 ZEBRA_RES zebra_end_transaction(ZebraHandle zh, ZebraTransactionStatus *status)
1919 {
1920     char val;
1921     int seqno;
1922     const char *rval;
1923
1924     ZEBRA_CHECK_HANDLE(zh);
1925
1926     assert(status);
1927     yaz_log(log_level, "zebra_end_transaction");
1928
1929     status->processed = 0;
1930     status->inserted  = 0;
1931     status->updated   = 0;
1932     status->deleted   = 0;
1933     status->utime     = 0;
1934     status->stime     = 0;
1935
1936     if (!zh->res || !zh->reg)
1937     {
1938         zebra_setError(zh, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR,
1939                        "zebra_end_trans: no open transaction");
1940         return ZEBRA_FAIL;
1941     }
1942     if (zh->trans_no != zh->trans_w_no)
1943     {
1944         zh->trans_no--;
1945         if (zh->trans_no != 0)
1946             return ZEBRA_OK;
1947
1948         /* release read lock */
1949
1950         zebra_unlock(zh->lock_normal);
1951         zebra_unlock(zh->lock_shadow);
1952     }
1953     else
1954     {   /* release write lock */
1955         zh->trans_no--;
1956         zh->trans_w_no = 0;
1957
1958         yaz_log(YLOG_DEBUG, "zebra_end_trans");
1959         rval = res_get(zh->res, "shadow");
1960
1961         zebraExplain_runNumberIncrement(zh->reg->zei, 1);
1962
1963         zebra_flush_reg(zh);
1964
1965         resultSetInvalidate(zh);
1966
1967         zebra_register_close(zh->service, zh->reg);
1968         zh->reg = 0;
1969
1970         yaz_log(YLOG_LOG, "Records: "ZINT_FORMAT" i/u/d "
1971                 ZINT_FORMAT"/"ZINT_FORMAT"/"ZINT_FORMAT,
1972                 zh->records_processed, zh->records_inserted,
1973                 zh->records_updated, zh->records_deleted);
1974
1975         status->processed = zh->records_processed;
1976         status->inserted = zh->records_inserted;
1977         status->updated = zh->records_updated;
1978         status->deleted = zh->records_deleted;
1979
1980         zebra_get_state(zh, &val, &seqno);
1981         if (val != 'd')
1982         {
1983             BFiles bfs = bfs_create(rval, zh->path_reg);
1984             bf_commitClean(bfs, rval);
1985             bfs_destroy(bfs);
1986         }
1987         if (!rval)
1988             seqno++;
1989         zebra_set_state(zh, 'o', seqno);
1990         zebra_unlock(zh->lock_shadow);
1991         zebra_unlock(zh->lock_normal);
1992
1993     }
1994 #if HAVE_SYS_TIMES_H
1995     times(&zh->tms2);
1996     yaz_log(log_level, "user/system: %ld/%ld",
1997             (long) (zh->tms2.tms_utime - zh->tms1.tms_utime),
1998             (long) (zh->tms2.tms_stime - zh->tms1.tms_stime));
1999
2000     status->utime = (long) (zh->tms2.tms_utime - zh->tms1.tms_utime);
2001     status->stime = (long) (zh->tms2.tms_stime - zh->tms1.tms_stime);
2002 #endif
2003     return ZEBRA_OK;
2004 }
2005
2006 ZEBRA_RES zebra_repository_update(ZebraHandle zh, const char *path)
2007 {
2008     return zebra_repository_index(zh, path, action_update);
2009 }
2010
2011 ZEBRA_RES zebra_repository_delete(ZebraHandle zh, const char *path)
2012 {
2013     return zebra_repository_index(zh, path, action_delete);
2014 }
2015
2016 ZEBRA_RES zebra_repository_index(ZebraHandle zh, const char *path,
2017                                  enum zebra_recctrl_action_t action)
2018 {
2019     ASSERTZH;
2020     assert(path);
2021
2022     if (action == action_update)
2023         yaz_log(log_level, "updating %s", path);
2024     else if (action == action_delete)
2025         yaz_log(log_level, "deleting %s", path);
2026     else if (action == action_a_delete)
2027         yaz_log(log_level, "attempt deleting %s", path);
2028     else
2029         yaz_log(log_level, "update action=%d", (int) action);
2030
2031     if (zh->m_record_id && !strcmp(zh->m_record_id, "file"))
2032         return zebra_update_file_match(zh, path);
2033     else
2034         return zebra_update_from_path(zh, path, action);
2035 }
2036
2037 ZEBRA_RES zebra_repository_show(ZebraHandle zh, const char *path)
2038 {
2039     ASSERTZH;
2040     assert(path);
2041     yaz_log(log_level, "zebra_repository_show");
2042     repositoryShow(zh, path);
2043     return ZEBRA_OK;
2044 }
2045
2046 static ZEBRA_RES zebra_commit_ex(ZebraHandle zh, int clean_only)
2047 {
2048     int seqno;
2049     char val;
2050     const char *rval;
2051     BFiles bfs;
2052     ZEBRA_RES res = ZEBRA_OK;
2053
2054     ASSERTZH;
2055
2056     yaz_log(log_level, "zebra_commit_ex clean_only=%d", clean_only);
2057     zebra_select_default_database(zh);
2058     if (!zh->res)
2059     {
2060         zh->errCode = YAZ_BIB1_DATABASE_UNAVAILABLE;
2061         return ZEBRA_FAIL;
2062     }
2063     rval = res_get(zh->res, "shadow");
2064     if (!rval)
2065     {
2066         yaz_log(YLOG_WARN, "Cannot perform commit - No shadow area defined");
2067         return ZEBRA_OK;
2068     }
2069
2070     zebra_lock_w(zh->lock_normal);
2071     zebra_lock_r(zh->lock_shadow);
2072
2073     bfs = bfs_create(res_get(zh->res, "register"), zh->path_reg);
2074     if (!bfs)
2075     {
2076         zebra_unlock(zh->lock_shadow);
2077         zebra_unlock(zh->lock_normal);
2078         return ZEBRA_FAIL;
2079     }
2080     zebra_get_state(zh, &val, &seqno);
2081
2082     if (val == 'd')
2083     {
2084         /* shadow area is dirty and so we must throw it away */
2085         yaz_log(YLOG_WARN, "previous transaction didn't reach commit");
2086         clean_only = 1;
2087     }
2088     else if (val == 'c')
2089     {
2090         /* commit has started. We can not remove it anymore */
2091         clean_only = 0;
2092     }
2093
2094     if (rval && *rval)
2095         bf_cache(bfs, rval);
2096     if (bf_commitExists(bfs))
2097     {
2098         if (clean_only)
2099             zebra_set_state(zh, 'd', seqno);
2100         else
2101         {
2102             zebra_set_state(zh, 'c', seqno);
2103
2104             yaz_log(log_level, "commit start");
2105             if (bf_commitExec(bfs))
2106                 res = ZEBRA_FAIL;
2107         }
2108         if (res == ZEBRA_OK)
2109         {
2110             seqno++;
2111             zebra_set_state(zh, 'o', seqno);
2112
2113             zebra_unlock(zh->lock_shadow);
2114             zebra_unlock(zh->lock_normal);
2115
2116             zebra_lock_w(zh->lock_shadow);
2117             bf_commitClean(bfs, rval);
2118             zebra_unlock(zh->lock_shadow);
2119         }
2120         else
2121         {
2122             zebra_unlock(zh->lock_shadow);
2123             zebra_unlock(zh->lock_normal);
2124             yaz_log(YLOG_WARN, "zebra_commit: failed");
2125         }
2126     }
2127     else
2128     {
2129         zebra_unlock(zh->lock_shadow);
2130         zebra_unlock(zh->lock_normal);
2131         yaz_log(log_level, "nothing to commit");
2132     }
2133     bfs_destroy(bfs);
2134
2135     return res;
2136 }
2137
2138 ZEBRA_RES zebra_clean(ZebraHandle zh)
2139 {
2140     yaz_log(log_level, "zebra_clean");
2141     ZEBRA_CHECK_HANDLE(zh);
2142     return zebra_commit_ex(zh, 1);
2143 }
2144
2145 ZEBRA_RES zebra_commit(ZebraHandle zh)
2146 {
2147     yaz_log(log_level, "zebra_commit");
2148     ZEBRA_CHECK_HANDLE(zh);
2149     return zebra_commit_ex(zh, 0);
2150 }
2151
2152
2153 ZEBRA_RES zebra_init(ZebraHandle zh)
2154 {
2155     const char *rval;
2156     BFiles bfs = 0;
2157
2158     yaz_log(log_level, "zebra_init");
2159
2160     ZEBRA_CHECK_HANDLE(zh);
2161
2162     zebra_select_default_database(zh);
2163     if (!zh->res)
2164     {
2165         zebra_setError(zh, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR,
2166                        "cannot select default database");
2167         return ZEBRA_FAIL;
2168     }
2169     rval = res_get(zh->res, "shadow");
2170
2171     bfs = bfs_create(res_get(zh->res, "register"), zh->path_reg);
2172     if (!bfs)
2173     {
2174         zebra_setError(zh, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, "bfs_create");
2175         return ZEBRA_FAIL;
2176     }
2177     if (rval && *rval)
2178         bf_cache(bfs, rval);
2179
2180     bf_reset(bfs);
2181     bfs_destroy(bfs);
2182     zebra_set_state(zh, 'o', 0);
2183     return ZEBRA_OK;
2184 }
2185
2186 ZEBRA_RES zebra_compact(ZebraHandle zh)
2187 {
2188     BFiles bfs;
2189
2190     yaz_log(log_level, "zebra_compact");
2191     ZEBRA_CHECK_HANDLE(zh);
2192     if (!zh->res)
2193     {
2194         zh->errCode = YAZ_BIB1_DATABASE_UNAVAILABLE;
2195         return ZEBRA_FAIL;
2196     }
2197     bfs = bfs_create(res_get(zh->res, "register"), zh->path_reg);
2198     inv_compact(bfs);
2199     bfs_destroy(bfs);
2200     return ZEBRA_OK;
2201 }
2202
2203 #define ZEBRA_CHECK_DICT 1
2204 #define ZEBRA_CHECK_ISAM 2
2205
2206 static ZEBRA_RES zebra_record_check(ZebraHandle zh, Record rec,
2207                                     zint *no_keys, int message_limit,
2208                                     unsigned flags,
2209                                     zint *no_long_dict_entries,
2210                                     zint *no_failed_dict_lookups,
2211                                     zint *no_invalid_keys,
2212                                     zint *no_invalid_dict_infos,
2213                                     zint *no_invalid_isam_entries)
2214 {
2215     ZEBRA_RES res = ZEBRA_OK;
2216     zebra_rec_keys_t keys = zebra_rec_keys_open();
2217     zebra_rec_keys_set_buf(keys, rec->info[recInfo_delKeys],
2218                            rec->size[recInfo_delKeys], 0);
2219
2220     *no_keys = 0;
2221     if (!zebra_rec_keys_rewind(keys))
2222     {
2223         ;
2224     }
2225     else
2226     {
2227         size_t slen;
2228         const char *str;
2229         struct it_key key_in;
2230         NMEM nmem = nmem_create();
2231
2232         while (zebra_rec_keys_read(keys, &str, &slen, &key_in))
2233         {
2234             int do_fail = 0;
2235             int ord = CAST_ZINT_TO_INT(key_in.mem[0]);
2236             char ord_buf[IT_MAX_WORD+20];
2237             int ord_len = key_SU_encode(ord, ord_buf);
2238             char *info = 0;
2239
2240             (*no_keys)++;
2241
2242             if (key_in.len < 2 || key_in.len > IT_KEY_LEVEL_MAX)
2243             {
2244                 res = ZEBRA_FAIL;
2245                 (*no_invalid_keys)++;
2246                 if (*no_invalid_keys <= message_limit)
2247                 {
2248                     do_fail = 1;
2249                     yaz_log(YLOG_WARN, "Record " ZINT_FORMAT
2250                             ": unexpected key length %d",
2251                             rec->sysno, key_in.len);
2252                 }
2253             }
2254             if (ord_len + slen >= sizeof(ord_buf)-1)
2255             {
2256                 res = ZEBRA_FAIL;
2257                 (*no_long_dict_entries)++;
2258                 if (*no_long_dict_entries <= message_limit)
2259                 {
2260                     do_fail = 1;
2261                     /* so bad it can not fit into our ord_buf */
2262                     yaz_log(YLOG_WARN, "Record " ZINT_FORMAT
2263                             ": long dictionary entry %d + %d",
2264                             rec->sysno, ord_len, (int) slen);
2265                 }
2266                 continue;
2267             }
2268             memcpy(ord_buf + ord_len, str, slen);
2269             ord_buf[ord_len + slen] = '\0';
2270             if (slen > IT_MAX_WORD || ord_len > 4)
2271             {
2272                 res = ZEBRA_FAIL;
2273                 (*no_long_dict_entries)++;
2274                 if (*no_long_dict_entries <= message_limit)
2275                 {
2276                     do_fail = 1;
2277                     yaz_log(YLOG_WARN, "Record " ZINT_FORMAT
2278                             ": long dictionary entry %d + %d",
2279                             rec->sysno, (int) ord_len, (int) slen);
2280                 }
2281             }
2282             if ((flags & ZEBRA_CHECK_DICT) == 0)
2283                 continue;
2284             info = dict_lookup(zh->reg->dict, ord_buf);
2285             if (!info)
2286             {
2287                 res = ZEBRA_FAIL;
2288                 (*no_failed_dict_lookups)++;
2289                 if (*no_failed_dict_lookups <= message_limit)
2290                 {
2291                     do_fail = 1;
2292                     yaz_log(YLOG_WARN, "Record " ZINT_FORMAT
2293                             ": term do not exist in dictionary", rec->sysno);
2294                 }
2295             }
2296             else if (flags & ZEBRA_CHECK_ISAM)
2297             {
2298                 ISAM_P pos;
2299
2300                 if (*info != sizeof(pos))
2301                 {
2302                     res = ZEBRA_FAIL;
2303                     (*no_invalid_dict_infos)++;
2304                     if (*no_invalid_dict_infos <= message_limit)
2305                     {
2306                         do_fail = 1;
2307                         yaz_log(YLOG_WARN, "Record " ZINT_FORMAT
2308                                 ": long dictionary entry %d + %d",
2309                                 rec->sysno, (int) ord_len, (int) slen);
2310                     }
2311                 }
2312                 else
2313                 {
2314                     int scope = 1;
2315                     memcpy(&pos, info+1, sizeof(pos));
2316                     if (zh->reg->isamb)
2317                     {
2318                         ISAMB_PP ispt = isamb_pp_open(zh->reg->isamb, pos,
2319                                                       scope);
2320                         if (!ispt)
2321                         {
2322                             res = ZEBRA_FAIL;
2323                             (*no_invalid_isam_entries)++;
2324                             if (*no_invalid_isam_entries <= message_limit)
2325                             {
2326                                 do_fail = 1;
2327                                 yaz_log(YLOG_WARN, "Record " ZINT_FORMAT
2328                                         ": isamb_pp_open entry " ZINT_FORMAT
2329                                         " not found",
2330                                         rec->sysno, pos);
2331                             }
2332                         }
2333                         else if (zh->m_staticrank)
2334                         {
2335                             isamb_pp_close(ispt);
2336                         }
2337                         else
2338                         {
2339                             struct it_key until_key;
2340                             struct it_key isam_key;
2341                             int r;
2342                             int i = 0;
2343
2344                             until_key.len = key_in.len - 1;
2345                             for (i = 0; i < until_key.len; i++)
2346                                 until_key.mem[i] = key_in.mem[i+1];
2347
2348                             if (until_key.mem[0] == 0)
2349                                 until_key.mem[0] = rec->sysno;
2350                             r = isamb_pp_forward(ispt, &isam_key, &until_key);
2351                             if (r != 1)
2352                             {
2353                                 res = ZEBRA_FAIL;
2354                                 (*no_invalid_isam_entries)++;
2355                                 if (*no_invalid_isam_entries <= message_limit)
2356                                 {
2357                                     do_fail = 1;
2358                                     yaz_log(YLOG_WARN, "Record " ZINT_FORMAT
2359                                             ": isamb_pp_forward " ZINT_FORMAT
2360                                             " returned no entry",
2361                                             rec->sysno, pos);
2362                                 }
2363                             }
2364                             else
2365                             {
2366                                 int cmp = key_compare(&until_key, &isam_key);
2367                                 if (cmp != 0)
2368                                 {
2369                                     res = ZEBRA_FAIL;
2370                                     (*no_invalid_isam_entries)++;
2371                                     if (*no_invalid_isam_entries
2372                                         <= message_limit)
2373                                     {
2374                                         do_fail = 1;
2375                                         yaz_log(YLOG_WARN, "Record "
2376                                                 ZINT_FORMAT
2377                                                 ": isamb_pp_forward "
2378                                                 ZINT_FORMAT
2379                                                 " returned different entry",
2380                                                 rec->sysno, pos);
2381
2382                                         key_logdump_txt(YLOG_LOG,
2383                                                         &until_key,
2384                                                         "until");
2385
2386                                         key_logdump_txt(YLOG_LOG,
2387                                                         &isam_key,
2388                                                         "isam");
2389
2390                                     }
2391                                 }
2392                             }
2393                             isamb_pp_close(ispt);
2394                         }
2395
2396                     }
2397                 }
2398             }
2399             if (do_fail)
2400             {
2401                 zebra_it_key_str_dump(zh, &key_in, str,
2402                                       slen, nmem, YLOG_LOG);
2403                 nmem_reset(nmem);
2404             }
2405         }
2406         nmem_destroy(nmem);
2407     }
2408     zebra_rec_keys_close(keys);
2409     return res;
2410 }
2411
2412 ZEBRA_RES zebra_register_check(ZebraHandle zh, const char *spec)
2413 {
2414     ZEBRA_RES res = ZEBRA_FAIL;
2415     unsigned flags = 0;
2416
2417     if (!spec || *spec == '\0'
2418         || !strcmp(spec, "dict") || !strcmp(spec, "default"))
2419         flags = ZEBRA_CHECK_DICT;
2420     else if (!strcmp(spec, "isam") || !strcmp(spec, "full"))
2421         flags = ZEBRA_CHECK_DICT|ZEBRA_CHECK_ISAM;
2422     else if (!strcmp(spec, "quick"))
2423         flags = 0;
2424     else
2425     {
2426         yaz_log(YLOG_WARN, "Unknown check spec: %s", spec);
2427         return ZEBRA_FAIL;
2428     }
2429
2430     yaz_log(YLOG_LOG, "zebra_register_check begin flags=%u", flags);
2431     if (zebra_begin_read(zh) == ZEBRA_OK)
2432     {
2433         zint no_records_total = 0;
2434         zint no_records_fail = 0;
2435         zint total_keys = 0;
2436         int message_limit = zh->m_file_verbose_limit;
2437
2438         if (zh->reg)
2439         {
2440             Record rec = rec_get_root(zh->reg->records);
2441
2442             zint no_long_dict_entries = 0;
2443             zint no_failed_dict_lookups = 0;
2444             zint no_invalid_keys = 0;
2445             zint no_invalid_dict_infos = 0;
2446             zint no_invalid_isam_entries = 0;
2447
2448             res = ZEBRA_OK;
2449             while (rec)
2450             {
2451                 Record r1;
2452                 zint no_keys;
2453
2454                 if (zebra_record_check(zh, rec, &no_keys, message_limit,
2455                                        flags,
2456                                        &no_long_dict_entries,
2457                                        &no_failed_dict_lookups,
2458                                        &no_invalid_keys,
2459                                        &no_invalid_dict_infos,
2460                                        &no_invalid_isam_entries
2461                         )
2462                     != ZEBRA_OK)
2463                 {
2464                     res = ZEBRA_FAIL;
2465                     no_records_fail++;
2466                 }
2467
2468                 r1 = rec_get_next(zh->reg->records, rec);
2469                 rec_free(&rec);
2470                 rec = r1;
2471                 no_records_total++;
2472                 total_keys += no_keys;
2473             }
2474             yaz_log(YLOG_LOG, "records total:        " ZINT_FORMAT,
2475                     no_records_total);
2476             yaz_log(YLOG_LOG, "records fail:         " ZINT_FORMAT,
2477                     no_records_fail);
2478             yaz_log(YLOG_LOG, "total keys:           " ZINT_FORMAT,
2479                     total_keys);
2480             yaz_log(YLOG_LOG, "long dict entries:    " ZINT_FORMAT,
2481                     no_long_dict_entries);
2482             if (flags & ZEBRA_CHECK_DICT)
2483             {
2484                 yaz_log(YLOG_LOG, "failed dict lookups:  " ZINT_FORMAT,
2485                         no_failed_dict_lookups);
2486                 yaz_log(YLOG_LOG, "invalid dict infos:   " ZINT_FORMAT,
2487                         no_invalid_dict_infos);
2488             }
2489             if (flags & ZEBRA_CHECK_ISAM)
2490                 yaz_log(YLOG_LOG, "invalid isam entries: " ZINT_FORMAT,
2491                         no_invalid_isam_entries);
2492         }
2493         zebra_end_read(zh);
2494     }
2495     yaz_log(YLOG_LOG, "zebra_register_check end ret=%d", res);
2496     return res;
2497 }
2498
2499 void zebra_result(ZebraHandle zh, int *code, char **addinfo)
2500 {
2501     yaz_log(log_level, "zebra_result");
2502     if (zh)
2503     {
2504         *code = zh->errCode;
2505         *addinfo = zh->errString;
2506     }
2507     else
2508     {
2509         *code = YAZ_BIB1_TEMPORARY_SYSTEM_ERROR;
2510         *addinfo ="ZebraHandle is NULL";
2511     }
2512 }
2513
2514 void zebra_shadow_enable(ZebraHandle zh, int value)
2515 {
2516     ASSERTZH;
2517     yaz_log(log_level, "zebra_shadow_enable");
2518     zh->shadow_enable = value;
2519 }
2520
2521 ZEBRA_RES zebra_octet_term_encoding(ZebraHandle zh, const char *encoding)
2522 {
2523     yaz_log(log_level, "zebra_octet_term_encoding %s", encoding);
2524     ZEBRA_CHECK_HANDLE(zh);
2525     assert(encoding);
2526
2527     if (zh->iconv_to_utf8 != 0)
2528         yaz_iconv_close(zh->iconv_to_utf8);
2529     if (zh->iconv_from_utf8 != 0)
2530         yaz_iconv_close(zh->iconv_from_utf8);
2531
2532     zh->iconv_to_utf8 =
2533         yaz_iconv_open("UTF-8", encoding);
2534     if (zh->iconv_to_utf8 == 0)
2535         yaz_log(YLOG_WARN, "iconv: %s to UTF-8 unsupported", encoding);
2536     zh->iconv_from_utf8 =
2537         yaz_iconv_open(encoding, "UTF-8");
2538     if (zh->iconv_to_utf8 == 0)
2539         yaz_log(YLOG_WARN, "iconv: UTF-8 to %s unsupported", encoding);
2540
2541     return ZEBRA_OK;
2542 }
2543
2544 ZEBRA_RES zebra_record_encoding(ZebraHandle zh, const char *encoding)
2545 {
2546     yaz_log(log_level, "zebra_record_encoding");
2547     ZEBRA_CHECK_HANDLE(zh);
2548     xfree(zh->record_encoding);
2549     zh->record_encoding = 0;
2550     if (encoding)
2551         zh->record_encoding = xstrdup(encoding);
2552     return ZEBRA_OK;
2553 }
2554
2555 void zebra_set_resource(ZebraHandle zh, const char *name, const char *value)
2556 {
2557     assert(name);
2558     assert(value);
2559     yaz_log(log_level, "zebra_set_resource %s:%s", name, value);
2560     ASSERTZH;
2561     res_set(zh->res, name, value);
2562 }
2563
2564 const char *zebra_get_resource(ZebraHandle zh,
2565                                const char *name, const char *defaultvalue)
2566 {
2567     const char *v;
2568     ASSERTZH;
2569     assert(name);
2570     v = res_get_def(zh->res, name,(char *)defaultvalue);
2571     yaz_log(log_level, "zebra_get_resource %s:%s", name, v);
2572     return v;
2573 }
2574
2575 /* moved from zebra_api_ext.c by pop */
2576 /* FIXME: Should this really be public??? -Heikki */
2577
2578 int zebra_trans_no(ZebraHandle zh)
2579 {
2580     yaz_log(log_level, "zebra_trans_no");
2581     ASSERTZH;
2582     return zh->trans_no;
2583 }
2584
2585 int zebra_get_shadow_enable(ZebraHandle zh)
2586 {
2587     yaz_log(log_level, "zebra_get_shadow_enable");
2588     ASSERTZH;
2589     return zh->shadow_enable;
2590 }
2591
2592 void zebra_set_shadow_enable(ZebraHandle zh, int value)
2593 {
2594     yaz_log(log_level, "zebra_set_shadow_enable %d",value);
2595     ASSERTZH;
2596     zh->shadow_enable = value;
2597 }
2598
2599 ZEBRA_RES zebra_add_record(ZebraHandle zh,
2600                            const char *buf, int buf_size)
2601 {
2602     return zebra_update_record(zh, action_update,
2603                                0 /* record type */,
2604                                0 /* sysno */ ,
2605                                0 /* match */,
2606                                0 /* fname */,
2607                                buf, buf_size);
2608 }
2609
2610 ZEBRA_RES zebra_update_record(ZebraHandle zh,
2611                               enum zebra_recctrl_action_t action,
2612                               const char *recordType,
2613                               zint *sysno, const char *match,
2614                               const char *fname,
2615                               const char *buf, int buf_size)
2616 {
2617     ZEBRA_RES res;
2618
2619     ZEBRA_CHECK_HANDLE(zh);
2620
2621     assert(buf);
2622
2623     yaz_log(log_level, "zebra_update_record");
2624     if (sysno)
2625         yaz_log(log_level, " sysno=" ZINT_FORMAT, *sysno);
2626
2627     if (buf_size < 1)
2628         buf_size = strlen(buf);
2629
2630     if (zebra_begin_trans(zh, 1) == ZEBRA_FAIL)
2631         return ZEBRA_FAIL;
2632     res = zebra_buffer_extract_record(zh, buf, buf_size,
2633                                       action,
2634                                       recordType,
2635                                       sysno,
2636                                       match,
2637                                       fname);
2638     if (zebra_end_trans(zh) != ZEBRA_OK)
2639     {
2640         yaz_log(YLOG_WARN, "zebra_end_trans failed");
2641         res = ZEBRA_FAIL;
2642     }
2643     return res;
2644 }
2645
2646 /* ---------------------------------------------------------------------------
2647    Searching
2648 */
2649
2650 ZEBRA_RES zebra_search_PQF(ZebraHandle zh, const char *pqf_query,
2651                            const char *setname, zint *hits)
2652 {
2653     zint lhits = 0;
2654     ZEBRA_RES res = ZEBRA_OK;
2655     Z_RPNQuery *query;
2656     ODR odr;
2657
2658
2659     ZEBRA_CHECK_HANDLE(zh);
2660
2661     odr = odr_createmem(ODR_ENCODE);
2662
2663     assert(pqf_query);
2664     assert(setname);
2665
2666     yaz_log(log_level, "zebra_search_PQF s=%s q=%s", setname, pqf_query);
2667
2668     query = p_query_rpn(odr, pqf_query);
2669
2670     if (!query)
2671     {
2672         yaz_log(YLOG_WARN, "bad query %s\n", pqf_query);
2673         zh->errCode = YAZ_BIB1_MALFORMED_QUERY;
2674         res = ZEBRA_FAIL;
2675     }
2676     else
2677         res = zebra_search_RPN(zh, odr, query, setname, &lhits);
2678
2679     odr_destroy(odr);
2680
2681     yaz_log(log_level, "Hits: " ZINT_FORMAT, lhits);
2682
2683     if (hits)
2684         *hits = lhits;
2685
2686     return res;
2687 }
2688
2689 /* ---------------------------------------------------------------------------
2690    Sort - a simplified interface, with optional read locks.
2691 */
2692 int zebra_sort_by_specstr(ZebraHandle zh, ODR stream,
2693                           const char *sort_spec,
2694                           const char *output_setname,
2695                           const char **input_setnames)
2696 {
2697     int num_input_setnames = 0;
2698     int sort_status = 0;
2699     Z_SortKeySpecList *sort_sequence;
2700
2701     ZEBRA_CHECK_HANDLE(zh);
2702     assert(stream);
2703     assert(sort_spec);
2704     assert(output_setname);
2705     assert(input_setnames);
2706     sort_sequence = yaz_sort_spec(stream, sort_spec);
2707     yaz_log(log_level, "sort (FIXME) ");
2708     if (!sort_sequence)
2709     {
2710         yaz_log(YLOG_WARN, "invalid sort specs '%s'", sort_spec);
2711         zh->errCode = YAZ_BIB1_CANNOT_SORT_ACCORDING_TO_SEQUENCE;
2712         return -1;
2713     }
2714
2715     /* we can do this, since the perl typemap code for char** will
2716        put a NULL at the end of list */
2717     while (input_setnames[num_input_setnames]) num_input_setnames++;
2718
2719     if (zebra_begin_read(zh))
2720         return -1;
2721
2722     resultSetSort(zh, stream->mem, num_input_setnames, input_setnames,
2723                   output_setname, sort_sequence, &sort_status);
2724
2725     zebra_end_read(zh);
2726     return sort_status;
2727 }
2728
2729 /* ---------------------------------------------------------------------------
2730    Get BFS for Zebra system (to make alternative storage methods)
2731 */
2732 struct BFiles_struct *zebra_get_bfs(ZebraHandle zh)
2733 {
2734     if (zh && zh->reg)
2735         return zh->reg->bfs;
2736     return 0;
2737 }
2738
2739
2740 /* ---------------------------------------------------------------------------
2741    Set limit for search/scan
2742 */
2743 ZEBRA_RES zebra_set_limit(ZebraHandle zh, int complement_flag, zint *ids)
2744 {
2745     ZEBRA_CHECK_HANDLE(zh);
2746     zebra_limit_destroy(zh->m_limit);
2747     zh->m_limit = zebra_limit_create(complement_flag, ids);
2748     return ZEBRA_OK;
2749 }
2750
2751 /*
2752   Set Error code + addinfo
2753 */
2754 void zebra_setError(ZebraHandle zh, int code, const char *addinfo)
2755 {
2756     if (!zh)
2757         return;
2758     zh->errCode = code;
2759     nmem_reset(zh->nmem_error);
2760     zh->errString = addinfo ? nmem_strdup(zh->nmem_error, addinfo) : 0;
2761 }
2762
2763 void zebra_setError_zint(ZebraHandle zh, int code, zint i)
2764 {
2765     char vstr[60];
2766     sprintf(vstr, ZINT_FORMAT, i);
2767
2768     zh->errCode = code;
2769     nmem_reset(zh->nmem_error);
2770     zh->errString = nmem_strdup(zh->nmem_error, vstr);
2771 }
2772
2773 void zebra_lock_prefix(Res res, char *path)
2774 {
2775     const char *lock_dir = res_get_def(res, "lockDir", "");
2776
2777     strcpy(path, lock_dir);
2778     if (*path && path[strlen(path)-1] != '/')
2779         strcat(path, "/");
2780 }
2781
2782 /*
2783  * Local variables:
2784  * c-basic-offset: 4
2785  * c-file-style: "Stroustrup"
2786  * indent-tabs-mode: nil
2787  * End:
2788  * vim: shiftwidth=4 tabstop=8 expandtab
2789  */
2790