Add total connections. Enable connection count for now.
[pazpar2-moved-to-github.git] / src / connection.c
1 /* This file is part of Pazpar2.
2    Copyright (C) 2006-2011 Index Data
3
4 Pazpar2 is free software; you can redistribute it and/or modify it under
5 the terms of the GNU General Public License as published by the Free
6 Software Foundation; either version 2, or (at your option) any later
7 version.
8
9 Pazpar2 is distributed in the hope that it will be useful, but WITHOUT ANY
10 WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12 for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
17
18 */
19
20 /** \file connection.c
21     \brief Z39.50 connection (low-level client)
22 */
23
24 #if HAVE_CONFIG_H
25 #include <config.h>
26 #endif
27
28 #include <stdlib.h>
29 #include <stdio.h>
30 #include <string.h>
31 #if HAVE_SYS_TIME_H
32 #include <sys/time.h>
33 #endif
34 #if HAVE_UNISTD_H
35 #include <unistd.h>
36 #endif
37
38 #include <signal.h>
39 #include <assert.h>
40
41 #include <yaz/log.h>
42 #include <yaz/comstack.h>
43 #include <yaz/tcpip.h>
44 #include "connection.h"
45 #include "session.h"
46 #include "host.h"
47 #include "client.h"
48 #include "settings.h"
49
50 /* connection counting (1) , disable connection counting (0) */
51 #if 1
52 static YAZ_MUTEX g_mutex = 0;
53 static int no_connections = 0;
54 static int total_no_connections = 0;
55 static void connection_use(int delta)
56 {
57     int result;
58     if (!g_mutex)
59         yaz_mutex_create(&g_mutex);
60     yaz_mutex_enter(g_mutex);
61     no_connections += delta;
62     result = no_connection;
63     if (delta > 0)
64         total_no_connections += delta;
65     yaz_mutex_leave(g_mutex);
66     if (delta == 0)
67             return result;
68     yaz_log(YLOG_LOG, "%s connections=%d", delta > 0 ? "INC" : "DEC",
69             no_connections);
70     return result;
71 }
72
73 int connections_count() {
74     return connection_use(0);
75 }
76
77
78 #else
79 #define connection_use(x)
80 #define connections_count(x) 0
81 #define connections_count_total(x) 0
82 #endif
83
84
85 /** \brief Represents a physical, reusable  connection to a remote Z39.50 host
86  */
87 struct connection {
88     IOCHAN iochan;
89     ZOOM_connection link;
90     struct host *host;
91     struct client *client;
92     char *zproxy;
93     enum {
94         Conn_Resolving,
95         Conn_Connecting,
96         Conn_Open,
97         Conn_Dead
98     } state;
99     int operation_timeout;
100     int session_timeout;
101     struct connection *next; // next for same host or next in free list
102 };
103
104 static int connection_connect(struct connection *con, iochan_man_t iochan_man);
105
106 static int connection_is_idle(struct connection *co)
107 {
108     ZOOM_connection link = co->link;
109     int event;
110
111     if (co->state != Conn_Open || !link)
112         return 0;
113
114     if (!ZOOM_connection_is_idle(link))
115         return 0;
116     event = ZOOM_connection_peek_event(link);
117     if (event == ZOOM_EVENT_NONE)
118         return 1;
119     else
120         return 0;
121 }
122
123 ZOOM_connection connection_get_link(struct connection *co)
124 {
125     return co->link;
126 }
127
128 static void remove_connection_from_host(struct connection *con)
129 {
130     struct connection **conp = &con->host->connections;
131     assert(con);
132     while (*conp)
133     {
134         if (*conp == con)
135         {
136             *conp = (*conp)->next;
137             break;
138         }
139         conp = &(*conp)->next;
140     }
141     yaz_cond_broadcast(con->host->cond_ready);
142 }
143
144 // Close connection and recycle structure
145 static void connection_destroy(struct connection *co)
146 {
147     if (co->link)
148     {
149         ZOOM_connection_destroy(co->link);
150         iochan_destroy(co->iochan);
151     }
152     yaz_log(YLOG_DEBUG, "%p Connection destroy %s", co, co->host->hostport);
153
154     if (co->client)
155     {
156         client_disconnect(co->client);
157     }
158
159     xfree(co->zproxy);
160     xfree(co);
161     connection_use(-1);
162 }
163
164 // Creates a new connection for client, associated with the host of 
165 // client's database
166 static struct connection *connection_create(struct client *cl,
167                                             int operation_timeout,
168                                             int session_timeout,
169                                             iochan_man_t iochan_man)
170 {
171     struct connection *co;
172     struct host *host = client_get_host(cl);
173
174     co = xmalloc(sizeof(*co));
175     co->host = host;
176
177     co->client = cl;
178     co->zproxy = 0;
179     client_set_connection(cl, co);
180     co->link = 0;
181     co->state = Conn_Resolving;
182     co->operation_timeout = operation_timeout;
183     co->session_timeout = session_timeout;
184     if (host->ipport)
185         connection_connect(co, iochan_man);
186
187     yaz_mutex_enter(host->mutex);
188     co->next = co->host->connections;
189     co->host->connections = co;
190     yaz_mutex_leave(host->mutex);
191
192     connection_use(1);
193     return co;
194 }
195
196 static void non_block_events(struct connection *co)
197 {
198     int got_records = 0;
199     IOCHAN iochan = co->iochan;
200     ZOOM_connection link = co->link;
201     while (1)
202     {
203         struct client *cl = co->client;
204         int ev;
205         int r = ZOOM_event_nonblock(1, &link);
206         if (!r)
207             break;
208         if (!cl)
209             continue;
210         ev = ZOOM_connection_last_event(link);
211         
212 #if 1
213         yaz_log(YLOG_DEBUG, "%p Connection ZOOM_EVENT_%s", co, ZOOM_get_event_str(ev));
214 #endif
215         switch (ev) 
216         {
217         case ZOOM_EVENT_END:
218             {
219                 const char *error, *addinfo;
220                 int err;
221                 if ((err = ZOOM_connection_error(link, &error, &addinfo)))
222                 {
223                     yaz_log(YLOG_LOG, "Error %s from %s",
224                             error, client_get_url(cl));
225                     client_set_diagnostic(cl, err);
226                     client_set_state(cl, Client_Error);
227                 }
228                 else
229                 {
230                     iochan_settimeout(iochan, co->session_timeout);
231                     client_set_state(cl, Client_Idle);
232                 }
233                 yaz_cond_broadcast(co->host->cond_ready);
234             }
235             break;
236         case ZOOM_EVENT_SEND_DATA:
237             break;
238         case ZOOM_EVENT_RECV_DATA:
239             break;
240         case ZOOM_EVENT_UNKNOWN:
241             break;
242         case ZOOM_EVENT_SEND_APDU:
243             client_set_state(co->client, Client_Working);
244             iochan_settimeout(iochan, co->operation_timeout);
245             break;
246         case ZOOM_EVENT_RECV_APDU:
247             break;
248         case ZOOM_EVENT_CONNECT:
249             yaz_log(YLOG_LOG, "Connected to %s", client_get_url(cl));
250             co->state = Conn_Open;
251             break;
252         case ZOOM_EVENT_RECV_SEARCH:
253             client_search_response(cl);
254             break;
255         case ZOOM_EVENT_RECV_RECORD:
256             client_record_response(cl);
257             got_records = 1;
258             break;
259         default:
260             yaz_log(YLOG_LOG, "Unhandled event (%d) from %s",
261                     ev, client_get_url(cl));
262         }
263     }
264     if (got_records)
265     {
266         struct client *cl = co->client;
267         if (cl)
268         {
269             client_check_preferred_watch(cl);
270             client_got_records(cl);
271         }
272     }
273 }
274
275 void connection_continue(struct connection *co)
276 {
277     int r = ZOOM_connection_exec_task(co->link);
278     if (!r)
279         yaz_log(YLOG_WARN, "No task was executed for connection");
280     iochan_setflags(co->iochan, ZOOM_connection_get_mask(co->link));
281     iochan_setfd(co->iochan, ZOOM_connection_get_socket(co->link));
282 }
283
284 static void connection_handler(IOCHAN iochan, int event)
285 {
286     struct connection *co = iochan_getdata(iochan);
287     struct client *cl;
288     struct host *host = co->host;
289
290     yaz_mutex_enter(host->mutex);
291     cl = co->client;
292     if (!cl) 
293     {
294         /* no client associated with it.. We are probably getting
295            a closed connection from the target.. Or, perhaps, an unexpected
296            package.. We will just close the connection */
297         yaz_log(YLOG_LOG, "timeout connection %p event=%d", co, event);
298         remove_connection_from_host(co);
299         yaz_mutex_leave(host->mutex);
300         connection_destroy(co);
301     }
302     else if (event & EVENT_TIMEOUT)
303     {
304         if (co->state == Conn_Connecting)
305         {
306             yaz_log(YLOG_WARN, "%p connect timeout %s", co, client_get_url(cl));
307
308             client_set_state(cl, Client_Error);
309             remove_connection_from_host(co);
310             yaz_mutex_leave(host->mutex);
311             connection_destroy(co);
312         }
313         else
314         {
315             yaz_log(YLOG_LOG,  "%p Connection idle timeout %s", co, client_get_url(cl));
316             remove_connection_from_host(co);
317             yaz_mutex_leave(host->mutex);
318             connection_destroy(co);
319         }
320     }
321     else
322     {
323         yaz_mutex_leave(host->mutex);
324
325         client_lock(cl);
326         non_block_events(co);
327
328         ZOOM_connection_fire_event_socket(co->link, event);
329         
330         non_block_events(co);
331         client_unlock(cl);
332
333         if (co->link)
334         {
335             iochan_setflags(iochan, ZOOM_connection_get_mask(co->link));
336             iochan_setfd(iochan, ZOOM_connection_get_socket(co->link));
337         }
338     }
339 }
340
341
342 // Disassociate connection from client
343 static void connection_release(struct connection *co)
344 {
345     struct client *cl = co->client;
346
347     if (!cl)
348         return;
349     client_set_connection(cl, 0);
350     co->client = 0;
351 }
352
353 void connect_resolver_host(struct host *host, iochan_man_t iochan_man)
354 {
355     struct connection *con;
356
357 start:
358     yaz_mutex_enter(host->mutex);
359     con = host->connections;
360     while (con)
361     {
362         if (con->state == Conn_Resolving)
363         {
364             if (!host->ipport) /* unresolved */
365             {
366                 remove_connection_from_host(con);
367                 yaz_mutex_leave(host->mutex);
368                 connection_destroy(con);
369                 goto start;
370                 /* start all over .. at some point it will be NULL */
371             }
372             else if (!con->client)
373             {
374                 remove_connection_from_host(con);
375                 yaz_mutex_leave(host->mutex);
376                 connection_destroy(con);
377                 /* start all over .. at some point it will be NULL */
378                 goto start;
379             }
380             else
381             {
382                 yaz_mutex_leave(host->mutex);
383                 connection_connect(con, iochan_man);
384                 client_start_search(con->client);
385                 goto start;
386             }
387         }
388         else
389         {
390             yaz_log(YLOG_LOG, "connect_resolver_host: state=%d", con->state);
391             con = con->next;
392         }
393     }
394     yaz_mutex_leave(host->mutex);
395 }
396
397 static struct host *connection_get_host(struct connection *con)
398 {
399     return con->host;
400 }
401
402 static int connection_connect(struct connection *con, iochan_man_t iochan_man)
403 {
404     ZOOM_connection link = 0;
405     struct host *host = connection_get_host(con);
406     ZOOM_options zoptions = ZOOM_options_create();
407     const char *auth;
408     const char *charset;
409     const char *sru;
410     const char *sru_version = 0;
411
412     struct session_database *sdb = client_get_database(con->client);
413     const char *zproxy = session_setting_oneval(sdb, PZ_ZPROXY);
414     const char *apdulog = session_setting_oneval(sdb, PZ_APDULOG);
415
416     assert(host->ipport);
417     assert(con);
418
419     ZOOM_options_set(zoptions, "async", "1");
420     ZOOM_options_set(zoptions, "implementationName", PACKAGE_NAME);
421     ZOOM_options_set(zoptions, "implementationVersion", VERSION);
422         
423     if ((charset = session_setting_oneval(sdb, PZ_NEGOTIATION_CHARSET)))
424         ZOOM_options_set(zoptions, "charset", charset);
425     
426     if (zproxy && *zproxy)
427     {
428         con->zproxy = xstrdup(zproxy);
429         ZOOM_options_set(zoptions, "proxy", zproxy);
430     }
431     if (apdulog && *apdulog)
432         ZOOM_options_set(zoptions, "apdulog", apdulog);
433
434     if ((auth = session_setting_oneval(sdb, PZ_AUTHENTICATION)))
435         ZOOM_options_set(zoptions, "user", auth);
436     if ((sru = session_setting_oneval(sdb, PZ_SRU)) && *sru)
437         ZOOM_options_set(zoptions, "sru", sru);
438     if ((sru_version = session_setting_oneval(sdb, PZ_SRU_VERSION)) 
439         && *sru_version)
440         ZOOM_options_set(zoptions, "sru_version", sru_version);
441     if (!(link = ZOOM_connection_create(zoptions)))
442     {
443         yaz_log(YLOG_FATAL|YLOG_ERRNO, "Failed to create ZOOM Connection");
444         ZOOM_options_destroy(zoptions);
445         return -1;
446     }
447
448     if (sru && *sru)
449     {
450         char http_hostport[512];
451         strcpy(http_hostport, "http://");
452         strcat(http_hostport, host->hostport);
453         ZOOM_connection_connect(link, http_hostport, 0);
454     }
455     else if (zproxy && *zproxy)
456         ZOOM_connection_connect(link, host->hostport, 0);        
457     else
458         ZOOM_connection_connect(link, host->ipport, 0);
459     
460     con->link = link;
461     con->iochan = iochan_create(-1, connection_handler, 0, "connection_socket");
462     con->state = Conn_Connecting;
463     iochan_settimeout(con->iochan, con->operation_timeout);
464     iochan_setdata(con->iochan, con);
465     iochan_add(iochan_man, con->iochan);
466
467     /* this fragment is bad DRY: from client_prep_connection */
468     client_set_state(con->client, Client_Connecting);
469     ZOOM_options_destroy(zoptions);
470     return 0;
471 }
472
473 // Ensure that client has a connection associated
474 int client_prep_connection(struct client *cl,
475                            int operation_timeout, int session_timeout,
476                            iochan_man_t iochan_man,
477                            const struct timeval *abstime)
478 {
479     struct connection *co;
480     struct host *host = client_get_host(cl);
481     struct session_database *sdb = client_get_database(cl);
482     const char *zproxy = session_setting_oneval(sdb, PZ_ZPROXY);
483
484     if (zproxy && zproxy[0] == '\0')
485         zproxy = 0;
486
487     if (!host)
488         return 0;
489
490     co = client_get_connection(cl);
491
492     yaz_log(YLOG_DEBUG, "Client prep %s", client_get_url(cl));
493
494     if (!co)
495     {
496         int max_connections = 0;
497         int reuse_connections = 1;
498         const char *v = session_setting_oneval(client_get_database(cl),
499                                                PZ_MAX_CONNECTIONS);
500         if (v && *v)
501             max_connections = atoi(v);
502
503         v = session_setting_oneval(client_get_database(cl),
504                 PZ_REUSE_CONNECTIONS);
505         if (v && *v)
506             reuse_connections = atoi(v);
507
508         // See if someone else has an idle connection
509         // We should look at timestamps here to select the longest-idle connection
510         yaz_mutex_enter(host->mutex);
511         while (1)
512         {
513             int num_connections = 0;
514             for (co = host->connections; co; co = co->next)
515                 num_connections++;
516             if (reuse_connections) {
517                 for (co = host->connections; co; co = co->next)
518                 {
519                     if (connection_is_idle(co) &&
520                         (!co->client || client_get_state(co->client) == Client_Idle) &&
521                         !strcmp(ZOOM_connection_option_get(co->link, "user"),
522                                 session_setting_oneval(client_get_database(cl),
523                                                        PZ_AUTHENTICATION)))
524                     {
525                         if (zproxy == 0 && co->zproxy == 0)
526                             break;
527                         if (zproxy && co->zproxy && !strcmp(zproxy, co->zproxy))
528                             break;
529                     }
530                 }
531                 if (co)
532                 {
533                     yaz_log(YLOG_LOG, "num_connections = %d (reusing)", num_connections);
534                     break;
535                 }
536             }
537             if (max_connections <= 0 || num_connections < max_connections)
538             {
539                 yaz_log(YLOG_LOG, "num_connections = %d (new); max = %d",
540                         num_connections, max_connections);
541                 break;
542             }
543             yaz_log(YLOG_LOG, "num_connections = %d (waiting) max = %d",
544                     num_connections, max_connections);
545             if (yaz_cond_wait(host->cond_ready, host->mutex, abstime))
546             {
547                 yaz_log(YLOG_LOG, "out of connections %s", client_get_url(cl));
548                 client_set_state(cl, Client_Error);
549                 yaz_mutex_leave(host->mutex);
550                 return 0;
551             }
552         }
553         if (co)
554         {
555             yaz_log(YLOG_LOG,  "%p Connection reuse. state: %d", co, co->state);
556             connection_release(co);
557             client_set_connection(cl, co);
558             co->client = cl;
559             /* ensure that connection is only assigned to this client
560                by marking the client non Idle */
561             client_set_state(cl, Client_Working);
562             yaz_mutex_leave(host->mutex);
563             co->operation_timeout = operation_timeout;
564             co->session_timeout = session_timeout;
565             /* tells ZOOM to reconnect if necessary. Disabled becuase
566                the ZOOM_connection_connect flushes the task queue */
567             ZOOM_connection_connect(co->link, 0, 0);
568         }
569         else
570         {
571             yaz_mutex_leave(host->mutex);
572             co = connection_create(cl, operation_timeout, session_timeout,
573                                    iochan_man);
574         }
575     }
576
577     if (co && co->link)
578         return 1;
579     else
580         return 0;
581 }
582
583 /*
584  * Local variables:
585  * c-basic-offset: 4
586  * c-file-style: "Stroustrup"
587  * indent-tabs-mode: nil
588  * End:
589  * vim: shiftwidth=4 tabstop=8 expandtab
590  */
591