Ensure zebra_start fails if bad .cfg is passed.
[idzebra-moved-to-github.git] / index / zebraapi.c
1 /* $Id: zebraapi.c,v 1.202 2006-02-21 15:23:11 adam Exp $
2    Copyright (C) 1995-2005
3    Index Data ApS
4
5 This file is part of the Zebra server.
6
7 Zebra is free software; you can redistribute it and/or modify it under
8 the terms of the GNU General Public License as published by the Free
9 Software Foundation; either version 2, or (at your option) any later
10 version.
11
12 Zebra is distributed in the hope that it will be useful, but WITHOUT ANY
13 WARRANTY; without even the implied warranty of MERCHANTABILITY or
14 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
15 for more details.
16
17 You should have received a copy of the GNU General Public License
18 along with Zebra; see the file LICENSE.zebra.  If not, write to the
19 Free Software Foundation, 59 Temple Place - Suite 330, Boston, MA
20 02111-1307, USA.
21 */
22
23 #include <assert.h>
24 #include <stdio.h>
25 #include <limits.h>
26 #ifdef WIN32
27 #include <io.h>
28 #include <process.h>
29 #include <direct.h>
30 #endif
31 #if HAVE_UNISTD_H
32 #include <unistd.h>
33 #endif
34
35 #include <yaz/diagbib1.h>
36 #include <yaz/pquery.h>
37 #include <yaz/sortspec.h>
38 #include "index.h"
39 #include "orddict.h"
40 #include <charmap.h>
41 #include <idzebra/api.h>
42
43 #define DEFAULT_APPROX_LIMIT 2000000000
44
45 /* simple asserts to validate the most essential input args */
46 #define ASSERTZH assert(zh && zh->service)
47 #define ASSERTZHRES assert(zh && zh->service && zh->res)
48 #define ASSERTZS assert(zs)
49
50 static int log_level = 0;
51 static int log_level_initialized = 0;
52
53 static void zebra_open_res(ZebraHandle zh);
54 static void zebra_close_res(ZebraHandle zh);
55
56 static void zebra_chdir (ZebraService zs)
57 {
58     const char *dir ;
59     ASSERTZS;
60     yaz_log(log_level, "zebra_chdir");
61     dir = res_get (zs->global_res, "chdir");
62     if (!dir)
63         return;
64     yaz_log (YLOG_DEBUG, "chdir %s", dir);
65 #ifdef WIN32
66     _chdir(dir);
67 #else
68     chdir (dir);
69 #endif
70 }
71
72 static void zebra_flush_reg (ZebraHandle zh)
73 {
74     ASSERTZH;
75     yaz_log(log_level, "zebra_flush_reg");
76     zebraExplain_flush (zh->reg->zei, zh);
77     
78     extract_flushWriteKeys (zh, 1 /* final */);
79     zebra_index_merge (zh );
80 }
81
82 static struct zebra_register *zebra_register_open(ZebraService zs, 
83                                                   const char *name,
84                                                   int rw, int useshadow,
85                                                   Res res,
86                                                   const char *reg_path);
87 static void zebra_register_close (ZebraService zs, struct zebra_register *reg);
88
89 ZebraHandle zebra_open(ZebraService zs, Res res)
90 {
91     ZebraHandle zh;
92     const char *default_encoding;
93     if (!log_level_initialized)
94     {
95         log_level = yaz_log_module_level("zebraapi");
96         log_level_initialized = 1;
97     }
98
99     yaz_log(log_level, "zebra_open");
100
101     if (!zs)
102         return 0;
103
104     zh = (ZebraHandle) xmalloc(sizeof(*zh));
105     yaz_log (YLOG_DEBUG, "zebra_open zs=%p returns %p", zs, zh);
106
107     zh->service = zs;
108     zh->reg = 0;          /* no register attached yet */
109     zh->sets = 0;
110     zh->destroyed = 0;
111     zh->errCode = 0;
112     zh->errString = 0;
113     zh->res = 0; 
114     zh->session_res = res_open(zs->global_res, res);
115     zh->user_perm = 0;
116     zh->dbaccesslist = 0;
117
118     zh->reg_name = xstrdup ("");
119     zh->path_reg = 0;
120     zh->num_basenames = 0;
121     zh->basenames = 0;
122
123     zh->approx_limit = DEFAULT_APPROX_LIMIT;
124     zh->trans_no = 0;
125     zh->trans_w_no = 0;
126
127     zh->lock_normal = 0;
128     zh->lock_shadow = 0;
129
130     zh->shadow_enable = 1;
131     zh->m_staticrank = 0;
132
133     default_encoding = res_get_def(zh->session_res, "encoding", "ISO-8859-1");
134
135     zh->iconv_to_utf8 =
136         yaz_iconv_open ("UTF-8", default_encoding);
137     if (zh->iconv_to_utf8 == 0)
138         yaz_log (YLOG_WARN, "iconv: %s to UTF-8 unsupported",
139            default_encoding);
140     zh->iconv_from_utf8 =
141         yaz_iconv_open (default_encoding, "UTF-8");
142     if (zh->iconv_to_utf8 == 0)
143         yaz_log (YLOG_WARN, "iconv: UTF-8 to %s unsupported",
144            default_encoding);
145
146     zh->record_encoding = 0;
147
148     zebra_mutex_cond_lock (&zs->session_lock);
149
150     zh->next = zs->sessions;
151     zs->sessions = zh;
152
153     zebra_mutex_cond_unlock (&zs->session_lock);
154
155     zh->store_data_buf = 0;
156
157     zh->m_limit = zebra_limit_create(1, 0);
158
159     zh->nmem_error = nmem_create();
160
161     return zh;
162 }
163
164 ZebraService zebra_start (const char *configName)
165 {
166     return zebra_start_res(configName, 0, 0);
167 }
168
169 ZebraService zebra_start_res (const char *configName, Res def_res, Res over_res)
170 {
171     Res res;
172
173     if (!log_level_initialized)
174     {
175         log_level = yaz_log_module_level("zebraapi");
176         log_level_initialized = 1;
177     }
178     
179     yaz_log(YLOG_LOG, "zebra_start %s %s", ZEBRAVER,
180             configName ? configName : "");
181
182     if ((res = res_open(def_res, over_res)))
183     {
184         const char *passwd_plain = 0;
185         const char *passwd_encrypt = 0;
186         const char *dbaccess = 0;
187         ZebraService zh = 0;
188
189         if (configName)
190         {
191             ZEBRA_RES ret = res_read_file(res, configName);
192             if (ret != ZEBRA_OK)
193             {
194                 res_close(res);
195                 return 0;
196             }
197         }
198         zh = xmalloc(sizeof(*zh));
199         zh->global_res = res;
200         zh->sessions = 0;
201         
202         zebra_chdir (zh);
203         
204         zebra_mutex_cond_init (&zh->session_lock);
205         passwd_plain = res_get (zh->global_res, "passwd");
206         passwd_encrypt = res_get (zh->global_res, "passwd.c");
207         dbaccess = res_get (zh->global_res, "dbaccess");
208
209         if (!passwd_plain && !passwd_encrypt)
210             zh->passwd_db = NULL;
211         else 
212         {
213             zh->passwd_db = passwd_db_open();
214             if (!zh->passwd_db)
215                 yaz_log (YLOG_WARN|YLOG_ERRNO, "passwd_db_open failed");
216             else
217             {
218                 if (passwd_plain)
219                     passwd_db_file_plain(zh->passwd_db, passwd_plain);
220                 if (passwd_encrypt)
221                     passwd_db_file_crypt(zh->passwd_db, passwd_encrypt);
222             }
223         }
224
225         if (!dbaccess)
226             zh->dbaccess = NULL;
227         else {
228             zh->dbaccess = res_open(NULL, NULL);
229             if (res_read_file(zh->dbaccess, dbaccess) != ZEBRA_OK) {
230                 yaz_log(YLOG_FATAL, "Failed to read %s", dbaccess);
231                 return NULL;
232             }
233         }
234
235         zh->path_root = res_get (zh->global_res, "root");
236         zh->nmem = nmem_create();
237         zh->record_classes = recTypeClass_create (zh->global_res, zh->nmem);
238         return zh;
239     }
240     return 0;
241 }
242
243 void zebra_filter_info(ZebraService zs, void *cd,
244                         void (*cb)(void *cd, const char *name))
245 {
246     ASSERTZS;
247     assert(cb);
248     recTypeClass_info(zs->record_classes, cd, cb);
249 }
250
251 void zebra_pidfname(ZebraService zs, char *path)
252 {
253     ASSERTZS;
254     zebra_lock_prefix (zs->global_res, path);
255     strcat(path, "zebrasrv.pid");
256 }
257
258 Dict dict_open_res (BFiles bfs, const char *name, int cache, int rw,
259                     int compact_flag, Res res)
260 {
261     int page_size = 4096;
262     char resource_str[200];
263     sprintf (resource_str, "dict.%.100s.pagesize", name);
264     assert(bfs);
265     assert(name);
266
267     if (res_get_int(res, resource_str, &page_size) == ZEBRA_OK)
268         yaz_log(YLOG_LOG, "Using custom dictionary page size %d for %s",
269                 page_size, name);
270     return dict_open(bfs, name, cache, rw, compact_flag, page_size);
271 }
272
273 static
274 struct zebra_register *zebra_register_open(ZebraService zs, const char *name,
275                                            int rw, int useshadow, Res res,
276                                            const char *reg_path)
277 {
278     struct zebra_register *reg;
279     int record_compression = REC_COMPRESS_NONE;
280     const char *recordCompression = 0;
281     const char *profilePath;
282     char cwd[1024];
283
284     ASSERTZS;
285     
286     reg = xmalloc(sizeof(*reg));
287
288     assert (name);
289     reg->name = xstrdup (name);
290
291     reg->seqno = 0;
292     reg->last_val = 0;
293
294     assert (res);
295
296     yaz_log (YLOG_DEBUG, "zebra_register_open rw=%d useshadow=%d p=%p n=%s rp=%s",
297              rw, useshadow, reg, name, reg_path ? reg_path : "(none)");
298     
299     reg->dh = data1_createx (DATA1_FLAG_XML);
300     if (!reg->dh)
301     {
302         xfree(reg);
303         return 0;
304     }
305     reg->bfs = bfs_create (res_get (res, "register"), reg_path);
306     if (!reg->bfs)
307     {
308         data1_destroy(reg->dh);
309         xfree(reg);
310         return 0;
311     }
312     if (useshadow)
313     {
314         if (bf_cache (reg->bfs, res_get (res, "shadow")) == ZEBRA_FAIL)
315         {
316             bfs_destroy(reg->bfs);
317             data1_destroy(reg->dh);
318             xfree(reg);
319             return 0;
320         }
321     }
322
323     getcwd(cwd, sizeof(cwd)-1);
324     profilePath = res_get_def(res, "profilePath", DEFAULT_PROFILE_PATH);
325     yaz_log(YLOG_DEBUG, "profilePath=%s cwd=%s", profilePath, cwd);
326
327     data1_set_tabpath (reg->dh, profilePath);
328     data1_set_tabroot (reg->dh, reg_path);
329     reg->recTypes = recTypes_init (zs->record_classes, reg->dh);
330
331     reg->zebra_maps = zebra_maps_open (res, reg_path);
332     reg->rank_classes = NULL;
333
334     reg->key_buf = 0;
335
336     reg->keys = zebra_rec_keys_open();
337
338 #if NATTR
339     reg->sortKeys = zebra_rec_keys_open();
340 #else
341     reg->sortKeys.buf = 0;
342     reg->sortKeys.buf_max = 0;
343 #endif
344
345     reg->records = 0;
346     reg->dict = 0;
347     reg->sortIdx = 0;
348     reg->isams = 0;
349     reg->matchDict = 0;
350     reg->isamc = 0;
351     reg->isamb = 0;
352     reg->zei = 0;
353     reg->key_file_no = 0;
354     reg->ptr_i = 0;
355     
356     zebraRankInstall (reg, rank_1_class);
357     zebraRankInstall (reg, rank_zv_class);
358     zebraRankInstall (reg, rank_static_class);
359
360     recordCompression = res_get_def (res, "recordCompression", "none");
361     if (!strcmp (recordCompression, "none"))
362         record_compression = REC_COMPRESS_NONE;
363     if (!strcmp (recordCompression, "bzip2"))
364         record_compression = REC_COMPRESS_BZIP2;
365
366     if (!(reg->records = rec_open (reg->bfs, rw, record_compression)))
367     {
368         yaz_log (YLOG_WARN, "rec_open failed");
369         return 0;
370     }
371     if (rw)
372     {
373         reg->matchDict = dict_open_res (reg->bfs, GMATCH_DICT, 20, 1, 0, res);
374     }
375     if (!(reg->dict = dict_open_res (reg->bfs, FNAME_DICT, 40, rw, 0, res)))
376     {
377         yaz_log (YLOG_WARN, "dict_open failed");
378         return 0;
379     }
380     if (!(reg->sortIdx = sortIdx_open (reg->bfs, rw)))
381     {
382         yaz_log (YLOG_WARN, "sortIdx_open failed");
383         return 0;
384     }
385     if (res_get_match (res, "isam", "s", ISAM_DEFAULT))
386     {
387         struct ISAMS_M_s isams_m;
388         if (!(reg->isams = isams_open (reg->bfs, FNAME_ISAMS, rw,
389                                       key_isams_m(res, &isams_m))))
390         {
391             yaz_log (YLOG_WARN, "isams_open failed");
392             return 0;
393         }
394     }
395     if (res_get_match (res, "isam", "c", ISAM_DEFAULT))
396     {
397         struct ISAMC_M_s isamc_m;
398         if (!(reg->isamc = isamc_open (reg->bfs, FNAME_ISAMC,
399                                     rw, key_isamc_m(res, &isamc_m))))
400         {
401             yaz_log (YLOG_WARN, "isamc_open failed");
402             return 0;
403         }
404     }
405     if (res_get_match (res, "isam", "b", ISAM_DEFAULT))
406     {
407         struct ISAMC_M_s isamc_m;
408         
409         if (!(reg->isamb = isamb_open (reg->bfs, "isamb",
410                                        rw, key_isamc_m(res, &isamc_m), 0)))
411         {
412             yaz_log (YLOG_WARN, "isamb_open failed");
413             return 0;
414         }
415     }
416     if (res_get_match (res, "isam", "bc", ISAM_DEFAULT))
417     {
418         struct ISAMC_M_s isamc_m;
419         
420         if (!(reg->isamb = isamb_open (reg->bfs, "isamb",
421                                        rw, key_isamc_m(res, &isamc_m), 1)))
422         {
423             yaz_log (YLOG_WARN, "isamb_open failed");
424             return 0;
425         }
426     }
427     if (res_get_match (res, "isam", "null", ISAM_DEFAULT))
428     {
429         struct ISAMC_M_s isamc_m;
430         
431         if (!(reg->isamb = isamb_open (reg->bfs, "isamb",
432                                        rw, key_isamc_m(res, &isamc_m), -1)))
433         {
434             yaz_log (YLOG_WARN, "isamb_open failed");
435             return 0;
436         }
437     }
438     reg->zei = zebraExplain_open (reg->records, reg->dh,
439                                   res, rw, reg,
440                                   explain_extract);
441     if (!reg->zei)
442     {
443         yaz_log (YLOG_WARN, "Cannot obtain EXPLAIN information");
444         return 0;
445     }
446
447     reg->active = 2;
448     yaz_log (YLOG_DEBUG, "zebra_register_open ok p=%p", reg);
449     return reg;
450 }
451
452 ZEBRA_RES zebra_admin_shutdown (ZebraHandle zh)
453 {
454     ASSERTZH;
455     yaz_log(log_level, "zebra_admin_shutdown");
456
457     zebra_mutex_cond_lock (&zh->service->session_lock);
458     zh->service->stop_flag = 1;
459     zebra_mutex_cond_unlock (&zh->service->session_lock);
460     return ZEBRA_OK;
461 }
462
463 ZEBRA_RES zebra_admin_start (ZebraHandle zh)
464 {
465     ZebraService zs;
466     ASSERTZH;
467     yaz_log(log_level, "zebra_admin_start");
468     zs = zh->service;
469     zebra_mutex_cond_lock (&zs->session_lock);
470     zebra_mutex_cond_unlock (&zs->session_lock);
471     return ZEBRA_OK;
472 }
473
474 static void zebra_register_close (ZebraService zs, struct zebra_register *reg)
475 {
476     ASSERTZS;
477     assert(reg);
478     yaz_log(YLOG_DEBUG, "zebra_register_close p=%p", reg);
479     reg->stop_flag = 0;
480     zebra_chdir (zs);
481     if (reg->records)
482     {
483         zebraExplain_close (reg->zei);
484         dict_close (reg->dict);
485         if (reg->matchDict)
486             dict_close (reg->matchDict);
487         sortIdx_close (reg->sortIdx);
488         if (reg->isams)
489             isams_close (reg->isams);
490         if (reg->isamc)
491             isamc_close (reg->isamc);
492         if (reg->isamb)
493             isamb_close (reg->isamb);
494         rec_close (&reg->records);
495     }
496
497     recTypes_destroy (reg->recTypes);
498     zebra_maps_close (reg->zebra_maps);
499     zebraRankDestroy (reg);
500     bfs_destroy (reg->bfs);
501     data1_destroy (reg->dh);
502
503     zebra_rec_keys_close(reg->keys);
504 #if NATTR
505     zebra_rec_keys_close(reg->sortKeys);
506 #else
507     xfree(reg->sortKeys.buf);
508 #endif
509
510     xfree(reg->key_buf);
511     xfree(reg->name);
512     xfree(reg);
513 }
514
515 ZEBRA_RES zebra_stop(ZebraService zs)
516 {
517     if (!zs)
518         return ZEBRA_OK;
519     yaz_log (log_level, "zebra_stop");
520
521     while (zs->sessions)
522     {
523         zebra_close (zs->sessions);
524     }
525         
526     zebra_mutex_cond_destroy (&zs->session_lock);
527
528     if (zs->passwd_db)
529         passwd_db_close (zs->passwd_db);
530
531     recTypeClass_destroy(zs->record_classes);
532     nmem_destroy(zs->nmem);
533     res_close (zs->global_res);
534     xfree(zs);
535     return ZEBRA_OK;
536 }
537
538 ZEBRA_RES zebra_close (ZebraHandle zh)
539 {
540     ZebraService zs;
541     struct zebra_session **sp;
542     int i;
543
544     yaz_log(log_level, "zebra_close");
545     if (!zh)
546         return ZEBRA_OK;
547     ASSERTZH;
548     zh->errCode = 0;
549     
550     zs = zh->service;
551     yaz_log (YLOG_DEBUG, "zebra_close zh=%p", zh);
552     resultSetDestroy (zh, -1, 0, 0);
553
554     if (zh->reg)
555         zebra_register_close (zh->service, zh->reg);
556     zebra_close_res (zh);
557     res_close(zh->session_res);
558
559     xfree(zh->record_encoding);
560
561     xfree(zh->dbaccesslist);
562
563     for (i = 0; i < zh->num_basenames; i++)
564         xfree(zh->basenames[i]);
565     xfree(zh->basenames);
566
567     if (zh->iconv_to_utf8 != 0)
568         yaz_iconv_close (zh->iconv_to_utf8);
569     if (zh->iconv_from_utf8 != 0)
570         yaz_iconv_close (zh->iconv_from_utf8);
571
572     zebra_mutex_cond_lock (&zs->session_lock);
573     zebra_lock_destroy (zh->lock_normal);
574     zebra_lock_destroy (zh->lock_shadow);
575     sp = &zs->sessions;
576     while (1)
577     {
578         assert (*sp);
579         if (*sp == zh)
580         {
581             *sp = (*sp)->next;
582             break;
583         }
584         sp = &(*sp)->next;
585     }
586     zebra_mutex_cond_unlock (&zs->session_lock);
587     xfree(zh->reg_name);
588     xfree(zh->user_perm);
589     zh->service = 0; /* more likely to trigger an assert */
590
591     zebra_limit_destroy(zh->m_limit);
592
593     nmem_destroy(zh->nmem_error);
594
595     xfree(zh->path_reg);
596     xfree(zh);
597     return ZEBRA_OK;
598 }
599
600 struct map_baseinfo {
601     ZebraHandle zh;
602     NMEM mem;
603     int num_bases;
604     char **basenames;
605     int new_num_bases;
606     char **new_basenames;
607     int new_num_max;
608 };
609
610 static void zebra_open_res(ZebraHandle zh)
611 {
612     char fname[512];
613     ASSERTZH;
614     zh->errCode = 0;
615
616     if (zh->path_reg)
617     {
618         sprintf(fname, "%.200s/zebra.cfg", zh->path_reg);
619         zh->res = res_open(zh->session_res, 0);
620         res_read_file(zh->res, fname);
621     }
622     else if (*zh->reg_name == 0)
623     {
624         zh->res = res_open(zh->session_res, 0);
625     }
626     else
627     {
628         yaz_log (YLOG_WARN, "no register root specified");
629         zh->res = 0;  /* no path for register - fail! */
630     }
631 }
632
633 static void zebra_close_res (ZebraHandle zh)
634 {
635     ASSERTZH;
636     zh->errCode = 0;
637     res_close (zh->res);
638     zh->res = 0;
639 }
640
641 static void zebra_select_register (ZebraHandle zh, const char *new_reg)
642 {
643     ASSERTZH;
644     zh->errCode = 0;
645     if (zh->res && strcmp (zh->reg_name, new_reg) == 0)
646         return;
647     if (!zh->res)
648     {
649         assert (zh->reg == 0);
650         assert (*zh->reg_name == 0);
651     }
652     else
653     {
654         if (zh->reg)
655         {
656             resultSetInvalidate (zh);
657             zebra_register_close (zh->service, zh->reg);
658             zh->reg = 0;
659         }
660         zebra_close_res(zh);
661     }
662     xfree(zh->reg_name);
663     zh->reg_name = xstrdup (new_reg);
664
665     xfree(zh->path_reg);
666     zh->path_reg = 0;
667     if (zh->service->path_root)
668     {
669         zh->path_reg = xmalloc(strlen(zh->service->path_root) + 
670                                 strlen(zh->reg_name) + 3);
671         strcpy (zh->path_reg, zh->service->path_root);
672         if (*zh->reg_name)
673         {
674             strcat (zh->path_reg, "/");
675             strcat (zh->path_reg, zh->reg_name);
676         }
677     }
678     zebra_open_res(zh);
679     
680     if (zh->lock_normal)
681         zebra_lock_destroy (zh->lock_normal);
682     zh->lock_normal = 0;
683
684     if (zh->lock_shadow)
685         zebra_lock_destroy (zh->lock_shadow);
686     zh->lock_shadow = 0;
687
688     if (zh->res)
689     {
690         char fname[512];
691         const char *lock_area = res_get (zh->res, "lockDir");
692         
693         if (!lock_area && zh->path_reg)
694             res_set (zh->res, "lockDir", zh->path_reg);
695         sprintf (fname, "norm.%s.LCK", zh->reg_name);
696         zh->lock_normal =
697             zebra_lock_create (res_get(zh->res, "lockDir"), fname, 0);
698         
699         sprintf (fname, "shadow.%s.LCK", zh->reg_name);
700         zh->lock_shadow =
701             zebra_lock_create (res_get(zh->res, "lockDir"), fname, 0);
702
703         if (!zh->lock_normal || !zh->lock_shadow)
704         {
705             if (zh->lock_normal)
706             {
707                 zebra_lock_destroy(zh->lock_normal);
708                 zh->lock_normal = 0;
709             }
710             if (zh->lock_shadow)
711             {
712                 zebra_lock_destroy(zh->lock_shadow);
713                 zh->lock_shadow = 0;
714             }
715             zebra_close_res(zh);
716         }
717     }
718     if (zh->res)
719     {
720         int approx = 0;
721         if (res_get_int(zh->res, "estimatehits", &approx) == ZEBRA_OK)
722             zebra_set_approx_limit(zh, approx);
723     }
724     if (zh->res)
725     {
726         if (res_get_int(zh->res, "staticrank", &zh->m_staticrank) == ZEBRA_OK)
727             yaz_log(YLOG_LOG, "static rank set and is %d", zh->m_staticrank);
728         else
729             yaz_log(YLOG_LOG, "static rank unset");
730     }
731 }
732
733 void map_basenames_func (void *vp, const char *name, const char *value)
734 {
735     struct map_baseinfo *p = (struct map_baseinfo *) vp;
736     int i, no;
737     char fromdb[128], todb[8][128];
738
739     assert(value);
740     assert(name);
741     assert(vp);
742     
743     no =
744         sscanf (value, "%127s %127s %127s %127s %127s %127s %127s %127s %127s",
745                 fromdb, todb[0], todb[1], todb[2], todb[3], todb[4],
746                 todb[5], todb[6], todb[7]);
747     if (no < 2)
748         return ;
749     no--;
750     for (i = 0; i<p->num_bases; i++)
751         if (p->basenames[i] && !STRCASECMP (p->basenames[i], fromdb))
752         {
753             p->basenames[i] = 0;
754             for (i = 0; i < no; i++)
755             {
756                 if (p->new_num_bases == p->new_num_max)
757                     return;
758                 p->new_basenames[(p->new_num_bases)++] = 
759                     nmem_strdup (p->mem, todb[i]);
760             }
761             return;
762         }
763 }
764
765 int zebra_select_default_database(ZebraHandle zh)
766 {
767     if (!zh->res)
768     {
769         /* no database has been selected - so we select based on
770            resource setting (including group)
771         */
772         const char *group = res_get(zh->session_res, "group");
773         const char *v = res_get_prefix(zh->session_res,
774                                        "database", group, "Default");
775         return zebra_select_database(zh, v);
776     }
777     return 0;
778 }
779
780 void map_basenames (ZebraHandle zh, ODR stream,
781                     int *num_bases, char ***basenames)
782 {
783     struct map_baseinfo info;
784     struct map_baseinfo *p = &info;
785     int i;
786     ASSERTZH;
787     yaz_log(log_level, "map_basenames ");
788     assert(stream);
789
790     info.zh = zh;
791
792     info.num_bases = *num_bases;
793     info.basenames = *basenames;
794     info.new_num_max = 128;
795     info.new_num_bases = 0;
796     info.new_basenames = (char **)
797         odr_malloc (stream, sizeof(*info.new_basenames) * info.new_num_max);
798     info.mem = stream->mem;
799
800     res_trav (zh->session_res, "mapdb", &info, map_basenames_func);
801     
802     for (i = 0; i<p->num_bases; i++)
803         if (p->basenames[i] && p->new_num_bases < p->new_num_max)
804         {
805             p->new_basenames[(p->new_num_bases)++] = 
806                 nmem_strdup (p->mem, p->basenames[i]);
807         }
808     *num_bases = info.new_num_bases;
809     *basenames = info.new_basenames;
810     for (i = 0; i<*num_bases; i++)
811         yaz_log (YLOG_DEBUG, "base %s", (*basenames)[i]);
812 }
813
814 ZEBRA_RES zebra_select_database (ZebraHandle zh, const char *basename)
815 {
816     ASSERTZH;
817     yaz_log(log_level, "zebra_select_database %s",basename);
818     assert(basename);
819     return zebra_select_databases (zh, 1, &basename);
820 }
821
822 ZEBRA_RES zebra_select_databases (ZebraHandle zh, int num_bases,
823                                   const char **basenames)
824 {
825     int i;
826     const char *cp;
827     int len = 0;
828     char *new_reg = 0;
829     ASSERTZH;
830     assert(basenames);
831
832     yaz_log(log_level, "zebra_select_databases n=%d [0]=%s",
833                     num_bases,basenames[0]);
834     zh->errCode = 0;
835     
836     if (num_bases < 1)
837     {
838         zh->errCode = YAZ_BIB1_COMBI_OF_SPECIFIED_DATABASES_UNSUPP;
839         return ZEBRA_FAIL;
840     }
841
842     /* Check if the user has access to all databases (Seb) */
843     /* You could argue that this should happen later, after we have
844      * determined that the database(s) exist. */
845     if (zh->dbaccesslist) {
846         for (i = 0; i < num_bases; i++) {
847             const char *db = basenames[i];
848             char *p, *pp;
849             for (p = zh->dbaccesslist; p && *p; p = pp) {
850                 int len;
851                 if ((pp = strchr(p, '+'))) {
852                     len = pp - p;
853                     pp++;
854                 }
855                 else
856                     len = strlen(p);
857                 if (len == strlen(db) && !strncmp(db, p, len))
858                     break;
859             }
860             if (!p) {
861                 zh->errCode = YAZ_BIB1_ACCESS_TO_SPECIFIED_DATABASE_DENIED;
862                 return ZEBRA_FAIL;
863             }
864         }
865     }
866
867     for (i = 0; i < zh->num_basenames; i++)
868         xfree(zh->basenames[i]);
869     xfree(zh->basenames);
870     
871     zh->num_basenames = num_bases;
872     zh->basenames = xmalloc(zh->num_basenames * sizeof(*zh->basenames));
873     for (i = 0; i < zh->num_basenames; i++)
874         zh->basenames[i] = xstrdup (basenames[i]);
875
876     cp = strrchr(basenames[0], '/');
877     if (cp)
878     {
879         len = cp - basenames[0];
880         new_reg = xmalloc(len + 1);
881         memcpy (new_reg, basenames[0], len);
882         new_reg[len] = '\0';
883     }
884     else
885         new_reg = xstrdup ("");
886     for (i = 1; i<num_bases; i++)
887     {
888         const char *cp1;
889
890         cp1 = strrchr (basenames[i], '/');
891         if (cp)
892         {
893             if (!cp1)
894             {
895                 zh->errCode = YAZ_BIB1_COMBI_OF_SPECIFIED_DATABASES_UNSUPP;
896                 return -1;
897             }
898             if (len != cp1 - basenames[i] ||
899                 memcmp (basenames[i], new_reg, len))
900             {
901                 zh->errCode = YAZ_BIB1_COMBI_OF_SPECIFIED_DATABASES_UNSUPP;
902                 return -1;
903             }
904         }
905         else
906         {
907             if (cp1)
908             {
909                 zh->errCode = YAZ_BIB1_COMBI_OF_SPECIFIED_DATABASES_UNSUPP;
910                 return ZEBRA_FAIL;
911             }
912         }
913     }
914     zebra_select_register (zh, new_reg);
915     xfree(new_reg);
916     if (!zh->res)
917     {
918         zh->errCode = YAZ_BIB1_DATABASE_UNAVAILABLE;
919         return ZEBRA_FAIL;
920     }
921     if (!zh->lock_normal || !zh->lock_shadow)
922     {
923         zh->errCode = YAZ_BIB1_TEMPORARY_SYSTEM_ERROR;
924         return ZEBRA_FAIL;
925     }
926     return ZEBRA_OK;
927 }
928
929 ZEBRA_RES zebra_set_approx_limit(ZebraHandle zh, zint approx_limit)
930 {
931     if (approx_limit == 0)
932         approx_limit = DEFAULT_APPROX_LIMIT;
933     zh->approx_limit = approx_limit;
934     return ZEBRA_OK;
935 }
936
937 ZEBRA_RES zebra_search_RPN(ZebraHandle zh, ODR o, Z_RPNQuery *query,
938                            const char *setname, zint *hits)
939 {
940     ZEBRA_RES r;
941     ASSERTZH;
942     assert(o);
943     assert(query);
944     assert(hits);
945     assert(setname);
946     yaz_log(log_level, "zebra_search_rpn");
947     zh->hits = 0;
948     *hits = 0;
949
950     if (zebra_begin_read(zh) == ZEBRA_FAIL)
951         return ZEBRA_FAIL;
952
953     r = resultSetAddRPN(zh, odr_extract_mem(o), query, 
954                         zh->num_basenames, zh->basenames, setname);
955     zebra_end_read(zh);
956     *hits = zh->hits;
957     return r;
958 }
959
960 ZEBRA_RES zebra_records_retrieve(ZebraHandle zh, ODR stream,
961                                  const char *setname,
962                                  Z_RecordComposition *comp,
963                                  oid_value input_format, int num_recs,
964                                  ZebraRetrievalRecord *recs)
965 {
966     ZebraMetaRecord *poset;
967     int i;
968     ZEBRA_RES ret = ZEBRA_OK;
969     zint *pos_array;
970     ASSERTZH;
971     assert(stream);
972     assert(setname);
973     assert(recs);
974     assert(num_recs>0);
975
976     yaz_log(log_level, "zebra_records_retrieve n=%d", num_recs);
977
978     if (!zh->res)
979     {
980         zebra_setError(zh, YAZ_BIB1_SPECIFIED_RESULT_SET_DOES_NOT_EXIST,
981                        setname);
982         return ZEBRA_FAIL;
983     }
984     
985     if (zebra_begin_read (zh) == ZEBRA_FAIL)
986         return ZEBRA_FAIL;
987
988     pos_array = (zint *) xmalloc(num_recs * sizeof(*pos_array));
989     for (i = 0; i<num_recs; i++)
990         pos_array[i] = recs[i].position;
991     poset = zebra_meta_records_create(zh, setname, num_recs, pos_array);
992     if (!poset)
993     {
994         yaz_log (YLOG_DEBUG, "zebraPosSetCreate error");
995         zebra_setError(zh, YAZ_BIB1_SPECIFIED_RESULT_SET_DOES_NOT_EXIST,
996                        setname);
997         ret = ZEBRA_FAIL;
998     }
999     else
1000     {
1001         for (i = 0; i<num_recs; i++)
1002         {
1003             if (poset[i].term)
1004             {
1005                 recs[i].errCode = 0;
1006                 recs[i].format = VAL_SUTRS;
1007                 recs[i].len = strlen(poset[i].term);
1008                 recs[i].buf = poset[i].term;
1009                 recs[i].base = poset[i].db;
1010             }
1011             else if (poset[i].sysno)
1012             {
1013                 char *buf;
1014                 int len;
1015                 zebra_snippets *hit_snippet = zebra_snippets_create();
1016
1017                 zebra_snippets_hit_vector(zh, setname, poset[i].sysno, 
1018                                           hit_snippet);
1019
1020                 recs[i].errCode =
1021                     zebra_record_fetch(zh, poset[i].sysno, poset[i].score,
1022                                        hit_snippet,
1023                                        stream, input_format, comp,
1024                                        &recs[i].format, &buf, &len,
1025                                        &recs[i].base, &recs[i].errString);
1026                 
1027                 recs[i].len = len;
1028                 if (len > 0)
1029                 {
1030                     recs[i].buf = (char*) odr_malloc(stream, len);
1031                     memcpy(recs[i].buf, buf, len);
1032                 }
1033                 else
1034                     recs[i].buf = buf;
1035                 recs[i].score = poset[i].score;
1036                 recs[i].sysno = poset[i].sysno;
1037                 zebra_snippets_destroy(hit_snippet);
1038             }
1039             else
1040             {
1041                 /* only need to set it once */
1042                 if (pos_array[i] < zh->approx_limit && ret == ZEBRA_OK)
1043                 {
1044                     zebra_setError_zint(zh,
1045                                         YAZ_BIB1_PRESENT_REQUEST_OUT_OF_RANGE,
1046                                         pos_array[i]);
1047                     ret = ZEBRA_FAIL;
1048                     break;
1049                 }
1050                 recs[i].buf = 0;  /* no record and no error issued */
1051                 recs[i].len = 0;
1052                 recs[i].errCode = 0;
1053                 recs[i].format = VAL_NONE;
1054                 recs[i].sysno = 0;
1055             }
1056         }
1057         zebra_meta_records_destroy(zh, poset, num_recs);
1058     }
1059     zebra_end_read (zh);
1060     xfree(pos_array);
1061     return ret;
1062 }
1063
1064 ZEBRA_RES zebra_scan_PQF(ZebraHandle zh, ODR stream, const char *query,
1065                          int *position,
1066                          int *num_entries, ZebraScanEntry **entries,
1067                          int *is_partial,
1068                          const char *setname)
1069 {
1070     YAZ_PQF_Parser pqf_parser = yaz_pqf_create ();
1071     Z_AttributesPlusTerm *zapt;
1072     int *attributeSet;
1073     ZEBRA_RES res;
1074     
1075     if (!(zapt = yaz_pqf_scan(pqf_parser, stream, &attributeSet, query)))
1076     {
1077         res = ZEBRA_FAIL;
1078         zh->errCode = YAZ_BIB1_SCAN_MALFORMED_SCAN;
1079     }
1080     else
1081         res = zebra_scan(zh, stream, zapt, VAL_BIB1,
1082                          position, num_entries, entries, is_partial,
1083                          setname);
1084     yaz_pqf_destroy (pqf_parser);
1085     return res;
1086 }
1087
1088 ZEBRA_RES zebra_scan(ZebraHandle zh, ODR stream, Z_AttributesPlusTerm *zapt,
1089                      oid_value attributeset,
1090                      int *position,
1091                      int *num_entries, ZebraScanEntry **entries,
1092                      int *is_partial,
1093                      const char *setname)
1094 {
1095     ZEBRA_RES res;
1096     RSET limit_rset = 0;
1097     ASSERTZH;
1098     assert(stream);
1099     assert(zapt);
1100     assert(position);
1101     assert(num_entries);
1102     assert(is_partial);
1103     assert(entries);
1104     yaz_log(log_level, "zebra_scan");
1105
1106     if (zebra_begin_read (zh) == ZEBRA_FAIL)
1107     {
1108         *entries = 0;
1109         *num_entries = 0;
1110         return ZEBRA_FAIL;
1111     }
1112     if (setname)
1113     {
1114         limit_rset = resultSetRef(zh, setname);
1115         if (!limit_rset)
1116         {
1117             zebra_setError(zh, 
1118                            YAZ_BIB1_SPECIFIED_RESULT_SET_DOES_NOT_EXIST,
1119                            setname);
1120             zebra_end_read (zh);
1121             return ZEBRA_FAIL;
1122         }
1123     }
1124     res = rpn_scan (zh, stream, zapt, attributeset,
1125                     zh->num_basenames, zh->basenames, position,
1126                     num_entries, entries, is_partial, limit_rset, 0);
1127     zebra_end_read (zh);
1128     return res;
1129 }
1130
1131 ZEBRA_RES zebra_sort (ZebraHandle zh, ODR stream,
1132                       int num_input_setnames, const char **input_setnames,
1133                       const char *output_setname,
1134                       Z_SortKeySpecList *sort_sequence,
1135                       int *sort_status)
1136 {
1137     ZEBRA_RES res;
1138     ASSERTZH;
1139     assert(stream);
1140     assert(num_input_setnames>0);
1141     assert(input_setnames);
1142     assert(sort_sequence);
1143     assert(sort_status);
1144     yaz_log(log_level, "zebra_sort");
1145
1146     if (zebra_begin_read(zh) == ZEBRA_FAIL)
1147         return ZEBRA_FAIL;
1148     res = resultSetSort(zh, stream->mem, num_input_setnames, input_setnames,
1149                         output_setname, sort_sequence, sort_status);
1150     zebra_end_read(zh);
1151     return res;
1152 }
1153
1154 int zebra_deleteResultSet(ZebraHandle zh, int function,
1155                           int num_setnames, char **setnames,
1156                           int *statuses)
1157 {
1158     int i, status;
1159     ASSERTZH;
1160     assert(statuses);
1161     yaz_log(log_level, "zebra_deleteResultSet n=%d",num_setnames);
1162
1163     if (zebra_begin_read(zh))
1164         return Z_DeleteStatus_systemProblemAtTarget;
1165     switch (function)
1166     {
1167     case Z_DeleteResultSetRequest_list:
1168         assert(num_setnames>0);
1169         assert(setnames);
1170         resultSetDestroy (zh, num_setnames, setnames, statuses);
1171         break;
1172     case Z_DeleteResultSetRequest_all:
1173         resultSetDestroy (zh, -1, 0, statuses);
1174         break;
1175     }
1176     zebra_end_read (zh);
1177     status = Z_DeleteStatus_success;
1178     for (i = 0; i<num_setnames; i++)
1179         if (statuses[i] == Z_DeleteStatus_resultSetDidNotExist)
1180             status = statuses[i];
1181     return status;
1182 }
1183
1184 int zebra_errCode (ZebraHandle zh)
1185 {
1186     if (zh)
1187     {
1188         yaz_log(log_level, "zebra_errCode: %d",zh->errCode);
1189         return zh->errCode;
1190     }
1191     yaz_log(log_level, "zebra_errCode: o");
1192     return 0; 
1193 }
1194
1195 const char *zebra_errString (ZebraHandle zh)
1196 {
1197     const char *e = 0;
1198     if (zh)
1199         e= diagbib1_str (zh->errCode);
1200     yaz_log(log_level, "zebra_errString: %s",e);
1201     return e;
1202 }
1203
1204 char *zebra_errAdd (ZebraHandle zh)
1205 {
1206     char *a = 0;
1207     if (zh)
1208         a= zh->errString;
1209     yaz_log(log_level, "zebra_errAdd: %s",a);
1210     return a;
1211 }
1212
1213 ZEBRA_RES zebra_auth (ZebraHandle zh, const char *user, const char *pass)
1214 {
1215     const char *p;
1216     const char *astring;
1217     char u[40];
1218     ZebraService zs;
1219
1220     ASSERTZH;
1221
1222     zs= zh->service;
1223     
1224     sprintf(u, "perm.%.30s", user ? user : "anonymous");
1225     p = res_get(zs->global_res, u);
1226     xfree(zh->user_perm);
1227     zh->user_perm = xstrdup(p ? p : "r");
1228
1229     /* Determine database access list */
1230     astring = res_get(zs->dbaccess, user ? user : "anonymous");
1231     if (astring)
1232         zh->dbaccesslist = xstrdup(astring);
1233     else
1234         zh->dbaccesslist = 0;
1235
1236     /* users that don't require a password .. */
1237     if (zh->user_perm && strchr(zh->user_perm, 'a'))
1238         return ZEBRA_OK;
1239     
1240     if (!zs->passwd_db || !passwd_db_auth (zs->passwd_db, user, pass))
1241         return ZEBRA_OK;
1242     return ZEBRA_FAIL;
1243 }
1244
1245 ZEBRA_RES zebra_admin_import_begin (ZebraHandle zh, const char *database,
1246                                const char *record_type)
1247 {
1248     ASSERTZH;
1249     yaz_log(log_level, "zebra_admin_import_begin db=%s rt=%s", 
1250                      database, record_type);
1251     if (zebra_select_database(zh, database) == ZEBRA_FAIL)
1252         return ZEBRA_FAIL;
1253     return zebra_begin_trans(zh, 1);
1254 }
1255
1256 ZEBRA_RES zebra_admin_import_end (ZebraHandle zh)
1257 {
1258     ASSERTZH;
1259     yaz_log(log_level, "zebra_admin_import_end");
1260     return zebra_end_trans(zh);
1261 }
1262
1263 ZEBRA_RES zebra_admin_import_segment (ZebraHandle zh, Z_Segment *segment)
1264 {
1265     ZEBRA_RES res = ZEBRA_OK;
1266     SYSNO sysno;
1267     int i;
1268     ASSERTZH;
1269     yaz_log(log_level, "zebra_admin_import_segment");
1270
1271     for (i = 0; i<segment->num_segmentRecords; i++)
1272     {
1273         Z_NamePlusRecord *npr = segment->segmentRecords[i];
1274
1275         if (npr->which == Z_NamePlusRecord_intermediateFragment)
1276         {
1277             Z_FragmentSyntax *fragment = npr->u.intermediateFragment;
1278             if (fragment->which == Z_FragmentSyntax_notExternallyTagged)
1279             {
1280                 Odr_oct *oct = fragment->u.notExternallyTagged;
1281                 sysno = 0;
1282                 
1283                 if (zebra_update_record(zh, 
1284                                         0, /* record Type */
1285                                         &sysno,
1286                                         0, /* match */
1287                                         0, /* fname */
1288                                         (const char *) oct->buf, oct->len,
1289                                         0) == ZEBRA_FAIL)
1290                     res = ZEBRA_FAIL;
1291             }
1292         }
1293     }
1294     return res;
1295 }
1296
1297 ZEBRA_RES zebra_admin_exchange_record(ZebraHandle zh,
1298                                       const char *rec_buf,
1299                                       size_t rec_len,
1300                                       const char *recid_buf, size_t recid_len,
1301                                       int action)
1302     /* 1 = insert. Fail it already exists */
1303     /* 2 = replace. Fail it does not exist */
1304     /* 3 = delete. Fail if does not exist */
1305     /* 4 = update. Insert/replace */
1306 {
1307     ZEBRA_RES res;
1308     SYSNO sysno = 0;
1309     char *rinfo = 0;
1310     char recid_z[256];
1311     int db_ord;
1312     ASSERTZH;
1313     assert(action>0 && action <=4);
1314     assert(rec_buf);
1315
1316     yaz_log(log_level, "zebra_admin_exchange_record ac=%d", action);
1317
1318     if (!recid_buf || recid_len <= 0 || recid_len >= sizeof(recid_z))
1319     {
1320         zebra_setError(zh, YAZ_BIB1_ES_IMMEDIATE_EXECUTION_FAILED,
1321                        "no record ID or empty record ID");
1322         return ZEBRA_FAIL;
1323     }
1324
1325     memcpy (recid_z, recid_buf, recid_len);
1326     recid_z[recid_len] = 0;
1327
1328     if (zebra_begin_trans(zh, 1) == ZEBRA_FAIL)
1329         return ZEBRA_FAIL;
1330
1331     db_ord = zebraExplain_get_database_ord(zh->reg->zei);
1332     rinfo = dict_lookup_ord(zh->reg->matchDict, db_ord, recid_z);
1333     if (rinfo)
1334     {
1335         if (action == 1)  /* fail if insert */
1336         {
1337             zebra_end_trans(zh);
1338             zebra_setError(zh, YAZ_BIB1_ES_IMMEDIATE_EXECUTION_FAILED,
1339                            "Cannot insert record: already exist");
1340             return ZEBRA_FAIL;
1341         }
1342
1343         memcpy (&sysno, rinfo+1, sizeof(sysno));
1344     }
1345     else
1346     {
1347         if (action == 2 || action == 3) /* fail if delete or update */
1348         {
1349             zebra_end_trans(zh);
1350             zebra_setError(zh, YAZ_BIB1_ES_IMMEDIATE_EXECUTION_FAILED,
1351                            "Cannot delete/update record: does not exist");
1352             return ZEBRA_FAIL;
1353         }
1354         action = 1;  /* make it an insert (if it's an update).. */
1355     }
1356     res = buffer_extract_record (zh, rec_buf, rec_len,
1357                                  action == 3 ? 1 : 0 /* delete flag */,
1358                                  0, /* test mode */
1359                                  0, /* recordType */
1360                                  &sysno, 
1361                                  0, /* match */
1362                                  0, /* fname */
1363                                  0, /* force update */
1364                                  1  /* allow update */
1365         );
1366     if (res == ZEBRA_FAIL)
1367     {
1368         zebra_setError(zh, YAZ_BIB1_ES_IMMEDIATE_EXECUTION_FAILED,
1369                        "Unable to parse record");
1370     }
1371     if (action == 1)
1372     {
1373         dict_insert_ord(zh->reg->matchDict, db_ord, recid_z,
1374                         sizeof(sysno), &sysno);
1375     }
1376     else if (action == 3)
1377     {
1378         dict_delete_ord(zh->reg->matchDict, db_ord, recid_z);
1379     }
1380     zebra_end_trans(zh);
1381     return res;
1382 }
1383
1384 int delete_w_handle(const char *info, void *handle)
1385 {
1386     ZebraHandle zh = (ZebraHandle) handle;
1387     ISAM_P pos;
1388     ASSERTZH;
1389
1390     if (*info == sizeof(pos))
1391     {
1392         memcpy (&pos, info+1, sizeof(pos));
1393         isamb_unlink(zh->reg->isamb, pos);
1394     }
1395     return 0;
1396 }
1397
1398 static int delete_SU_handle(void *handle, int ord)
1399 {
1400     ZebraHandle zh = (ZebraHandle) handle;
1401     char ord_buf[20];
1402     int ord_len;
1403
1404     ord_len = key_SU_encode (ord, ord_buf);
1405     ord_buf[ord_len] = '\0';
1406
1407     assert (zh->reg->isamb);
1408     dict_delete_subtree(zh->reg->dict, ord_buf,
1409                         zh, delete_w_handle);
1410     return 0;
1411 }
1412
1413 ZEBRA_RES zebra_drop_database(ZebraHandle zh, const char *db)
1414 {
1415     ZEBRA_RES ret = ZEBRA_OK;
1416     ASSERTZH;
1417     yaz_log(log_level, "zebra_drop_database %s", db);
1418
1419     if (zebra_select_database (zh, db) == ZEBRA_FAIL)
1420         return ZEBRA_FAIL;
1421     if (zebra_begin_trans (zh, 1) == ZEBRA_FAIL)
1422         return ZEBRA_FAIL;
1423     if (zh->reg->isamb)
1424     {
1425         int db_ord;
1426         zebraExplain_curDatabase (zh->reg->zei, db);
1427         db_ord = zebraExplain_get_database_ord(zh->reg->zei);
1428         dict_delete_subtree_ord(zh->reg->matchDict, db_ord,
1429                                 0 /* handle */, 0 /* func */);
1430         zebraExplain_trav_ord(zh->reg->zei, zh, delete_SU_handle);
1431         zebraExplain_removeDatabase(zh->reg->zei, zh);
1432     }
1433     else
1434     {
1435         yaz_log(YLOG_WARN, "drop database only supported for isam:b");
1436         zebra_setError(zh, YAZ_BIB1_ES_IMMEDIATE_EXECUTION_FAILED,
1437                        "drop database only supported for isam:b");
1438         ret = ZEBRA_FAIL;
1439     }
1440     zebra_end_trans (zh);
1441     return ret;
1442 }
1443
1444 ZEBRA_RES zebra_create_database (ZebraHandle zh, const char *db)
1445 {
1446     ASSERTZH;
1447     yaz_log(log_level, "zebra_create_database %s", db);
1448     assert(db);
1449
1450     if (zebra_select_database (zh, db) == ZEBRA_FAIL)
1451         return ZEBRA_FAIL;
1452     if (zebra_begin_trans (zh, 1))
1453         return ZEBRA_FAIL;
1454
1455     /* announce database */
1456     if (zebraExplain_newDatabase (zh->reg->zei, db, 0 
1457                                   /* explainDatabase */))
1458     {
1459         zebra_end_trans (zh);
1460         zebra_setError(zh, YAZ_BIB1_ES_IMMEDIATE_EXECUTION_FAILED, db);
1461         return ZEBRA_FAIL;
1462     }
1463     return zebra_end_trans (zh);
1464 }
1465
1466 int zebra_string_norm (ZebraHandle zh, unsigned reg_id,
1467                        const char *input_str, int input_len,
1468                        char *output_str, int output_len)
1469 {
1470     WRBUF wrbuf;
1471     ASSERTZH;
1472     assert(input_str);
1473     assert(output_str);
1474     yaz_log(log_level, "zebra_string_norm ");
1475
1476     if (!zh->reg->zebra_maps)
1477         return -1;
1478     wrbuf = zebra_replace(zh->reg->zebra_maps, reg_id, "",
1479                           input_str, input_len);
1480     if (!wrbuf)
1481         return -2;
1482     if (wrbuf_len(wrbuf) >= output_len)
1483         return -3;
1484     if (wrbuf_len(wrbuf))
1485         memcpy (output_str, wrbuf_buf(wrbuf), wrbuf_len(wrbuf));
1486     output_str[wrbuf_len(wrbuf)] = '\0';
1487     return wrbuf_len(wrbuf);
1488 }
1489
1490 static void zebra_set_state (ZebraHandle zh, int val, int seqno)
1491 {
1492     char state_fname[256];
1493     char *fname;
1494     long p = getpid();
1495     FILE *f;
1496     ASSERTZH;
1497     yaz_log(log_level, "zebra_set_state v=%d seq=%d", val, seqno);
1498
1499     sprintf (state_fname, "state.%s.LCK", zh->reg_name);
1500     fname = zebra_mk_fname (res_get(zh->res, "lockDir"), state_fname);
1501     f = fopen (fname, "w");
1502
1503     yaz_log (YLOG_DEBUG, "zebra_set_state: %c %d %ld", val, seqno, p);
1504     fprintf (f, "%c %d %ld\n", val, seqno, p);
1505     fclose (f);
1506     xfree(fname);
1507 }
1508
1509 static void zebra_get_state (ZebraHandle zh, char *val, int *seqno)
1510 {
1511     char state_fname[256];
1512     char *fname;
1513     FILE *f;
1514
1515     ASSERTZH;
1516     yaz_log(log_level, "zebra_get_state ");
1517
1518     sprintf (state_fname, "state.%s.LCK", zh->reg_name);
1519     fname = zebra_mk_fname (res_get(zh->res, "lockDir"), state_fname);
1520     f = fopen (fname, "r");
1521     *val = 'o';
1522     *seqno = 0;
1523
1524     if (f)
1525     {
1526         fscanf (f, "%c %d", val, seqno);
1527         fclose (f);
1528     }
1529     xfree(fname);
1530 }
1531
1532 ZEBRA_RES zebra_begin_read (ZebraHandle zh)
1533 {
1534     return zebra_begin_trans(zh, 0);
1535 }
1536
1537 ZEBRA_RES zebra_end_read (ZebraHandle zh)
1538 {
1539     return zebra_end_trans(zh);
1540 }
1541
1542 static void read_res_for_transaction(ZebraHandle zh)
1543 {
1544     const char *group = res_get(zh->res, "group");
1545     const char *v;
1546     /* FIXME - do we still use groups ?? */
1547     
1548     zh->m_group = group;
1549     v = res_get_prefix(zh->res, "followLinks", group, "1");
1550     zh->m_follow_links = atoi(v);
1551
1552     zh->m_record_id = res_get_prefix(zh->res, "recordId", group, 0);
1553     zh->m_record_type = res_get_prefix(zh->res, "recordType", group, 0);
1554
1555     v = res_get_prefix(zh->res, "storeKeys", group, "1");
1556     zh->m_store_keys = atoi(v);
1557
1558     v = res_get_prefix(zh->res, "storeData", group, "1");
1559     zh->m_store_data = atoi(v);
1560
1561     v = res_get_prefix(zh->res, "explainDatabase", group, "0");
1562     zh->m_explain_database = atoi(v);
1563
1564     v = res_get_prefix(zh->res, "openRW", group, "1");
1565     zh->m_flag_rw = atoi(v);
1566
1567     v = res_get_prefix(zh->res, "fileVerboseLimit", group, "100000");
1568     zh->m_file_verbose_limit = atoi(v);
1569 }
1570
1571 ZEBRA_RES zebra_begin_trans(ZebraHandle zh, int rw)
1572 {
1573     ASSERTZH;
1574     zebra_select_default_database(zh);
1575     if (!zh->res)
1576     {
1577         zebra_setError(zh, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR,
1578                        "zebra_begin_trans: no database selected");
1579         return ZEBRA_FAIL;
1580     }
1581     ASSERTZHRES;
1582     yaz_log(log_level, "zebra_begin_trans rw=%d",rw);
1583
1584     if (zh->user_perm)
1585     {
1586         if (rw && !strchr(zh->user_perm, 'w'))
1587         {
1588             zebra_setError(
1589                 zh,
1590                 YAZ_BIB1_ES_PERMISSION_DENIED_ON_ES_CANNOT_MODIFY_OR_DELETE,
1591                 0);
1592             return ZEBRA_FAIL;
1593         }
1594     }
1595
1596     assert (zh->res);
1597     if (rw)
1598     {
1599         int pass;
1600         int seqno = 0;
1601         char val = '?';
1602         const char *rval = 0;
1603         
1604         (zh->trans_no++);
1605         if (zh->trans_w_no)
1606         {
1607             read_res_for_transaction(zh);
1608             return 0;
1609         }
1610         if (zh->trans_no != 1)
1611         {
1612             zebra_setError(zh, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR,
1613                            "zebra_begin_trans: no write trans within read");
1614             return ZEBRA_FAIL;
1615         }
1616         if (zh->reg)
1617         {
1618             resultSetInvalidate (zh);
1619             zebra_register_close (zh->service, zh->reg);
1620         }
1621         zh->trans_w_no = zh->trans_no;
1622
1623         zh->records_inserted = 0;
1624         zh->records_updated = 0;
1625         zh->records_deleted = 0;
1626         zh->records_processed = 0;
1627         
1628 #if HAVE_SYS_TIMES_H
1629         times (&zh->tms1);
1630 #endif
1631         /* lock */
1632         if (zh->shadow_enable)
1633             rval = res_get (zh->res, "shadow");
1634         
1635         for (pass = 0; pass < 2; pass++)
1636         {
1637             if (rval)
1638             {
1639                 zebra_lock_r (zh->lock_normal);
1640                 zebra_lock_w (zh->lock_shadow);
1641             }
1642             else
1643             {
1644                 zebra_lock_w (zh->lock_normal);
1645                 zebra_lock_w (zh->lock_shadow);
1646             }
1647             
1648             zebra_get_state (zh, &val, &seqno);
1649             if (val == 'c')
1650             {
1651                 yaz_log (YLOG_WARN, "previous transaction didn't finish commit");
1652                 zebra_unlock (zh->lock_shadow);
1653                 zebra_unlock (zh->lock_normal);
1654                 zebra_commit (zh);
1655                 continue;
1656             }
1657             else if (val == 'd')
1658             {
1659                 if (rval)
1660                 {
1661                     BFiles bfs = bfs_create (res_get (zh->res, "shadow"),
1662                                              zh->path_reg);
1663                     yaz_log (YLOG_WARN, "previous transaction didn't reach commit");
1664                     bf_commitClean (bfs, rval);
1665                     bfs_destroy (bfs);
1666             }
1667                 else
1668                 {
1669                     yaz_log (YLOG_WARN, "your previous transaction didn't finish");
1670                 }
1671             }
1672             break;
1673         }
1674         if (pass == 2)
1675         {
1676             yaz_log (YLOG_FATAL, "zebra_begin_trans couldn't finish commit");
1677             abort();
1678             return ZEBRA_FAIL;
1679         }
1680         zebra_set_state (zh, 'd', seqno);
1681         
1682         zh->reg = zebra_register_open(zh->service, zh->reg_name,
1683                                       1, rval ? 1 : 0, zh->res,
1684                                       zh->path_reg);
1685         if (zh->reg)
1686             zh->reg->seqno = seqno;
1687         else
1688         {
1689             zebra_set_state (zh, 'o', seqno);
1690             
1691             zebra_unlock (zh->lock_shadow);
1692             zebra_unlock (zh->lock_normal);
1693
1694             zh->trans_no--;
1695             zh->trans_w_no = 0;
1696
1697             zebra_setError(zh, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR,
1698                            "zebra_begin_trans: cannot open register");
1699             yaz_log(YLOG_FATAL, "%s", zh->errString);
1700             return ZEBRA_FAIL;
1701         }
1702         zebraExplain_curDatabase(zh->reg->zei, zh->basenames[0]);
1703     }
1704     else
1705     {
1706         int dirty = 0;
1707         char val;
1708         int seqno;
1709         
1710         (zh->trans_no)++;
1711         
1712         if (zh->trans_no != 1)
1713         {
1714             zebra_flush_reg (zh);
1715             return ZEBRA_OK;
1716         }
1717 #if HAVE_SYS_TIMES_H
1718         times (&zh->tms1);
1719 #endif
1720         if (!zh->res)
1721         {
1722             (zh->trans_no)--;
1723             zh->errCode = YAZ_BIB1_DATABASE_UNAVAILABLE;
1724             return ZEBRA_FAIL;
1725         }
1726         if (!zh->lock_normal || !zh->lock_shadow)
1727         {
1728             (zh->trans_no)--;
1729             zh->errCode = YAZ_BIB1_TEMPORARY_SYSTEM_ERROR;
1730             return ZEBRA_FAIL;
1731         }
1732         zebra_get_state (zh, &val, &seqno);
1733         if (val == 'd')
1734             val = 'o';
1735         
1736         if (!zh->reg)
1737             dirty = 1;
1738         else if (seqno != zh->reg->seqno)
1739         {
1740             yaz_log (YLOG_DEBUG, "reopen seqno cur/old %d/%d",
1741                      seqno, zh->reg->seqno);
1742             dirty = 1;
1743         }
1744         else if (zh->reg->last_val != val)
1745         {
1746             yaz_log (YLOG_DEBUG, "reopen last cur/old %d/%d",
1747                      val, zh->reg->last_val);
1748             dirty = 1;
1749         }
1750         if (!dirty)
1751             return ZEBRA_OK;
1752         
1753         if (val == 'c')
1754             zebra_lock_r (zh->lock_shadow);
1755         else
1756             zebra_lock_r (zh->lock_normal);
1757         
1758         if (zh->reg)
1759         {
1760             resultSetInvalidate (zh);
1761             zebra_register_close (zh->service, zh->reg);
1762         }
1763         zh->reg = zebra_register_open(zh->service, zh->reg_name,
1764                                       0, val == 'c' ? 1 : 0,
1765                                       zh->res, zh->path_reg);
1766         if (!zh->reg)
1767         {
1768             zebra_unlock (zh->lock_normal);
1769             zebra_unlock (zh->lock_shadow);
1770             zh->trans_no--;
1771             zh->errCode = YAZ_BIB1_DATABASE_UNAVAILABLE;
1772             return ZEBRA_FAIL;
1773         }
1774         zh->reg->last_val = val;
1775         zh->reg->seqno = seqno;
1776     }
1777     read_res_for_transaction(zh);
1778     return ZEBRA_OK;
1779 }
1780
1781 ZEBRA_RES zebra_end_trans (ZebraHandle zh)
1782 {
1783     ZebraTransactionStatus dummy;
1784     ASSERTZH;
1785     yaz_log(log_level, "zebra_end_trans");
1786     return zebra_end_transaction(zh, &dummy);
1787 }
1788
1789 ZEBRA_RES zebra_end_transaction (ZebraHandle zh, ZebraTransactionStatus *status)
1790 {
1791     char val;
1792     int seqno;
1793     const char *rval;
1794
1795     ASSERTZH;
1796     assert(status);
1797     yaz_log(log_level, "zebra_end_transaction");
1798
1799     status->processed = 0;
1800     status->inserted  = 0;
1801     status->updated   = 0;
1802     status->deleted   = 0;
1803     status->utime     = 0;
1804     status->stime     = 0;
1805
1806     if (!zh->res || !zh->reg)
1807     {
1808         zebra_setError(zh, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR,
1809                        "zebra_end_trans: no open transaction");
1810         return ZEBRA_FAIL;
1811     }
1812     if (zh->trans_no != zh->trans_w_no)
1813     {
1814         zh->trans_no--;
1815         if (zh->trans_no != 0)
1816             return ZEBRA_OK;
1817
1818         /* release read lock */
1819
1820         zebra_unlock (zh->lock_normal);
1821         zebra_unlock (zh->lock_shadow);
1822     }
1823     else
1824     {   /* release write lock */
1825         zh->trans_no--;
1826         zh->trans_w_no = 0;
1827         
1828         yaz_log (YLOG_DEBUG, "zebra_end_trans");
1829         rval = res_get (zh->res, "shadow");
1830         
1831         zebraExplain_runNumberIncrement (zh->reg->zei, 1);
1832         
1833         zebra_flush_reg (zh);
1834         
1835         resultSetInvalidate (zh);
1836
1837         zebra_register_close (zh->service, zh->reg);
1838         zh->reg = 0;
1839         
1840         yaz_log (YLOG_LOG, "Records: "ZINT_FORMAT" i/u/d "
1841                         ZINT_FORMAT"/"ZINT_FORMAT"/"ZINT_FORMAT, 
1842                  zh->records_processed, zh->records_inserted,
1843                  zh->records_updated, zh->records_deleted);
1844         
1845         status->processed = (int) zh->records_processed;
1846         status->inserted = (int) zh->records_inserted;
1847         status->updated = (int) zh->records_updated;
1848         status->deleted = (int) zh->records_deleted;
1849         
1850         zebra_get_state (zh, &val, &seqno);
1851         if (val != 'd')
1852         {
1853             BFiles bfs = bfs_create (rval, zh->path_reg);
1854             yaz_log (YLOG_DEBUG, "deleting shadow val=%c", val);
1855             bf_commitClean (bfs, rval);
1856             bfs_destroy (bfs);
1857         }
1858         if (!rval)
1859             seqno++;
1860         zebra_set_state (zh, 'o', seqno);
1861         
1862         zebra_unlock (zh->lock_shadow);
1863         zebra_unlock (zh->lock_normal);
1864         
1865     }
1866 #if HAVE_SYS_TIMES_H
1867     times (&zh->tms2);
1868     yaz_log (log_level, "user/system: %ld/%ld",
1869           (long) (zh->tms2.tms_utime - zh->tms1.tms_utime),
1870           (long) (zh->tms2.tms_stime - zh->tms1.tms_stime));
1871     
1872     status->utime = (long) (zh->tms2.tms_utime - zh->tms1.tms_utime);
1873     status->stime = (long) (zh->tms2.tms_stime - zh->tms1.tms_stime);
1874 #endif
1875     return ZEBRA_OK;
1876 }
1877
1878 int zebra_repository_update (ZebraHandle zh, const char *path)
1879 {
1880     ASSERTZH;
1881     assert(path);
1882     yaz_log (log_level, "updating %s", path);
1883     repositoryUpdate (zh, path);
1884     return 0;
1885 }
1886
1887 int zebra_repository_delete (ZebraHandle zh, const char *path)
1888 {
1889     ASSERTZH;
1890     assert(path);
1891     yaz_log (log_level, "deleting %s", path);
1892     repositoryDelete (zh, path);
1893     return 0;
1894 }
1895
1896 int zebra_repository_show (ZebraHandle zh, const char *path)
1897 {
1898     ASSERTZH;
1899     assert(path);
1900     yaz_log(log_level, "zebra_repository_show");
1901     repositoryShow (zh, path);
1902     return 0;
1903 }
1904
1905 static ZEBRA_RES zebra_commit_ex(ZebraHandle zh, int clean_only)
1906 {
1907     int seqno;
1908     char val;
1909     const char *rval;
1910     BFiles bfs;
1911     ASSERTZH;
1912
1913     zebra_select_default_database(zh);
1914     if (!zh->res)
1915     {
1916         zh->errCode = YAZ_BIB1_DATABASE_UNAVAILABLE;
1917         return ZEBRA_FAIL;
1918     }
1919     rval = res_get (zh->res, "shadow");    
1920     if (!rval)
1921     {
1922         yaz_log (YLOG_WARN, "Cannot perform commit - No shadow area defined");
1923         return ZEBRA_OK;
1924     }
1925
1926     zebra_lock_w (zh->lock_normal);
1927     zebra_lock_r (zh->lock_shadow);
1928
1929     bfs = bfs_create (res_get (zh->res, "register"), zh->path_reg);
1930
1931     zebra_get_state (zh, &val, &seqno);
1932
1933     if (rval && *rval)
1934         bf_cache (bfs, rval);
1935     if (bf_commitExists (bfs))
1936     {
1937         if (clean_only)
1938             zebra_set_state (zh, 'd', seqno);
1939         else
1940         {
1941             zebra_set_state (zh, 'c', seqno);
1942             
1943             yaz_log (YLOG_DEBUG, "commit start");
1944             bf_commitExec (bfs);
1945 #ifndef WIN32
1946             sync ();
1947 #endif
1948         }
1949         yaz_log (YLOG_DEBUG, "commit clean");
1950         bf_commitClean (bfs, rval);
1951         seqno++;
1952         zebra_set_state (zh, 'o', seqno);
1953     }
1954     else
1955     {
1956         yaz_log (log_level, "nothing to commit");
1957     }
1958     bfs_destroy (bfs);
1959
1960     zebra_unlock (zh->lock_shadow);
1961     zebra_unlock (zh->lock_normal);
1962     return ZEBRA_OK;
1963 }
1964
1965 ZEBRA_RES zebra_clean(ZebraHandle zh)
1966 {
1967     ASSERTZH;
1968     yaz_log(log_level, "zebra_clean");
1969     return zebra_commit_ex(zh, 1);
1970 }
1971
1972 ZEBRA_RES zebra_commit(ZebraHandle zh)
1973 {
1974     ASSERTZH;
1975     yaz_log(log_level, "zebra_commit");
1976     return zebra_commit_ex(zh, 0);
1977 }
1978
1979 ZEBRA_RES zebra_init(ZebraHandle zh)
1980 {
1981     const char *rval;
1982     BFiles bfs = 0;
1983     ASSERTZH;
1984     yaz_log(log_level, "zebra_init");
1985
1986     zebra_select_default_database(zh);
1987     if (!zh->res)
1988     {
1989         zebra_setError(zh, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR,
1990                        "cannot select default database");
1991         return ZEBRA_FAIL;
1992     }
1993     rval = res_get (zh->res, "shadow");
1994
1995     bfs = bfs_create (res_get (zh->res, "register"), zh->path_reg);
1996     if (!bfs)
1997     {
1998         zebra_setError(zh, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, "bfs_create");
1999         return ZEBRA_FAIL;
2000     }
2001     if (rval && *rval)
2002         bf_cache (bfs, rval);
2003     
2004     bf_reset (bfs);
2005     bfs_destroy (bfs);
2006     zebra_set_state (zh, 'o', 0);
2007     return ZEBRA_OK;
2008 }
2009
2010 ZEBRA_RES zebra_compact(ZebraHandle zh)
2011 {
2012     BFiles bfs;
2013     ASSERTZH;
2014     yaz_log(log_level, "zebra_compact");
2015     if (!zh->res)
2016     {
2017         zh->errCode = YAZ_BIB1_DATABASE_UNAVAILABLE;
2018         return ZEBRA_FAIL;
2019     }
2020     bfs = bfs_create (res_get (zh->res, "register"), zh->path_reg);
2021     inv_compact (bfs);
2022     bfs_destroy (bfs);
2023     return ZEBRA_OK;
2024 }
2025
2026 void zebra_result(ZebraHandle zh, int *code, char **addinfo)
2027 {
2028     ASSERTZH;
2029     yaz_log(log_level, "zebra_result");
2030     *code = zh->errCode;
2031     *addinfo = zh->errString;
2032 }
2033
2034 void zebra_shadow_enable(ZebraHandle zh, int value)
2035 {
2036     ASSERTZH;
2037     yaz_log(log_level, "zebra_shadow_enable");
2038     zh->shadow_enable = value;
2039 }
2040
2041 ZEBRA_RES zebra_octet_term_encoding(ZebraHandle zh, const char *encoding)
2042 {
2043     ASSERTZH;
2044     assert(encoding);
2045     yaz_log(log_level, "zebra_octet_term_encoding %s", encoding);
2046
2047     if (zh->iconv_to_utf8 != 0)
2048         yaz_iconv_close(zh->iconv_to_utf8);
2049     if (zh->iconv_from_utf8 != 0)
2050         yaz_iconv_close(zh->iconv_from_utf8);
2051     
2052     zh->iconv_to_utf8 =
2053         yaz_iconv_open ("UTF-8", encoding);
2054     if (zh->iconv_to_utf8 == 0)
2055         yaz_log (YLOG_WARN, "iconv: %s to UTF-8 unsupported", encoding);
2056     zh->iconv_from_utf8 =
2057         yaz_iconv_open (encoding, "UTF-8");
2058     if (zh->iconv_to_utf8 == 0)
2059         yaz_log (YLOG_WARN, "iconv: UTF-8 to %s unsupported", encoding);
2060
2061     return ZEBRA_OK;
2062 }
2063
2064 ZEBRA_RES zebra_record_encoding (ZebraHandle zh, const char *encoding)
2065 {
2066     ASSERTZH;
2067     yaz_log(log_level, "zebra_record_encoding");
2068     xfree(zh->record_encoding);
2069     zh->record_encoding = 0;
2070     if (encoding)
2071         zh->record_encoding = xstrdup (encoding);
2072     return ZEBRA_OK;
2073 }
2074
2075 void zebra_set_resource(ZebraHandle zh, const char *name, const char *value)
2076 {
2077     ASSERTZH;
2078     assert(name);
2079     assert(value);
2080     yaz_log(log_level, "zebra_set_resource %s:%s", name, value);
2081     res_set(zh->res, name, value);
2082 }
2083
2084 const char *zebra_get_resource(ZebraHandle zh,
2085                                const char *name, const char *defaultvalue)
2086 {
2087     const char *v;
2088     ASSERTZH;
2089     assert(name);
2090     v = res_get_def (zh->res, name, (char *)defaultvalue);
2091     yaz_log(log_level, "zebra_get_resource %s:%s", name, v);
2092     return v;
2093 }
2094
2095 /* moved from zebra_api_ext.c by pop */
2096 /* FIXME: Should this really be public??? -Heikki */
2097
2098 int zebra_trans_no (ZebraHandle zh)
2099 {
2100     ASSERTZH;
2101     yaz_log(log_level, "zebra_trans_no");
2102     return zh->trans_no;
2103 }
2104
2105 int zebra_get_shadow_enable (ZebraHandle zh)
2106 {
2107     ASSERTZH;
2108     yaz_log(log_level, "zebra_get_shadow_enable");
2109     return zh->shadow_enable;
2110 }
2111
2112 void zebra_set_shadow_enable (ZebraHandle zh, int value)
2113 {
2114     ASSERTZH;
2115     yaz_log(log_level, "zebra_set_shadow_enable %d",value);
2116     zh->shadow_enable = value;
2117 }
2118
2119 /* Used by Perl API.. Added the record buffer dup to zebra_records_retrieve
2120    so that it's identicical to the original api_records_retrieve */
2121 void api_records_retrieve (ZebraHandle zh, ODR stream,
2122                            const char *setname, Z_RecordComposition *comp,
2123                            oid_value input_format, int num_recs,
2124                            ZebraRetrievalRecord *recs)
2125 {
2126     zebra_records_retrieve(zh, stream, setname, comp, input_format,
2127                            num_recs, recs);
2128 }
2129
2130 /* ---------------------------------------------------------------------------
2131   Record insert(=update), delete 
2132
2133   If sysno is provided, then it's used to identify the record.
2134   If not, and match_criteria is provided, then sysno is guessed
2135   If not, and a record is provided, then sysno is got from there
2136 NOTE: Now returns 0 at success and updates sysno, which is an int*
2137   20-jun-2003 Heikki
2138 */
2139
2140 int zebra_add_record(ZebraHandle zh,
2141                      const char *buf, int buf_size)
2142 {
2143     return zebra_update_record(zh, 0, 0 /* sysno */, 0, 0, buf, buf_size, 0);
2144 }
2145
2146 ZEBRA_RES zebra_insert_record (ZebraHandle zh, 
2147                                const char *recordType,
2148                                SYSNO *sysno, const char *match,
2149                                const char *fname,
2150                                const char *buf, int buf_size, int force_update)
2151 {
2152     ZEBRA_RES res;
2153     ASSERTZH;
2154     assert(sysno);
2155     assert(buf);
2156     yaz_log(log_level, "zebra_insert_record sysno=" ZINT_FORMAT, *sysno);
2157
2158     if (buf_size < 1)
2159         buf_size = strlen(buf);
2160
2161     if (zebra_begin_trans(zh, 1) == ZEBRA_FAIL)
2162         return ZEBRA_FAIL;
2163     res = buffer_extract_record (zh, buf, buf_size, 
2164                                  0, /* delete_flag  */
2165                                  0, /* test_mode */
2166                                  recordType,
2167                                  sysno,   
2168                                  match, fname,
2169                                  0, 
2170                                  0); /* allow_update */
2171     zebra_end_trans(zh); 
2172     return res; 
2173 }
2174
2175 ZEBRA_RES zebra_update_record (ZebraHandle zh, 
2176                                const char *recordType,
2177                                SYSNO* sysno, const char *match,
2178                                const char *fname,
2179                                const char *buf, int buf_size,
2180                                int force_update)
2181 {
2182     ZEBRA_RES res;
2183     ASSERTZH;
2184     assert(buf);
2185
2186     yaz_log(log_level, "zebra_update_record");
2187     if (sysno)
2188         yaz_log(log_level, " sysno=" ZINT_FORMAT, *sysno);
2189
2190     if (buf_size < 1) buf_size = strlen(buf);
2191
2192     if (zebra_begin_trans(zh, 1) == ZEBRA_FAIL)
2193         return ZEBRA_FAIL;
2194     res = buffer_extract_record (zh, buf, buf_size, 
2195                                  0, /* delete_flag */
2196                                  0, /* test_mode */
2197                                  recordType,
2198                                  sysno,   
2199                                  match, fname,
2200                                  force_update, 
2201                                  1); /* allow_update */
2202     zebra_end_trans(zh); 
2203     return res; 
2204 }
2205
2206 ZEBRA_RES zebra_delete_record (ZebraHandle zh, 
2207                                const char *recordType,
2208                                SYSNO *sysno, const char *match,
2209                                const char *fname,
2210                                const char *buf, int buf_size,
2211                                int force_update) 
2212 {
2213     ZEBRA_RES res;
2214     ASSERTZH;
2215     assert(sysno);
2216     assert(buf);
2217     yaz_log(log_level, "zebra_delete_record sysno=" ZINT_FORMAT, *sysno);
2218
2219     if (buf_size < 1) buf_size = strlen(buf);
2220
2221     if (zebra_begin_trans(zh, 1) == ZEBRA_FAIL)
2222         return ZEBRA_FAIL;
2223     res = buffer_extract_record (zh, buf, buf_size,
2224                                  1, /* delete_flag */
2225                                  0, /* test_mode */
2226                                  recordType,
2227                                  sysno,
2228                                  match,fname,
2229                                  force_update,
2230                                  1); /* allow_update */
2231     zebra_end_trans(zh);
2232     return res;
2233 }
2234
2235 /* ---------------------------------------------------------------------------
2236   Searching 
2237 */
2238
2239 ZEBRA_RES zebra_search_PQF(ZebraHandle zh, const char *pqf_query,
2240                            const char *setname, zint *hits)
2241 {
2242     zint lhits = 0;
2243     ZEBRA_RES res = ZEBRA_OK;
2244     Z_RPNQuery *query;
2245     ODR odr = odr_createmem(ODR_ENCODE);
2246     ASSERTZH;
2247     assert(pqf_query);
2248     assert(setname);
2249
2250     yaz_log(log_level, "zebra_search_PQF s=%s q=%s", setname, pqf_query);
2251     
2252     query = p_query_rpn (odr, PROTO_Z3950, pqf_query);
2253     
2254     if (!query)
2255     {
2256         yaz_log (YLOG_WARN, "bad query %s\n", pqf_query);
2257         zh->errCode = YAZ_BIB1_MALFORMED_QUERY;
2258         res = ZEBRA_FAIL;
2259     }
2260     else
2261         res = zebra_search_RPN(zh, odr, query, setname, &lhits);
2262     
2263     odr_destroy(odr);
2264
2265     yaz_log(log_level, "Hits: " ZINT_FORMAT, lhits);
2266
2267     if (hits)
2268         *hits = lhits;
2269
2270     return res;
2271 }
2272
2273 /* ---------------------------------------------------------------------------
2274   Sort - a simplified interface, with optional read locks.
2275 */
2276 int zebra_sort_by_specstr (ZebraHandle zh, ODR stream,
2277                            const char *sort_spec,
2278                            const char *output_setname,
2279                            const char **input_setnames) 
2280 {
2281     int num_input_setnames = 0;
2282     int sort_status = 0;
2283     Z_SortKeySpecList *sort_sequence;
2284     ASSERTZH;
2285     assert(stream);
2286     assert(sort_spec);
2287     assert(output_setname);
2288     assert(input_setnames);
2289     sort_sequence = yaz_sort_spec (stream, sort_spec);
2290     yaz_log(log_level, "sort (FIXME) ");
2291     if (!sort_sequence)
2292     {
2293         yaz_log(YLOG_WARN, "invalid sort specs '%s'", sort_spec);
2294         zh->errCode = YAZ_BIB1_CANNOT_SORT_ACCORDING_TO_SEQUENCE;
2295         return -1;
2296     }
2297     
2298     /* we can do this, since the perl typemap code for char** will 
2299        put a NULL at the end of list */
2300     while (input_setnames[num_input_setnames]) num_input_setnames++;
2301
2302     if (zebra_begin_read (zh))
2303         return -1;
2304     
2305     resultSetSort (zh, stream->mem, num_input_setnames, input_setnames,
2306                    output_setname, sort_sequence, &sort_status);
2307     
2308     zebra_end_read(zh);
2309     return sort_status;
2310 }
2311
2312 /* ---------------------------------------------------------------------------
2313   Get BFS for Zebra system (to make alternative storage methods)
2314 */
2315 struct BFiles_struct *zebra_get_bfs(ZebraHandle zh)
2316 {
2317     if (zh && zh->reg)
2318         return zh->reg->bfs;
2319     return 0;
2320 }
2321
2322
2323 /* ---------------------------------------------------------------------------
2324   Set limit for search/scan
2325 */
2326 ZEBRA_RES zebra_set_limit(ZebraHandle zh, int complement_flag, zint *ids)
2327 {
2328     ASSERTZH;
2329     zebra_limit_destroy(zh->m_limit);
2330     zh->m_limit = zebra_limit_create(complement_flag, ids);
2331     return ZEBRA_OK;
2332 }
2333
2334 /*
2335   Set Error code + addinfo
2336 */
2337 void zebra_setError(ZebraHandle zh, int code, const char *addinfo)
2338 {
2339     zh->errCode = code;
2340     nmem_reset(zh->nmem_error);
2341     zh->errString = addinfo ? nmem_strdup(zh->nmem_error, addinfo) : 0;
2342 }
2343
2344 void zebra_setError_zint(ZebraHandle zh, int code, zint i)
2345 {
2346     char vstr[60];
2347     sprintf(vstr, ZINT_FORMAT, i);
2348
2349     zh->errCode = code;
2350     nmem_reset(zh->nmem_error);
2351     zh->errString = nmem_strdup(zh->nmem_error, vstr);
2352 }
2353