HTTP free buffers handled by http_server_t
authorAdam Dickmeiss <adam@indexdata.dk>
Wed, 10 Feb 2010 13:54:30 +0000 (14:54 +0100)
committerAdam Dickmeiss <adam@indexdata.dk>
Wed, 10 Feb 2010 13:54:30 +0000 (14:54 +0100)
Two non-thread safe lists now handled by http_server_t.

src/http.c
src/http.h
src/pazpar2.c
src/pazpar2_config.c
src/pazpar2_config.h

index 05198d6..1e73f7c 100644 (file)
@@ -61,6 +61,7 @@ typedef int socklen_t;
 #include <yaz/yaz-util.h>
 #include <yaz/comstack.h>
 #include <yaz/nmem.h>
+#include <yaz/mutex.h>
 
 #include "pazpar2.h"
 #include "http.h"
@@ -72,13 +73,33 @@ typedef int socklen_t;
 #define strcasecmp _stricmp
 #endif
 
+struct http_buf
+{
+#define HTTP_BUF_SIZE 4096
+    char buf[4096];
+    int offset;
+    int len;
+    struct http_buf *next;
+};
+
+
 static void proxy_io(IOCHAN i, int event);
-static struct http_channel *http_create(const char *addr,
+static struct http_channel *http_create(http_server_t http_server,
+                                        const char *addr,
                                         struct conf_server *server);
 static void http_destroy(IOCHAN i);
+static http_server_t http_server_create(void);
+static void http_server_incref(http_server_t hs);
 
-static struct http_buf *http_buf_freelist = 0;        /* thread pr */
-static struct http_channel *http_channel_freelist = 0; /* thread pr */
+struct http_server
+{
+    struct http_buf *http_buf_freelist;
+    struct http_channel *http_channel_freelist;
+    YAZ_MUTEX mutex;
+    int listener_socket;
+    int ref_count;
+    struct sockaddr_in *proxy_addr;
+};
 
 struct http_channel_observer_s {
     void *data;
@@ -98,16 +119,18 @@ const char *http_lookup_header(struct http_header *header,
     return 0;
 }
 
-static struct http_buf *http_buf_create(void)
+static struct http_buf *http_buf_create(http_server_t hs)
 {
-    struct http_buf *r;
+    struct http_buf *r = 0;
 
-    if (http_buf_freelist)
+    yaz_mutex_enter(hs->mutex);
+    if (hs->http_buf_freelist)
     {
-        r = http_buf_freelist;
-        http_buf_freelist = http_buf_freelist->next;
+        r = hs->http_buf_freelist;
+        hs->http_buf_freelist = hs->http_buf_freelist->next;
     }
-    else
+    yaz_mutex_leave(hs->mutex);
+    if (!r)
         r = xmalloc(sizeof(struct http_buf));
     r->offset = 0;
     r->len = 0;
@@ -115,24 +138,26 @@ static struct http_buf *http_buf_create(void)
     return r;
 }
 
-static void http_buf_destroy(struct http_buf *b)
+static void http_buf_destroy(http_server_t hs, struct http_buf *b)
 {
-    b->next = http_buf_freelist;
-    http_buf_freelist = b;
+    yaz_mutex_enter(hs->mutex);
+    b->next = hs->http_buf_freelist;
+    hs->http_buf_freelist = b;
+    yaz_mutex_leave(hs->mutex);
 }
 
-static void http_buf_destroy_queue(struct http_buf *b)
+static void http_buf_destroy_queue(http_server_t hs, struct http_buf *b)
 {
     struct http_buf *p;
     while (b)
     {
         p = b->next;
-        http_buf_destroy(b);
+        http_buf_destroy(hs, b);
         b = p;
     }
 }
 
-static struct http_buf *http_buf_bybuf(char *b, int len)
+static struct http_buf *http_buf_bybuf(http_server_t hs, char *b, int len)
 {
     struct http_buf *res = 0;
     struct http_buf **p = &res;
@@ -142,7 +167,7 @@ static struct http_buf *http_buf_bybuf(char *b, int len)
         int tocopy = len;
         if (tocopy > HTTP_BUF_SIZE)
             tocopy = HTTP_BUF_SIZE;
-        *p = http_buf_create();
+        *p = http_buf_create(hs);
         memcpy((*p)->buf, b, tocopy);
         (*p)->len = tocopy;
         len -= tocopy;
@@ -160,10 +185,10 @@ static void http_buf_enqueue(struct http_buf **queue, struct http_buf *b)
     *queue = b;
 }
 
-static struct http_buf *http_buf_bywrbuf(WRBUF wrbuf)
+static struct http_buf *http_buf_bywrbuf(http_server_t hs, WRBUF wrbuf)
 {
     // Heavens to Betsy (buf)!
-    return http_buf_bybuf(wrbuf_buf(wrbuf), wrbuf_len(wrbuf));
+    return http_buf_bybuf(hs, wrbuf_buf(wrbuf), wrbuf_len(wrbuf));
 }
 
 // Non-destructively collapse chain of buffers into a string (max *len)
@@ -192,7 +217,8 @@ static int http_buf_size(struct http_buf *b)
 }
 
 // Ddestructively munch up to len  from head of queue.
-static int http_buf_read(struct http_buf **b, char *buf, int len)
+static int http_buf_read(http_server_t hs,
+                         struct http_buf **b, char *buf, int len)
 {
     int rd = 0;
     while ((*b) && rd < len)
@@ -211,7 +237,7 @@ static int http_buf_read(struct http_buf **b, char *buf, int len)
         else
         {
             struct http_buf *n = (*b)->next;
-            http_buf_destroy(*b);
+            http_buf_destroy(hs, *b);
             *b = n;
         }
     }
@@ -466,7 +492,7 @@ struct http_request *http_parse_request(struct http_channel *c,
     char *start = nmem_malloc(c->nmem, len+1);
     char *buf = start;
 
-    if (http_buf_read(queue, buf, len) < len)
+    if (http_buf_read(c->http_server, queue, buf, len) < len)
     {
         yaz_log(YLOG_WARN, "http_buf_read < len (%d)", len);
         return 0;
@@ -631,7 +657,7 @@ static struct http_buf *http_serialize_response(struct http_channel *c,
     if (r->payload)
         wrbuf_puts(c->wrbuf, r->payload);
 
-    return http_buf_bywrbuf(c->wrbuf);
+    return http_buf_bywrbuf(c->http_server, c->wrbuf);
 }
 
 // Serialize a HTTP request
@@ -658,14 +684,14 @@ static struct http_buf *http_serialize_request(struct http_request *r)
     yaz_log(YLOG_LOG, "WRITING TO PROXY:\n%s\n----",
             wrbuf_cstr(c->wrbuf));
 #endif
-    return http_buf_bywrbuf(c->wrbuf);
+    return http_buf_bywrbuf(c->http_server, c->wrbuf);
 }
 
 
 static int http_weshouldproxy(struct http_request *rq)
 {
     struct http_channel *c = rq->channel;
-    if (c->server->proxy_addr && !strstr(rq->path, "search.pz2"))
+    if (c->server->http_server->proxy_addr && !strstr(rq->path, "search.pz2"))
         return 1;
     return 0;
 }
@@ -754,8 +780,9 @@ static int http_proxy(struct http_request *rq)
                         &one, sizeof(one)) < 0)
             abort();
         enable_nonblock(sock);
-        if (connect(sock, (struct sockaddr *) c->server->proxy_addr, 
-                    sizeof(*c->server->proxy_addr)) < 0)
+        if (connect(sock, (struct sockaddr *)
+                    c->server->http_server->proxy_addr, 
+                    sizeof(*c->server->http_server->proxy_addr)) < 0)
         {
             if (!is_inprogress()) 
             {
@@ -855,16 +882,16 @@ static void http_io(IOCHAN i, int event)
         struct http_buf *htbuf;
 
         case EVENT_INPUT:
-            htbuf = http_buf_create();
+            htbuf = http_buf_create(hc->http_server);
             res = recv(iochan_getfd(i), htbuf->buf, HTTP_BUF_SIZE -1, 0);
             if (res == -1 && errno == EAGAIN)
             {
-                http_buf_destroy(htbuf);
+                http_buf_destroy(hc->http_server, htbuf);
                 return;
             }
             if (res <= 0)
             {
-                http_buf_destroy(htbuf);
+                http_buf_destroy(hc->http_server, htbuf);
                 http_destroy(i);
                 return;
             }
@@ -918,7 +945,7 @@ static void http_io(IOCHAN i, int event)
                 if (res == wb->len)
                 {
                     hc->oqueue = hc->oqueue->next;
-                    http_buf_destroy(wb);
+                    http_buf_destroy(hc->http_server, wb);
                 }
                 else
                 {
@@ -961,7 +988,7 @@ static void proxy_io(IOCHAN pi, int event)
         struct http_buf *htbuf;
 
         case EVENT_INPUT:
-            htbuf = http_buf_create();
+            htbuf = http_buf_create(hc->http_server);
             res = recv(iochan_getfd(pi), htbuf->buf, HTTP_BUF_SIZE -1, 0);
             if (res == 0 || (res < 0 && !is_inprogress()))
             {
@@ -969,7 +996,7 @@ static void proxy_io(IOCHAN pi, int event)
                 {
                     yaz_log(YLOG_WARN, "Proxy read came up short");
                     // Close channel and alert client HTTP channel that we're gone
-                    http_buf_destroy(htbuf);
+                    http_buf_destroy(hc->http_server, htbuf);
 #ifdef WIN32
                     closesocket(iochan_getfd(pi));
 #else
@@ -1011,7 +1038,7 @@ static void proxy_io(IOCHAN pi, int event)
             if (res == htbuf->len)
             { 
                 struct http_buf *np = htbuf->next;
-                http_buf_destroy(htbuf);
+                http_buf_destroy(hc->http_server, htbuf);
                 pc->oqueue = np;
             }
             else
@@ -1037,6 +1064,7 @@ static void http_destroy_observers(struct http_channel *c);
 static void http_destroy(IOCHAN i)
 {
     struct http_channel *s = iochan_getdata(i);
+    http_server_t http_server;
 
     if (s->proxy)
     {
@@ -1049,15 +1077,23 @@ static void http_destroy(IOCHAN i)
 #endif
             iochan_destroy(s->proxy->iochan);
         }
-        http_buf_destroy_queue(s->proxy->oqueue);
+        http_buf_destroy_queue(s->http_server, s->proxy->oqueue);
         xfree(s->proxy);
     }
-    http_buf_destroy_queue(s->iqueue);
-    http_buf_destroy_queue(s->oqueue);
+    http_buf_destroy_queue(s->http_server, s->iqueue);
+    http_buf_destroy_queue(s->http_server, s->oqueue);
     http_fire_observers(s);
     http_destroy_observers(s);
-    s->next = http_channel_freelist;
-    http_channel_freelist = s;
+
+    http_server = s->http_server; /* save it for destroy (decref) */
+
+    yaz_mutex_enter(s->http_server->mutex);
+    s->next = s->http_server->http_channel_freelist;
+    s->http_server->http_channel_freelist = s;
+    yaz_mutex_leave(s->http_server->mutex);
+
+    http_server_destroy(http_server);
+
 #ifdef WIN32
     closesocket(iochan_getfd(i));
 #else
@@ -1066,14 +1102,20 @@ static void http_destroy(IOCHAN i)
     iochan_destroy(i);
 }
 
-static struct http_channel *http_create(const char *addr,
+static struct http_channel *http_create(http_server_t hs,
+                                        const char *addr,
                                         struct conf_server *server)
 {
-    struct http_channel *r = http_channel_freelist;
+    struct http_channel *r;
+
+    yaz_mutex_enter(hs->mutex);
+    r = hs->http_channel_freelist;
+    if (r)
+        hs->http_channel_freelist = r->next;
+    yaz_mutex_leave(hs->mutex);
 
     if (r)
     {
-        http_channel_freelist = r->next;
         nmem_reset(r->nmem);
         wrbuf_rewind(r->wrbuf);
     }
@@ -1083,6 +1125,8 @@ static struct http_channel *http_create(const char *addr,
         r->nmem = nmem_create();
         r->wrbuf = wrbuf_alloc();
     }
+    http_server_incref(hs);
+    r->http_server = hs;
     r->server = server;
     r->proxy = 0;
     r->iochan = 0;
@@ -1124,7 +1168,7 @@ static void http_accept(IOCHAN i, int event)
     yaz_log(YLOG_DEBUG, "New command connection");
     c = iochan_create(s, http_io, EVENT_INPUT | EVENT_EXCEPT);
     
-    ch = http_create(inet_ntoa(addr.sin_addr), server);
+    ch = http_create(server->http_server, inet_ntoa(addr.sin_addr), server);
     ch->iochan = c;
     iochan_setdata(c, ch);
 
@@ -1141,6 +1185,7 @@ int http_init(const char *addr, struct conf_server *server)
     int one = 1;
     const char *pp;
     short port;
+    http_server_t http_server;
 
     yaz_log(YLOG_LOG, "HTTP listener %s", addr);
 
@@ -1193,7 +1238,10 @@ int http_init(const char *addr, struct conf_server *server)
         return 1;
     }
 
-    server->listener_socket = l;
+    http_server = http_server_create();
+    server->http_server = http_server;
+
+    http_server->listener_socket = l;
 
     c = iochan_create(l, http_accept, EVENT_INPUT | EVENT_EXCEPT);
     iochan_setdata(c, server);
@@ -1204,12 +1252,12 @@ int http_init(const char *addr, struct conf_server *server)
 void http_close_server(struct conf_server *server)
 {
     /* break the event_loop (select) by closing down the HTTP listener sock */
-    if (server->listener_socket)
+    if (server->http_server->listener_socket)
     {
 #ifdef WIN32
-        closesocket(server->listener_socket);
+        closesocket(server->http_server->listener_socket);
 #else
-        close(server->listener_socket);
+        close(server->http_server->listener_socket);
 #endif
     }
 }
@@ -1242,10 +1290,11 @@ void http_set_proxyaddr(const char *host, struct conf_server *server)
     }
     wrbuf_destroy(w);
 
-    server->proxy_addr = xmalloc(sizeof(struct sockaddr_in));
-    server->proxy_addr->sin_family = he->h_addrtype;
-    memcpy(&server->proxy_addr->sin_addr.s_addr, he->h_addr_list[0], he->h_length);
-    server->proxy_addr->sin_port = htons(port);
+    server->http_server->proxy_addr = xmalloc(sizeof(struct sockaddr_in));
+    server->http_server->proxy_addr->sin_family = he->h_addrtype;
+    memcpy(&server->http_server->proxy_addr->sin_addr.s_addr,
+           he->h_addr_list[0], he->h_length);
+    server->http_server->proxy_addr->sin_port = htons(port);
 }
 
 static void http_fire_observers(struct http_channel *c)
@@ -1303,6 +1352,70 @@ void http_observer_set_data2(http_channel_observer_t obs, void *data2)
     obs->data2 = data2;
 }
 
+http_server_t http_server_create(void)
+{
+    http_server_t hs = xmalloc(sizeof(*hs));
+    hs->mutex = 0;
+    hs->proxy_addr = 0;
+    hs->ref_count = 1;
+    hs->http_buf_freelist = 0;
+    hs->http_channel_freelist = 0;
+    return hs;
+}
+
+void http_server_destroy(http_server_t hs)
+{
+    if (hs)
+    {
+        int r;
+
+        if (hs->mutex)
+        {
+            yaz_mutex_enter(hs->mutex);
+            r = --(hs->ref_count);
+            yaz_mutex_leave(hs->mutex);
+        }
+        else
+            r = --(hs->ref_count);
+        
+        if (r == 0)
+        {
+            struct http_buf *b = hs->http_buf_freelist;
+            struct http_channel *c = hs->http_channel_freelist;
+            while (b)
+            {
+                struct http_buf *b_next = b->next;
+                xfree(b);
+                b = b_next;
+            }
+            while (c)
+            {
+                struct http_channel *c_next = c->next;
+                xfree(c);
+                c = c_next;
+            }
+            xfree(hs->proxy_addr);
+            yaz_mutex_destroy(&hs->mutex);
+            xfree(hs);
+        }
+    }
+}
+
+void http_server_incref(http_server_t hs)
+{
+    assert(hs);
+    yaz_mutex_enter(hs->mutex);
+    (hs->ref_count)++;
+    yaz_mutex_leave(hs->mutex);
+}
+
+void http_mutex_init(struct conf_server *server)
+{
+    assert(server);
+
+    assert(server->http_server->mutex == 0);
+    yaz_mutex_create(&server->http_server->mutex);
+}
 
 /*
  * Local variables:
index 2a5d1fe..e075e0c 100644 (file)
@@ -22,17 +22,11 @@ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
 
 #include "eventl.h"
 // Generic I/O buffer
-struct http_buf
-{
-#define HTTP_BUF_SIZE 4096
-    char buf[4096];
-    int offset;
-    int len;
-    struct http_buf *next;
-};
-
+struct http_buf;
 typedef struct http_channel_observer_s *http_channel_observer_t;
 
+typedef struct http_server *http_server_t;
+
 struct http_channel
 {
     IOCHAN iochan;
@@ -54,6 +48,7 @@ struct http_channel
     char addr[20]; // forwarded address
     http_channel_observer_t observers;
     struct conf_server *server;
+    http_server_t http_server;
 };
 
 struct http_proxy //  attached to iochan for proxy connection
@@ -101,6 +96,9 @@ struct http_response
     char *content_type;
 };
 
+void http_mutex_init(struct conf_server *server);
+void http_server_destroy(http_server_t hs);
+
 void http_set_proxyaddr(const char *url, struct conf_server *ser);
 int http_init(const char *addr, struct conf_server *ser);
 void http_close_server(struct conf_server *ser);
index 4905399..88b549d 100644 (file)
@@ -43,6 +43,8 @@ void child_handler(void *data)
     config_start_databases(config);
 
     pazpar2_event_loop();
+
+    config_destroy(config);
 }
 
 static void show_version(void)
index bcb415a..8943b20 100644 (file)
@@ -240,8 +240,6 @@ void service_destroy(struct conf_service *service)
 {
     if (service)
     {
-        yaz_log(YLOG_LOG, "service_destroy. p=%p cnt=%d", service,
-                service->ref_count);
         if (!pazpar2_decref(&service->ref_count, service->mutex))
         {
             pp2_charset_destroy(service->relevance_pct);
@@ -713,7 +711,6 @@ static struct conf_server *server_create(struct conf_config *config,
     server->proxy_host = 0;
     server->proxy_port = 0;
     server->myurl = 0;
-    server->proxy_addr = 0;
     server->service = 0;
     server->config = config;
     server->next = 0;
@@ -721,6 +718,7 @@ static struct conf_server *server_create(struct conf_config *config,
     server->sort_pct = 0;
     server->mergekey_pct = 0;
     server->server_settings = 0;
+    server->http_server = 0;
 
     if (server_id)
     {
@@ -1011,6 +1009,8 @@ void server_destroy(struct conf_server *server)
     pp2_charset_destroy(server->relevance_pct);
     pp2_charset_destroy(server->sort_pct);
     pp2_charset_destroy(server->mergekey_pct);
+    yaz_log(YLOG_LOG, "server_destroy server=%p", server);
+    http_server_destroy(server->http_server);
 }
 
 void config_destroy(struct conf_config *config)
@@ -1048,6 +1048,7 @@ void config_start_databases(struct conf_config *conf)
             assert(s->mutex == 0);
             yaz_mutex_create(&s->mutex);
         }
+        http_mutex_init(ser);
     }
 }
 
index 41d9431..1ae1ee4 100644 (file)
@@ -25,6 +25,7 @@ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
 #include <yaz/nmem.h>
 #include <yaz/mutex.h>
 #include "charsets.h"
+#include "http.h"
 
 enum conf_metadata_type {
     Metadata_type_generic,    // Generic text field
@@ -134,8 +135,6 @@ struct conf_server
     char *proxy_host;
     int proxy_port;
     char *myurl;
-    struct sockaddr_in *proxy_addr;
-    int listener_socket;
     char *server_settings;
     char *server_id;
 
@@ -145,6 +144,7 @@ struct conf_server
     struct conf_service *service;
     struct conf_server *next;
     struct conf_config *config;
+    http_server_t http_server;
 };
 
 struct conf_targetprofiles