Fixes for hit estimates. Added zebra_set_approx_limit.
[idzebra-moved-to-github.git] / index / zebraapi.c
1 /* $Id: zebraapi.c,v 1.175 2005-06-09 10:39:53 adam Exp $
2    Copyright (C) 1995-2005
3    Index Data ApS
4
5 This file is part of the Zebra server.
6
7 Zebra is free software; you can redistribute it and/or modify it under
8 the terms of the GNU General Public License as published by the Free
9 Software Foundation; either version 2, or (at your option) any later
10 version.
11
12 Zebra is distributed in the hope that it will be useful, but WITHOUT ANY
13 WARRANTY; without even the implied warranty of MERCHANTABILITY or
14 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
15 for more details.
16
17 You should have received a copy of the GNU General Public License
18 along with Zebra; see the file LICENSE.zebra.  If not, write to the
19 Free Software Foundation, 59 Temple Place - Suite 330, Boston, MA
20 02111-1307, USA.
21 */
22
23 #include <assert.h>
24 #include <stdio.h>
25 #include <limits.h>
26 #ifdef WIN32
27 #include <io.h>
28 #include <process.h>
29 #include <direct.h>
30 #else
31 #include <unistd.h>
32 #endif
33
34 #include <yaz/diagbib1.h>
35 #include <yaz/pquery.h>
36 #include <yaz/sortspec.h>
37 #include "index.h"
38 #include <charmap.h>
39 #include <idzebra/api.h>
40
41 /* simple asserts to validate the most essential input args */
42 #define ASSERTZH assert(zh && zh->service)
43 #define ASSERTZHRES assert(zh && zh->service && zh->res)
44 #define ASSERTZS assert(zs)
45
46 static int log_level = 0;
47 static int log_level_initialized = 0;
48
49 static Res zebra_open_res (ZebraHandle zh);
50 static void zebra_close_res (ZebraHandle zh);
51
52 static void zebra_chdir (ZebraService zs)
53 {
54     const char *dir ;
55     ASSERTZS;
56     yaz_log(log_level, "zebra_chdir");
57     dir = res_get (zs->global_res, "chdir");
58     if (!dir)
59         return;
60     yaz_log (YLOG_DEBUG, "chdir %s", dir);
61 #ifdef WIN32
62     _chdir(dir);
63 #else
64     chdir (dir);
65 #endif
66 }
67
68 static void zebra_flush_reg (ZebraHandle zh)
69 {
70     ASSERTZH;
71     yaz_log(log_level, "zebra_flush_reg");
72     zebra_clearError(zh);
73     zebraExplain_flush (zh->reg->zei, zh);
74     
75     extract_flushWriteKeys (zh, 1 /* final */);
76     zebra_index_merge (zh );
77 }
78
79 static struct zebra_register *zebra_register_open (ZebraService zs, 
80                                                    const char *name,
81                                                    int rw, int useshadow,
82                                                    Res res,
83                                                    const char *reg_path);
84 static void zebra_register_close (ZebraService zs, struct zebra_register *reg);
85
86 ZebraHandle zebra_open (ZebraService zs)
87 {
88     ZebraHandle zh;
89     const char *default_encoding;
90     if (!log_level_initialized)
91     {
92         log_level = yaz_log_module_level("zebraapi");
93         log_level_initialized = 1;
94     }
95
96     yaz_log(log_level, "zebra_open");
97
98     if (!zs)
99         return 0;
100
101     zh = (ZebraHandle) xmalloc(sizeof(*zh));
102     yaz_log (YLOG_DEBUG, "zebra_open zs=%p returns %p", zs, zh);
103
104     zh->service = zs;
105     zh->reg = 0;          /* no register attached yet */
106     zh->sets = 0;
107     zh->destroyed = 0;
108     zh->errCode = 0;
109     zh->errString = 0;
110     zh->res = 0; 
111     zh->user_perm = 0;
112
113     zh->reg_name = xstrdup ("");
114     zh->path_reg = 0;
115     zh->num_basenames = 0;
116     zh->basenames = 0;
117
118     zh->approx_limit = 1000000000;
119     zh->trans_no = 0;
120     zh->trans_w_no = 0;
121
122     zh->lock_normal = 0;
123     zh->lock_shadow = 0;
124
125     zh->shadow_enable = 1;
126
127     default_encoding = res_get_def(zs->global_res, "encoding", "ISO-8859-1");
128
129     zh->iconv_to_utf8 =
130         yaz_iconv_open ("UTF-8", default_encoding);
131     if (zh->iconv_to_utf8 == 0)
132         yaz_log (YLOG_WARN, "iconv: %s to UTF-8 unsupported",
133            default_encoding);
134     zh->iconv_from_utf8 =
135         yaz_iconv_open (default_encoding, "UTF-8");
136     if (zh->iconv_to_utf8 == 0)
137         yaz_log (YLOG_WARN, "iconv: UTF-8 to %s unsupported",
138            default_encoding);
139
140     zh->record_encoding = 0;
141
142     zebra_mutex_cond_lock (&zs->session_lock);
143
144     zh->next = zs->sessions;
145     zs->sessions = zh;
146
147     zebra_mutex_cond_unlock (&zs->session_lock);
148
149     zh->store_data_buf = 0;
150
151     zh->m_limit = zebra_limit_create(1, 0);
152
153     zh->nmem_error = nmem_create();
154
155     return zh;
156 }
157
158 ZebraService zebra_start (const char *configName)
159 {
160     return zebra_start_res(configName, 0, 0);
161 }
162
163 ZebraService zebra_start_res (const char *configName, Res def_res, Res over_res)
164 {
165     Res res;
166
167     if (!log_level_initialized)
168     {
169         log_level = yaz_log_module_level("zebraapi");
170         log_level_initialized = 1;
171     }
172
173     yaz_log(YLOG_LOG, "zebra_start %s %s",configName, ZEBRAVER);
174     assert(configName);
175
176     if ((res = res_open (configName, def_res, over_res)))
177     {
178         const char *passwd_plain = 0;
179         const char *passwd_encrypt = 0;
180         ZebraService zh = xmalloc(sizeof(*zh));
181
182         yaz_log (YLOG_DEBUG, "Read resources `%s'", configName);
183         
184         zh->global_res = res;
185         zh->configName = xstrdup(configName);
186         zh->sessions = 0;
187         
188         zebra_chdir (zh);
189         
190         zebra_mutex_cond_init (&zh->session_lock);
191         passwd_plain = res_get (zh->global_res, "passwd");
192         passwd_encrypt = res_get (zh->global_res, "passwd.c");
193
194         if (!passwd_plain && !passwd_encrypt)
195             zh->passwd_db = NULL;
196         else
197         {
198             zh->passwd_db = passwd_db_open();
199             if (!zh->passwd_db)
200                 yaz_log (YLOG_WARN|YLOG_ERRNO, "passwd_db_open failed");
201             else
202             {
203                 if (passwd_plain)
204                     passwd_db_file_plain(zh->passwd_db, passwd_plain);
205                 if (passwd_encrypt)
206                     passwd_db_file_crypt(zh->passwd_db, passwd_encrypt);
207             }
208         }
209         zh->path_root = res_get (zh->global_res, "root");
210         zh->nmem = nmem_create();
211         zh->record_classes = recTypeClass_create (zh->global_res, zh->nmem);
212         return zh;
213     }
214     return 0;
215 }
216
217 void zebra_filter_info(ZebraService zs, void *cd,
218                         void (*cb)(void *cd, const char *name))
219 {
220     ASSERTZS;
221     assert(cb);
222     recTypeClass_info(zs->record_classes, cd, cb);
223 }
224
225 void zebra_pidfname(ZebraService zs, char *path)
226 {
227     ASSERTZS;
228     zebra_lock_prefix (zs->global_res, path);
229     strcat(path, "zebrasrv.pid");
230 }
231
232 Dict dict_open_res (BFiles bfs, const char *name, int cache, int rw,
233                     int compact_flag, Res res)
234 {
235     int page_size = 4096;
236     char resource_str[200];
237     const char *v;
238     sprintf (resource_str, "dict.%.100s.pagesize", name);
239     assert(bfs);
240     assert(name);
241
242     v = res_get(res, resource_str);
243     if (v)
244     {
245         page_size = atoi(v);
246         yaz_log(YLOG_LOG, "Using custom dictionary page size %d for %s",
247                 page_size, name);
248     }
249     return dict_open(bfs, name, cache, rw, compact_flag, page_size);
250 }
251
252
253 static
254 struct zebra_register *zebra_register_open (ZebraService zs, const char *name,
255                                             int rw, int useshadow, Res res,
256                                             const char *reg_path)
257 {
258     struct zebra_register *reg;
259     int record_compression = REC_COMPRESS_NONE;
260     const char *recordCompression = 0;
261     const char *profilePath;
262     char cwd[1024];
263
264     ASSERTZS;
265     
266     reg = xmalloc(sizeof(*reg));
267
268     assert (name);
269     reg->name = xstrdup (name);
270
271     reg->seqno = 0;
272     reg->last_val = 0;
273
274     assert (res);
275
276     yaz_log (YLOG_DEBUG, "zebra_register_open rw=%d useshadow=%d p=%p n=%s rp=%s",
277              rw, useshadow, reg, name, reg_path ? reg_path : "(none)");
278     
279     reg->dh = data1_createx (DATA1_FLAG_XML);
280     if (!reg->dh)
281     {
282         xfree(reg);
283         return 0;
284     }
285     reg->bfs = bfs_create (res_get (res, "register"), reg_path);
286     if (!reg->bfs)
287     {
288         data1_destroy(reg->dh);
289         xfree(reg);
290         return 0;
291     }
292     if (useshadow)
293     {
294         if (bf_cache (reg->bfs, res_get (res, "shadow")) == ZEBRA_FAIL)
295         {
296             bfs_destroy(reg->bfs);
297             data1_destroy(reg->dh);
298             xfree(reg);
299             return 0;
300         }
301     }
302
303     getcwd(cwd, sizeof(cwd)-1);
304     profilePath = res_get_def(res, "profilePath", DEFAULT_PROFILE_PATH);
305     yaz_log(YLOG_DEBUG, "profilePath=%s cwd=%s", profilePath, cwd);
306
307     data1_set_tabpath (reg->dh, profilePath);
308     data1_set_tabroot (reg->dh, reg_path);
309     reg->recTypes = recTypes_init (zs->record_classes, reg->dh);
310
311     reg->zebra_maps = zebra_maps_open (res, reg_path);
312     reg->rank_classes = NULL;
313
314     reg->key_buf = 0;
315
316     reg->keys.buf_max = 0;
317     reg->keys.buf = 0;
318     reg->keys.codec_handle = iscz1_start();
319
320     reg->sortKeys.buf = 0;
321     reg->sortKeys.buf_max = 0;
322
323     reg->records = 0;
324     reg->dict = 0;
325     reg->sortIdx = 0;
326     reg->isams = 0;
327     reg->matchDict = 0;
328     reg->isamc = 0;
329     reg->isamb = 0;
330     reg->zei = 0;
331     reg->matchDict = 0;
332     reg->key_file_no = 0;
333     reg->ptr_i = 0;
334     
335     zebraRankInstall (reg, rank1_class);
336     zebraRankInstall (reg, rankzv_class);
337
338     recordCompression = res_get_def (res, "recordCompression", "none");
339     if (!strcmp (recordCompression, "none"))
340         record_compression = REC_COMPRESS_NONE;
341     if (!strcmp (recordCompression, "bzip2"))
342         record_compression = REC_COMPRESS_BZIP2;
343
344     if (!(reg->records = rec_open (reg->bfs, rw, record_compression)))
345     {
346         yaz_log (YLOG_WARN, "rec_open failed");
347         return 0;
348     }
349     if (rw)
350     {
351         reg->matchDict = dict_open_res (reg->bfs, GMATCH_DICT, 20, 1, 0, res);
352     }
353     if (!(reg->dict = dict_open_res (reg->bfs, FNAME_DICT, 40, rw, 0, res)))
354     {
355         yaz_log (YLOG_WARN, "dict_open failed");
356         return 0;
357     }
358     if (!(reg->sortIdx = sortIdx_open (reg->bfs, rw)))
359     {
360         yaz_log (YLOG_WARN, "sortIdx_open failed");
361         return 0;
362     }
363     if (res_get_match (res, "isam", "s", ISAM_DEFAULT))
364     {
365         struct ISAMS_M_s isams_m;
366         if (!(reg->isams = isams_open (reg->bfs, FNAME_ISAMS, rw,
367                                       key_isams_m(res, &isams_m))))
368         {
369             yaz_log (YLOG_WARN, "isams_open failed");
370             return 0;
371         }
372     }
373     if (res_get_match (res, "isam", "c", ISAM_DEFAULT))
374     {
375         struct ISAMC_M_s isamc_m;
376         if (!(reg->isamc = isamc_open (reg->bfs, FNAME_ISAMC,
377                                     rw, key_isamc_m(res, &isamc_m))))
378         {
379             yaz_log (YLOG_WARN, "isamc_open failed");
380             return 0;
381         }
382     }
383     if (res_get_match (res, "isam", "b", ISAM_DEFAULT))
384     {
385         struct ISAMC_M_s isamc_m;
386         
387         if (!(reg->isamb = isamb_open (reg->bfs, "isamb",
388                                        rw, key_isamc_m(res, &isamc_m), 0)))
389         {
390             yaz_log (YLOG_WARN, "isamb_open failed");
391             return 0;
392         }
393     }
394     if (res_get_match (res, "isam", "bc", ISAM_DEFAULT))
395     {
396         struct ISAMC_M_s isamc_m;
397         
398         if (!(reg->isamb = isamb_open (reg->bfs, "isamb",
399                                        rw, key_isamc_m(res, &isamc_m), 1)))
400         {
401             yaz_log (YLOG_WARN, "isamb_open failed");
402             return 0;
403         }
404     }
405     if (res_get_match (res, "isam", "null", ISAM_DEFAULT))
406     {
407         struct ISAMC_M_s isamc_m;
408         
409         if (!(reg->isamb = isamb_open (reg->bfs, "isamb",
410                                        rw, key_isamc_m(res, &isamc_m), -1)))
411         {
412             yaz_log (YLOG_WARN, "isamb_open failed");
413             return 0;
414         }
415     }
416     reg->zei = zebraExplain_open (reg->records, reg->dh,
417                                   res, rw, reg,
418                                   explain_extract);
419     if (!reg->zei)
420     {
421         yaz_log (YLOG_WARN, "Cannot obtain EXPLAIN information");
422         return 0;
423     }
424     reg->active = 2;
425     yaz_log (YLOG_DEBUG, "zebra_register_open ok p=%p", reg);
426     return reg;
427 }
428
429 ZEBRA_RES zebra_admin_shutdown (ZebraHandle zh)
430 {
431     ASSERTZH;
432     yaz_log(log_level, "zebra_admin_shutdown");
433     zebra_clearError(zh);
434
435     zebra_mutex_cond_lock (&zh->service->session_lock);
436     zh->service->stop_flag = 1;
437     zebra_mutex_cond_unlock (&zh->service->session_lock);
438     return ZEBRA_OK;
439 }
440
441 ZEBRA_RES zebra_admin_start (ZebraHandle zh)
442 {
443     ZebraService zs;
444     ASSERTZH;
445     yaz_log(log_level, "zebra_admin_start");
446     zebra_clearError(zh);
447     zs = zh->service;
448     zebra_mutex_cond_lock (&zs->session_lock);
449     zebra_mutex_cond_unlock (&zs->session_lock);
450     return ZEBRA_OK;
451 }
452
453 static void zebra_register_close (ZebraService zs, struct zebra_register *reg)
454 {
455     ASSERTZS;
456     assert(reg);
457     yaz_log(YLOG_DEBUG, "zebra_register_close p=%p", reg);
458     reg->stop_flag = 0;
459     zebra_chdir (zs);
460     if (reg->records)
461     {
462         zebraExplain_close (reg->zei);
463         dict_close (reg->dict);
464         if (reg->matchDict)
465             dict_close (reg->matchDict);
466         sortIdx_close (reg->sortIdx);
467         if (reg->isams)
468             isams_close (reg->isams);
469         if (reg->isamc)
470             isamc_close (reg->isamc);
471         if (reg->isamb)
472             isamb_close (reg->isamb);
473         rec_close (&reg->records);
474     }
475
476     recTypes_destroy (reg->recTypes);
477     zebra_maps_close (reg->zebra_maps);
478     zebraRankDestroy (reg);
479     bfs_destroy (reg->bfs);
480     data1_destroy (reg->dh);
481
482     xfree(reg->sortKeys.buf);
483     xfree(reg->keys.buf);
484     if (reg->keys.codec_handle)
485         iscz1_stop(reg->keys.codec_handle);
486     xfree(reg->key_buf);
487     xfree(reg->name);
488     xfree(reg);
489 }
490
491 ZEBRA_RES zebra_stop(ZebraService zs)
492 {
493     if (!zs)
494         return ZEBRA_OK;
495     yaz_log (log_level, "zebra_stop");
496
497     while (zs->sessions)
498     {
499         zebra_close (zs->sessions);
500     }
501         
502     zebra_mutex_cond_destroy (&zs->session_lock);
503
504     if (zs->passwd_db)
505         passwd_db_close (zs->passwd_db);
506
507     recTypeClass_destroy(zs->record_classes);
508     nmem_destroy(zs->nmem);
509     res_close (zs->global_res);
510     xfree(zs->configName);
511     xfree(zs);
512     return ZEBRA_OK;
513 }
514
515 ZEBRA_RES zebra_close (ZebraHandle zh)
516 {
517     ZebraService zs;
518     struct zebra_session **sp;
519     int i;
520
521     yaz_log(log_level, "zebra_close");
522     if (!zh)
523         return ZEBRA_OK;
524     ASSERTZH;
525     zh->errCode = 0;
526     
527     zs = zh->service;
528     yaz_log (YLOG_DEBUG, "zebra_close zh=%p", zh);
529     resultSetDestroy (zh, -1, 0, 0);
530
531     if (zh->reg)
532         zebra_register_close (zh->service, zh->reg);
533     zebra_close_res (zh);
534
535     xfree(zh->record_encoding);
536
537     for (i = 0; i < zh->num_basenames; i++)
538         xfree(zh->basenames[i]);
539     xfree(zh->basenames);
540
541     if (zh->iconv_to_utf8 != 0)
542         yaz_iconv_close (zh->iconv_to_utf8);
543     if (zh->iconv_from_utf8 != 0)
544         yaz_iconv_close (zh->iconv_from_utf8);
545
546     zebra_mutex_cond_lock (&zs->session_lock);
547     zebra_lock_destroy (zh->lock_normal);
548     zebra_lock_destroy (zh->lock_shadow);
549     sp = &zs->sessions;
550     while (1)
551     {
552         assert (*sp);
553         if (*sp == zh)
554         {
555             *sp = (*sp)->next;
556             break;
557         }
558         sp = &(*sp)->next;
559     }
560     zebra_mutex_cond_unlock (&zs->session_lock);
561     xfree(zh->reg_name);
562     xfree(zh->user_perm);
563     zh->service = 0; /* more likely to trigger an assert */
564
565     zebra_limit_destroy(zh->m_limit);
566
567     nmem_destroy(zh->nmem_error);
568
569     xfree(zh->path_reg);
570     xfree(zh);
571     return ZEBRA_OK;
572 }
573
574 struct map_baseinfo {
575     ZebraHandle zh;
576     NMEM mem;
577     int num_bases;
578     char **basenames;
579     int new_num_bases;
580     char **new_basenames;
581     int new_num_max;
582 };
583
584 static Res zebra_open_res (ZebraHandle zh)
585 {
586     Res res = 0;
587     char fname[512];
588     ASSERTZH;
589     zh->errCode = 0;
590
591     if (zh->path_reg)
592     {
593         sprintf (fname, "%.200s/zebra.cfg", zh->path_reg);
594         res = res_open (fname, zh->service->global_res, 0);
595         if (!res)
596             res = zh->service->global_res;
597     }
598     else if (*zh->reg_name == 0)
599     {
600         res = zh->service->global_res;
601     }
602     else
603     {
604         yaz_log (YLOG_WARN, "no register root specified");
605         return 0;  /* no path for register - fail! */
606     }
607     return res;
608 }
609
610 static void zebra_close_res (ZebraHandle zh)
611 {
612     ASSERTZH;
613     zh->errCode = 0;
614     if (zh->res != zh->service->global_res)
615         res_close (zh->res);
616     zh->res = 0;
617 }
618
619 static void zebra_select_register (ZebraHandle zh, const char *new_reg)
620 {
621     ASSERTZH;
622     zh->errCode = 0;
623     if (zh->res && strcmp (zh->reg_name, new_reg) == 0)
624         return;
625     if (!zh->res)
626     {
627         assert (zh->reg == 0);
628         assert (*zh->reg_name == 0);
629     }
630     else
631     {
632         if (zh->reg)
633         {
634             resultSetInvalidate (zh);
635             zebra_register_close (zh->service, zh->reg);
636             zh->reg = 0;
637         }
638         zebra_close_res(zh);
639     }
640     xfree(zh->reg_name);
641     zh->reg_name = xstrdup (new_reg);
642
643     xfree(zh->path_reg);
644     zh->path_reg = 0;
645     if (zh->service->path_root)
646     {
647         zh->path_reg = xmalloc(strlen(zh->service->path_root) + 
648                                 strlen(zh->reg_name) + 3);
649         strcpy (zh->path_reg, zh->service->path_root);
650         if (*zh->reg_name)
651         {
652             strcat (zh->path_reg, "/");
653             strcat (zh->path_reg, zh->reg_name);
654         }
655     }
656     zh->res = zebra_open_res (zh);
657     
658     if (zh->lock_normal)
659         zebra_lock_destroy (zh->lock_normal);
660     zh->lock_normal = 0;
661
662     if (zh->lock_shadow)
663         zebra_lock_destroy (zh->lock_shadow);
664     zh->lock_shadow = 0;
665
666     if (zh->res)
667     {
668         char fname[512];
669         const char *lock_area  =res_get (zh->res, "lockDir");
670         
671         if (!lock_area && zh->path_reg)
672             res_set (zh->res, "lockDir", zh->path_reg);
673         sprintf (fname, "norm.%s.LCK", zh->reg_name);
674         zh->lock_normal =
675             zebra_lock_create (res_get(zh->res, "lockDir"), fname, 0);
676         
677         sprintf (fname, "shadow.%s.LCK", zh->reg_name);
678         zh->lock_shadow =
679             zebra_lock_create (res_get(zh->res, "lockDir"), fname, 0);
680
681         if (!zh->lock_normal || !zh->lock_shadow)
682         {
683             if (zh->lock_normal)
684             {
685                 zebra_lock_destroy(zh->lock_normal);
686                 zh->lock_normal = 0;
687             }
688             if (zh->lock_shadow)
689             {
690                 zebra_lock_destroy(zh->lock_shadow);
691                 zh->lock_shadow = 0;
692             }
693             zebra_close_res(zh);
694         }
695     }
696 }
697
698 void map_basenames_func (void *vp, const char *name, const char *value)
699 {
700     struct map_baseinfo *p = (struct map_baseinfo *) vp;
701     int i, no;
702     char fromdb[128], todb[8][128];
703
704     assert(value);
705     assert(name);
706     assert(vp);
707     
708     no =
709         sscanf (value, "%127s %127s %127s %127s %127s %127s %127s %127s %127s",
710                 fromdb, todb[0], todb[1], todb[2], todb[3], todb[4],
711                 todb[5], todb[6], todb[7]);
712     if (no < 2)
713         return ;
714     no--;
715     for (i = 0; i<p->num_bases; i++)
716         if (p->basenames[i] && !STRCASECMP (p->basenames[i], fromdb))
717         {
718             p->basenames[i] = 0;
719             for (i = 0; i < no; i++)
720             {
721                 if (p->new_num_bases == p->new_num_max)
722                     return;
723                 p->new_basenames[(p->new_num_bases)++] = 
724                     nmem_strdup (p->mem, todb[i]);
725             }
726             return;
727         }
728 }
729
730 int zebra_select_default_database(ZebraHandle zh)
731 {
732     if (!zh->res)
733     {
734         /* no database has been selected - so we select based on
735            resource setting (including group)
736         */
737         const char *group = res_get(zh->service->global_res, "group");
738         const char *v = res_get_prefix(zh->service->global_res,
739                                        "database", group, "Default");
740         return zebra_select_database(zh, v);
741     }
742     return 0;
743 }
744
745 void map_basenames (ZebraHandle zh, ODR stream,
746                     int *num_bases, char ***basenames)
747 {
748     struct map_baseinfo info;
749     struct map_baseinfo *p = &info;
750     int i;
751     ASSERTZH;
752     yaz_log(log_level, "map_basenames ");
753     assert(stream);
754
755     info.zh = zh;
756
757     info.num_bases = *num_bases;
758     info.basenames = *basenames;
759     info.new_num_max = 128;
760     info.new_num_bases = 0;
761     info.new_basenames = (char **)
762         odr_malloc (stream, sizeof(*info.new_basenames) * info.new_num_max);
763     info.mem = stream->mem;
764
765     res_trav (zh->service->global_res, "mapdb", &info, map_basenames_func);
766     
767     for (i = 0; i<p->num_bases; i++)
768         if (p->basenames[i] && p->new_num_bases < p->new_num_max)
769         {
770             p->new_basenames[(p->new_num_bases)++] = 
771                 nmem_strdup (p->mem, p->basenames[i]);
772         }
773     *num_bases = info.new_num_bases;
774     *basenames = info.new_basenames;
775     for (i = 0; i<*num_bases; i++)
776         yaz_log (YLOG_DEBUG, "base %s", (*basenames)[i]);
777 }
778
779 ZEBRA_RES zebra_select_database (ZebraHandle zh, const char *basename)
780 {
781     ASSERTZH;
782     yaz_log(log_level, "zebra_select_database %s",basename);
783     assert(basename);
784     return zebra_select_databases (zh, 1, &basename);
785 }
786
787 ZEBRA_RES zebra_select_databases (ZebraHandle zh, int num_bases,
788                                   const char **basenames)
789 {
790     int i;
791     const char *cp;
792     int len = 0;
793     char *new_reg = 0;
794     ASSERTZH;
795     assert(basenames);
796
797     yaz_log(log_level, "zebra_select_databases n=%d [0]=%s",
798                     num_bases,basenames[0]);
799     zh->errCode = 0;
800     
801     if (num_bases < 1)
802     {
803         zh->errCode = YAZ_BIB1_COMBI_OF_SPECIFIED_DATABASES_UNSUPP;
804         return ZEBRA_FAIL;
805     }
806     for (i = 0; i < zh->num_basenames; i++)
807         xfree(zh->basenames[i]);
808     xfree(zh->basenames);
809     
810     zh->num_basenames = num_bases;
811     zh->basenames = xmalloc(zh->num_basenames * sizeof(*zh->basenames));
812     for (i = 0; i < zh->num_basenames; i++)
813         zh->basenames[i] = xstrdup (basenames[i]);
814
815     cp = strrchr(basenames[0], '/');
816     if (cp)
817     {
818         len = cp - basenames[0];
819         new_reg = xmalloc(len + 1);
820         memcpy (new_reg, basenames[0], len);
821         new_reg[len] = '\0';
822     }
823     else
824         new_reg = xstrdup ("");
825     for (i = 1; i<num_bases; i++)
826     {
827         const char *cp1;
828
829         cp1 = strrchr (basenames[i], '/');
830         if (cp)
831         {
832             if (!cp1)
833             {
834                 zh->errCode = YAZ_BIB1_COMBI_OF_SPECIFIED_DATABASES_UNSUPP;
835                 return -1;
836             }
837             if (len != cp1 - basenames[i] ||
838                 memcmp (basenames[i], new_reg, len))
839             {
840                 zh->errCode = YAZ_BIB1_COMBI_OF_SPECIFIED_DATABASES_UNSUPP;
841                 return -1;
842             }
843         }
844         else
845         {
846             if (cp1)
847             {
848                 zh->errCode = YAZ_BIB1_COMBI_OF_SPECIFIED_DATABASES_UNSUPP;
849                 return ZEBRA_FAIL;
850             }
851         }
852     }
853     zebra_select_register (zh, new_reg);
854     xfree(new_reg);
855     if (!zh->res)
856     {
857         zh->errCode = YAZ_BIB1_DATABASE_UNAVAILABLE;
858         return ZEBRA_FAIL;
859     }
860     if (!zh->lock_normal || !zh->lock_shadow)
861     {
862         zh->errCode = YAZ_BIB1_TEMPORARY_SYSTEM_ERROR;
863         return ZEBRA_FAIL;
864     }
865     return ZEBRA_OK;
866 }
867
868 ZEBRA_RES zebra_set_approx_limit(ZebraHandle zh, zint approx_limit)
869 {
870     zh->approx_limit = approx_limit;
871     return ZEBRA_OK;
872 }
873
874 ZEBRA_RES zebra_search_RPN(ZebraHandle zh, ODR o, Z_RPNQuery *query,
875                            const char *setname, zint *hits)
876 {
877     ZEBRA_RES r;
878     ASSERTZH;
879     assert(o);
880     assert(query);
881     assert(hits);
882     assert(setname);
883     yaz_log(log_level, "zebra_search_rpn");
884     zebra_clearError(zh);
885     zh->hits = 0;
886     *hits = 0;
887
888     if (zebra_begin_read(zh) == ZEBRA_FAIL)
889         return ZEBRA_FAIL;
890
891     r = resultSetAddRPN(zh, odr_extract_mem(o), query, 
892                         zh->num_basenames, zh->basenames, setname);
893     zebra_end_read(zh);
894     *hits = zh->hits;
895     return r;
896 }
897
898 ZEBRA_RES zebra_records_retrieve(ZebraHandle zh, ODR stream,
899                                  const char *setname,
900                                  Z_RecordComposition *comp,
901                                  oid_value input_format, int num_recs,
902                                  ZebraRetrievalRecord *recs)
903 {
904     ZebraMetaRecord *poset;
905     int i;
906     ZEBRA_RES ret = ZEBRA_OK;
907     zint *pos_array;
908     ASSERTZH;
909     assert(stream);
910     assert(setname);
911     assert(recs);
912     assert(num_recs>0);
913
914     yaz_log(log_level, "zebra_records_retrieve n=%d", num_recs);
915
916     zebra_clearError(zh);
917     if (!zh->res)
918     {
919         zebra_setError(zh, YAZ_BIB1_SPECIFIED_RESULT_SET_DOES_NOT_EXIST,
920                        setname);
921         return ZEBRA_FAIL;
922     }
923     
924     if (zebra_begin_read (zh) == ZEBRA_FAIL)
925         return ZEBRA_FAIL;
926
927     pos_array = (zint *) xmalloc(num_recs * sizeof(*pos_array));
928     for (i = 0; i<num_recs; i++)
929         pos_array[i] = recs[i].position;
930     poset = zebra_meta_records_create(zh, setname, num_recs, pos_array);
931     if (!poset)
932     {
933         yaz_log (YLOG_DEBUG, "zebraPosSetCreate error");
934         zebra_setError(zh, YAZ_BIB1_SPECIFIED_RESULT_SET_DOES_NOT_EXIST,
935                        setname);
936         ret = ZEBRA_FAIL;
937     }
938     else
939     {
940         for (i = 0; i<num_recs; i++)
941         {
942             if (poset[i].term)
943             {
944                 recs[i].errCode = 0;
945                 recs[i].format = VAL_SUTRS;
946                 recs[i].len = strlen(poset[i].term);
947                 recs[i].buf = poset[i].term;
948                 recs[i].base = poset[i].db;
949             }
950             else if (poset[i].sysno)
951             {
952                 char *buf;
953                 int len;
954                 zebra_snippets *hit_snippet = zebra_snippets_create();
955
956                 zebra_snippets_hit_vector(zh, setname, poset[i].sysno, 
957                                           hit_snippet);
958
959                 recs[i].errCode =
960                     zebra_record_fetch(zh, poset[i].sysno, poset[i].score,
961                                        hit_snippet,
962                                        stream, input_format, comp,
963                                        &recs[i].format, &buf, &len,
964                                        &recs[i].base, &recs[i].errString);
965                 
966                 recs[i].len = len;
967                 if (len > 0)
968                 {
969                     recs[i].buf = (char*) odr_malloc(stream, len);
970                     memcpy(recs[i].buf, buf, len);
971                 }
972                 else
973                     recs[i].buf = buf;
974                 recs[i].score = poset[i].score;
975                 recs[i].sysno = poset[i].sysno;
976                 zebra_snippets_destroy(hit_snippet);
977             }
978             else
979             {
980                 if (ret == ZEBRA_OK) /* only need to set it once */
981                     zebra_setError_zint(zh,
982                                         YAZ_BIB1_PRESENT_REQUEST_OUT_OF_RANGE,
983                                         pos_array[i]);
984                 ret = ZEBRA_FAIL;
985                 break;
986             }
987         }
988         zebra_meta_records_destroy(zh, poset, num_recs);
989     }
990     zebra_end_read (zh);
991     xfree(pos_array);
992     return ret;
993 }
994
995 ZEBRA_RES zebra_scan_PQF(ZebraHandle zh, ODR stream, const char *query,
996                          int *position,
997                          int *num_entries, ZebraScanEntry **entries,
998                          int *is_partial)
999 {
1000     YAZ_PQF_Parser pqf_parser = yaz_pqf_create ();
1001     Z_AttributesPlusTerm *zapt;
1002     int *attributeSet;
1003     ZEBRA_RES res;
1004     
1005     if (!(zapt = yaz_pqf_scan(pqf_parser, stream, &attributeSet, query)))
1006     {
1007         res = ZEBRA_FAIL;
1008         zh->errCode = YAZ_BIB1_SCAN_MALFORMED_SCAN;
1009     }
1010     else
1011         res = zebra_scan(zh, stream, zapt, VAL_BIB1,
1012                          position, num_entries, entries, is_partial);
1013     yaz_pqf_destroy (pqf_parser);
1014     return res;
1015 }
1016
1017 ZEBRA_RES zebra_scan(ZebraHandle zh, ODR stream, Z_AttributesPlusTerm *zapt,
1018                      oid_value attributeset,
1019                      int *position,
1020                      int *num_entries, ZebraScanEntry **entries,
1021                      int *is_partial)
1022 {
1023     ZEBRA_RES res;
1024     ASSERTZH;
1025     assert(stream);
1026     assert(zapt);
1027     assert(position);
1028     assert(num_entries);
1029     assert(is_partial);
1030     assert(entries);
1031     yaz_log(log_level, "zebra_scan");
1032     zebra_clearError(zh);
1033     if (zebra_begin_read (zh) == ZEBRA_FAIL)
1034     {
1035         *entries = 0;
1036         *num_entries = 0;
1037         return ZEBRA_FAIL;
1038     }
1039     res = rpn_scan (zh, stream, zapt, attributeset,
1040                     zh->num_basenames, zh->basenames, position,
1041                     num_entries, entries, is_partial, 0, 0);
1042     zebra_end_read (zh);
1043     return res;
1044 }
1045
1046 ZEBRA_RES zebra_sort (ZebraHandle zh, ODR stream,
1047                       int num_input_setnames, const char **input_setnames,
1048                       const char *output_setname,
1049                       Z_SortKeySpecList *sort_sequence,
1050                       int *sort_status)
1051 {
1052     ZEBRA_RES res;
1053     ASSERTZH;
1054     assert(stream);
1055     assert(num_input_setnames>0);
1056     assert(input_setnames);
1057     assert(sort_sequence);
1058     assert(sort_status);
1059     yaz_log(log_level, "zebra_sort");
1060     zebra_clearError(zh);
1061     if (zebra_begin_read(zh) == ZEBRA_FAIL)
1062         return ZEBRA_FAIL;
1063     res = resultSetSort(zh, stream->mem, num_input_setnames, input_setnames,
1064                         output_setname, sort_sequence, sort_status);
1065     zebra_end_read(zh);
1066     return res;
1067 }
1068
1069 int zebra_deleteResultSet(ZebraHandle zh, int function,
1070                           int num_setnames, char **setnames,
1071                           int *statuses)
1072 {
1073     int i, status;
1074     ASSERTZH;
1075     assert(statuses);
1076     yaz_log(log_level, "zebra_deleteResultSet n=%d",num_setnames);
1077     zebra_clearError(zh);;
1078     if (zebra_begin_read(zh))
1079         return Z_DeleteStatus_systemProblemAtTarget;
1080     switch (function)
1081     {
1082     case Z_DeleteResultSetRequest_list:
1083         assert(num_setnames>0);
1084         assert(setnames);
1085         resultSetDestroy (zh, num_setnames, setnames, statuses);
1086         break;
1087     case Z_DeleteResultSetRequest_all:
1088         resultSetDestroy (zh, -1, 0, statuses);
1089         break;
1090     }
1091     zebra_end_read (zh);
1092     status = Z_DeleteStatus_success;
1093     for (i = 0; i<num_setnames; i++)
1094         if (statuses[i] == Z_DeleteStatus_resultSetDidNotExist)
1095             status = statuses[i];
1096     return status;
1097 }
1098
1099 int zebra_errCode (ZebraHandle zh)
1100 {
1101     if (zh)
1102     {
1103         yaz_log(log_level, "zebra_errCode: %d",zh->errCode);
1104         return zh->errCode;
1105     }
1106     yaz_log(log_level, "zebra_errCode: o");
1107     return 0; 
1108 }
1109
1110 const char *zebra_errString (ZebraHandle zh)
1111 {
1112     const char *e = 0;
1113     if (zh)
1114         e= diagbib1_str (zh->errCode);
1115     yaz_log(log_level, "zebra_errString: %s",e);
1116     return e;
1117 }
1118
1119 char *zebra_errAdd (ZebraHandle zh)
1120 {
1121     char *a = 0;
1122     if (zh)
1123         a= zh->errString;
1124     yaz_log(log_level, "zebra_errAdd: %s",a);
1125     return a;
1126 }
1127
1128 void zebra_clearError(ZebraHandle zh)
1129 {
1130     if (zh)
1131     {
1132         zh->errCode = 0;
1133         zh->errString = 0;
1134     }
1135 }
1136
1137 ZEBRA_RES zebra_auth (ZebraHandle zh, const char *user, const char *pass)
1138 {
1139     const char *p;
1140     char u[40];
1141     ZebraService zs;
1142
1143     ASSERTZH;
1144     zebra_clearError(zh);
1145     zs= zh->service;
1146     
1147     sprintf(u, "perm.%.30s", user ? user : "anonymous");
1148     p = res_get(zs->global_res, u);
1149     xfree(zh->user_perm);
1150     zh->user_perm = xstrdup(p ? p : "r");
1151
1152     /* users that don't require a password .. */
1153     if (zh->user_perm && strchr(zh->user_perm, 'a'))
1154         return ZEBRA_OK;
1155     
1156     if (!zs->passwd_db || !passwd_db_auth (zs->passwd_db, user, pass))
1157         return ZEBRA_OK;
1158     return ZEBRA_FAIL;
1159 }
1160
1161 ZEBRA_RES zebra_admin_import_begin (ZebraHandle zh, const char *database,
1162                                const char *record_type)
1163 {
1164     ASSERTZH;
1165     yaz_log(log_level, "zebra_admin_import_begin db=%s rt=%s", 
1166                      database, record_type);
1167     zebra_clearError(zh);
1168     if (zebra_select_database(zh, database) == ZEBRA_FAIL)
1169         return ZEBRA_FAIL;
1170     return zebra_begin_trans(zh, 1);
1171 }
1172
1173 ZEBRA_RES zebra_admin_import_end (ZebraHandle zh)
1174 {
1175     ASSERTZH;
1176     yaz_log(log_level, "zebra_admin_import_end");
1177     zebra_clearError(zh);
1178     return zebra_end_trans(zh);
1179 }
1180
1181 ZEBRA_RES zebra_admin_import_segment (ZebraHandle zh, Z_Segment *segment)
1182 {
1183     ZEBRA_RES res = ZEBRA_OK;
1184     SYSNO sysno;
1185     int i;
1186     ASSERTZH;
1187     yaz_log(log_level, "zebra_admin_import_segment");
1188     zebra_clearError(zh);
1189     for (i = 0; i<segment->num_segmentRecords; i++)
1190     {
1191         Z_NamePlusRecord *npr = segment->segmentRecords[i];
1192
1193         if (npr->which == Z_NamePlusRecord_intermediateFragment)
1194         {
1195             Z_FragmentSyntax *fragment = npr->u.intermediateFragment;
1196             if (fragment->which == Z_FragmentSyntax_notExternallyTagged)
1197             {
1198                 Odr_oct *oct = fragment->u.notExternallyTagged;
1199                 sysno = 0;
1200                 
1201                 if ( zebra_update_record(zh, 
1202                                          0, /* record Type */
1203                                          &sysno,
1204                                          0, /* match */
1205                                          0, /* fname */
1206                                          oct->buf, oct->len,
1207                                          0) == ZEBRA_FAIL)
1208                     res = ZEBRA_FAIL;
1209             }
1210         }
1211     }
1212     return res;
1213 }
1214
1215 ZEBRA_RES zebra_admin_exchange_record (ZebraHandle zh,
1216                                        const char *rec_buf,
1217                                        size_t rec_len,
1218                                        const char *recid_buf, size_t recid_len,
1219                                        int action)
1220     /* 1 = insert. Fail it already exists */
1221     /* 2 = replace. Fail it does not exist */
1222     /* 3 = delete. Fail if does not exist */
1223     /* 4 = update. Insert/replace */
1224 {
1225     ZEBRA_RES res;
1226     SYSNO sysno = 0;
1227     char *rinfo = 0;
1228     char recid_z[256];
1229     ASSERTZH;
1230     assert(action>0 && action <=4);
1231     assert(rec_buf);
1232
1233     yaz_log(log_level, "zebra_admin_exchange_record ac=%d", action);
1234     zebra_clearError(zh);
1235
1236     if (!recid_buf || recid_len <= 0 || recid_len >= sizeof(recid_z))
1237         return ZEBRA_FAIL;
1238
1239     memcpy (recid_z, recid_buf, recid_len);
1240     recid_z[recid_len] = 0;
1241
1242     if (zebra_begin_trans(zh, 1) == ZEBRA_FAIL)
1243         return ZEBRA_FAIL;
1244
1245     rinfo = dict_lookup (zh->reg->matchDict, recid_z);
1246     if (rinfo)
1247     {
1248         if (action == 1)  /* fail if insert */
1249         {
1250              zebra_end_trans(zh);
1251              return ZEBRA_FAIL;
1252         }
1253
1254         memcpy (&sysno, rinfo+1, sizeof(sysno));
1255     }
1256     else
1257     {
1258         if (action == 2 || action == 3) /* fail if delete or update */
1259         {
1260             zebra_end_trans(zh);
1261             return ZEBRA_FAIL;
1262         }
1263         action = 1;  /* make it an insert (if it's an update).. */
1264     }
1265     res = buffer_extract_record (zh, rec_buf, rec_len,
1266                                  action == 3 ? 1 : 0 /* delete flag */,
1267                                  0, /* test mode */
1268                                  0, /* recordType */
1269                                  &sysno, 
1270                                  0, /* match */
1271                                  0, /* fname */
1272                            0, /* force update */
1273                                  1  /* allow update */
1274         );
1275     if (action == 1)
1276     {
1277         dict_insert (zh->reg->matchDict, recid_z, sizeof(sysno), &sysno);
1278     }
1279     else if (action == 3)
1280     {
1281         dict_delete (zh->reg->matchDict, recid_z);
1282     }
1283     zebra_end_trans(zh);
1284     return res;
1285 }
1286
1287 int delete_w_handle(const char *info, void *handle)
1288 {
1289     ZebraHandle zh = (ZebraHandle) handle;
1290     ISAM_P pos;
1291     ASSERTZH;
1292
1293     if (*info == sizeof(pos))
1294     {
1295         memcpy (&pos, info+1, sizeof(pos));
1296         isamb_unlink(zh->reg->isamb, pos);
1297     }
1298     return 0;
1299 }
1300
1301 static int delete_SU_handle(void *handle, int ord)
1302 {
1303     ZebraHandle zh = (ZebraHandle) handle;
1304     char ord_buf[20];
1305     int ord_len;
1306
1307     ord_len = key_SU_encode (ord, ord_buf);
1308     ord_buf[ord_len] = '\0';
1309
1310     assert (zh->reg->isamb);
1311     dict_delete_subtree(zh->reg->dict, ord_buf,
1312                         zh, delete_w_handle);
1313     return 0;
1314 }
1315
1316 ZEBRA_RES zebra_drop_database(ZebraHandle zh, const char *db)
1317 {
1318     ZEBRA_RES ret = ZEBRA_OK;
1319     ASSERTZH;
1320     yaz_log(log_level, "zebra_drop_database %s", db);
1321     zebra_clearError(zh);
1322
1323     if (zebra_select_database (zh, db) == ZEBRA_FAIL)
1324         return ZEBRA_FAIL;
1325     if (zebra_begin_trans (zh, 1) == ZEBRA_FAIL)
1326         return ZEBRA_FAIL;
1327     if (zh->reg->isamb)
1328     {
1329         zebraExplain_curDatabase (zh->reg->zei, db);
1330         
1331         zebraExplain_trav_ord(zh->reg->zei, zh, delete_SU_handle);
1332         zebraExplain_removeDatabase(zh->reg->zei, zh);
1333     }
1334     else
1335     {
1336         yaz_log(YLOG_WARN, "drop database only supported for isam:b");
1337         ret = ZEBRA_FAIL;
1338     }
1339     zebra_end_trans (zh);
1340     return ret;
1341 }
1342
1343 ZEBRA_RES zebra_create_database (ZebraHandle zh, const char *db)
1344 {
1345     ASSERTZH;
1346     yaz_log(log_level, "zebra_create_database %s", db);
1347     assert(db);
1348     zebra_clearError(zh);
1349
1350     if (zebra_select_database (zh, db) == ZEBRA_FAIL)
1351         return ZEBRA_FAIL;
1352     if (zebra_begin_trans (zh, 1))
1353         return ZEBRA_FAIL;
1354
1355     /* announce database */
1356     if (zebraExplain_newDatabase (zh->reg->zei, db, 0 
1357                                   /* explainDatabase */))
1358     {
1359         zebra_end_trans (zh);
1360         zebra_setError(zh, YAZ_BIB1_ES_IMMEDIATE_EXECUTION_FAILED, db);
1361         return ZEBRA_FAIL;
1362     }
1363     return zebra_end_trans (zh);
1364 }
1365
1366 int zebra_string_norm (ZebraHandle zh, unsigned reg_id,
1367                        const char *input_str, int input_len,
1368                        char *output_str, int output_len)
1369 {
1370     WRBUF wrbuf;
1371     ASSERTZH;
1372     assert(input_str);
1373     assert(output_str);
1374     yaz_log(log_level, "zebra_string_norm ");
1375     zebra_clearError(zh);
1376     if (!zh->reg->zebra_maps)
1377         return -1;
1378     wrbuf = zebra_replace(zh->reg->zebra_maps, reg_id, "",
1379                           input_str, input_len);
1380     if (!wrbuf)
1381         return -2;
1382     if (wrbuf_len(wrbuf) >= output_len)
1383         return -3;
1384     if (wrbuf_len(wrbuf))
1385         memcpy (output_str, wrbuf_buf(wrbuf), wrbuf_len(wrbuf));
1386     output_str[wrbuf_len(wrbuf)] = '\0';
1387     return wrbuf_len(wrbuf);
1388 }
1389
1390 static void zebra_set_state (ZebraHandle zh, int val, int seqno)
1391 {
1392     char state_fname[256];
1393     char *fname;
1394     long p = getpid();
1395     FILE *f;
1396     ASSERTZH;
1397     yaz_log(log_level, "zebra_set_state v=%d seq=%d", val, seqno);
1398     zebra_clearError(zh);
1399
1400     sprintf (state_fname, "state.%s.LCK", zh->reg_name);
1401     fname = zebra_mk_fname (res_get(zh->res, "lockDir"), state_fname);
1402     f = fopen (fname, "w");
1403
1404     yaz_log (YLOG_DEBUG, "zebra_set_state: %c %d %ld", val, seqno, p);
1405     fprintf (f, "%c %d %ld\n", val, seqno, p);
1406     fclose (f);
1407     xfree(fname);
1408 }
1409
1410 static void zebra_get_state (ZebraHandle zh, char *val, int *seqno)
1411 {
1412     char state_fname[256];
1413     char *fname;
1414     FILE *f;
1415
1416     ASSERTZH;
1417     yaz_log(log_level, "zebra_get_state ");
1418     zebra_clearError(zh);
1419     sprintf (state_fname, "state.%s.LCK", zh->reg_name);
1420     fname = zebra_mk_fname (res_get(zh->res, "lockDir"), state_fname);
1421     f = fopen (fname, "r");
1422     *val = 'o';
1423     *seqno = 0;
1424
1425     if (f)
1426     {
1427         fscanf (f, "%c %d", val, seqno);
1428         fclose (f);
1429     }
1430     xfree(fname);
1431 }
1432
1433 ZEBRA_RES zebra_begin_read (ZebraHandle zh)
1434 {
1435     return zebra_begin_trans(zh, 0);
1436 }
1437
1438 ZEBRA_RES zebra_end_read (ZebraHandle zh)
1439 {
1440     return zebra_end_trans(zh);
1441 }
1442
1443 static void read_res_for_transaction(ZebraHandle zh)
1444 {
1445     const char *group = res_get(zh->res, "group");
1446     const char *v;
1447     /* FIXME - do we still use groups ?? */
1448     
1449     zh->m_group = group;
1450     v = res_get_prefix(zh->res, "followLinks", group, "1");
1451     zh->m_follow_links = atoi(v);
1452
1453     zh->m_record_id = res_get_prefix(zh->res, "recordId", group, 0);
1454     zh->m_record_type = res_get_prefix(zh->res, "recordType", group, 0);
1455
1456     v = res_get_prefix(zh->res, "storeKeys", group, "1");
1457     zh->m_store_keys = atoi(v);
1458
1459     v = res_get_prefix(zh->res, "storeData", group, "1");
1460     zh->m_store_data = atoi(v);
1461
1462     v = res_get_prefix(zh->res, "explainDatabase", group, "0");
1463     zh->m_explain_database = atoi(v);
1464
1465     v = res_get_prefix(zh->res, "openRW", group, "1");
1466     zh->m_flag_rw = atoi(v);
1467
1468     v = res_get_prefix(zh->res, "fileVerboseLimit", group, "100000");
1469     zh->m_file_verbose_limit = atoi(v);
1470 }
1471
1472 ZEBRA_RES zebra_begin_trans(ZebraHandle zh, int rw)
1473 {
1474     ASSERTZH;
1475     zebra_select_default_database(zh);
1476     if (!zh->res)
1477     {
1478         zebra_setError(zh, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR,
1479                        "zebra_begin_trans: no database selected");
1480         return ZEBRA_FAIL;
1481     }
1482     ASSERTZHRES;
1483     yaz_log(log_level, "zebra_begin_trans rw=%d",rw);
1484
1485     if (zh->user_perm)
1486     {
1487         if (rw && !strchr(zh->user_perm, 'w'))
1488         {
1489             zebra_setError(
1490                 zh,
1491                 YAZ_BIB1_ES_PERMISSION_DENIED_ON_ES_CANNOT_MODIFY_OR_DELETE,
1492                 0);
1493             return ZEBRA_FAIL;
1494         }
1495     }
1496
1497     assert (zh->res);
1498     if (rw)
1499     {
1500         int pass;
1501         int seqno = 0;
1502         char val = '?';
1503         const char *rval = 0;
1504         
1505         (zh->trans_no++);
1506         if (zh->trans_w_no)
1507         {
1508             read_res_for_transaction(zh);
1509             return 0;
1510         }
1511         if (zh->trans_no != 1)
1512         {
1513             zebra_setError(zh, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR,
1514                            "zebra_begin_trans: no write trans within read");
1515             return ZEBRA_FAIL;
1516         }
1517         if (zh->reg)
1518         {
1519             resultSetInvalidate (zh);
1520             zebra_register_close (zh->service, zh->reg);
1521         }
1522         zh->trans_w_no = zh->trans_no;
1523
1524         zebra_clearError(zh);
1525         
1526         zh->records_inserted = 0;
1527         zh->records_updated = 0;
1528         zh->records_deleted = 0;
1529         zh->records_processed = 0;
1530         
1531 #if HAVE_SYS_TIMES_H
1532         times (&zh->tms1);
1533 #endif
1534         /* lock */
1535         if (zh->shadow_enable)
1536             rval = res_get (zh->res, "shadow");
1537         
1538         for (pass = 0; pass < 2; pass++)
1539         {
1540             if (rval)
1541             {
1542                 zebra_lock_r (zh->lock_normal);
1543                 zebra_lock_w (zh->lock_shadow);
1544             }
1545             else
1546             {
1547                 zebra_lock_w (zh->lock_normal);
1548                 zebra_lock_w (zh->lock_shadow);
1549             }
1550             
1551             zebra_get_state (zh, &val, &seqno);
1552             if (val == 'c')
1553             {
1554                 yaz_log (YLOG_WARN, "previous transaction didn't finish commit");
1555                 zebra_unlock (zh->lock_shadow);
1556                 zebra_unlock (zh->lock_normal);
1557                 zebra_commit (zh);
1558                 continue;
1559             }
1560             else if (val == 'd')
1561             {
1562                 if (rval)
1563                 {
1564                     BFiles bfs = bfs_create (res_get (zh->res, "shadow"),
1565                                              zh->path_reg);
1566                     yaz_log (YLOG_WARN, "previous transaction didn't reach commit");
1567                     bf_commitClean (bfs, rval);
1568                     bfs_destroy (bfs);
1569             }
1570                 else
1571                 {
1572                     yaz_log (YLOG_WARN, "your previous transaction didn't finish");
1573                 }
1574             }
1575             break;
1576         }
1577         if (pass == 2)
1578         {
1579             yaz_log (YLOG_FATAL, "zebra_begin_trans couldn't finish commit");
1580             abort();
1581             return ZEBRA_FAIL;
1582         }
1583         zebra_set_state (zh, 'd', seqno);
1584         
1585         zh->reg = zebra_register_open (zh->service, zh->reg_name,
1586                                        1, rval ? 1 : 0, zh->res,
1587                                        zh->path_reg);
1588         if (zh->reg)
1589             zh->reg->seqno = seqno;
1590         else
1591         {
1592             zebra_set_state (zh, 'o', seqno);
1593             
1594             zebra_unlock (zh->lock_shadow);
1595             zebra_unlock (zh->lock_normal);
1596
1597             zh->trans_no--;
1598             zh->trans_w_no = 0;
1599
1600             zebra_setError(zh, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR,
1601                            "zebra_begin_trans: cannot open register");
1602             yaz_log(YLOG_FATAL, "%s", zh->errString);
1603             return ZEBRA_FAIL;
1604         }
1605     }
1606     else
1607     {
1608         int dirty = 0;
1609         char val;
1610         int seqno;
1611         
1612         (zh->trans_no)++;
1613         
1614         if (zh->trans_no != 1)
1615         {
1616             zebra_flush_reg (zh);
1617             return ZEBRA_OK;
1618         }
1619         zebra_clearError(zh);
1620 #if HAVE_SYS_TIMES_H
1621         times (&zh->tms1);
1622 #endif
1623         if (!zh->res)
1624         {
1625             (zh->trans_no)--;
1626             zh->errCode = YAZ_BIB1_DATABASE_UNAVAILABLE;
1627             return ZEBRA_FAIL;
1628         }
1629         if (!zh->lock_normal || !zh->lock_shadow)
1630         {
1631             (zh->trans_no)--;
1632             zh->errCode = YAZ_BIB1_TEMPORARY_SYSTEM_ERROR;
1633             return ZEBRA_FAIL;
1634         }
1635         zebra_get_state (zh, &val, &seqno);
1636         if (val == 'd')
1637             val = 'o';
1638         
1639         if (!zh->reg)
1640             dirty = 1;
1641         else if (seqno != zh->reg->seqno)
1642         {
1643             yaz_log (YLOG_DEBUG, "reopen seqno cur/old %d/%d",
1644                      seqno, zh->reg->seqno);
1645             dirty = 1;
1646         }
1647         else if (zh->reg->last_val != val)
1648         {
1649             yaz_log (YLOG_DEBUG, "reopen last cur/old %d/%d",
1650                      val, zh->reg->last_val);
1651             dirty = 1;
1652         }
1653         if (!dirty)
1654             return ZEBRA_OK;
1655         
1656         if (val == 'c')
1657             zebra_lock_r (zh->lock_shadow);
1658         else
1659             zebra_lock_r (zh->lock_normal);
1660         
1661         if (zh->reg)
1662         {
1663             resultSetInvalidate (zh);
1664             zebra_register_close (zh->service, zh->reg);
1665         }
1666         zh->reg = zebra_register_open (zh->service, zh->reg_name,
1667                                        0, val == 'c' ? 1 : 0,
1668                                        zh->res, zh->path_reg);
1669         if (!zh->reg)
1670         {
1671             zebra_unlock (zh->lock_normal);
1672             zebra_unlock (zh->lock_shadow);
1673             zh->trans_no--;
1674             zh->errCode = YAZ_BIB1_DATABASE_UNAVAILABLE;
1675             return ZEBRA_FAIL;
1676         }
1677         zh->reg->last_val = val;
1678         zh->reg->seqno = seqno;
1679     }
1680     read_res_for_transaction(zh);
1681     return ZEBRA_OK;
1682 }
1683
1684 ZEBRA_RES zebra_end_trans (ZebraHandle zh)
1685 {
1686     ZebraTransactionStatus dummy;
1687     ASSERTZH;
1688     yaz_log(log_level, "zebra_end_trans");
1689     return zebra_end_transaction(zh, &dummy);
1690 }
1691
1692 ZEBRA_RES zebra_end_transaction (ZebraHandle zh, ZebraTransactionStatus *status)
1693 {
1694     char val;
1695     int seqno;
1696     const char *rval;
1697
1698     ASSERTZH;
1699     assert(status);
1700     yaz_log(log_level, "zebra_end_transaction");
1701
1702     status->processed = 0;
1703     status->inserted  = 0;
1704     status->updated   = 0;
1705     status->deleted   = 0;
1706     status->utime     = 0;
1707     status->stime     = 0;
1708
1709     if (!zh->res || !zh->reg)
1710     {
1711         zebra_setError(zh, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR,
1712                        "zebra_end_trans: no open transaction");
1713         return ZEBRA_FAIL;
1714     }
1715     if (zh->trans_no != zh->trans_w_no)
1716     {
1717         zh->trans_no--;
1718         if (zh->trans_no != 0)
1719             return ZEBRA_OK;
1720
1721         /* release read lock */
1722
1723         zebra_unlock (zh->lock_normal);
1724         zebra_unlock (zh->lock_shadow);
1725     }
1726     else
1727     {   /* release write lock */
1728         zh->trans_no--;
1729         zh->trans_w_no = 0;
1730         
1731         yaz_log (YLOG_DEBUG, "zebra_end_trans");
1732         rval = res_get (zh->res, "shadow");
1733         
1734         zebraExplain_runNumberIncrement (zh->reg->zei, 1);
1735         
1736         zebra_flush_reg (zh);
1737         
1738         resultSetInvalidate (zh);
1739
1740         zebra_register_close (zh->service, zh->reg);
1741         zh->reg = 0;
1742         
1743         yaz_log (YLOG_LOG, "Records: "ZINT_FORMAT" i/u/d "
1744                         ZINT_FORMAT"/"ZINT_FORMAT"/"ZINT_FORMAT, 
1745                  zh->records_processed, zh->records_inserted,
1746                  zh->records_updated, zh->records_deleted);
1747         
1748         status->processed = (int) zh->records_processed;
1749         status->inserted = (int) zh->records_inserted;
1750         status->updated = (int) zh->records_updated;
1751         status->deleted = (int) zh->records_deleted;
1752         
1753         zebra_get_state (zh, &val, &seqno);
1754         if (val != 'd')
1755         {
1756             BFiles bfs = bfs_create (rval, zh->path_reg);
1757             yaz_log (YLOG_DEBUG, "deleting shadow val=%c", val);
1758             bf_commitClean (bfs, rval);
1759             bfs_destroy (bfs);
1760         }
1761         if (!rval)
1762             seqno++;
1763         zebra_set_state (zh, 'o', seqno);
1764         
1765         zebra_unlock (zh->lock_shadow);
1766         zebra_unlock (zh->lock_normal);
1767         
1768     }
1769 #if HAVE_SYS_TIMES_H
1770     times (&zh->tms2);
1771     yaz_log (log_level, "user/system: %ld/%ld",
1772           (long) (zh->tms2.tms_utime - zh->tms1.tms_utime),
1773           (long) (zh->tms2.tms_stime - zh->tms1.tms_stime));
1774     
1775     status->utime = (long) (zh->tms2.tms_utime - zh->tms1.tms_utime);
1776     status->stime = (long) (zh->tms2.tms_stime - zh->tms1.tms_stime);
1777 #endif
1778     return ZEBRA_OK;
1779 }
1780
1781 int zebra_repository_update (ZebraHandle zh, const char *path)
1782 {
1783     ASSERTZH;
1784     assert(path);
1785     zebra_clearError(zh);
1786     yaz_log (log_level, "updating %s", path);
1787     repositoryUpdate (zh, path);
1788     return zh->errCode;
1789 }
1790
1791 int zebra_repository_delete (ZebraHandle zh, const char *path)
1792 {
1793     ASSERTZH;
1794     assert(path);
1795     zebra_clearError(zh);
1796     yaz_log (log_level, "deleting %s", path);
1797     repositoryDelete (zh, path);
1798     return zh->errCode;
1799 }
1800
1801 int zebra_repository_show (ZebraHandle zh, const char *path)
1802 {
1803     ASSERTZH;
1804     assert(path);
1805     yaz_log(log_level, "zebra_repository_show");
1806     zebra_clearError(zh);
1807     repositoryShow (zh, path);
1808     return zh->errCode;
1809 }
1810
1811 static int zebra_commit_ex(ZebraHandle zh, int clean_only)
1812 {
1813     int seqno;
1814     char val;
1815     const char *rval;
1816     BFiles bfs;
1817     ASSERTZH;
1818     zebra_clearError(zh);
1819
1820     zebra_select_default_database(zh);
1821     if (!zh->res)
1822     {
1823         zh->errCode = YAZ_BIB1_DATABASE_UNAVAILABLE;
1824         return -1;
1825     }
1826     rval = res_get (zh->res, "shadow");    
1827     if (!rval)
1828     {
1829         yaz_log (YLOG_WARN, "Cannot perform commit - No shadow area defined");
1830         return 0;
1831     }
1832
1833     zebra_lock_w (zh->lock_normal);
1834     zebra_lock_r (zh->lock_shadow);
1835
1836     bfs = bfs_create (res_get (zh->res, "register"), zh->path_reg);
1837
1838     zebra_get_state (zh, &val, &seqno);
1839
1840     if (rval && *rval)
1841         bf_cache (bfs, rval);
1842     if (bf_commitExists (bfs))
1843     {
1844         if (clean_only)
1845             zebra_set_state (zh, 'd', seqno);
1846         else
1847         {
1848             zebra_set_state (zh, 'c', seqno);
1849             
1850             yaz_log (YLOG_DEBUG, "commit start");
1851             bf_commitExec (bfs);
1852 #ifndef WIN32
1853             sync ();
1854 #endif
1855         }
1856         yaz_log (YLOG_DEBUG, "commit clean");
1857         bf_commitClean (bfs, rval);
1858         seqno++;
1859         zebra_set_state (zh, 'o', seqno);
1860     }
1861     else
1862     {
1863         yaz_log (log_level, "nothing to commit");
1864     }
1865     bfs_destroy (bfs);
1866
1867     zebra_unlock (zh->lock_shadow);
1868     zebra_unlock (zh->lock_normal);
1869     return 0;
1870 }
1871
1872 ZEBRA_RES zebra_clean(ZebraHandle zh)
1873 {
1874     ASSERTZH;
1875     yaz_log(log_level, "zebra_clean");
1876     return zebra_commit_ex(zh, 1);
1877 }
1878
1879 ZEBRA_RES zebra_commit(ZebraHandle zh)
1880 {
1881     ASSERTZH;
1882     yaz_log(log_level, "zebra_commit");
1883     return zebra_commit_ex(zh, 0);
1884 }
1885
1886 ZEBRA_RES zebra_init(ZebraHandle zh)
1887 {
1888     const char *rval;
1889     BFiles bfs = 0;
1890     ASSERTZH;
1891     yaz_log(log_level, "zebra_init");
1892     zebra_clearError(zh);
1893
1894     zebra_select_default_database(zh);
1895     if (!zh->res)
1896     {
1897         zebra_setError(zh, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR,
1898                        "cannot select default database");
1899         return ZEBRA_FAIL;
1900     }
1901     rval = res_get (zh->res, "shadow");
1902
1903     bfs = bfs_create (res_get (zh->res, "register"), zh->path_reg);
1904     if (!bfs)
1905     {
1906         zebra_setError(zh, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, "bfs_create");
1907         return ZEBRA_FAIL;
1908     }
1909     if (rval && *rval)
1910         bf_cache (bfs, rval);
1911     
1912     bf_reset (bfs);
1913     bfs_destroy (bfs);
1914     zebra_set_state (zh, 'o', 0);
1915     return ZEBRA_OK;
1916 }
1917
1918 ZEBRA_RES zebra_compact(ZebraHandle zh)
1919 {
1920     BFiles bfs;
1921     ASSERTZH;
1922     yaz_log(log_level, "zebra_compact");
1923     zebra_clearError(zh);
1924     if (!zh->res)
1925     {
1926         zh->errCode = YAZ_BIB1_DATABASE_UNAVAILABLE;
1927         return ZEBRA_FAIL;
1928     }
1929     bfs = bfs_create (res_get (zh->res, "register"), zh->path_reg);
1930     inv_compact (bfs);
1931     bfs_destroy (bfs);
1932     return ZEBRA_OK;
1933 }
1934
1935 void zebra_result(ZebraHandle zh, int *code, char **addinfo)
1936 {
1937     ASSERTZH;
1938     yaz_log(log_level, "zebra_result");
1939     *code = zh->errCode;
1940     *addinfo = zh->errString;
1941 }
1942
1943 void zebra_shadow_enable(ZebraHandle zh, int value)
1944 {
1945     ASSERTZH;
1946     yaz_log(log_level, "zebra_shadow_enable");
1947     zebra_clearError(zh);
1948     zh->shadow_enable = value;
1949 }
1950
1951 ZEBRA_RES zebra_octet_term_encoding(ZebraHandle zh, const char *encoding)
1952 {
1953     ASSERTZH;
1954     assert(encoding);
1955     yaz_log(log_level, "zebra_octet_term_encoding");
1956     zebra_clearError(zh);
1957
1958     if (zh->iconv_to_utf8 != 0)
1959         yaz_iconv_close(zh->iconv_to_utf8);
1960     if (zh->iconv_from_utf8 != 0)
1961         yaz_iconv_close(zh->iconv_from_utf8);
1962     
1963     zh->iconv_to_utf8 =
1964         yaz_iconv_open ("UTF-8", encoding);
1965     if (zh->iconv_to_utf8 == 0)
1966         yaz_log (YLOG_WARN, "iconv: %s to UTF-8 unsupported", encoding);
1967     zh->iconv_from_utf8 =
1968         yaz_iconv_open (encoding, "UTF-8");
1969     if (zh->iconv_to_utf8 == 0)
1970         yaz_log (YLOG_WARN, "iconv: UTF-8 to %s unsupported", encoding);
1971
1972     return ZEBRA_OK;
1973 }
1974
1975 ZEBRA_RES zebra_record_encoding (ZebraHandle zh, const char *encoding)
1976 {
1977     ASSERTZH;
1978     yaz_log(log_level, "zebra_record_encoding");
1979     zebra_clearError(zh);
1980     xfree(zh->record_encoding);
1981     zh->record_encoding = 0;
1982     if (encoding)
1983         zh->record_encoding = xstrdup (encoding);
1984     return ZEBRA_OK;
1985 }
1986
1987 void zebra_set_resource(ZebraHandle zh, const char *name, const char *value)
1988 {
1989     ASSERTZH;
1990     assert(name);
1991     assert(value);
1992     yaz_log(log_level, "zebra_set_resource %s:%s", name, value);
1993     zebra_clearError(zh);
1994     res_set(zh->res, name, value);
1995 }
1996
1997 const char *zebra_get_resource(ZebraHandle zh,
1998                                const char *name, const char *defaultvalue)
1999 {
2000     const char *v;
2001     ASSERTZH;
2002     assert(name);
2003     assert(defaultvalue);
2004     v = res_get_def (zh->res, name, (char *)defaultvalue);
2005     zebra_clearError(zh);
2006     yaz_log(log_level, "zebra_get_resource %s:%s", name, v);
2007     return v;
2008 }
2009
2010 /* moved from zebra_api_ext.c by pop */
2011 /* FIXME: Should this really be public??? -Heikki */
2012
2013 int zebra_trans_no (ZebraHandle zh)
2014 {
2015     ASSERTZH;
2016     yaz_log(log_level, "zebra_trans_no");
2017     return zh->trans_no;
2018 }
2019
2020 int zebra_get_shadow_enable (ZebraHandle zh)
2021 {
2022     ASSERTZH;
2023     yaz_log(log_level, "zebra_get_shadow_enable");
2024     return zh->shadow_enable;
2025 }
2026
2027 void zebra_set_shadow_enable (ZebraHandle zh, int value)
2028 {
2029     ASSERTZH;
2030     yaz_log(log_level, "zebra_set_shadow_enable %d",value);
2031     zh->shadow_enable = value;
2032 }
2033
2034 /* Used by Perl API.. Added the record buffer dup to zebra_records_retrieve
2035    so that it's identicical to the original api_records_retrieve */
2036 void api_records_retrieve (ZebraHandle zh, ODR stream,
2037                            const char *setname, Z_RecordComposition *comp,
2038                            oid_value input_format, int num_recs,
2039                            ZebraRetrievalRecord *recs)
2040 {
2041     zebra_records_retrieve(zh, stream, setname, comp, input_format,
2042                            num_recs, recs);
2043 }
2044
2045 /* ---------------------------------------------------------------------------
2046   Record insert(=update), delete 
2047
2048   If sysno is provided, then it's used to identify the record.
2049   If not, and match_criteria is provided, then sysno is guessed
2050   If not, and a record is provided, then sysno is got from there
2051 NOTE: Now returns 0 at success and updates sysno, which is an int*
2052   20-jun-2003 Heikki
2053 */
2054
2055 int zebra_add_record(ZebraHandle zh,
2056                      const char *buf, int buf_size)
2057 {
2058     SYSNO sysno = 0;
2059     return zebra_update_record(zh, 0, &sysno, 0, 0, buf, buf_size, 0);
2060 }
2061
2062 ZEBRA_RES zebra_insert_record (ZebraHandle zh, 
2063                                const char *recordType,
2064                                SYSNO *sysno, const char *match,
2065                                const char *fname,
2066                                const char *buf, int buf_size, int force_update)
2067 {
2068     ZEBRA_RES res;
2069     ASSERTZH;
2070     assert(sysno);
2071     assert(buf);
2072     yaz_log(log_level, "zebra_insert_record sysno=" ZINT_FORMAT, *sysno);
2073
2074     if (buf_size < 1)
2075         buf_size = strlen(buf);
2076
2077     if (zebra_begin_trans(zh, 1) == ZEBRA_FAIL)
2078         return ZEBRA_FAIL;
2079     res = buffer_extract_record (zh, buf, buf_size, 
2080                                  0, /* delete_flag  */
2081                                  0, /* test_mode */
2082                                  recordType,
2083                                  sysno,   
2084                                  match, fname,
2085                                  0, 
2086                                  0); /* allow_update */
2087     zebra_end_trans(zh); 
2088     return res; 
2089 }
2090
2091 ZEBRA_RES zebra_update_record (ZebraHandle zh, 
2092                                const char *recordType,
2093                                SYSNO* sysno, const char *match,
2094                                const char *fname,
2095                                const char *buf, int buf_size,
2096                                int force_update)
2097 {
2098     ZEBRA_RES res;
2099     ASSERTZH;
2100     assert(sysno);
2101     assert(buf);
2102
2103     yaz_log(log_level, "zebra_update_record sysno=" ZINT_FORMAT, *sysno);
2104
2105     if (buf_size < 1) buf_size = strlen(buf);
2106
2107     if (zebra_begin_trans(zh, 1) == ZEBRA_FAIL)
2108         return ZEBRA_FAIL;
2109     res = buffer_extract_record (zh, buf, buf_size, 
2110                                  0, /* delete_flag */
2111                                  0, /* test_mode */
2112                                  recordType,
2113                                  sysno,   
2114                                  match, fname,
2115                                  force_update, 
2116                                  1); /* allow_update */
2117     zebra_end_trans(zh); 
2118     return res; 
2119 }
2120
2121 ZEBRA_RES zebra_delete_record (ZebraHandle zh, 
2122                                const char *recordType,
2123                                SYSNO *sysno, const char *match,
2124                                const char *fname,
2125                                const char *buf, int buf_size,
2126                                int force_update) 
2127 {
2128     ZEBRA_RES res;
2129     ASSERTZH;
2130     assert(sysno);
2131     assert(buf);
2132     yaz_log(log_level, "zebra_delete_record sysno=" ZINT_FORMAT, *sysno);
2133
2134     if (buf_size < 1) buf_size = strlen(buf);
2135
2136     if (zebra_begin_trans(zh, 1) == ZEBRA_FAIL)
2137         return ZEBRA_FAIL;
2138     res = buffer_extract_record (zh, buf, buf_size,
2139                                  1, /* delete_flag */
2140                                  0, /* test_mode */
2141                                  recordType,
2142                                  sysno,
2143                                  match,fname,
2144                                  force_update,
2145                                  1); /* allow_update */
2146     zebra_end_trans(zh);
2147     return res;
2148 }
2149
2150 /* ---------------------------------------------------------------------------
2151   Searching 
2152 */
2153
2154 ZEBRA_RES zebra_search_PQF(ZebraHandle zh, const char *pqf_query,
2155                            const char *setname, zint *hits)
2156 {
2157     zint lhits = 0;
2158     ZEBRA_RES res = ZEBRA_OK;
2159     Z_RPNQuery *query;
2160     ODR odr = odr_createmem(ODR_ENCODE);
2161     ASSERTZH;
2162     assert(pqf_query);
2163     assert(setname);
2164
2165     yaz_log(log_level, "zebra_search_PQF s=%s q=%s", setname, pqf_query);
2166     
2167     query = p_query_rpn (odr, PROTO_Z3950, pqf_query);
2168     
2169     if (!query)
2170     {
2171         yaz_log (YLOG_WARN, "bad query %s\n", pqf_query);
2172         zh->errCode = YAZ_BIB1_MALFORMED_QUERY;
2173         res = ZEBRA_FAIL;
2174     }
2175     else
2176         res = zebra_search_RPN(zh, odr, query, setname, &lhits);
2177     
2178     odr_destroy(odr);
2179
2180     yaz_log(log_level, "Hits: " ZINT_FORMAT, lhits);
2181
2182     if (hits)
2183         *hits = lhits;
2184
2185     return res;
2186 }
2187
2188 /* ---------------------------------------------------------------------------
2189   Sort - a simplified interface, with optional read locks.
2190 */
2191 int zebra_sort_by_specstr (ZebraHandle zh, ODR stream,
2192                            const char *sort_spec,
2193                            const char *output_setname,
2194                            const char **input_setnames) 
2195 {
2196     int num_input_setnames = 0;
2197     int sort_status = 0;
2198     Z_SortKeySpecList *sort_sequence;
2199     ASSERTZH;
2200     assert(stream);
2201     assert(sort_spec);
2202     assert(output_setname);
2203     assert(input_setnames);
2204     sort_sequence = yaz_sort_spec (stream, sort_spec);
2205     yaz_log(log_level, "sort (FIXME) ");
2206     if (!sort_sequence)
2207     {
2208         yaz_log(YLOG_WARN, "invalid sort specs '%s'", sort_spec);
2209         zh->errCode = YAZ_BIB1_CANNOT_SORT_ACCORDING_TO_SEQUENCE;
2210         return -1;
2211     }
2212     
2213     /* we can do this, since the perl typemap code for char** will 
2214        put a NULL at the end of list */
2215     while (input_setnames[num_input_setnames]) num_input_setnames++;
2216
2217     if (zebra_begin_read (zh))
2218         return -1;
2219     
2220     resultSetSort (zh, stream->mem, num_input_setnames, input_setnames,
2221                    output_setname, sort_sequence, &sort_status);
2222     
2223     zebra_end_read(zh);
2224     return sort_status;
2225 }
2226
2227 /* ---------------------------------------------------------------------------
2228   Get BFS for Zebra system (to make alternative storage methods)
2229 */
2230 struct BFiles_struct *zebra_get_bfs(ZebraHandle zh)
2231 {
2232     if (zh && zh->reg)
2233         return zh->reg->bfs;
2234     return 0;
2235 }
2236
2237
2238 /* ---------------------------------------------------------------------------
2239   Set limit for search/scan
2240 */
2241 ZEBRA_RES zebra_set_limit(ZebraHandle zh, int complement_flag, zint *ids)
2242 {
2243     ASSERTZH;
2244     zebra_limit_destroy(zh->m_limit);
2245     zh->m_limit = zebra_limit_create(complement_flag, ids);
2246     return ZEBRA_OK;
2247 }
2248
2249 /*
2250   Set Error code + addinfo
2251 */
2252 void zebra_setError(ZebraHandle zh, int code, const char *addinfo)
2253 {
2254     zh->errCode = code;
2255     nmem_reset(zh->nmem_error);
2256     zh->errString = addinfo ? nmem_strdup(zh->nmem_error, addinfo) : 0;
2257 }
2258
2259 void zebra_setError_zint(ZebraHandle zh, int code, zint i)
2260 {
2261     char vstr[60];
2262     sprintf(vstr, ZINT_FORMAT, i);
2263
2264     zh->errCode = code;
2265     nmem_reset(zh->nmem_error);
2266     zh->errString = nmem_strdup(zh->nmem_error, vstr);
2267 }
2268