Introduce spec for zebra_register_check
[idzebra-moved-to-github.git] / index / zebraapi.c
1 /* This file is part of the Zebra server.
2    Copyright (C) 1994-2011 Index Data
3
4 Zebra is free software; you can redistribute it and/or modify it under
5 the terms of the GNU General Public License as published by the Free
6 Software Foundation; either version 2, or (at your option) any later
7 version.
8
9 Zebra is distributed in the hope that it will be useful, but WITHOUT ANY
10 WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12 for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
17
18 */
19
20 #include <assert.h>
21 #include <stdio.h>
22 #include <limits.h>
23 #ifdef WIN32
24 #include <io.h>
25 #include <process.h>
26 #include <direct.h>
27 #endif
28 #if HAVE_UNISTD_H
29 #include <unistd.h>
30 #endif
31
32 #include <yaz/diagbib1.h>
33 #include <yaz/pquery.h>
34 #include <yaz/sortspec.h>
35 #include "index.h"
36 #include "rank.h"
37 #include "orddict.h"
38 #include <charmap.h>
39 #include <idzebra/api.h>
40 #include <yaz/oid_db.h>
41
42 #define DEFAULT_APPROX_LIMIT 2000000000
43
44 /* simple asserts to validate the most essential input args */
45 #define ASSERTZH assert(zh && zh->service)
46 #define ASSERTZHRES assert(zh && zh->service && zh->res)
47 #define ASSERTZS assert(zs)
48
49 static int log_level = 0;
50 static int log_level_initialized = 0;
51
52 static void zebra_open_res(ZebraHandle zh);
53 static void zebra_close_res(ZebraHandle zh);
54
55 static ZEBRA_RES zebra_check_handle(ZebraHandle zh)
56 {
57     if (zh)
58         return ZEBRA_OK;
59     return ZEBRA_FAIL;
60 }
61
62 #define ZEBRA_CHECK_HANDLE(zh) if (zebra_check_handle(zh) != ZEBRA_OK) return ZEBRA_FAIL
63
64 static int zebra_chdir(ZebraService zs)
65 {
66     const char *dir ;
67     int r;
68     ASSERTZS;
69     yaz_log(log_level, "zebra_chdir");
70     dir = res_get(zs->global_res, "chdir");
71     if (!dir)
72         return 0;
73     yaz_log(YLOG_DEBUG, "chdir %s", dir);
74 #ifdef WIN32
75     r = _chdir(dir);
76 #else
77     r = chdir(dir);
78 #endif
79     if (r)
80         yaz_log(YLOG_FATAL|YLOG_ERRNO, "chdir %s", dir);
81     return r;
82 }
83
84 static ZEBRA_RES zebra_flush_reg(ZebraHandle zh)
85 {
86     ZEBRA_CHECK_HANDLE(zh);
87     yaz_log(log_level, "zebra_flush_reg");
88     zebraExplain_flush(zh->reg->zei, zh);
89
90     key_block_flush(zh->reg->key_block, 1);
91
92     zebra_index_merge(zh);
93     return ZEBRA_OK;
94 }
95
96 static struct zebra_register *zebra_register_open(ZebraService zs, 
97                                                   const char *name,
98                                                   int rw, int useshadow,
99                                                   Res res,
100                                                   const char *reg_path);
101 static void zebra_register_close(ZebraService zs, struct zebra_register *reg);
102
103 const char *zebra_get_encoding(ZebraHandle zh)
104 {
105     assert(zh && zh->session_res);
106     return res_get_def(zh->session_res, "encoding", "ISO-8859-1");
107 }
108
109 ZebraHandle zebra_open(ZebraService zs, Res res)
110 {
111     ZebraHandle zh;
112     const char *default_encoding;
113     if (!log_level_initialized)
114     {
115         log_level = yaz_log_module_level("zebraapi");
116         log_level_initialized = 1;
117     }
118
119     yaz_log(log_level, "zebra_open");
120
121     if (!zs)
122         return 0;
123
124     zh = (ZebraHandle) xmalloc(sizeof(*zh));
125     yaz_log(YLOG_DEBUG, "zebra_open zs=%p returns %p", zs, zh);
126
127     zh->service = zs;
128     zh->reg = 0;          /* no register attached yet */
129     zh->sets = 0;
130     zh->destroyed = 0;
131     zh->errCode = 0;
132     zh->errString = 0;
133     zh->res = 0; 
134     zh->session_res = res_open(zs->global_res, res);
135     zh->user_perm = 0;
136     zh->dbaccesslist = 0;
137
138     zh->reg_name = xstrdup("");
139     zh->path_reg = 0;
140     zh->num_basenames = 0;
141     zh->basenames = 0;
142
143     zh->approx_limit = DEFAULT_APPROX_LIMIT;
144     zh->trans_no = 0;
145     zh->trans_w_no = 0;
146
147     zh->lock_normal = 0;
148     zh->lock_shadow = 0;
149
150     zh->shadow_enable = 1;
151     zh->m_staticrank = 0;
152     zh->m_segment_indexing = 0;
153
154     zh->break_handler_func = 0;
155     zh->break_handler_data = 0;
156
157     default_encoding = zebra_get_encoding(zh);
158
159     zh->iconv_to_utf8 =
160         yaz_iconv_open("UTF-8", default_encoding);
161     if (zh->iconv_to_utf8 == 0)
162         yaz_log(YLOG_WARN, "iconv: %s to UTF-8 unsupported",
163                 default_encoding);
164     zh->iconv_from_utf8 =
165         yaz_iconv_open(default_encoding, "UTF-8");
166     if (zh->iconv_to_utf8 == 0)
167         yaz_log(YLOG_WARN, "iconv: UTF-8 to %s unsupported",
168                 default_encoding);
169
170     zh->record_encoding = 0;
171
172     zebra_mutex_cond_lock(&zs->session_lock);
173
174     zh->next = zs->sessions;
175     zs->sessions = zh;
176
177     zebra_mutex_cond_unlock(&zs->session_lock);
178
179     zh->store_data_buf = 0;
180
181     zh->m_limit = zebra_limit_create(1, 0);
182
183     zh->nmem_error = nmem_create();
184
185     return zh;
186 }
187
188 ZebraService zebra_start(const char *configName)
189 {
190     return zebra_start_res(configName, 0, 0);
191 }
192
193 ZebraService zebra_start_res(const char *configName, Res def_res, Res over_res)
194 {
195     Res res;
196     char version_str[16];
197     char system_str[80];
198
199     zebra_flock_init();
200
201     if (!log_level_initialized)
202     {
203         log_level = yaz_log_module_level("zebraapi");
204         log_level_initialized = 1;
205     }
206
207     *system_str = '\0';
208     *version_str = '\0';
209     zebra_get_version(version_str, system_str);
210
211     yaz_log(YLOG_LOG, "zebra_start %s %s", version_str, system_str);
212     if (configName)
213         yaz_log(YLOG_LOG, "config %s", configName);
214
215     if ((res = res_open(def_res, over_res)))
216     {
217         const char *passwd_plain = 0;
218         const char *passwd_encrypt = 0;
219         const char *dbaccess = 0;
220         ZebraService zh = 0;
221
222         if (configName)
223         {
224             ZEBRA_RES ret = res_read_file(res, configName);
225             if (ret != ZEBRA_OK)
226             {
227                 res_close(res);
228                 return 0;
229             }
230             if (zebra_check_res(res))
231             {
232                 yaz_log(YLOG_FATAL, "Configuration error(s) for %s",
233                         configName);
234                 return 0;
235             }
236         }
237         else
238         {
239             zebra_check_res(res);
240         }
241
242         zh = xmalloc(sizeof(*zh));
243         zh->global_res = res;
244         zh->sessions = 0;
245         
246         if (zebra_chdir(zh))
247         {
248             xfree(zh);
249             return 0;
250         }
251         
252         zebra_mutex_cond_init(&zh->session_lock);
253         passwd_plain = res_get(zh->global_res, "passwd");
254         passwd_encrypt = res_get(zh->global_res, "passwd.c");
255         dbaccess = res_get(zh->global_res, "dbaccess");
256
257         if (!passwd_plain && !passwd_encrypt)
258             zh->passwd_db = NULL;
259         else 
260         {
261             zh->passwd_db = passwd_db_open();
262             if (!zh->passwd_db)
263                 yaz_log(YLOG_WARN|YLOG_ERRNO, "passwd_db_open failed");
264             else
265             {
266                 if (passwd_plain)
267                     passwd_db_file_plain(zh->passwd_db, passwd_plain);
268                 if (passwd_encrypt)
269                     passwd_db_file_crypt(zh->passwd_db, passwd_encrypt);
270             }
271         }
272
273         if (!dbaccess)
274             zh->dbaccess = NULL;
275         else {
276             zh->dbaccess = res_open(NULL, NULL);
277             if (res_read_file(zh->dbaccess, dbaccess) != ZEBRA_OK) {
278                 yaz_log(YLOG_FATAL, "Failed to read %s", dbaccess);
279                 return NULL;
280             }
281         }
282
283         zh->timing = yaz_timing_create();
284         zh->path_root = res_get(zh->global_res, "root");
285         zh->nmem = nmem_create();
286         zh->record_classes = recTypeClass_create(zh->global_res, zh->nmem);
287
288         if (1)
289         {
290             const char *module_path = res_get(res, "modulePath");
291             if (module_path)
292                 recTypeClass_load_modules(&zh->record_classes, zh->nmem,
293                                           module_path);
294         }
295         return zh;
296     }
297     return 0;
298 }
299
300 void zebra_filter_info(ZebraService zs, void *cd,
301                        void(*cb)(void *cd, const char *name))
302 {
303     ASSERTZS;
304     assert(cb);
305     recTypeClass_info(zs->record_classes, cd, cb);
306 }
307
308 void zebra_pidfname(ZebraService zs, char *path)
309 {
310     ASSERTZS;
311     zebra_lock_prefix(zs->global_res, path);
312     strcat(path, "zebrasrv.pid");
313 }
314
315 Dict dict_open_res(BFiles bfs, const char *name, int cache, int rw,
316                    int compact_flag, Res res)
317 {
318     int page_size = 4096;
319     char resource_str[200];
320     sprintf(resource_str, "dict.%.100s.pagesize", name);
321     assert(bfs);
322     assert(name);
323
324     if (res_get_int(res, resource_str, &page_size) == ZEBRA_OK)
325         yaz_log(YLOG_LOG, "Using custom dictionary page size %d for %s",
326                 page_size, name);
327     return dict_open(bfs, name, cache, rw, compact_flag, page_size);
328 }
329
330 static
331 struct zebra_register *zebra_register_open(ZebraService zs, const char *name,
332                                            int rw, int useshadow, Res res,
333                                            const char *reg_path)
334 {
335     struct zebra_register *reg;
336     int record_compression = REC_COMPRESS_NONE;
337     const char *compression_str = 0;
338     const char *profilePath;
339     int sort_type = ZEBRA_SORT_TYPE_FLAT;
340     ZEBRA_RES ret = ZEBRA_OK;
341
342     ASSERTZS;
343     
344     reg = xmalloc(sizeof(*reg));
345
346     assert(name);
347     reg->name = xstrdup(name);
348
349     reg->seqno = 0;
350     reg->last_val = 0;
351
352     assert(res);
353
354     yaz_log(YLOG_DEBUG, "zebra_register_open rw=%d useshadow=%d p=%p n=%s rp=%s",
355             rw, useshadow, reg, name, reg_path ? reg_path : "(none)");
356     
357     reg->dh = data1_create();
358     if (!reg->dh)
359     {
360         xfree(reg->name);
361         xfree(reg);
362         return 0;
363     }
364     reg->bfs = bfs_create(res_get(res, "register"), reg_path);
365     if (!reg->bfs)
366     {
367         data1_destroy(reg->dh);
368         xfree(reg->name);
369         xfree(reg);
370         return 0;
371     }
372     if (useshadow)
373     {
374         if (bf_cache(reg->bfs, res_get(res, "shadow")) == ZEBRA_FAIL)
375         {
376             bfs_destroy(reg->bfs);
377             data1_destroy(reg->dh);
378             xfree(reg->name);
379             xfree(reg);
380             return 0;
381         }
382     }
383
384     profilePath = res_get_def(res, "profilePath", 0);
385
386     data1_set_tabpath(reg->dh, profilePath);
387     data1_set_tabroot(reg->dh, reg_path);
388     reg->recTypes = recTypes_init(zs->record_classes, reg->dh);
389
390     reg->zebra_maps =
391         zebra_maps_open(res, reg_path, profilePath);
392     if (!reg->zebra_maps)
393     {
394         recTypes_destroy(reg->recTypes);
395         bfs_destroy(reg->bfs);
396         data1_destroy(reg->dh);
397         xfree(reg->name);
398         xfree(reg);
399         return 0;
400     }
401     reg->rank_classes = NULL;
402
403     reg->key_block = 0;
404     reg->keys = zebra_rec_keys_open();
405
406     reg->sortKeys = zebra_rec_keys_open();
407
408     reg->records = 0;
409     reg->dict = 0;
410     reg->sort_index = 0;
411     reg->isams = 0;
412     reg->matchDict = 0;
413     reg->isamc = 0;
414     reg->isamb = 0;
415     reg->zei = 0;
416     
417     /* installing rank classes */
418     zebraRankInstall(reg, rank_1_class);
419     zebraRankInstall(reg, rank_2_class);
420     zebraRankInstall(reg, rank_similarity_class);
421     zebraRankInstall(reg, rank_static_class);
422
423     compression_str = res_get_def(res, "recordCompression", "none");
424     if (!strcmp(compression_str, "none"))
425         record_compression = REC_COMPRESS_NONE;
426     else if (!strcmp(compression_str, "bzip2"))
427         record_compression = REC_COMPRESS_BZIP2;
428     else if (!strcmp(compression_str, "zlib"))
429         record_compression = REC_COMPRESS_ZLIB;
430     else
431     {
432         yaz_log(YLOG_FATAL, "invalid recordCompression: %s", compression_str);
433         ret = ZEBRA_FAIL;
434     }
435
436     if (!rec_check_compression_method(record_compression))
437     {
438         yaz_log(YLOG_FATAL, "unsupported recordCompression: %s",
439                 compression_str);
440         ret = ZEBRA_FAIL;
441     }
442
443     {
444         const char *index_fname = res_get_def(res, "index", "default.idx");
445         if (index_fname && *index_fname && strcmp(index_fname, "none"))
446         {
447             if (zebra_maps_read_file(reg->zebra_maps, index_fname) != ZEBRA_OK)
448                 ret = ZEBRA_FAIL;
449         }
450         else
451         {
452             zebra_maps_define_default_sort(reg->zebra_maps);
453         }
454     }
455
456     if (!(reg->records = rec_open(reg->bfs, rw, record_compression)))
457     {
458         yaz_log(YLOG_WARN, "rec_open failed");
459         ret = ZEBRA_FAIL;
460     }
461     if (rw)
462     {
463         reg->matchDict = dict_open_res(reg->bfs, GMATCH_DICT, 20, 1, 0, res);
464     }
465     if (!(reg->dict = dict_open_res(reg->bfs, FNAME_DICT, 40, rw, 0, res)))
466     {
467         yaz_log(YLOG_WARN, "dict_open failed");
468         ret = ZEBRA_FAIL;
469     }
470
471     
472     if (res_get_match(res, "sortindex", "f", "f"))
473         sort_type = ZEBRA_SORT_TYPE_FLAT;
474     else if (res_get_match(res, "sortindex", "i", "f"))
475         sort_type = ZEBRA_SORT_TYPE_ISAMB;
476     else if (res_get_match(res, "sortindex", "m", "f"))
477         sort_type = ZEBRA_SORT_TYPE_MULTI;
478     else
479     {
480         yaz_log(YLOG_WARN, "bad_value for 'sortindex'");
481         ret = ZEBRA_FAIL;
482     }
483
484
485     if (!(reg->sort_index = zebra_sort_open(reg->bfs, rw, sort_type)))
486     {
487         yaz_log(YLOG_WARN, "zebra_sort_open failed");
488         ret = ZEBRA_FAIL;
489     }
490     if (res_get_match(res, "isam", "s", ISAM_DEFAULT))
491     {
492         struct ISAMS_M_s isams_m;
493         if (!(reg->isams = isams_open(reg->bfs, FNAME_ISAMS, rw,
494                                       key_isams_m(res, &isams_m))))
495         {
496             yaz_log(YLOG_WARN, "isams_open failed");
497             ret = ZEBRA_FAIL;
498         }
499     }
500     if (res_get_match(res, "isam", "c", ISAM_DEFAULT))
501     {
502         struct ISAMC_M_s isamc_m;
503         if (!(reg->isamc = isamc_open(reg->bfs, FNAME_ISAMC,
504                                       rw, key_isamc_m(res, &isamc_m))))
505         {
506             yaz_log(YLOG_WARN, "isamc_open failed");
507             ret = ZEBRA_FAIL;
508         }
509     }
510     if (res_get_match(res, "isam", "b", ISAM_DEFAULT))
511     {
512         struct ISAMC_M_s isamc_m;
513         
514         if (!(reg->isamb = isamb_open(reg->bfs, "isamb",
515                                       rw, key_isamc_m(res, &isamc_m), 0)))
516         {
517             yaz_log(YLOG_WARN, "isamb_open failed");
518             ret = ZEBRA_FAIL;
519         }
520     }
521     if (res_get_match(res, "isam", "bc", ISAM_DEFAULT))
522     {
523         struct ISAMC_M_s isamc_m;
524         
525         if (!(reg->isamb = isamb_open(reg->bfs, "isamb",
526                                       rw, key_isamc_m(res, &isamc_m), 1)))
527         {
528             yaz_log(YLOG_WARN, "isamb_open failed");
529             ret = ZEBRA_FAIL;
530         }
531     }
532     if (res_get_match(res, "isam", "null", ISAM_DEFAULT))
533     {
534         struct ISAMC_M_s isamc_m;
535         
536         if (!(reg->isamb = isamb_open(reg->bfs, "isamb",
537                                       rw, key_isamc_m(res, &isamc_m), -1)))
538         {
539             yaz_log(YLOG_WARN, "isamb_open failed");
540             ret = ZEBRA_FAIL;
541         }
542     }
543     if (ret == ZEBRA_OK)
544     {
545         reg->zei = zebraExplain_open(reg->records, reg->dh,
546                                      res, rw, reg,
547                                      zebra_extract_explain);
548         if (!reg->zei)
549         {
550             yaz_log(YLOG_WARN, "Cannot obtain EXPLAIN information");
551             ret = ZEBRA_FAIL;
552         }
553     }
554     
555     if (ret != ZEBRA_OK)
556     {
557         zebra_register_close(zs, reg);
558         return 0;
559     }
560     yaz_log(YLOG_DEBUG, "zebra_register_open ok p=%p", reg);
561     return reg;
562 }
563
564 ZEBRA_RES zebra_admin_shutdown(ZebraHandle zh)
565 {
566     ZEBRA_CHECK_HANDLE(zh);
567     yaz_log(log_level, "zebra_admin_shutdown");
568
569     zebra_mutex_cond_lock(&zh->service->session_lock);
570     zh->service->stop_flag = 1;
571     zebra_mutex_cond_unlock(&zh->service->session_lock);
572     return ZEBRA_OK;
573 }
574
575 ZEBRA_RES zebra_admin_start(ZebraHandle zh)
576 {
577     ZebraService zs;
578     ZEBRA_CHECK_HANDLE(zh);
579     yaz_log(log_level, "zebra_admin_start");
580     zs = zh->service;
581     zebra_mutex_cond_lock(&zs->session_lock);
582     zebra_mutex_cond_unlock(&zs->session_lock);
583     return ZEBRA_OK;
584 }
585
586 static void zebra_register_close(ZebraService zs, struct zebra_register *reg)
587 {
588     ASSERTZS;
589     assert(reg);
590     yaz_log(YLOG_DEBUG, "zebra_register_close p=%p", reg);
591     reg->stop_flag = 0;
592     zebra_chdir(zs);
593     
594     zebraExplain_close(reg->zei);
595     dict_close(reg->dict);
596     if (reg->matchDict)
597         dict_close(reg->matchDict);
598     zebra_sort_close(reg->sort_index);
599     if (reg->isams)
600         isams_close(reg->isams);
601     if (reg->isamc)
602         isamc_close(reg->isamc);
603     if (reg->isamb)
604         isamb_close(reg->isamb);
605     rec_close(&reg->records);
606
607     recTypes_destroy(reg->recTypes);
608     zebra_maps_close(reg->zebra_maps);
609     zebraRankDestroy(reg);
610     bfs_destroy(reg->bfs);
611     data1_destroy(reg->dh);
612
613     zebra_rec_keys_close(reg->keys);
614     zebra_rec_keys_close(reg->sortKeys);
615
616     key_block_destroy(&reg->key_block);
617     xfree(reg->name);
618     xfree(reg);
619 }
620
621 ZEBRA_RES zebra_stop(ZebraService zs)
622 {
623     if (!zs)
624         return ZEBRA_OK;
625     while (zs->sessions)
626     {
627         zebra_close(zs->sessions);
628     }
629         
630     zebra_mutex_cond_destroy(&zs->session_lock);
631
632     if (zs->passwd_db)
633         passwd_db_close(zs->passwd_db);
634
635     recTypeClass_destroy(zs->record_classes);
636     nmem_destroy(zs->nmem);
637     res_close(zs->global_res);
638
639     yaz_timing_stop(zs->timing);
640     yaz_log(YLOG_LOG, "zebra_stop: %4.2f %4.2f %4.2f",
641             yaz_timing_get_real(zs->timing),
642             yaz_timing_get_user(zs->timing),
643             yaz_timing_get_sys(zs->timing));
644     
645
646     yaz_timing_destroy(&zs->timing);
647     xfree(zs);
648     return ZEBRA_OK;
649 }
650
651 ZEBRA_RES zebra_close(ZebraHandle zh)
652 {
653     ZebraService zs;
654     struct zebra_session **sp;
655     int i;
656
657     yaz_log(log_level, "zebra_close");
658     ZEBRA_CHECK_HANDLE(zh);
659
660     zh->errCode = 0;
661     
662     zs = zh->service;
663     yaz_log(YLOG_DEBUG, "zebra_close zh=%p", zh);
664     resultSetDestroy(zh, -1, 0, 0);
665
666     if (zh->reg)
667         zebra_register_close(zh->service, zh->reg);
668     zebra_close_res(zh);
669     res_close(zh->session_res);
670
671     xfree(zh->record_encoding);
672
673     xfree(zh->dbaccesslist);
674
675     for (i = 0; i < zh->num_basenames; i++)
676         xfree(zh->basenames[i]);
677     xfree(zh->basenames);
678
679     if (zh->iconv_to_utf8 != 0)
680         yaz_iconv_close(zh->iconv_to_utf8);
681     if (zh->iconv_from_utf8 != 0)
682         yaz_iconv_close(zh->iconv_from_utf8);
683
684     zebra_mutex_cond_lock(&zs->session_lock);
685     zebra_lock_destroy(zh->lock_normal);
686     zebra_lock_destroy(zh->lock_shadow);
687     sp = &zs->sessions;
688     while (1)
689     {
690         assert(*sp);
691         if (*sp == zh)
692         {
693             *sp = (*sp)->next;
694             break;
695         }
696         sp = &(*sp)->next;
697     }
698     zebra_mutex_cond_unlock(&zs->session_lock);
699     xfree(zh->reg_name);
700     xfree(zh->user_perm);
701     zh->service = 0; /* more likely to trigger an assert */
702
703     zebra_limit_destroy(zh->m_limit);
704
705     nmem_destroy(zh->nmem_error);
706
707     xfree(zh->path_reg);
708     xfree(zh);
709     return ZEBRA_OK;
710 }
711
712 struct map_baseinfo {
713     ZebraHandle zh;
714     NMEM mem;
715     int num_bases;
716     char **basenames;
717     int new_num_bases;
718     char **new_basenames;
719     int new_num_max;
720 };
721
722 static void zebra_open_res(ZebraHandle zh)
723 {
724     char fname[512];
725     ASSERTZH;
726     zh->errCode = 0;
727
728     if (zh->path_reg)
729     {
730         sprintf(fname, "%.200s/zebra.cfg", zh->path_reg);
731         zh->res = res_open(zh->session_res, 0);
732         res_read_file(zh->res, fname);
733     }
734     else if (*zh->reg_name == 0)
735     {
736         zh->res = res_open(zh->session_res, 0);
737     }
738     else
739     {
740         yaz_log(YLOG_WARN, "no register root specified");
741         zh->res = 0;  /* no path for register - fail! */
742     }
743 }
744
745 static void zebra_close_res(ZebraHandle zh)
746 {
747     ASSERTZH;
748     zh->errCode = 0;
749     res_close(zh->res);
750     zh->res = 0;
751 }
752
753 static void zebra_select_register(ZebraHandle zh, const char *new_reg)
754 {
755     ASSERTZH;
756     zh->errCode = 0;
757     if (zh->res && strcmp(zh->reg_name, new_reg) == 0)
758         return;
759     if (!zh->res)
760     {
761         assert(zh->reg == 0);
762         assert(*zh->reg_name == 0);
763     }
764     else
765     {
766         if (zh->reg)
767         {
768             resultSetInvalidate(zh);
769             zebra_register_close(zh->service, zh->reg);
770             zh->reg = 0;
771         }
772         zebra_close_res(zh);
773     }
774     xfree(zh->reg_name);
775     zh->reg_name = xstrdup(new_reg);
776
777     xfree(zh->path_reg);
778     zh->path_reg = 0;
779     if (zh->service->path_root)
780     {
781         zh->path_reg = xmalloc(strlen(zh->service->path_root) + 
782                                strlen(zh->reg_name) + 3);
783         strcpy(zh->path_reg, zh->service->path_root);
784         if (*zh->reg_name)
785         {
786             strcat(zh->path_reg, "/");
787             strcat(zh->path_reg, zh->reg_name);
788         }
789     }
790     zebra_open_res(zh);
791     
792     if (zh->lock_normal)
793         zebra_lock_destroy(zh->lock_normal);
794     zh->lock_normal = 0;
795
796     if (zh->lock_shadow)
797         zebra_lock_destroy(zh->lock_shadow);
798     zh->lock_shadow = 0;
799
800     if (zh->res)
801     {
802         char fname[512];
803         const char *lock_area = res_get(zh->res, "lockDir");
804         
805         if (!lock_area && zh->path_reg)
806             res_set(zh->res, "lockDir", zh->path_reg);
807         sprintf(fname, "norm.%s.LCK", zh->reg_name);
808         zh->lock_normal =
809             zebra_lock_create(res_get(zh->res, "lockDir"), fname);
810         
811         sprintf(fname, "shadow.%s.LCK", zh->reg_name);
812         zh->lock_shadow =
813             zebra_lock_create(res_get(zh->res, "lockDir"), fname);
814
815         if (!zh->lock_normal || !zh->lock_shadow)
816         {
817             if (zh->lock_normal)
818             {
819                 zebra_lock_destroy(zh->lock_normal);
820                 zh->lock_normal = 0;
821             }
822             if (zh->lock_shadow)
823             {
824                 zebra_lock_destroy(zh->lock_shadow);
825                 zh->lock_shadow = 0;
826             }
827             zebra_close_res(zh);
828         }
829     }
830     if (zh->res)
831     {
832         int approx = 0;
833         if (res_get_int(zh->res, "estimatehits", &approx) == ZEBRA_OK)
834             zebra_set_approx_limit(zh, approx);
835     }
836     if (zh->res)
837     {
838         if (res_get_int(zh->res, "staticrank", &zh->m_staticrank) == ZEBRA_OK)
839             yaz_log(YLOG_LOG, "static rank set and is %d", zh->m_staticrank);
840     }
841     if (zh->res)
842     {
843         if (res_get_int(zh->res, "segment", &zh->m_segment_indexing) == 
844             ZEBRA_OK)
845         {
846             yaz_log(YLOG_DEBUG, "segment indexing set and is %d",
847                     zh->m_segment_indexing);
848         }
849     }
850 }
851
852 void map_basenames_func(void *vp, const char *name, const char *value)
853 {
854     struct map_baseinfo *p = (struct map_baseinfo *) vp;
855     int i, no;
856     char fromdb[128], todb[8][128];
857
858     assert(value);
859     assert(name);
860     assert(vp);
861     
862     no =
863         sscanf(value, "%127s %127s %127s %127s %127s %127s %127s %127s %127s",
864                fromdb,  todb[0], todb[1], todb[2], todb[3], todb[4],
865                todb[5], todb[6], todb[7]);
866     if (no < 2)
867         return ;
868     no--;
869     for (i = 0; i<p->num_bases; i++)
870         if (p->basenames[i] && !STRCASECMP(p->basenames[i], fromdb))
871         {
872             p->basenames[i] = 0;
873             for (i = 0; i < no; i++)
874             {
875                 if (p->new_num_bases == p->new_num_max)
876                     return;
877                 p->new_basenames[(p->new_num_bases)++] = 
878                     nmem_strdup(p->mem, todb[i]);
879             }
880             return;
881         }
882 }
883
884 int zebra_select_default_database(ZebraHandle zh)
885 {
886     if (!zh->res)
887     {
888         /* no database has been selected - so we select based on
889            resource setting (including group)
890         */
891         const char *group = res_get(zh->session_res, "group");
892         const char *v = res_get_prefix(zh->session_res,
893                                        "database", group, "Default");
894         return zebra_select_database(zh, v);
895     }
896     return 0;
897 }
898
899 void map_basenames(ZebraHandle zh, ODR stream,
900                    int *num_bases, char ***basenames)
901 {
902     struct map_baseinfo info;
903     struct map_baseinfo *p = &info;
904     int i;
905     ASSERTZH;
906     yaz_log(log_level, "map_basenames ");
907     assert(stream);
908
909     info.zh = zh;
910
911     info.num_bases = *num_bases;
912     info.basenames = *basenames;
913     info.new_num_max = 128;
914     info.new_num_bases = 0;
915     info.new_basenames = (char **)
916         odr_malloc(stream, sizeof(*info.new_basenames) * info.new_num_max);
917     info.mem = stream->mem;
918
919     res_trav(zh->session_res, "mapdb", &info, map_basenames_func);
920     
921     for (i = 0; i<p->num_bases; i++)
922         if (p->basenames[i] && p->new_num_bases < p->new_num_max)
923         {
924             p->new_basenames[(p->new_num_bases)++] = 
925                 nmem_strdup(p->mem, p->basenames[i]);
926         }
927     *num_bases = info.new_num_bases;
928     *basenames = info.new_basenames;
929     for (i = 0; i<*num_bases; i++)
930         yaz_log(YLOG_DEBUG, "base %s", (*basenames)[i]);
931 }
932
933 ZEBRA_RES zebra_select_database(ZebraHandle zh, const char *basename)
934 {
935     ZEBRA_CHECK_HANDLE(zh);
936
937     yaz_log(log_level, "zebra_select_database %s",basename);
938     assert(basename);
939     return zebra_select_databases(zh, 1, &basename);
940 }
941
942 ZEBRA_RES zebra_select_databases(ZebraHandle zh, int num_bases,
943                                  const char **basenames)
944 {
945     int i;
946     const char *cp;
947     int len = 0;
948     char *new_reg = 0;
949
950     ZEBRA_CHECK_HANDLE(zh);
951     assert(basenames);
952
953     yaz_log(log_level, "zebra_select_databases n=%d [0]=%s",
954             num_bases,basenames[0]);
955     zh->errCode = 0;
956     
957     if (num_bases < 1)
958     {
959         zh->errCode = YAZ_BIB1_COMBI_OF_SPECIFIED_DATABASES_UNSUPP;
960         return ZEBRA_FAIL;
961     }
962
963     /* Check if the user has access to all databases (Seb) */
964     /* You could argue that this should happen later, after we have
965      * determined that the database(s) exist. */
966     if (zh->dbaccesslist) {
967         for (i = 0; i < num_bases; i++) {
968             const char *db = basenames[i];
969             char *p, *pp;
970             for (p = zh->dbaccesslist; p && *p; p = pp) {
971                 int len;
972                 if ((pp = strchr(p, '+'))) {
973                     len = pp - p;
974                     pp++;
975                 }
976                 else
977                     len = strlen(p);
978                 if (len == strlen(db) && !strncmp(db, p, len))
979                     break;
980             }
981             if (!p) {
982                 zh->errCode = YAZ_BIB1_ACCESS_TO_SPECIFIED_DATABASE_DENIED;
983                 return ZEBRA_FAIL;
984             }
985         }
986     }
987
988     for (i = 0; i < zh->num_basenames; i++)
989         xfree(zh->basenames[i]);
990     xfree(zh->basenames);
991     
992     zh->num_basenames = num_bases;
993     zh->basenames = xmalloc(zh->num_basenames * sizeof(*zh->basenames));
994     for (i = 0; i < zh->num_basenames; i++)
995         zh->basenames[i] = xstrdup(basenames[i]);
996
997     cp = strrchr(basenames[0], '/');
998     if (cp)
999     {
1000         len = cp - basenames[0];
1001         new_reg = xmalloc(len + 1);
1002         memcpy(new_reg, basenames[0], len);
1003         new_reg[len] = '\0';
1004     }
1005     else
1006         new_reg = xstrdup("");
1007     for (i = 1; i<num_bases; i++)
1008     {
1009         const char *cp1;
1010
1011         cp1 = strrchr(basenames[i], '/');
1012         if (cp)
1013         {
1014             if (!cp1)
1015             {
1016                 zh->errCode = YAZ_BIB1_COMBI_OF_SPECIFIED_DATABASES_UNSUPP;
1017                 return -1;
1018             }
1019             if (len != cp1 - basenames[i] ||
1020                 memcmp(basenames[i], new_reg, len))
1021             {
1022                 zh->errCode = YAZ_BIB1_COMBI_OF_SPECIFIED_DATABASES_UNSUPP;
1023                 return -1;
1024             }
1025         }
1026         else
1027         {
1028             if (cp1)
1029             {
1030                 zh->errCode = YAZ_BIB1_COMBI_OF_SPECIFIED_DATABASES_UNSUPP;
1031                 return ZEBRA_FAIL;
1032             }
1033         }
1034     }
1035     zebra_select_register(zh, new_reg);
1036     xfree(new_reg);
1037     if (!zh->res)
1038     {
1039         zh->errCode = YAZ_BIB1_DATABASE_UNAVAILABLE;
1040         return ZEBRA_FAIL;
1041     }
1042     if (!zh->lock_normal || !zh->lock_shadow)
1043     {
1044         zh->errCode = YAZ_BIB1_TEMPORARY_SYSTEM_ERROR;
1045         return ZEBRA_FAIL;
1046     }
1047     return ZEBRA_OK;
1048 }
1049
1050 ZEBRA_RES zebra_set_approx_limit(ZebraHandle zh, zint approx_limit)
1051 {
1052     if (approx_limit == 0)
1053         approx_limit = DEFAULT_APPROX_LIMIT;
1054     zh->approx_limit = approx_limit;
1055     return ZEBRA_OK;
1056 }
1057
1058 void zebra_set_partial_result(ZebraHandle zh)
1059 {
1060     zh->partial_result = 1;
1061 }
1062
1063
1064 ZEBRA_RES zebra_set_break_handler(ZebraHandle zh,
1065                                   int (*f)(void *client_data),
1066                                   void *client_data)
1067 {
1068     zh->break_handler_func = f;
1069     zh->break_handler_data = client_data;
1070     return ZEBRA_OK;
1071 }
1072
1073 ZEBRA_RES zebra_search_RPN_x(ZebraHandle zh, ODR o, Z_RPNQuery *query,
1074                              const char *setname, zint *hits,
1075                              int *estimated_hit_count,
1076                              int *partial_resultset)
1077 {
1078     ZEBRA_RES r;
1079     
1080     ZEBRA_CHECK_HANDLE(zh);
1081
1082     assert(o);
1083     assert(query);
1084     assert(hits);
1085     assert(setname);
1086     yaz_log(log_level, "zebra_search_rpn");
1087
1088     zh->partial_result = 0;
1089
1090     if (zebra_begin_read(zh) == ZEBRA_FAIL)
1091         return ZEBRA_FAIL;
1092
1093     r = resultSetAddRPN(zh, odr_extract_mem(o), query, 
1094                         zh->num_basenames, zh->basenames, setname,
1095                         hits, estimated_hit_count);
1096
1097     *partial_resultset = zh->partial_result;
1098     zebra_end_read(zh);
1099     return r;
1100 }
1101
1102 ZEBRA_RES zebra_search_RPN(ZebraHandle zh, ODR o, Z_RPNQuery *query,
1103                            const char *setname, zint *hits)
1104 {
1105     int estimated_hit_count;
1106     int partial_resultset;
1107     return zebra_search_RPN_x(zh, o, query, setname, hits,
1108                               &estimated_hit_count,
1109                               &partial_resultset);
1110 }
1111
1112 ZEBRA_RES zebra_records_retrieve(ZebraHandle zh, ODR stream,
1113                                  const char *setname,
1114                                  Z_RecordComposition *comp,
1115                                  const Odr_oid *input_format, int num_recs,
1116                                  ZebraRetrievalRecord *recs)
1117 {
1118     ZebraMetaRecord *poset;
1119     int i;
1120     ZEBRA_RES ret = ZEBRA_OK;
1121     zint *pos_array;
1122
1123     ZEBRA_CHECK_HANDLE(zh);
1124     assert(stream);
1125     assert(setname);
1126     assert(recs);
1127     assert(num_recs>0);
1128
1129     yaz_log(log_level, "zebra_records_retrieve n=%d", num_recs);
1130
1131     if (!zh->res)
1132     {
1133         zebra_setError(zh, YAZ_BIB1_SPECIFIED_RESULT_SET_DOES_NOT_EXIST,
1134                        setname);
1135         return ZEBRA_FAIL;
1136     }
1137     
1138     if (zebra_begin_read(zh) == ZEBRA_FAIL)
1139         return ZEBRA_FAIL;
1140
1141     pos_array = (zint *) xmalloc(num_recs * sizeof(*pos_array));
1142     for (i = 0; i<num_recs; i++)
1143         pos_array[i] = recs[i].position;
1144     poset = zebra_meta_records_create(zh, setname, num_recs, pos_array);
1145     if (!poset)
1146     {
1147         yaz_log(YLOG_DEBUG, "zebraPosSetCreate error");
1148         zebra_setError(zh, YAZ_BIB1_SPECIFIED_RESULT_SET_DOES_NOT_EXIST,
1149                        setname);
1150         ret = ZEBRA_FAIL;
1151     }
1152     else
1153     {
1154         WRBUF addinfo_w = wrbuf_alloc();
1155         for (i = 0; i < num_recs; i++)
1156         {
1157             recs[i].errCode = 0;
1158             recs[i].errString = 0;
1159             recs[i].format = 0;
1160             recs[i].len = 0;
1161             recs[i].buf = 0;
1162             recs[i].base = 0;
1163             recs[i].sysno = poset[i].sysno;
1164             if (poset[i].term)
1165             {
1166                 recs[i].format = yaz_oid_recsyn_sutrs;
1167                 recs[i].len = strlen(poset[i].term);
1168                 recs[i].buf = poset[i].term;
1169                 recs[i].base = poset[i].db;
1170             }
1171             else if (poset[i].sysno)
1172             {
1173                 char *buf;
1174                 int len = 0;
1175                 zebra_snippets *hit_snippet = zebra_snippets_create();
1176
1177                 /* we disable hit snippets for now. It does not work well
1178                    and it slows retrieval down a lot */
1179 #if 0
1180                 zebra_snippets_hit_vector(zh, setname, poset[i].sysno, 
1181                                           hit_snippet);
1182 #endif
1183                 wrbuf_rewind(addinfo_w);
1184                 recs[i].errCode =
1185                     zebra_record_fetch(zh, setname,
1186                                        poset[i].sysno, poset[i].score,
1187                                        stream, input_format, comp,
1188                                        &recs[i].format, &buf, &len,
1189                                        &recs[i].base, addinfo_w);
1190                 
1191                 if (wrbuf_len(addinfo_w))
1192                     recs[i].errString =
1193                         odr_strdup(stream, wrbuf_cstr(addinfo_w));
1194                 recs[i].len = len;
1195                 if (len > 0)
1196                 {
1197                     recs[i].buf = (char*) odr_malloc(stream, len);
1198                     memcpy(recs[i].buf, buf, len);
1199                 }
1200                 else
1201                     recs[i].buf = buf;
1202                 recs[i].score = poset[i].score;
1203                 zebra_snippets_destroy(hit_snippet);
1204             }
1205             else
1206             {
1207                 /* only need to set it once */
1208                 if (pos_array[i] < zh->approx_limit && ret == ZEBRA_OK)
1209                 {
1210                     zebra_setError_zint(zh,
1211                                         YAZ_BIB1_PRESENT_REQUEST_OUT_OF_RANGE,
1212                                         pos_array[i]);
1213                     ret = ZEBRA_FAIL;
1214                     break;
1215                 }
1216             }
1217         }
1218         zebra_meta_records_destroy(zh, poset, num_recs);
1219         wrbuf_destroy(addinfo_w);
1220     }
1221     zebra_end_read(zh);
1222     xfree(pos_array);
1223     return ret;
1224 }
1225
1226 ZEBRA_RES zebra_scan_PQF(ZebraHandle zh, ODR stream, const char *query,
1227                          int *position,
1228                          int *num_entries, ZebraScanEntry **entries,
1229                          int *is_partial,
1230                          const char *setname)
1231 {
1232     YAZ_PQF_Parser pqf_parser = yaz_pqf_create();
1233     Z_AttributesPlusTerm *zapt;
1234     Odr_oid *attributeSet;
1235     ZEBRA_RES res;
1236     
1237     if (!(zapt = yaz_pqf_scan(pqf_parser, stream, &attributeSet, query)))
1238     {
1239         res = ZEBRA_FAIL;
1240         zh->errCode = YAZ_BIB1_SCAN_MALFORMED_SCAN;
1241     }
1242     else
1243     {
1244         res = zebra_scan(zh, stream, zapt, yaz_oid_attset_bib_1,
1245                          position, num_entries, entries, is_partial,
1246                          setname);
1247     }
1248     yaz_pqf_destroy(pqf_parser);
1249     return res;
1250 }
1251
1252 ZEBRA_RES zebra_scan(ZebraHandle zh, ODR stream, Z_AttributesPlusTerm *zapt,
1253                      const Odr_oid *attributeset,
1254                      int *position,
1255                      int *num_entries, ZebraScanEntry **entries,
1256                      int *is_partial,
1257                      const char *setname)
1258 {
1259     ZEBRA_RES res;
1260
1261     ZEBRA_CHECK_HANDLE(zh);
1262
1263     assert(stream);
1264     assert(zapt);
1265     assert(position);
1266     assert(num_entries);
1267     assert(is_partial);
1268     assert(entries);
1269     yaz_log(log_level, "zebra_scan");
1270
1271     if (zebra_begin_read(zh) == ZEBRA_FAIL)
1272     {
1273         *entries = 0;
1274         *num_entries = 0;
1275         return ZEBRA_FAIL;
1276     }
1277
1278     res = rpn_scan(zh, stream, zapt, attributeset,
1279                    zh->num_basenames, zh->basenames, position,
1280                    num_entries, entries, is_partial, setname);
1281     zebra_end_read(zh);
1282     return res;
1283 }
1284
1285 ZEBRA_RES zebra_sort(ZebraHandle zh, ODR stream,
1286                      int num_input_setnames, const char **input_setnames,
1287                      const char *output_setname,
1288                      Z_SortKeySpecList *sort_sequence,
1289                      int *sort_status)
1290 {
1291     ZEBRA_RES res;
1292     ZEBRA_CHECK_HANDLE(zh);
1293     assert(stream);
1294     assert(num_input_setnames>0);
1295     assert(input_setnames);
1296     assert(sort_sequence);
1297     assert(sort_status);
1298     yaz_log(log_level, "zebra_sort");
1299
1300     if (zebra_begin_read(zh) == ZEBRA_FAIL)
1301         return ZEBRA_FAIL;
1302     res = resultSetSort(zh, stream->mem, num_input_setnames, input_setnames,
1303                         output_setname, sort_sequence, sort_status);
1304     zebra_end_read(zh);
1305     return res;
1306 }
1307
1308 int zebra_deleteResultSet(ZebraHandle zh, int function,
1309                           int num_setnames, char **setnames,
1310                           int *statuses)
1311 {
1312     int i, status;
1313     ASSERTZH;
1314     yaz_log(log_level, "zebra_deleteResultSet n=%d", num_setnames);
1315
1316     if (zebra_begin_read(zh))
1317         return Z_DeleteStatus_systemProblemAtTarget;
1318     switch (function)
1319     {
1320     case Z_DeleteResultSetRequest_list:
1321         assert(num_setnames>0);
1322         assert(setnames);
1323         resultSetDestroy(zh, num_setnames, setnames, statuses);
1324         break;
1325     case Z_DeleteResultSetRequest_all:
1326         resultSetDestroy(zh, -1, 0, statuses);
1327         break;
1328     }
1329     zebra_end_read(zh);
1330     status = Z_DeleteStatus_success;
1331     for (i = 0; i<num_setnames; i++)
1332         if (statuses[i] == Z_DeleteStatus_resultSetDidNotExist)
1333             status = statuses[i];
1334     return status;
1335 }
1336
1337 int zebra_errCode(ZebraHandle zh)
1338 {
1339     if (zh)
1340     {
1341         yaz_log(log_level, "zebra_errCode: %d",zh->errCode);
1342         return zh->errCode;
1343     }
1344     yaz_log(log_level, "zebra_errCode: o");
1345     return 0; 
1346 }
1347
1348 const char *zebra_errString(ZebraHandle zh)
1349 {
1350     const char *e = 0;
1351     if (zh)
1352         e= diagbib1_str(zh->errCode);
1353     yaz_log(log_level, "zebra_errString: %s",e);
1354     return e;
1355 }
1356
1357 char *zebra_errAdd(ZebraHandle zh)
1358 {
1359     char *a = 0;
1360     if (zh)
1361         a= zh->errString;
1362     yaz_log(log_level, "zebra_errAdd: %s",a);
1363     return a;
1364 }
1365
1366 ZEBRA_RES zebra_auth(ZebraHandle zh, const char *user, const char *pass)
1367 {
1368     const char *p;
1369     const char *astring;
1370     char u[40];
1371     ZebraService zs;
1372
1373     ZEBRA_CHECK_HANDLE(zh);
1374
1375     zs = zh->service;
1376     
1377     sprintf(u, "perm.%.30s", user ? user : "anonymous");
1378     p = res_get(zs->global_res, u);
1379     xfree(zh->user_perm);
1380     zh->user_perm = xstrdup(p ? p : "r");
1381
1382     /* Determine database access list */
1383     astring = res_get(zs->dbaccess, user ? user : "anonymous");
1384     if (astring)
1385         zh->dbaccesslist = xstrdup(astring);
1386     else
1387         zh->dbaccesslist = 0;
1388
1389     /* users that don't require a password .. */
1390     if (zh->user_perm && strchr(zh->user_perm, 'a'))
1391         return ZEBRA_OK;
1392     
1393     if (!zs->passwd_db || !passwd_db_auth(zs->passwd_db, user, pass))
1394         return ZEBRA_OK;
1395     return ZEBRA_FAIL;
1396 }
1397
1398 ZEBRA_RES zebra_admin_import_begin(ZebraHandle zh, const char *database,
1399                                    const char *record_type)
1400 {
1401     yaz_log(log_level, "zebra_admin_import_begin db=%s rt=%s", 
1402             database, record_type);
1403     if (zebra_select_database(zh, database) == ZEBRA_FAIL)
1404         return ZEBRA_FAIL;
1405     return zebra_begin_trans(zh, 1);
1406 }
1407
1408 ZEBRA_RES zebra_admin_import_end(ZebraHandle zh)
1409 {
1410     ZEBRA_CHECK_HANDLE(zh);
1411     yaz_log(log_level, "zebra_admin_import_end");
1412     return zebra_end_trans(zh);
1413 }
1414
1415 ZEBRA_RES zebra_admin_import_segment(ZebraHandle zh, Z_Segment *segment)
1416 {
1417     ZEBRA_RES res = ZEBRA_OK;
1418     zint sysno;
1419     int i;
1420     ZEBRA_CHECK_HANDLE(zh);
1421     yaz_log(log_level, "zebra_admin_import_segment");
1422
1423     for (i = 0; i<segment->num_segmentRecords; i++)
1424     {
1425         Z_NamePlusRecord *npr = segment->segmentRecords[i];
1426
1427         if (npr->which == Z_NamePlusRecord_intermediateFragment)
1428         {
1429             Z_FragmentSyntax *fragment = npr->u.intermediateFragment;
1430             if (fragment->which == Z_FragmentSyntax_notExternallyTagged)
1431             {
1432                 Odr_oct *oct = fragment->u.notExternallyTagged;
1433                 sysno = 0;
1434                 
1435                 if(zebra_update_record(
1436                        zh, 
1437                        action_update,
1438                        0, /* record Type */
1439                        &sysno,
1440                        0, /* match */
1441                        0, /* fname */
1442                        (const char *) oct->buf, oct->len) == ZEBRA_FAIL)
1443                     res = ZEBRA_FAIL;
1444             }
1445         }
1446     }
1447     return res;
1448 }
1449
1450 int delete_w_handle(const char *info, void *handle)
1451 {
1452     ZebraHandle zh = (ZebraHandle) handle;
1453     ISAM_P pos;
1454
1455     if (*info == sizeof(pos))
1456     {
1457         memcpy(&pos, info+1, sizeof(pos));
1458         isamb_unlink(zh->reg->isamb, pos);
1459     }
1460     return 0;
1461 }
1462
1463 int delete_w_all_handle(const char *info, void *handle)
1464 {
1465     ZebraHandle zh = (ZebraHandle) handle;
1466     ISAM_P pos;
1467
1468     if (*info == sizeof(pos))
1469     {
1470         ISAMB_PP pt;
1471         memcpy(&pos, info+1, sizeof(pos));
1472         pt = isamb_pp_open(zh->reg->isamb, pos, 2);
1473         if (pt)
1474         {
1475             struct it_key key;
1476             key.mem[0] = 0;
1477             while (isamb_pp_read(pt, &key))
1478             {
1479                 Record rec;
1480                 rec = rec_get(zh->reg->records, key.mem[0]);
1481                 rec_del(zh->reg->records, &rec);
1482             }
1483             isamb_pp_close(pt);
1484         }
1485     }
1486     return delete_w_handle(info, handle);
1487 }
1488
1489 static int delete_SU_handle(void *handle, int ord,
1490                             const char *index_type, const char *string_index,
1491                             zinfo_index_category_t cat)
1492 {
1493     ZebraHandle zh = (ZebraHandle) handle;
1494     char ord_buf[20];
1495     int ord_len;
1496 #if 0
1497     yaz_log(YLOG_LOG, "ord=%d index_type=%s index=%s cat=%d", ord,
1498             index_type, string_index, (int) cat);
1499 #endif
1500     ord_len = key_SU_encode(ord, ord_buf);
1501     ord_buf[ord_len] = '\0';
1502
1503     assert(zh->reg->isamb);
1504     assert(zh->reg->records);
1505     dict_delete_subtree(zh->reg->dict, ord_buf,
1506                         zh, 
1507                         !strcmp(string_index, "_ALLRECORDS") ?
1508                         delete_w_all_handle : delete_w_handle);
1509     return 0;
1510 }
1511
1512 ZEBRA_RES zebra_drop_database(ZebraHandle zh, const char *db)
1513 {
1514     ZEBRA_RES ret = ZEBRA_OK;
1515
1516     yaz_log(log_level, "zebra_drop_database %s", db);
1517     ZEBRA_CHECK_HANDLE(zh);
1518
1519     if (zebra_select_database(zh, db) == ZEBRA_FAIL)
1520         return ZEBRA_FAIL;
1521     if (zebra_begin_trans(zh, 1) == ZEBRA_FAIL)
1522         return ZEBRA_FAIL;
1523     if (zh->reg->isamb)
1524     {
1525         int db_ord;
1526         if (zebraExplain_curDatabase(zh->reg->zei, db))
1527         {
1528             zebra_setError(zh, YAZ_BIB1_DATABASE_DOES_NOT_EXIST, db);
1529             ret = ZEBRA_FAIL;
1530         }
1531         else
1532         {
1533             db_ord = zebraExplain_get_database_ord(zh->reg->zei);
1534             dict_delete_subtree_ord(zh->reg->matchDict, db_ord,
1535                                     0 /* handle */, 0 /* func */);
1536             zebraExplain_trav_ord(zh->reg->zei, zh, delete_SU_handle);
1537             zebraExplain_removeDatabase(zh->reg->zei, zh);
1538             zebra_remove_file_match(zh);
1539         }
1540     }
1541     else
1542     {
1543         yaz_log(YLOG_WARN, "drop database only supported for isam:b");
1544         zebra_setError(zh, YAZ_BIB1_ES_IMMEDIATE_EXECUTION_FAILED,
1545                        "drop database only supported for isam:b");
1546         ret = ZEBRA_FAIL;
1547     }
1548     if (zebra_end_trans(zh) != ZEBRA_OK)
1549     {
1550         yaz_log(YLOG_WARN, "zebra_end_trans failed");
1551         ret = ZEBRA_FAIL;
1552     }
1553     return ret;
1554 }
1555
1556 ZEBRA_RES zebra_create_database(ZebraHandle zh, const char *db)
1557 {
1558     yaz_log(log_level, "zebra_create_database %s", db);
1559     ZEBRA_CHECK_HANDLE(zh);
1560     assert(db);
1561
1562     if (zebra_select_database(zh, db) == ZEBRA_FAIL)
1563         return ZEBRA_FAIL;
1564     if (zebra_begin_trans(zh, 1))
1565         return ZEBRA_FAIL;
1566
1567     /* announce database */
1568     if (zebraExplain_newDatabase(zh->reg->zei, db, 0 
1569                                  /* explainDatabase */))
1570     {
1571         if (zebra_end_trans(zh) != ZEBRA_OK)
1572         {
1573             yaz_log(YLOG_WARN, "zebra_end_trans failed");
1574         }
1575         zebra_setError(zh, YAZ_BIB1_ES_IMMEDIATE_EXECUTION_FAILED, db);
1576         return ZEBRA_FAIL;
1577     }
1578     return zebra_end_trans(zh);
1579 }
1580
1581 int zebra_string_norm(ZebraHandle zh, const char *index_type,
1582                       const char *input_str, int input_len,
1583                       char *output_str, int output_len)
1584 {
1585     WRBUF wrbuf;
1586     zebra_map_t zm = zebra_map_get(zh->reg->zebra_maps, index_type);
1587     ASSERTZH;
1588     assert(input_str);
1589     assert(output_str);
1590     yaz_log(log_level, "zebra_string_norm ");
1591
1592     if (!zh->reg->zebra_maps)
1593         return -1;
1594     wrbuf = zebra_replace(zm, "", input_str, input_len);
1595     if (!wrbuf)
1596         return -2;
1597     if (wrbuf_len(wrbuf) >= output_len)
1598         return -3;
1599     if (wrbuf_len(wrbuf))
1600         memcpy(output_str, wrbuf_buf(wrbuf), wrbuf_len(wrbuf));
1601     output_str[wrbuf_len(wrbuf)] = '\0';
1602     return wrbuf_len(wrbuf);
1603 }
1604
1605 /** \brief set register state (state*.LCK)
1606     \param zh Zebra handle
1607     \param val state
1608     \param seqno sequence number
1609     
1610     val is one of:
1611     d=writing to shadow(shadow enabled); writing to register (shadow disabled)
1612     o=reading only
1613     c=commit (writing to register, reading from shadow, shadow mode only)
1614 */
1615 static void zebra_set_state(ZebraHandle zh, int val, int seqno)
1616 {
1617     char state_fname[256];
1618     char *fname;
1619     long p = getpid();
1620     FILE *f;
1621     ASSERTZH;
1622     yaz_log(log_level, "zebra_set_state v=%c seq=%d", val, seqno);
1623
1624     sprintf(state_fname, "state.%s.LCK", zh->reg_name);
1625     fname = zebra_mk_fname(res_get(zh->res, "lockDir"), state_fname);
1626     f = fopen(fname, "w");
1627
1628     yaz_log(YLOG_DEBUG, "zebra_set_state: %c %d %ld", val, seqno, p);
1629     fprintf(f, "%c %d %ld\n", val, seqno, p);
1630     fclose(f);
1631     xfree(fname);
1632 }
1633
1634 static void zebra_get_state(ZebraHandle zh, char *val, int *seqno)
1635 {
1636     char state_fname[256];
1637     char *fname;
1638     FILE *f;
1639
1640     ASSERTZH;
1641     yaz_log(log_level, "zebra_get_state ");
1642
1643     sprintf(state_fname, "state.%s.LCK", zh->reg_name);
1644     fname = zebra_mk_fname(res_get(zh->res, "lockDir"), state_fname);
1645     f = fopen(fname, "r");
1646     *val = 'o';
1647     *seqno = 0;
1648
1649     if (f)
1650     {
1651         if (fscanf(f, "%c %d", val, seqno) != 2)
1652         {
1653             yaz_log(YLOG_ERRNO|YLOG_WARN, "fscan fail %s",
1654                     state_fname);
1655         }
1656         fclose(f);
1657     }
1658     xfree(fname);
1659 }
1660
1661 ZEBRA_RES zebra_begin_read(ZebraHandle zh)
1662 {
1663     return zebra_begin_trans(zh, 0);
1664 }
1665
1666 ZEBRA_RES zebra_end_read(ZebraHandle zh)
1667 {
1668     return zebra_end_trans(zh);
1669 }
1670
1671 static void read_res_for_transaction(ZebraHandle zh)
1672 {
1673     const char *group = res_get(zh->res, "group");
1674     const char *v;
1675     /* FIXME - do we still use groups ?? */
1676     
1677     zh->m_group = group;
1678     v = res_get_prefix(zh->res, "followLinks", group, "1");
1679     zh->m_follow_links = atoi(v);
1680
1681     zh->m_record_id = res_get_prefix(zh->res, "recordId", group, 0);
1682     zh->m_record_type = res_get_prefix(zh->res, "recordType", group, 0);
1683
1684     v = res_get_prefix(zh->res, "storeKeys", group, "1");
1685     zh->m_store_keys = atoi(v);
1686
1687     v = res_get_prefix(zh->res, "storeData", group, "1");
1688     zh->m_store_data = atoi(v);
1689
1690     v = res_get_prefix(zh->res, "explainDatabase", group, "0");
1691     zh->m_explain_database = atoi(v);
1692
1693     v = res_get_prefix(zh->res, "openRW", group, "1");
1694     zh->m_flag_rw = atoi(v);
1695
1696     v = res_get_prefix(zh->res, "fileVerboseLimit", group, "1000");
1697     zh->m_file_verbose_limit = atoi(v);
1698 }
1699
1700 ZEBRA_RES zebra_begin_trans(ZebraHandle zh, int rw)
1701 {
1702     ZEBRA_CHECK_HANDLE(zh);
1703     zebra_select_default_database(zh);
1704     if (!zh->res)
1705     {
1706         zebra_setError(zh, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR,
1707                        "zebra_begin_trans: no database selected");
1708         return ZEBRA_FAIL;
1709     }
1710     ASSERTZHRES;
1711     yaz_log(log_level, "zebra_begin_trans rw=%d",rw);
1712
1713     if (zh->user_perm)
1714     {
1715         if (rw && !strchr(zh->user_perm, 'w'))
1716         {
1717             zebra_setError(
1718                 zh,
1719                 YAZ_BIB1_ES_PERMISSION_DENIED_ON_ES_CANNOT_MODIFY_OR_DELETE,
1720                 0);
1721             return ZEBRA_FAIL;
1722         }
1723     }
1724
1725     assert(zh->res);
1726     if (rw)
1727     {
1728         int seqno = 0;
1729         char val = '?';
1730         const char *rval = 0;
1731         
1732         (zh->trans_no++);
1733         if (zh->trans_w_no)
1734         {
1735             read_res_for_transaction(zh);
1736             return 0;
1737         }
1738         if (zh->trans_no != 1)
1739         {
1740             zebra_setError(zh, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR,
1741                            "zebra_begin_trans: no write trans within read");
1742             return ZEBRA_FAIL;
1743         }
1744         if (zh->reg)
1745         {
1746             resultSetInvalidate(zh);
1747             zebra_register_close(zh->service, zh->reg);
1748         }
1749         zh->trans_w_no = zh->trans_no;
1750
1751         zh->records_inserted = 0;
1752         zh->records_updated = 0;
1753         zh->records_deleted = 0;
1754         zh->records_processed = 0;
1755         zh->records_skipped = 0;
1756         
1757 #if HAVE_SYS_TIMES_H
1758         times(&zh->tms1);
1759 #endif
1760         /* lock */
1761         if (zh->shadow_enable)
1762             rval = res_get(zh->res, "shadow");
1763         
1764         if (rval)
1765         {
1766             zebra_lock_r(zh->lock_normal);
1767             zebra_lock_w(zh->lock_shadow);
1768         }
1769         else
1770         {
1771             zebra_lock_w(zh->lock_normal);
1772             zebra_lock_w(zh->lock_shadow);
1773         }
1774         zebra_get_state(zh, &val, &seqno);
1775         if (val != 'o')
1776         {
1777             /* either we didn't finish commit or shadow is dirty */
1778             if (!rval)
1779             {
1780                 yaz_log(YLOG_WARN, "previous transaction did not finish "
1781                         "(shadow disabled)");
1782             }
1783             zebra_unlock(zh->lock_shadow);
1784             zebra_unlock(zh->lock_normal);
1785             if (zebra_commit(zh))
1786             {
1787                 zh->trans_no--;
1788                 zh->trans_w_no = 0;
1789                 return ZEBRA_FAIL;
1790             }
1791             if (rval)
1792             {
1793                 zebra_lock_r(zh->lock_normal);
1794                 zebra_lock_w(zh->lock_shadow);
1795             }
1796             else
1797             {
1798                 zebra_lock_w(zh->lock_normal);
1799                 zebra_lock_w(zh->lock_shadow);
1800             }
1801         }
1802
1803         zebra_set_state(zh, 'd', seqno);
1804         
1805         zh->reg = zebra_register_open(zh->service, zh->reg_name,
1806                                       1, rval ? 1 : 0, zh->res,
1807                                       zh->path_reg);
1808         if (zh->reg)
1809             zh->reg->seqno = seqno;
1810         else
1811         {
1812             zebra_set_state(zh, 'o', seqno);
1813             
1814             zebra_unlock(zh->lock_shadow);
1815             zebra_unlock(zh->lock_normal);
1816
1817             zh->trans_no--;
1818             zh->trans_w_no = 0;
1819
1820             zebra_setError(zh, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR,
1821                            "zebra_begin_trans: cannot open register");
1822             yaz_log(YLOG_FATAL, "%s", zh->errString);
1823             return ZEBRA_FAIL;
1824         }
1825         zebraExplain_curDatabase(zh->reg->zei, zh->basenames[0]);
1826     }
1827     else
1828     {
1829         int dirty = 0;
1830         char val;
1831         int seqno;
1832         
1833         (zh->trans_no)++;
1834         
1835         if (zh->trans_no != 1)
1836         {
1837             return zebra_flush_reg(zh);
1838         }
1839 #if HAVE_SYS_TIMES_H
1840         times(&zh->tms1);
1841 #endif
1842         if (!zh->res)
1843         {
1844             (zh->trans_no)--;
1845             zh->errCode = YAZ_BIB1_DATABASE_UNAVAILABLE;
1846             return ZEBRA_FAIL;
1847         }
1848         if (!zh->lock_normal || !zh->lock_shadow)
1849         {
1850             (zh->trans_no)--;
1851             zh->errCode = YAZ_BIB1_TEMPORARY_SYSTEM_ERROR;
1852             return ZEBRA_FAIL;
1853         }
1854         zebra_get_state(zh, &val, &seqno);
1855         if (val == 'd')
1856             val = 'o';
1857         
1858         if (!zh->reg)
1859             dirty = 1;
1860         else if (seqno != zh->reg->seqno)
1861         {
1862             yaz_log(YLOG_DEBUG, "reopen seqno cur/old %d/%d",
1863                     seqno, zh->reg->seqno);
1864             dirty = 1;
1865         }
1866         else if (zh->reg->last_val != val)
1867         {
1868             yaz_log(YLOG_DEBUG, "reopen last cur/old %d/%d",
1869                     val, zh->reg->last_val);
1870             dirty = 1;
1871         }
1872         if (!dirty)
1873             return ZEBRA_OK;
1874         
1875         if (val == 'c')
1876             zebra_lock_r(zh->lock_shadow);
1877         else
1878             zebra_lock_r(zh->lock_normal);
1879         
1880         if (zh->reg)
1881         {
1882             resultSetInvalidate(zh);
1883             zebra_register_close(zh->service, zh->reg);
1884         }
1885         zh->reg = zebra_register_open(zh->service, zh->reg_name,
1886                                       0, val == 'c' ? 1 : 0,
1887                                       zh->res, zh->path_reg);
1888         if (!zh->reg)
1889         {
1890             zebra_unlock(zh->lock_normal);
1891             zebra_unlock(zh->lock_shadow);
1892             zh->trans_no--;
1893             zh->errCode = YAZ_BIB1_DATABASE_UNAVAILABLE;
1894             return ZEBRA_FAIL;
1895         }
1896         zh->reg->last_val = val;
1897         zh->reg->seqno = seqno;
1898     }
1899     read_res_for_transaction(zh);
1900     return ZEBRA_OK;
1901 }
1902
1903 ZEBRA_RES zebra_end_trans(ZebraHandle zh)
1904 {
1905     ZebraTransactionStatus dummy;
1906
1907     yaz_log(log_level, "zebra_end_trans");
1908     ZEBRA_CHECK_HANDLE(zh);
1909     return zebra_end_transaction(zh, &dummy);
1910 }
1911
1912 ZEBRA_RES zebra_end_transaction(ZebraHandle zh, ZebraTransactionStatus *status)
1913 {
1914     char val;
1915     int seqno;
1916     const char *rval;
1917
1918     ZEBRA_CHECK_HANDLE(zh);
1919
1920     assert(status);
1921     yaz_log(log_level, "zebra_end_transaction");
1922
1923     status->processed = 0;
1924     status->inserted  = 0;
1925     status->updated   = 0;
1926     status->deleted   = 0;
1927     status->utime     = 0;
1928     status->stime     = 0;
1929
1930     if (!zh->res || !zh->reg)
1931     {
1932         zebra_setError(zh, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR,
1933                        "zebra_end_trans: no open transaction");
1934         return ZEBRA_FAIL;
1935     }
1936     if (zh->trans_no != zh->trans_w_no)
1937     {
1938         zh->trans_no--;
1939         if (zh->trans_no != 0)
1940             return ZEBRA_OK;
1941
1942         /* release read lock */
1943
1944         zebra_unlock(zh->lock_normal);
1945         zebra_unlock(zh->lock_shadow);
1946     }
1947     else
1948     {   /* release write lock */
1949         zh->trans_no--;
1950         zh->trans_w_no = 0;
1951         
1952         yaz_log(YLOG_DEBUG, "zebra_end_trans");
1953         rval = res_get(zh->res, "shadow");
1954         
1955         zebraExplain_runNumberIncrement(zh->reg->zei, 1);
1956         
1957         zebra_flush_reg(zh);
1958         
1959         resultSetInvalidate(zh);
1960
1961         zebra_register_close(zh->service, zh->reg);
1962         zh->reg = 0;
1963         
1964         yaz_log(YLOG_LOG, "Records: "ZINT_FORMAT" i/u/d "
1965                 ZINT_FORMAT"/"ZINT_FORMAT"/"ZINT_FORMAT, 
1966                 zh->records_processed, zh->records_inserted,
1967                 zh->records_updated, zh->records_deleted);
1968         
1969         status->processed = zh->records_processed;
1970         status->inserted = zh->records_inserted;
1971         status->updated = zh->records_updated;
1972         status->deleted = zh->records_deleted;
1973         
1974         zebra_get_state(zh, &val, &seqno);
1975         if (val != 'd')
1976         {
1977             BFiles bfs = bfs_create(rval, zh->path_reg);
1978             bf_commitClean(bfs, rval);
1979             bfs_destroy(bfs);
1980         }
1981         if (!rval)
1982             seqno++;
1983         zebra_set_state(zh, 'o', seqno);
1984         zebra_unlock(zh->lock_shadow);
1985         zebra_unlock(zh->lock_normal);
1986         
1987     }
1988 #if HAVE_SYS_TIMES_H
1989     times(&zh->tms2);
1990     yaz_log(log_level, "user/system: %ld/%ld",
1991             (long) (zh->tms2.tms_utime - zh->tms1.tms_utime),
1992             (long) (zh->tms2.tms_stime - zh->tms1.tms_stime));
1993     
1994     status->utime = (long) (zh->tms2.tms_utime - zh->tms1.tms_utime);
1995     status->stime = (long) (zh->tms2.tms_stime - zh->tms1.tms_stime);
1996 #endif
1997     return ZEBRA_OK;
1998 }
1999
2000 ZEBRA_RES zebra_repository_update(ZebraHandle zh, const char *path)
2001 {
2002     return zebra_repository_index(zh, path, action_update);
2003 }
2004
2005 ZEBRA_RES zebra_repository_delete(ZebraHandle zh, const char *path)
2006 {
2007     return zebra_repository_index(zh, path, action_delete);
2008 }
2009
2010 ZEBRA_RES zebra_repository_index(ZebraHandle zh, const char *path,
2011                                  enum zebra_recctrl_action_t action)
2012 {
2013     ASSERTZH;
2014     assert(path);
2015
2016     if (action == action_update)
2017         yaz_log(log_level, "updating %s", path);
2018     else if (action == action_delete)
2019         yaz_log(log_level, "deleting %s", path);
2020     else if (action == action_a_delete)
2021         yaz_log(log_level, "attempt deleting %s", path);
2022     else
2023         yaz_log(log_level, "update action=%d", (int) action);
2024
2025     if (zh->m_record_id && !strcmp(zh->m_record_id, "file"))
2026         return zebra_update_file_match(zh, path);
2027     else
2028         return zebra_update_from_path(zh, path, action);
2029 }
2030
2031 ZEBRA_RES zebra_repository_show(ZebraHandle zh, const char *path)
2032 {
2033     ASSERTZH;
2034     assert(path);
2035     yaz_log(log_level, "zebra_repository_show");
2036     repositoryShow(zh, path);
2037     return ZEBRA_OK;
2038 }
2039
2040 static ZEBRA_RES zebra_commit_ex(ZebraHandle zh, int clean_only)
2041 {
2042     int seqno;
2043     char val;
2044     const char *rval;
2045     BFiles bfs;
2046     ZEBRA_RES res = ZEBRA_OK;
2047
2048     ASSERTZH;
2049
2050     yaz_log(log_level, "zebra_commit_ex clean_only=%d", clean_only);
2051     zebra_select_default_database(zh);
2052     if (!zh->res)
2053     {
2054         zh->errCode = YAZ_BIB1_DATABASE_UNAVAILABLE;
2055         return ZEBRA_FAIL;
2056     }
2057     rval = res_get(zh->res, "shadow");    
2058     if (!rval)
2059     {
2060         yaz_log(YLOG_WARN, "Cannot perform commit - No shadow area defined");
2061         return ZEBRA_OK;
2062     }
2063
2064     zebra_lock_w(zh->lock_normal);
2065     zebra_lock_r(zh->lock_shadow);
2066
2067     bfs = bfs_create(res_get(zh->res, "register"), zh->path_reg);
2068     if (!bfs)
2069     {
2070         zebra_unlock(zh->lock_shadow);
2071         zebra_unlock(zh->lock_normal);
2072         return ZEBRA_FAIL;
2073     }
2074     zebra_get_state(zh, &val, &seqno);
2075
2076     if (val == 'd')
2077     {
2078         /* shadow area is dirty and so we must throw it away */
2079         yaz_log(YLOG_WARN, "previous transaction didn't reach commit");
2080         clean_only = 1;
2081     }
2082     else if (val == 'c')
2083     {
2084         /* commit has started. We can not remove it anymore */
2085         clean_only = 0;
2086     }
2087
2088     if (rval && *rval)
2089         bf_cache(bfs, rval);
2090     if (bf_commitExists(bfs))
2091     {
2092         if (clean_only)
2093             zebra_set_state(zh, 'd', seqno);
2094         else
2095         {
2096             zebra_set_state(zh, 'c', seqno);
2097             
2098             yaz_log(log_level, "commit start");
2099             if (bf_commitExec(bfs))
2100                 res = ZEBRA_FAIL;
2101         }
2102         if (res == ZEBRA_OK)
2103         {
2104             seqno++;
2105             zebra_set_state(zh, 'o', seqno);
2106             
2107             zebra_unlock(zh->lock_shadow);
2108             zebra_unlock(zh->lock_normal);
2109             
2110             zebra_lock_w(zh->lock_shadow);
2111             bf_commitClean(bfs, rval);
2112             zebra_unlock(zh->lock_shadow);
2113         }
2114         else
2115         {
2116             zebra_unlock(zh->lock_shadow);
2117             zebra_unlock(zh->lock_normal);
2118             yaz_log(YLOG_WARN, "zebra_commit: failed");
2119         }
2120     }
2121     else
2122     {
2123         zebra_unlock(zh->lock_shadow);
2124         zebra_unlock(zh->lock_normal);
2125         yaz_log(log_level, "nothing to commit");
2126     }
2127     bfs_destroy(bfs);
2128
2129     return res;
2130 }
2131
2132 ZEBRA_RES zebra_clean(ZebraHandle zh)
2133 {
2134     yaz_log(log_level, "zebra_clean");
2135     ZEBRA_CHECK_HANDLE(zh);
2136     return zebra_commit_ex(zh, 1);
2137 }
2138
2139 ZEBRA_RES zebra_commit(ZebraHandle zh)
2140 {
2141     yaz_log(log_level, "zebra_commit");
2142     ZEBRA_CHECK_HANDLE(zh);
2143     return zebra_commit_ex(zh, 0);
2144 }
2145
2146
2147 ZEBRA_RES zebra_init(ZebraHandle zh)
2148 {
2149     const char *rval;
2150     BFiles bfs = 0;
2151
2152     yaz_log(log_level, "zebra_init");
2153
2154     ZEBRA_CHECK_HANDLE(zh);
2155
2156     zebra_select_default_database(zh);
2157     if (!zh->res)
2158     {
2159         zebra_setError(zh, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR,
2160                        "cannot select default database");
2161         return ZEBRA_FAIL;
2162     }
2163     rval = res_get(zh->res, "shadow");
2164
2165     bfs = bfs_create(res_get(zh->res, "register"), zh->path_reg);
2166     if (!bfs)
2167     {
2168         zebra_setError(zh, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, "bfs_create");
2169         return ZEBRA_FAIL;
2170     }
2171     if (rval && *rval)
2172         bf_cache(bfs, rval);
2173     
2174     bf_reset(bfs);
2175     bfs_destroy(bfs);
2176     zebra_set_state(zh, 'o', 0);
2177     return ZEBRA_OK;
2178 }
2179
2180 ZEBRA_RES zebra_compact(ZebraHandle zh)
2181 {
2182     BFiles bfs;
2183
2184     yaz_log(log_level, "zebra_compact");
2185     ZEBRA_CHECK_HANDLE(zh);
2186     if (!zh->res)
2187     {
2188         zh->errCode = YAZ_BIB1_DATABASE_UNAVAILABLE;
2189         return ZEBRA_FAIL;
2190     }
2191     bfs = bfs_create(res_get(zh->res, "register"), zh->path_reg);
2192     inv_compact(bfs);
2193     bfs_destroy(bfs);
2194     return ZEBRA_OK;
2195 }
2196
2197 #define ZEBRA_CHECK_DICT 1
2198 #define ZEBRA_CHECK_ISAM 2
2199
2200 static ZEBRA_RES zebra_record_check(ZebraHandle zh, Record rec,
2201                                     zint *no_keys, int message_limit,
2202                                     unsigned flags,
2203                                     zint *no_long_dict_entries,
2204                                     zint *no_failed_dict_lookups,
2205                                     zint *no_invalid_keys,
2206                                     zint *no_invalid_dict_infos,
2207                                     zint *no_invalid_isam_entries)
2208 {
2209     ZEBRA_RES res = ZEBRA_OK;
2210     zebra_rec_keys_t keys = zebra_rec_keys_open();
2211     zebra_rec_keys_set_buf(keys, rec->info[recInfo_delKeys],
2212                            rec->size[recInfo_delKeys], 0);
2213     
2214     *no_keys = 0;
2215     if (!zebra_rec_keys_rewind(keys))
2216     {
2217         ;
2218     }
2219     else
2220     {
2221         size_t slen;
2222         const char *str;
2223         struct it_key key_in;
2224         NMEM nmem = nmem_create();
2225
2226         while (zebra_rec_keys_read(keys, &str, &slen, &key_in))
2227         {
2228             int do_fail = 0;
2229             int ord = CAST_ZINT_TO_INT(key_in.mem[0]);
2230             char ord_buf[IT_MAX_WORD+20];
2231             int ord_len = key_SU_encode(ord, ord_buf);
2232             char *info = 0;
2233
2234             (*no_keys)++;
2235
2236             if (key_in.len < 2 || key_in.len > IT_KEY_LEVEL_MAX)
2237             {
2238                 res = ZEBRA_FAIL;
2239                 (*no_invalid_keys)++;
2240                 if (*no_invalid_keys <= message_limit)
2241                 {
2242                     do_fail = 1;
2243                     yaz_log(YLOG_WARN, "Record " ZINT_FORMAT
2244                             ": unexpected key length %d",
2245                             rec->sysno, key_in.len);
2246                 }
2247             }
2248             if (ord_len + slen >= sizeof(ord_buf)-1)
2249             {
2250                 res = ZEBRA_FAIL;
2251                 (*no_long_dict_entries)++;
2252                 if (*no_long_dict_entries <= message_limit)
2253                 {
2254                     do_fail = 1;
2255                     /* so bad it can not fit into our ord_buf */
2256                     yaz_log(YLOG_WARN, "Record " ZINT_FORMAT
2257                             ": long dictionary entry %d + %d",
2258                             rec->sysno, ord_len, (int) slen);
2259                 }
2260                 continue;
2261             }
2262             memcpy(ord_buf + ord_len, str, slen);
2263             ord_buf[ord_len + slen] = '\0'; 
2264             if (ord_len + slen >= IT_MAX_WORD)
2265             {
2266                 res = ZEBRA_FAIL;
2267                 (*no_long_dict_entries)++;
2268                 if (*no_long_dict_entries <= message_limit)
2269                 {
2270                     do_fail = 1;
2271                     yaz_log(YLOG_WARN, "Record " ZINT_FORMAT 
2272                             ": long dictionary entry %d + %d",
2273                             rec->sysno, (int) ord_len, (int) slen);
2274                 }
2275             }
2276             if ((flags & ZEBRA_CHECK_DICT) == 0)
2277                 continue;
2278             info = dict_lookup(zh->reg->dict, ord_buf);
2279             if (!info)
2280             {
2281                 res = ZEBRA_FAIL;
2282                 (*no_failed_dict_lookups)++;
2283                 if (*no_failed_dict_lookups <= message_limit)
2284                 {
2285                     do_fail = 1;
2286                     yaz_log(YLOG_WARN, "Record " ZINT_FORMAT
2287                             ": term do not exist in dictionary", rec->sysno);
2288                 }
2289             }
2290             else if (flags & ZEBRA_CHECK_ISAM)
2291             {
2292                 ISAM_P pos;
2293
2294                 if (*info != sizeof(pos))
2295                 {
2296                     res = ZEBRA_FAIL;
2297                     (*no_invalid_dict_infos)++;
2298                     if (*no_invalid_dict_infos <= message_limit)
2299                     {
2300                         do_fail = 1;
2301                         yaz_log(YLOG_WARN, "Record " ZINT_FORMAT 
2302                                 ": long dictionary entry %d + %d",
2303                                 rec->sysno, (int) ord_len, (int) slen);
2304                     }
2305                 }
2306                 else
2307                 {
2308                     int scope = 1;
2309                     memcpy(&pos, info+1, sizeof(pos));
2310                     if (zh->reg->isamb)
2311                     {
2312                         ISAMB_PP ispt = isamb_pp_open(zh->reg->isamb, pos,
2313                                                       scope);
2314                         if (!ispt)
2315                         {
2316                             res = ZEBRA_FAIL;
2317                             (*no_invalid_isam_entries)++;
2318                             if (*no_invalid_isam_entries <= message_limit)
2319                             {
2320                                 do_fail = 1;
2321                                 yaz_log(YLOG_WARN, "Record " ZINT_FORMAT 
2322                                         ": isamb_pp_open entry " ZINT_FORMAT
2323                                         " not found",
2324                                         rec->sysno, pos);
2325                             }
2326                         }
2327                         else if (zh->m_staticrank)
2328                         {
2329                             isamb_pp_close(ispt);
2330                         }
2331                         else
2332                         {
2333                             struct it_key until_key;
2334                             struct it_key isam_key;
2335                             int r;
2336                             int i = 0;
2337                             
2338                             until_key.len = key_in.len - 1;
2339                             for (i = 0; i < until_key.len; i++)
2340                                 until_key.mem[i] = key_in.mem[i+1];
2341                             
2342                             if (until_key.mem[0] == 0)
2343                                 until_key.mem[0] = rec->sysno;
2344                             r = isamb_pp_forward(ispt, &isam_key, &until_key);
2345                             if (r != 1)
2346                             {
2347                                 res = ZEBRA_FAIL;
2348                                 (*no_invalid_isam_entries)++;
2349                                 if (*no_invalid_isam_entries <= message_limit)
2350                                 {
2351                                     do_fail = 1;
2352                                     yaz_log(YLOG_WARN, "Record " ZINT_FORMAT 
2353                                             ": isamb_pp_forward " ZINT_FORMAT
2354                                             " returned no entry",
2355                                             rec->sysno, pos);
2356                                 }
2357                             }
2358                             else
2359                             {
2360                                 int cmp = key_compare(&until_key, &isam_key);
2361                                 if (cmp != 0)
2362                                 {
2363                                     res = ZEBRA_FAIL;
2364                                     (*no_invalid_isam_entries)++;
2365                                     if (*no_invalid_isam_entries
2366                                         <= message_limit)
2367                                     {
2368                                         do_fail = 1;
2369                                         yaz_log(YLOG_WARN, "Record "
2370                                                 ZINT_FORMAT 
2371                                                 ": isamb_pp_forward "
2372                                                 ZINT_FORMAT
2373                                                 " returned different entry",
2374                                                 rec->sysno, pos);
2375
2376                                         key_logdump_txt(YLOG_LOG,
2377                                                         &until_key,
2378                                                         "until");
2379
2380                                         key_logdump_txt(YLOG_LOG,
2381                                                         &isam_key,
2382                                                         "isam");
2383
2384                                     }
2385                                 }
2386                             }
2387                             isamb_pp_close(ispt);
2388                         }
2389
2390                     }
2391                 }
2392             }
2393             if (do_fail)
2394             {
2395                 zebra_it_key_str_dump(zh, &key_in, str,
2396                                       slen, nmem, YLOG_LOG);
2397                 nmem_reset(nmem);
2398             }
2399         }
2400         nmem_destroy(nmem);
2401     }
2402     zebra_rec_keys_close(keys);
2403     return res;
2404 }
2405
2406 ZEBRA_RES zebra_register_check(ZebraHandle zh, const char *spec)
2407 {
2408     ZEBRA_RES res = ZEBRA_FAIL;
2409     unsigned flags = 0;
2410     int message_limit = 10;
2411     
2412     if (!spec || *spec == '\0'
2413         || !strcmp(spec, "dict") || !strcmp(spec, "default"))
2414         flags = ZEBRA_CHECK_DICT;
2415     else if (!strcmp(spec, "isam") || !strcmp(spec, "full"))
2416         flags = ZEBRA_CHECK_DICT|ZEBRA_CHECK_ISAM;
2417     else if (!strcmp(spec, "quick"))
2418         flags = 0;
2419     else
2420         return ZEBRA_FAIL;
2421
2422     yaz_log(YLOG_LOG, "zebra_register_check begin flags=%u message_limit=%d",
2423             flags, message_limit);
2424     if (zebra_begin_read(zh) == ZEBRA_OK)
2425     {
2426         zint no_records_total = 0;
2427         zint no_records_fail = 0;
2428         zint total_keys = 0;
2429
2430         if (zh->reg)
2431         {
2432             Record rec = rec_get_root(zh->reg->records);
2433             
2434             zint no_long_dict_entries = 0;
2435             zint no_failed_dict_lookups = 0;
2436             zint no_invalid_keys = 0;
2437             zint no_invalid_dict_infos = 0;
2438             zint no_invalid_isam_entries = 0;
2439
2440             res = ZEBRA_OK;
2441             while (rec)
2442             {
2443                 Record r1;
2444                 zint no_keys;
2445
2446                 if (zebra_record_check(zh, rec, &no_keys, message_limit,
2447                                        flags,
2448                                        &no_long_dict_entries,
2449                                        &no_failed_dict_lookups,
2450                                        &no_invalid_keys,
2451                                        &no_invalid_dict_infos,
2452                                        &no_invalid_isam_entries
2453                         )
2454                     != ZEBRA_OK)
2455                 {
2456                     res = ZEBRA_FAIL;
2457                     no_records_fail++;
2458                 }
2459
2460                 r1 = rec_get_next(zh->reg->records, rec);
2461                 rec_free(&rec);
2462                 rec = r1;
2463                 no_records_total++;
2464                 total_keys += no_keys;
2465             }
2466             yaz_log(YLOG_LOG, "records total:        " ZINT_FORMAT,
2467                     no_records_total);
2468             yaz_log(YLOG_LOG, "records fail:         " ZINT_FORMAT,
2469                     no_records_fail);
2470             yaz_log(YLOG_LOG, "total keys:           " ZINT_FORMAT,
2471                     total_keys);
2472             yaz_log(YLOG_LOG, "long dict entries:    " ZINT_FORMAT,
2473                     no_long_dict_entries);
2474             if (flags & ZEBRA_CHECK_DICT)
2475             {
2476                 yaz_log(YLOG_LOG, "failed dict lookups:  " ZINT_FORMAT,
2477                         no_failed_dict_lookups);
2478                 yaz_log(YLOG_LOG, "invalid dict infos:   " ZINT_FORMAT,
2479                         no_invalid_dict_infos);
2480             }
2481             if (flags & ZEBRA_CHECK_ISAM)
2482                 yaz_log(YLOG_LOG, "invalid isam entries: " ZINT_FORMAT,
2483                         no_invalid_isam_entries);
2484         }
2485         zebra_end_read(zh);
2486     }
2487     yaz_log(YLOG_LOG, "zebra_register_check end ret=%d", res);
2488     return res;
2489 }
2490
2491 void zebra_result(ZebraHandle zh, int *code, char **addinfo)
2492 {
2493     yaz_log(log_level, "zebra_result");
2494     if (zh)
2495     {
2496         *code = zh->errCode;
2497         *addinfo = zh->errString;
2498     }
2499     else
2500     {
2501         *code = YAZ_BIB1_TEMPORARY_SYSTEM_ERROR;
2502         *addinfo ="ZebraHandle is NULL";
2503     }
2504 }
2505
2506 void zebra_shadow_enable(ZebraHandle zh, int value)
2507 {
2508     ASSERTZH;
2509     yaz_log(log_level, "zebra_shadow_enable");
2510     zh->shadow_enable = value;
2511 }
2512
2513 ZEBRA_RES zebra_octet_term_encoding(ZebraHandle zh, const char *encoding)
2514 {
2515     yaz_log(log_level, "zebra_octet_term_encoding %s", encoding);
2516     ZEBRA_CHECK_HANDLE(zh);
2517     assert(encoding);
2518
2519     if (zh->iconv_to_utf8 != 0)
2520         yaz_iconv_close(zh->iconv_to_utf8);
2521     if (zh->iconv_from_utf8 != 0)
2522         yaz_iconv_close(zh->iconv_from_utf8);
2523     
2524     zh->iconv_to_utf8 =
2525         yaz_iconv_open("UTF-8", encoding);
2526     if (zh->iconv_to_utf8 == 0)
2527         yaz_log(YLOG_WARN, "iconv: %s to UTF-8 unsupported", encoding);
2528     zh->iconv_from_utf8 =
2529         yaz_iconv_open(encoding, "UTF-8");
2530     if (zh->iconv_to_utf8 == 0)
2531         yaz_log(YLOG_WARN, "iconv: UTF-8 to %s unsupported", encoding);
2532
2533     return ZEBRA_OK;
2534 }
2535
2536 ZEBRA_RES zebra_record_encoding(ZebraHandle zh, const char *encoding)
2537 {
2538     yaz_log(log_level, "zebra_record_encoding");
2539     ZEBRA_CHECK_HANDLE(zh);
2540     xfree(zh->record_encoding);
2541     zh->record_encoding = 0;
2542     if (encoding)
2543         zh->record_encoding = xstrdup(encoding);
2544     return ZEBRA_OK;
2545 }
2546
2547 void zebra_set_resource(ZebraHandle zh, const char *name, const char *value)
2548 {
2549     assert(name);
2550     assert(value);
2551     yaz_log(log_level, "zebra_set_resource %s:%s", name, value);
2552     ASSERTZH;
2553     res_set(zh->res, name, value);
2554 }
2555
2556 const char *zebra_get_resource(ZebraHandle zh,
2557                                const char *name, const char *defaultvalue)
2558 {
2559     const char *v;
2560     ASSERTZH;
2561     assert(name);
2562     v = res_get_def(zh->res, name,(char *)defaultvalue);
2563     yaz_log(log_level, "zebra_get_resource %s:%s", name, v);
2564     return v;
2565 }
2566
2567 /* moved from zebra_api_ext.c by pop */
2568 /* FIXME: Should this really be public??? -Heikki */
2569
2570 int zebra_trans_no(ZebraHandle zh)
2571 {
2572     yaz_log(log_level, "zebra_trans_no");
2573     ASSERTZH;
2574     return zh->trans_no;
2575 }
2576
2577 int zebra_get_shadow_enable(ZebraHandle zh)
2578 {
2579     yaz_log(log_level, "zebra_get_shadow_enable");
2580     ASSERTZH;
2581     return zh->shadow_enable;
2582 }
2583
2584 void zebra_set_shadow_enable(ZebraHandle zh, int value)
2585 {
2586     yaz_log(log_level, "zebra_set_shadow_enable %d",value);
2587     ASSERTZH;
2588     zh->shadow_enable = value;
2589 }
2590
2591 ZEBRA_RES zebra_add_record(ZebraHandle zh,
2592                            const char *buf, int buf_size)
2593 {
2594     return zebra_update_record(zh, action_update, 
2595                                0 /* record type */,
2596                                0 /* sysno */ ,
2597                                0 /* match */, 
2598                                0 /* fname */,
2599                                buf, buf_size);
2600 }
2601
2602 ZEBRA_RES zebra_update_record(ZebraHandle zh, 
2603                               enum zebra_recctrl_action_t action,
2604                               const char *recordType,
2605                               zint *sysno, const char *match,
2606                               const char *fname,
2607                               const char *buf, int buf_size)
2608 {
2609     ZEBRA_RES res;
2610
2611     ZEBRA_CHECK_HANDLE(zh);
2612
2613     assert(buf);
2614
2615     yaz_log(log_level, "zebra_update_record");
2616     if (sysno)
2617         yaz_log(log_level, " sysno=" ZINT_FORMAT, *sysno);
2618
2619     if (buf_size < 1)
2620         buf_size = strlen(buf);
2621
2622     if (zebra_begin_trans(zh, 1) == ZEBRA_FAIL)
2623         return ZEBRA_FAIL;
2624     res = zebra_buffer_extract_record(zh, buf, buf_size, 
2625                                       action,
2626                                       recordType,
2627                                       sysno,   
2628                                       match, 
2629                                       fname);
2630     if (zebra_end_trans(zh) != ZEBRA_OK)
2631     {
2632         yaz_log(YLOG_WARN, "zebra_end_trans failed");
2633         res = ZEBRA_FAIL;
2634     }
2635     return res; 
2636 }
2637
2638 /* ---------------------------------------------------------------------------
2639    Searching 
2640 */
2641
2642 ZEBRA_RES zebra_search_PQF(ZebraHandle zh, const char *pqf_query,
2643                            const char *setname, zint *hits)
2644 {
2645     zint lhits = 0;
2646     ZEBRA_RES res = ZEBRA_OK;
2647     Z_RPNQuery *query;
2648     ODR odr;
2649
2650
2651     ZEBRA_CHECK_HANDLE(zh);
2652
2653     odr = odr_createmem(ODR_ENCODE);
2654
2655     assert(pqf_query);
2656     assert(setname);
2657
2658     yaz_log(log_level, "zebra_search_PQF s=%s q=%s", setname, pqf_query);
2659     
2660     query = p_query_rpn(odr, pqf_query);
2661     
2662     if (!query)
2663     {
2664         yaz_log(YLOG_WARN, "bad query %s\n", pqf_query);
2665         zh->errCode = YAZ_BIB1_MALFORMED_QUERY;
2666         res = ZEBRA_FAIL;
2667     }
2668     else
2669         res = zebra_search_RPN(zh, odr, query, setname, &lhits);
2670     
2671     odr_destroy(odr);
2672
2673     yaz_log(log_level, "Hits: " ZINT_FORMAT, lhits);
2674
2675     if (hits)
2676         *hits = lhits;
2677
2678     return res;
2679 }
2680
2681 /* ---------------------------------------------------------------------------
2682    Sort - a simplified interface, with optional read locks.
2683 */
2684 int zebra_sort_by_specstr(ZebraHandle zh, ODR stream,
2685                           const char *sort_spec,
2686                           const char *output_setname,
2687                           const char **input_setnames) 
2688 {
2689     int num_input_setnames = 0;
2690     int sort_status = 0;
2691     Z_SortKeySpecList *sort_sequence;
2692
2693     ZEBRA_CHECK_HANDLE(zh);
2694     assert(stream);
2695     assert(sort_spec);
2696     assert(output_setname);
2697     assert(input_setnames);
2698     sort_sequence = yaz_sort_spec(stream, sort_spec);
2699     yaz_log(log_level, "sort (FIXME) ");
2700     if (!sort_sequence)
2701     {
2702         yaz_log(YLOG_WARN, "invalid sort specs '%s'", sort_spec);
2703         zh->errCode = YAZ_BIB1_CANNOT_SORT_ACCORDING_TO_SEQUENCE;
2704         return -1;
2705     }
2706     
2707     /* we can do this, since the perl typemap code for char** will 
2708        put a NULL at the end of list */
2709     while (input_setnames[num_input_setnames]) num_input_setnames++;
2710
2711     if (zebra_begin_read(zh))
2712         return -1;
2713     
2714     resultSetSort(zh, stream->mem, num_input_setnames, input_setnames,
2715                   output_setname, sort_sequence, &sort_status);
2716     
2717     zebra_end_read(zh);
2718     return sort_status;
2719 }
2720
2721 /* ---------------------------------------------------------------------------
2722    Get BFS for Zebra system (to make alternative storage methods)
2723 */
2724 struct BFiles_struct *zebra_get_bfs(ZebraHandle zh)
2725 {
2726     if (zh && zh->reg)
2727         return zh->reg->bfs;
2728     return 0;
2729 }
2730
2731
2732 /* ---------------------------------------------------------------------------
2733    Set limit for search/scan
2734 */
2735 ZEBRA_RES zebra_set_limit(ZebraHandle zh, int complement_flag, zint *ids)
2736 {
2737     ZEBRA_CHECK_HANDLE(zh);
2738     zebra_limit_destroy(zh->m_limit);
2739     zh->m_limit = zebra_limit_create(complement_flag, ids);
2740     return ZEBRA_OK;
2741 }
2742
2743 /*
2744   Set Error code + addinfo
2745 */
2746 void zebra_setError(ZebraHandle zh, int code, const char *addinfo)
2747 {
2748     if (!zh)
2749         return;
2750     zh->errCode = code;
2751     nmem_reset(zh->nmem_error);
2752     zh->errString = addinfo ? nmem_strdup(zh->nmem_error, addinfo) : 0;
2753 }
2754
2755 void zebra_setError_zint(ZebraHandle zh, int code, zint i)
2756 {
2757     char vstr[60];
2758     sprintf(vstr, ZINT_FORMAT, i);
2759
2760     zh->errCode = code;
2761     nmem_reset(zh->nmem_error);
2762     zh->errString = nmem_strdup(zh->nmem_error, vstr);
2763 }
2764
2765 void zebra_lock_prefix(Res res, char *path)
2766 {
2767     const char *lock_dir = res_get_def(res, "lockDir", "");
2768     
2769     strcpy(path, lock_dir);
2770     if (*path && path[strlen(path)-1] != '/')
2771         strcat(path, "/");
2772 }
2773
2774 /*
2775  * Local variables:
2776  * c-basic-offset: 4
2777  * c-file-style: "Stroustrup"
2778  * indent-tabs-mode: nil
2779  * End:
2780  * vim: shiftwidth=4 tabstop=8 expandtab
2781  */
2782