Add reference counting for client
authorAdam Dickmeiss <adam@indexdata.dk>
Wed, 3 Mar 2010 14:19:31 +0000 (15:19 +0100)
committerAdam Dickmeiss <adam@indexdata.dk>
Wed, 3 Mar 2010 14:19:31 +0000 (15:19 +0100)
src/client.c
src/client.h
src/connection.c
src/session.c

index 08c006c..5975b69 100644 (file)
@@ -64,6 +64,7 @@ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
 #include "connection.h"
 #include "settings.h"
 #include "relevance.h"
+#include "incref.h"
 
 /** \brief Represents client state for a connection to one search target */
 struct client {
@@ -81,6 +82,8 @@ struct client {
     struct show_raw *show_raw;
     struct client *next;     // next client in session or next in free list
     ZOOM_resultset resultset;
+    YAZ_MUTEX mutex;
+    int ref_count;
 };
 
 struct show_raw {
@@ -104,6 +107,17 @@ static const char *client_states[] = {
     "Client_Disconnected"
 };
 
+static void client_enter(struct client *cl)
+{
+    yaz_mutex_enter(cl->mutex);
+}
+
+static void client_leave(struct client *cl)
+{
+    yaz_mutex_leave(cl->mutex);
+}
+
+
 const char *client_get_state_str(struct client *cl)
 {
     return client_states[cl->state];
@@ -119,12 +133,14 @@ void client_set_state(struct client *cl, enum client_state st)
     cl->state = st;
     /* no need to check for all client being non-active if this one
        already is. Note that session_active_clients also LOCKS session */
+#if 0
     if (!client_is_active(cl) && cl->session)
     {
         int no_active = session_active_clients(cl->session);
         if (no_active == 0)
             session_alert_watch(cl->session, SESSION_WATCH_SHOW);
     }
+#endif
 }
 
 static void client_show_raw_error(struct client *cl, const char *addinfo);
@@ -388,31 +404,36 @@ static void ingest_raw_record(struct client *cl, ZOOM_record rec)
     client_show_raw_dequeue(cl);
 }
 
-void client_search_response(struct client *cl)
+static void search_response(struct client *cl)
 {
     struct connection *co = cl->connection;
     struct session *se = cl->session;
     ZOOM_connection link = connection_get_link(co);
     ZOOM_resultset resultset = cl->resultset;
     const char *error, *addinfo = 0;
-
+    
     if (ZOOM_connection_error(link, &error, &addinfo))
     {
         cl->hits = 0;
         client_set_state(cl, Client_Error);
         yaz_log(YLOG_WARN, "Search error %s (%s): %s",
-            error, addinfo, client_get_url(cl));
+                error, addinfo, client_get_url(cl));
     }
     else
     {
         cl->record_offset = cl->startrecs;
         cl->hits = ZOOM_resultset_size(resultset);
-        se->total_hits += cl->hits;
+        if (se)
+            se->total_hits += cl->hits;
     }
 }
 
+void client_search_response(struct client *cl)
+{
+    search_response(cl);
+}
 
-void client_record_response(struct client *cl)
+static void record_response(struct client *cl)
 {
     struct connection *co = cl->connection;
     ZOOM_connection link = connection_get_link(co);
@@ -450,7 +471,9 @@ void client_record_response(struct client *cl)
             if ((rec = ZOOM_resultset_record(resultset, offset)))
             {
                 cl->record_offset++;
-                if (ZOOM_record_error(rec, &msg, &addinfo, 0))
+                if (cl->session == 0)
+                    ;
+                else if (ZOOM_record_error(rec, &msg, &addinfo, 0))
                     yaz_log(YLOG_WARN, "Record error %s (%s): %s (rec #%d)",
                             error, addinfo, client_get_url(cl),
                             cl->record_offset);
@@ -479,7 +502,6 @@ void client_record_response(struct client *cl)
                     }
                     nmem_destroy(nmem);
                 }
-
             }
             else
             {
@@ -490,6 +512,11 @@ void client_record_response(struct client *cl)
     }
 }
 
+void client_record_response(struct client *cl)
+{
+    record_response(cl);
+}
+
 void client_start_search(struct client *cl)
 {
     struct session_database *sdb = client_get_database(cl);
@@ -582,35 +609,60 @@ struct client *client_create(void)
     r->show_raw = 0;
     r->resultset = 0;
     r->next = 0;
+    r->mutex = 0;
+    yaz_mutex_create(&r->mutex);
+    r->ref_count = 1;
+    
     return r;
 }
 
-void client_destroy(struct client *c)
+void client_incref(struct client *c)
 {
-    struct session *se = c->session;
-    if (c == se->clients)
-        se->clients = c->next;
-    else
+    pazpar2_incref(&c->ref_count, c->mutex);
+    yaz_log(YLOG_LOG, "client_incref %s %d", client_get_url(c), c->ref_count);
+}
+
+int client_destroy(struct client *c)
+{
+    if (c)
     {
-        struct client *cc;
-        for (cc = se->clients; cc && cc->next != c; cc = cc->next)
-            ;
-        if (cc)
-            cc->next = c->next;
+        yaz_log(YLOG_LOG, "client_destroy %s %d",
+                client_get_url(c), c->ref_count);
+        if (!pazpar2_decref(&c->ref_count, c->mutex))
+        {
+            c->next = 0;
+            xfree(c->pquery);
+            c->pquery = 0;
+            xfree(c->cqlquery);
+            c->cqlquery = 0;
+            c->hits = 12345678;
+
+#if 0            
+            if (c->connection)
+                connection_release(c->connection);
+#endif       
+            ZOOM_resultset_destroy(c->resultset);
+            yaz_mutex_destroy(&c->mutex);
+            xfree(c);
+            return 1;
+        }
     }
-    xfree(c->pquery);
-    xfree(c->cqlquery);
-
-    if (c->connection)
-        connection_release(c->connection);
-
-    ZOOM_resultset_destroy(c->resultset);
-    xfree(c);
+    return 0;
 }
 
 void client_set_connection(struct client *cl, struct connection *con)
 {
-    cl->connection = con;
+    if (con)
+    {
+        assert(cl->connection == 0);
+        cl->connection = con;
+        client_incref(cl);
+    }
+    else
+    {
+        cl->connection = con;
+        client_destroy(cl);
+    }
 }
 
 void client_disconnect(struct client *cl)
@@ -758,6 +810,26 @@ int client_parse_query(struct client *cl, const char *query)
     return 0;
 }
 
+
+void client_remove_from_session(struct client *c)
+{
+    struct session *se = c->session;
+    
+    assert(se);
+    if (se)
+    {
+        struct client **ccp = &se->clients;
+        
+        while (*ccp && *ccp != c)
+            ccp = &(*ccp)->next;
+        assert(*ccp == c);
+        *ccp = c->next;
+        
+        c->session = 0;
+        c->next = 0;
+    }
+}
+
 void client_set_session(struct client *cl, struct session *se)
 {
     cl->session = se;
index 31a4840..e03efac 100644 (file)
@@ -67,7 +67,7 @@ int client_is_our_response(struct client *cl);
 void client_continue(struct client *cl);
 
 struct client *client_create(void);
-void client_destroy(struct client *c);
+int client_destroy(struct client *c);
 
 void client_set_connection(struct client *cl, struct connection *con);
 void client_disconnect(struct client *cl);
@@ -89,6 +89,7 @@ struct host *client_get_host(struct client *cl);
 const char *client_get_url(struct client *cl);
 void client_set_maxrecs(struct client *cl, int v);
 void client_set_startrecs(struct client *cl, int v);
+void client_remove_from_session(struct client *c);
 
 #endif
 
index cbe26a1..196a397 100644 (file)
@@ -117,11 +117,12 @@ void connection_destroy(struct connection *co)
     }
     yaz_log(YLOG_DEBUG, "Connection destroy %s", co->host->hostport);
 
-    remove_connection_from_host(co);
     if (co->client)
     {
         client_disconnect(co->client);
     }
+
+    remove_connection_from_host(co);
     xfree(co->zproxy);
     xfree(co);
 }
index de9efd7..c41960d 100644 (file)
@@ -447,14 +447,25 @@ static void select_targets_callback(void *context, struct session_database *db)
     client_set_session(cl, se);
 }
 
+static void session_remove_clients(struct session *se)
+{
+    struct client *cl = se->clients;
+    while (cl)
+    {
+        struct client *cl_next = client_next_in_session(cl);
+        client_remove_from_session(cl);
+        client_destroy(cl);
+        cl = cl_next;
+    }
+    se->clients = 0;
+}
+
 // Associates a set of clients with a session;
 // Note: Session-databases represent databases with per-session 
 // setting overrides
 static int select_targets(struct session *se, const char *filter)
 {
-    while (se->clients)
-        client_destroy(se->clients);
-
+    session_remove_clients(se);
     return session_grep_databases(se, filter, select_targets_callback);
 }
 
@@ -634,8 +645,8 @@ void destroy_session(struct session *s)
 {
     struct session_database *sdb;
 
-    while (s->clients)
-        client_destroy(s->clients);
+    session_remove_clients(s);
+
     for (sdb = s->databases; sdb; sdb = sdb->next)
         session_database_destroy(sdb);
     normalize_cache_destroy(s->normalize_cache);
@@ -1098,27 +1109,27 @@ static int ingest_to_cluster(struct client *cl,
 int ingest_record(struct client *cl, const char *rec,
                   int record_no, NMEM nmem)
 {
-    struct session_database *sdb = client_get_database(cl);
     struct session *se = client_get_session(cl);
+    int ret;
+    struct session_database *sdb = client_get_database(cl);
     struct conf_service *service = se->service;
     xmlDoc *xdoc = normalize_record(sdb, service, rec, nmem);
     xmlNode *root;
     const char *mergekey_norm;
-    int ret;
-
+    
     if (!xdoc)
         return -1;
-
+    
     root = xmlDocGetRootElement(xdoc);
-
+    
     if (!check_record_filter(root, sdb))
     {
         yaz_log(YLOG_WARN, "Filtered out record no %d from %s", record_no,
-            sdb->database->url);
+                sdb->database->url);
         xmlFreeDoc(xdoc);
         return -1;
     }
-
+    
     mergekey_norm = get_mergekey(xdoc, cl, record_no, service, nmem);
     if (!mergekey_norm)
     {
@@ -1129,9 +1140,8 @@ int ingest_record(struct client *cl, const char *rec,
     session_enter(se);
     ret = ingest_to_cluster(cl, xdoc, root, record_no, mergekey_norm);
     session_leave(se);
-
+    
     xmlFreeDoc(xdoc);
-
     return ret;
 }