Work on bug #1192: block=1, session_watch problems. The http_channel
authorAdam Dickmeiss <adam@indexdata.dk>
Fri, 15 Jun 2007 19:35:17 +0000 (19:35 +0000)
committerAdam Dickmeiss <adam@indexdata.dk>
Fri, 15 Jun 2007 19:35:17 +0000 (19:35 +0000)
now includes an observer interface so that objects using it
can be notified when it's destroyed. This is combined with the
watch mechanism. Tests shows that this change fixes first problem
as described in bug #1192. Second problem is also fixed. If a watch
is already registered on show records, subsequent requests will not
block. The third and fourth problems are not solved by this. While
the show raw (pass through) does not uses session watch, it is still
fixed to use the http_channel observer. This commit also fixes two
leaks.

src/client.c
src/client.h
src/http.c
src/http.h
src/http_command.c
src/logic.c
src/pazpar2.h

index adfdcf9..d546c72 100644 (file)
@@ -1,4 +1,4 @@
-/* $Id: client.c,v 1.10 2007-06-15 06:55:16 adam Exp $
+/* $Id: client.c,v 1.11 2007-06-15 19:35:17 adam Exp $
    Copyright (c) 2006-2007, Index Data.
 
 This file is part of Pazpar2.
@@ -162,12 +162,12 @@ void client_set_requestid(struct client *cl, int id)
     cl->requestid = id;
 }
 
-int client_show_raw(struct client *cl, int position,
-                    const char *syntax, const char *esn,
-                    void *data,
-                    void (*error_handler)(void *data, const char *addinfo),
-                    void (*record_handler)(void *data, const char *buf,
-                                           size_t sz))
+int client_show_raw_begin(struct client *cl, int position,
+                          const char *syntax, const char *esn,
+                          void *data,
+                          void (*error_handler)(void *data, const char *addinfo),
+                          void (*record_handler)(void *data, const char *buf,
+                                                 size_t sz))
 {
     if (cl->show_raw)
         return -1;
@@ -189,13 +189,18 @@ int client_show_raw(struct client *cl, int position,
     return 0;
 }
 
+void client_show_raw_reset(struct client *cl)
+{
+    xfree(cl->show_raw);
+    cl->show_raw = 0;
+}
+
 static void client_show_raw_error(struct client *cl, const char *addinfo)
 {
     if (cl->show_raw)
     {
         cl->show_raw->error_handler(cl->show_raw->data, addinfo);
-        xfree(cl->show_raw);
-        cl->show_raw = 0;
+        client_show_raw_reset(cl);
     }
 }
 
@@ -204,8 +209,7 @@ static void client_show_raw_cancel(struct client *cl)
     if (cl->show_raw)
     {
         cl->show_raw->error_handler(cl->show_raw->data, "cancel");
-        xfree(cl->show_raw);
-        cl->show_raw = 0;
+        client_show_raw_reset(cl);
     }
 }
 
@@ -217,7 +221,7 @@ void client_send_raw_present(struct client *cl)
 
     assert(cl->show_raw);
 
-    yaz_log(YLOG_LOG, "Trying to present %d record(s) from %d",
+    yaz_log(YLOG_DEBUG, "Trying to present %d record(s) from %d",
             toget, start);
 
     a->u.presentRequest->resultSetStartPoint = &start;
@@ -430,11 +434,12 @@ static void ingest_raw_records(struct client *cl, Z_Records *r)
     }
 
     xmlDocDumpMemory(doc, &buf_out, &len_out);
+    xmlFreeDoc(doc);
 
     cl->show_raw->record_handler(cl->show_raw->data,
                                  (const char *) buf_out, len_out);
     
-    xmlFreeDoc(doc);
+    xmlFree(buf_out);
     xfree(cl->show_raw);
     cl->show_raw = 0;
 }
@@ -706,6 +711,8 @@ void client_destroy(struct client *c)
         if (cc)
             cc->next = c->next;
     }
+    xfree(c->pquery);
+
     if (c->connection)
         connection_release(c->connection);
     c->next = client_freelist;
index f01e980..f9d5b64 100644 (file)
@@ -1,4 +1,4 @@
-/* $Id: client.h,v 1.2 2007-06-15 06:45:39 adam Exp $
+/* $Id: client.h,v 1.3 2007-06-15 19:35:17 adam Exp $
    Copyright (c) 2006-2007, Index Data.
 
 This file is part of Pazpar2.
@@ -43,12 +43,13 @@ enum client_state
     Client_Stopped
 };
 
-int client_show_raw(struct client *cl, int position,
-                    const char *syntax, const char *esn,
-                    void *data,
-                    void (*error_handler)(void *data, const char *addinfo),
-                    void (*record_handler)(void *data, const char *buf,
-                                           size_t sz));
+int client_show_raw_begin(struct client *cl, int position,
+                          const char *syntax, const char *esn,
+                          void *data,
+                          void (*error_handler)(void *data, const char *addinfo),
+                          void (*record_handler)(void *data, const char *buf,
+                                                 size_t sz));
+void client_show_raw_reset(struct client *cl);
 
 const char *client_get_state_str(struct client *cl);
 enum client_state client_get_state(struct client *cl);
index 8184be7..a9a9054 100644 (file)
@@ -1,4 +1,4 @@
-/* $Id: http.c,v 1.33 2007-06-04 14:44:22 adam Exp $
+/* $Id: http.c,v 1.34 2007-06-15 19:35:17 adam Exp $
    Copyright (c) 2006-2007, Index Data.
 
 This file is part of Pazpar2.
@@ -63,6 +63,13 @@ static char myurl[256] = "";
 static struct http_buf *http_buf_freelist = 0;
 static struct http_channel *http_channel_freelist = 0;
 
+struct http_channel_observer_s {
+    void *data;
+    void (*destroy)(void *data, struct http_channel *chan);
+    struct http_channel_observer_s *next;
+    struct http_channel *chan;
+};
+
 static struct http_buf *http_buf_create()
 {
     struct http_buf *r;
@@ -701,12 +708,9 @@ static void http_io(IOCHAN i, int event)
                 http_destroy(i);
                 return;
             }
-            if (res > 0)
-            {
-                htbuf->buf[res] = '\0';
-                htbuf->len = res;
-                http_buf_enqueue(&hc->iqueue, htbuf);
-            }
+            htbuf->buf[res] = '\0';
+            htbuf->len = res;
+            http_buf_enqueue(&hc->iqueue, htbuf);
 
             if (hc->state == Http_Busy)
                 return;
@@ -909,6 +913,9 @@ static void proxy_io(IOCHAN pi, int event)
     }
 }
 
+static void http_fire_observers(struct http_channel *c);
+static void http_destroy_observers(struct http_channel *c);
+
 // Cleanup channel
 static void http_destroy(IOCHAN i)
 {
@@ -924,6 +931,10 @@ static void http_destroy(IOCHAN i)
         http_buf_destroy_queue(s->proxy->oqueue);
         xfree(s->proxy);
     }
+    http_buf_destroy_queue(s->iqueue);
+    http_buf_destroy_queue(s->oqueue);
+    http_fire_observers(s);
+    http_destroy_observers(s);
     s->next = http_channel_freelist;
     http_channel_freelist = s;
     close(iochan_getfd(i));
@@ -958,6 +969,7 @@ static struct http_channel *http_create(const char *addr)
         exit(1);
     }
     strcpy(r->addr, addr);
+    r->observers = 0;
     return r;
 }
 
@@ -1091,6 +1103,55 @@ void http_set_proxyaddr(char *host, char *base_url)
     proxy_addr->sin_port = htons(port);
 }
 
+static void http_fire_observers(struct http_channel *c)
+{
+    http_channel_observer_t p = c->observers;
+    while (p)
+    {
+        p->destroy(p->data, c);
+        p = p->next;
+    }
+}
+
+static void http_destroy_observers(struct http_channel *c)
+{
+    while (c->observers)
+    {
+        http_channel_observer_t obs = c->observers;
+        c->observers = obs->next;
+        xfree(obs);
+    }
+}
+
+http_channel_observer_t http_add_observer(struct http_channel *c, void *data,
+                                          http_channel_destroy_t des)
+{
+    http_channel_observer_t obs = xmalloc(sizeof(*obs));
+    obs->chan = c;
+    obs->data = data;
+    obs->destroy= des;
+    obs->next = c->observers;
+    c->observers = obs;
+    return obs;
+}
+
+void http_remove_observer(http_channel_observer_t obs)
+{
+    struct http_channel *c = obs->chan;
+    http_channel_observer_t found, *p = &c->observers;
+    while (*p != obs)
+        p = &(*p)->next;
+    found = *p;
+    assert(found);
+    *p = (*p)->next;
+    xfree(found);
+}
+
+struct http_channel *http_channel_observer_chan(http_channel_observer_t obs)
+{
+    return obs->chan;
+}
+
 /*
  * Local variables:
  * c-basic-offset: 4
index 8a6ac1d..34ef41a 100644 (file)
@@ -1,4 +1,4 @@
-/* $Id: http.h,v 1.8 2007-04-15 00:35:57 quinn Exp $
+/* $Id: http.h,v 1.9 2007-06-15 19:35:17 adam Exp $
    Copyright (c) 2006-2007, Index Data.
 
 This file is part of Pazpar2.
@@ -32,6 +32,8 @@ struct http_buf
     struct http_buf *next;
 };
 
+typedef struct http_channel_observer_s *http_channel_observer_t;
+
 struct http_channel
 {
     IOCHAN iochan;
@@ -50,6 +52,7 @@ struct http_channel
     struct http_response *response;
     struct http_channel *next; // for freelist
     char addr[20]; // forwarded address
+    http_channel_observer_t observers;
 };
 
 struct http_proxy //  attached to iochan for proxy connection
@@ -108,6 +111,15 @@ struct http_response *http_create_response(struct http_channel *c);
 void http_send_response(struct http_channel *c);
 void urlencode(const char *i, char *o);
 
+typedef void (*http_channel_destroy_t)(void *data, struct http_channel *c);
+
+http_channel_observer_t http_add_observer(struct http_channel *c, void *data,
+                                          http_channel_destroy_t);
+
+void http_remove_observer(http_channel_observer_t obs);
+struct http_channel *http_channel_observer_chan(http_channel_observer_t obs);
+#endif
+
 /*
  * Local variables:
  * c-basic-offset: 4
@@ -115,4 +127,3 @@ void urlencode(const char *i, char *o);
  * End:
  * vim: shiftwidth=4 tabstop=8 expandtab
  */
-#endif
index ffc9b81..9c56046 100644 (file)
@@ -1,4 +1,4 @@
-/* $Id: http_command.c,v 1.53 2007-06-15 06:45:39 adam Exp $
+/* $Id: http_command.c,v 1.54 2007-06-15 19:35:17 adam Exp $
    Copyright (c) 2006-2007, Index Data.
 
 This file is part of Pazpar2.
@@ -20,7 +20,7 @@ Free Software Foundation, 59 Temple Place - Suite 330, Boston, MA
  */
 
 /*
- * $Id: http_command.c,v 1.53 2007-06-15 06:45:39 adam Exp $
+ * $Id: http_command.c,v 1.54 2007-06-15 19:35:17 adam Exp $
  */
 
 #include <stdio.h>
@@ -489,22 +489,34 @@ static void write_subrecord(struct record *r, WRBUF w,
 
 static void show_raw_record_error(void *data, const char *addinfo)
 {
-    struct http_channel *c = (struct http_channel *) data;
+    http_channel_observer_t obs = data;
+    struct http_channel *c = http_channel_observer_chan(obs);
     struct http_response *rs = c->response;
 
+    http_remove_observer(obs);
+
     error(rs, PAZPAR2_NOT_IMPLEMENTED, addinfo);
 }
 
 static void show_raw_record_ok(void *data, const char *buf, size_t sz)
 {
-    struct http_channel *c = (struct http_channel *) data;
+    http_channel_observer_t obs = data;
+    struct http_channel *c = http_channel_observer_chan(obs);
     struct http_response *rs = c->response;
 
+    http_remove_observer(obs);
+
     wrbuf_write(c->wrbuf, buf, sz);
     rs->payload = nmem_strdup(c->nmem, wrbuf_cstr(c->wrbuf));
     http_send_response(c);
 }
 
+void show_raw_reset(void *data, struct http_channel *c)
+{
+    struct client *client = data;
+    client_show_raw_reset(client);
+}
+
 static void cmd_record(struct http_channel *c)
 {
     struct http_response *rs = c->response;
@@ -547,13 +559,19 @@ static void cmd_record(struct http_channel *c)
             error(rs, PAZPAR2_RECORD_FAIL, "no record at offset given");
             return;
         }
-        if (client_show_raw(r->client, r->position, syntax, esn, 
-                            c /* data */,
-                            show_raw_record_error,
-                            show_raw_record_ok))
+        else
         {
-            error(rs, PAZPAR2_RECORD_FAIL, "invalid parameters");
-            return;
+            http_channel_observer_t obs =
+                http_add_observer(c, r->client, show_raw_reset);
+            if (client_show_raw_begin(r->client, r->position, syntax, esn, 
+                                      obs /* data */,
+                                      show_raw_record_error,
+                                      show_raw_record_ok))
+            {
+                http_remove_observer(obs);
+                error(rs, PAZPAR2_RECORD_FAIL, "invalid parameters");
+                return;
+            }
         }
     }
     else
@@ -660,9 +678,14 @@ static void cmd_show(struct http_channel *c)
     {
         if (status && (!s->psession->reclist || !s->psession->reclist->num_records))
         {
-            session_set_watch(s->psession, SESSION_WATCH_RECORDS, show_records_ready, c);
-            yaz_log(YLOG_DEBUG, "Blocking on cmd_show");
-            return;
+            // if there is already a watch/block. we do not block this one
+            if (session_set_watch(s->psession,
+                                  SESSION_WATCH_RECORDS,
+                                  show_records_ready, c, c) == 0)
+            {
+                yaz_log(YLOG_DEBUG, "Blocking on cmd_show");
+                return;
+            }
         }
     }
 
index 05527bc..827d947 100644 (file)
@@ -1,4 +1,4 @@
-/* $Id: logic.c,v 1.44 2007-06-15 06:45:39 adam Exp $
+/* $Id: logic.c,v 1.45 2007-06-15 19:35:17 adam Exp $
    Copyright (c) 2006-2007, Index Data.
 
 This file is part of Pazpar2.
@@ -384,21 +384,39 @@ static int prepare_session_database(struct session *se,
     return 0;
 }
 
+// called if watch should be removed because http_channel is to be destroyed
+static void session_watch_cancel(void *data, struct http_channel *c)
+{
+    struct session_watchentry *ent = data;
+
+    ent->fun = 0;
+    ent->data = 0;
+    ent->obs = 0;
+}
 
-void session_set_watch(struct session *s, int what, 
-                       session_watchfun fun, void *data)
+// set watch. Returns 0=OK, -1 if watch is already set
+int session_set_watch(struct session *s, int what, 
+                      session_watchfun fun, void *data,
+                      struct http_channel *chan)
 {
+    if (s->watchlist[what].fun)
+        return -1;
     s->watchlist[what].fun = fun;
     s->watchlist[what].data = data;
+    s->watchlist[what].obs = http_add_observer(chan, &s->watchlist[what],
+                                               session_watch_cancel);
+    return 0;
 }
 
 void session_alert_watch(struct session *s, int what)
 {
     if (!s->watchlist[what].fun)
         return;
+    http_remove_observer(s->watchlist[what].obs);
     (*s->watchlist[what].fun)(s->watchlist[what].data);
     s->watchlist[what].fun = 0;
     s->watchlist[what].data = 0;
+    s->watchlist[what].obs = 0;
 }
 
 //callback for grep_databases
index 0776874..58de6e9 100644 (file)
@@ -1,4 +1,4 @@
-/* $Id: pazpar2.h,v 1.41 2007-06-15 06:45:39 adam Exp $
+/* $Id: pazpar2.h,v 1.42 2007-06-15 19:35:17 adam Exp $
    Copyright (c) 2006-2007, Index Data.
 
 This file is part of Pazpar2.
@@ -39,6 +39,7 @@ Free Software Foundation, 59 Temple Place - Suite 330, Boston, MA
 #include "eventl.h"
 #include "config.h"
 #include "parameters.h"
+#include "http.h"
 
 struct record;
 struct client;
@@ -116,6 +117,12 @@ struct named_termlist
     struct termlist *termlist;
 };
 
+struct session_watchentry {
+    void *data;
+    http_channel_observer_t obs;
+    session_watchfun fun;
+};
+
 // End-user session
 struct session {
     struct session_database *databases;  // All databases, settings overriden
@@ -128,10 +135,7 @@ struct session {
     struct named_termlist termlists[SESSION_MAX_TERMLISTS];
     struct relevance *relevance;
     struct reclist *reclist;
-    struct {
-        void *data;
-        session_watchfun fun;
-    } watchlist[SESSION_WATCH_MAX + 1];
+    struct session_watchentry watchlist[SESSION_WATCH_MAX + 1];
     int expected_maxrecs;
     int total_hits;
     int total_records;
@@ -174,14 +178,13 @@ struct record_cluster **show(struct session *s, struct reclist_sortparms *sp, in
         int *num, int *total, int *sumhits, NMEM nmem_show);
 struct record_cluster *show_single(struct session *s, int id);
 struct termlist_score **termlist(struct session *s, const char *name, int *num);
-void session_set_watch(struct session *s, int what, session_watchfun fun, void *data);
+int session_set_watch(struct session *s, int what, session_watchfun fun, void *data, struct http_channel *c);
 int session_active_clients(struct session *s);
 void session_apply_setting(struct session *se, char *dbname, char *setting, char *value);
 char *session_setting_oneval(struct session_database *db, int offset);
 
 void start_http_listener(void);
 void start_proxy(void);
-//void start_zproxy(void);
 
 void pazpar2_add_channel(IOCHAN c);
 void pazpar2_event_loop(void);