X-Git-Url: http://git.indexdata.com/?a=blobdiff_plain;f=src%2Fyaz-pdu-assoc.cpp;h=0462f5ae61be58c905e34e04d20dbabac8b43da6;hb=355215b6f2c914f2eb099dda88828b8f0c8cb001;hp=6056b2d81f2a02004ea798b7080560e4e1c70792;hpb=bf377ba45c8c1cbcf843fdecc6d5c68fda6bad18;p=yazpp-moved-to-github.git diff --git a/src/yaz-pdu-assoc.cpp b/src/yaz-pdu-assoc.cpp index 6056b2d..0462f5a 100644 --- a/src/yaz-pdu-assoc.cpp +++ b/src/yaz-pdu-assoc.cpp @@ -1,26 +1,78 @@ /* - * Copyright (c) 1998-1999, Index Data. + * Copyright (c) 1998-2000, Index Data. * See the file LICENSE for details. - * Sebastian Hammer, Adam Dickmeiss * * $Log: yaz-pdu-assoc.cpp,v $ - * Revision 1.1 1999-01-28 09:41:07 adam - * Initial revision + * Revision 1.17 2000-10-11 11:58:16 adam + * Moved header files to include/yaz++. Switched to libtool and automake. + * Configure script creates yaz++-config script. + * + * Revision 1.16 2000/09/22 09:54:11 heikki + * minor + * + * Revision 1.15 2000/09/21 21:43:20 adam + * Better high-level server API. + * + * Revision 1.14 2000/09/12 12:09:53 adam + * More work on high-level server. + * + * Revision 1.13 2000/09/08 10:23:42 adam + * Added skeleton of yaz-z-server. + * + * Revision 1.12 2000/09/06 14:23:45 adam + * WIN32 updates. + * + * Revision 1.11 2000/09/04 08:29:22 adam + * Fixed memory leak(s). Added re-use of associations, rather than + * re-init, when maximum number of targets are in use. + * + * Revision 1.10 2000/08/10 08:42:42 adam + * Fixes for {set,get}_APDU_log. + * + * Revision 1.9 1999/12/06 13:52:45 adam + * Modified for new location of YAZ header files. Experimental threaded + * operation. + * + * Revision 1.8 1999/04/28 13:04:03 adam + * Fixed setting of proxy otherInfo so that database(s) are removed. + * + * Revision 1.7 1999/04/21 12:09:01 adam + * Many improvements. Modified to proxy server to work with "sessions" + * based on cookies. + * + * Revision 1.6 1999/04/20 10:30:05 adam + * Implemented various stuff for client and proxy. Updated calls + * to ODR to reflect new name parameter. + *g + * Revision 1.5 1999/04/09 11:46:57 adam + * Added object Yaz_Z_Assoc. Much more functional client. + * + * Revision 1.4 1999/03/23 14:17:57 adam + * More work on timeout handling. Work on yaz-client. + * + * Revision 1.3 1999/02/02 14:01:20 adam + * First WIN32 port of YAZ++. + * + * Revision 1.2 1999/01/28 13:08:44 adam + * Yaz_PDU_Assoc better encapsulated. Memory leak fix in + * yaz-socket-manager.cc. + * + * Revision 1.1.1.1 1999/01/28 09:41:07 adam + * First implementation of YAZ++. * */ #include -#include +#include -#include -#include +#include +#include -Yaz_PDU_Assoc::Yaz_PDU_Assoc(IYazSocketObservable *socketObservable, - COMSTACK cs) +Yaz_PDU_Assoc::Yaz_PDU_Assoc(IYazSocketObservable *socketObservable) { m_state = Closed; - m_cs = cs; + m_cs = 0; m_socketObservable = socketObservable; m_PDU_Observer = 0; m_queue_out = 0; @@ -29,47 +81,21 @@ Yaz_PDU_Assoc::Yaz_PDU_Assoc(IYazSocketObservable *socketObservable, m_children = 0; m_parent = 0; m_next = 0; + m_destroyed = 0; + m_idleTime = 0; + m_log = LOG_DEBUG; } IYaz_PDU_Observable *Yaz_PDU_Assoc::clone() { - Yaz_PDU_Assoc *copy = new Yaz_PDU_Assoc(m_socketObservable, 0); + Yaz_PDU_Assoc *copy = new Yaz_PDU_Assoc(m_socketObservable); return copy; } -Yaz_PDU_Assoc::~Yaz_PDU_Assoc() -{ - Yaz_PDU_Assoc **c; - close(); - - logf (LOG_LOG, "m_children=%p m_parent=%p", m_children, - m_parent); - // delete from parent's child list (if any) - if (m_parent) - { - c = &m_parent->m_children; - while (*c != this) - { - assert (*c); - c = &(*c)->m_next; - } - *c = (*c)->m_next; - } - // delete all children ... - c = &m_children; - while (*c) - { - Yaz_PDU_Assoc *here = *c; - *c = (*c)->m_next; - here->m_parent = 0; - delete here; - } -} - void Yaz_PDU_Assoc::socketNotify(int event) { - logf (LOG_LOG, "socketNotify p=%p event = %d", this, event); - if (m_state == Connected) + logf (m_log, "Yaz_PDU_Assoc::socketNotify p=%p event = %d", this, event); + if (0 /* m_state == Connected */) { m_state = Ready; m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ| @@ -84,7 +110,11 @@ void Yaz_PDU_Assoc::socketNotify(int event) close(); m_PDU_Observer->failNotify(); } - else + else if (event & YAZ_SOCKET_OBSERVE_TIMEOUT) + { + m_PDU_Observer->timeoutNotify(); + } + else { m_state = Ready; m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ| @@ -95,7 +125,6 @@ void Yaz_PDU_Assoc::socketNotify(int event) } else if (m_state == Listen) { - logf (LOG_LOG, "handler_listen %d", event); if (event & YAZ_SOCKET_OBSERVE_READ) { int res; @@ -105,36 +134,32 @@ void Yaz_PDU_Assoc::socketNotify(int event) return; if (res < 0) { - logf(LOG_FATAL, "cs_listen failed"); + logf(LOG_FATAL|LOG_ERRNO, "cs_listen failed"); return; } if (!(new_line = cs_accept(m_cs))) return; - - Yaz_PDU_Assoc *assoc = new Yaz_PDU_Assoc (m_socketObservable, - new_line); - assoc->m_parent = this; - assoc->m_next = m_children; - m_children = assoc; - - assoc->m_PDU_Observer = m_PDU_Observer->clone(assoc); - assoc->m_state = Ready; - assoc->m_socketObservable->addObserver(cs_fileno(new_line), assoc); - assoc->m_socketObservable->maskObserver(assoc, - YAZ_SOCKET_OBSERVE_READ| - YAZ_SOCKET_OBSERVE_EXCEPT); + /* 1. create socket-manager + 2. create pdu-assoc + 3. create top-level object + setup observer for child fileid in pdu-assoc + 4. start thread + */ + int fd = cs_fileno(new_line); + logf (m_log, "accept ok fd = %d", fd); + cs_fileno(new_line) = -1; + cs_close (new_line); + childNotify(fd); } } else if (m_state == Ready) { if (event & YAZ_SOCKET_OBSERVE_WRITE) { - logf (LOG_LOG, "socketNotify write"); flush_PDU(); } if (event & YAZ_SOCKET_OBSERVE_READ) { - logf (LOG_LOG, "socketNotify read"); do { int res = cs_get (m_cs, &m_input_buf, &m_input_len); @@ -142,13 +167,24 @@ void Yaz_PDU_Assoc::socketNotify(int event) return; else if (res <= 0) { - logf (LOG_LOG, "Connection closed by server"); + logf (m_log, "Connection closed by peer"); close(); m_PDU_Observer->failNotify(); return; } + // lock it, so we know if recv_PDU deletes it. + int destroyed = 0; + m_destroyed = &destroyed; + m_PDU_Observer->recv_PDU(m_input_buf, res); - } while (cs_more (m_cs)); + m_destroyed = 0; + if (destroyed) // it really was destroyed, return now. + return; + } while (m_cs && cs_more (m_cs)); + } + if (event & YAZ_SOCKET_OBSERVE_TIMEOUT) + { + m_PDU_Observer->timeoutNotify(); } } } @@ -159,22 +195,50 @@ void Yaz_PDU_Assoc::close() m_state = Closed; if (m_cs) { - logf (LOG_LOG, "Yaz_PDU_Assoc::close fd=%d", cs_fileno(m_cs)); + logf (m_log, "Yaz_PDU_Assoc::close fd=%d", cs_fileno(m_cs)); cs_close (m_cs); } m_cs = 0; - PDU_Queue **q = &m_queue_out; - while (*q) + while (m_queue_out) { - PDU_Queue *q_this = *q; - *q = (*q)->m_next; + PDU_Queue *q_this = m_queue_out; + m_queue_out = m_queue_out->m_next; delete q_this; } - free (m_input_buf); + xfree (m_input_buf); m_input_buf = 0; m_input_len = 0; } +void Yaz_PDU_Assoc::destroy() +{ + close(); + if (m_destroyed) + *m_destroyed = 1; + Yaz_PDU_Assoc **c; + + // delete from parent's child list (if any) + if (m_parent) + { + c = &m_parent->m_children; + while (*c != this) + { + assert (*c); + c = &(*c)->m_next; + } + *c = (*c)->m_next; + } + // delete all children ... + c = &m_children; + while (*c) + { + Yaz_PDU_Assoc *here = *c; + *c = (*c)->m_next; + here->m_parent = 0; + delete here; + } +} + Yaz_PDU_Assoc::PDU_Queue::PDU_Queue(const char *buf, int len) { m_buf = (char *) malloc (len); @@ -191,10 +255,13 @@ Yaz_PDU_Assoc::PDU_Queue::~PDU_Queue() int Yaz_PDU_Assoc::flush_PDU() { int r; - - logf (LOG_LOG, "flush_PDU fd=%d", cs_fileno(m_cs)); + + logf (m_log, "Yaz_PDU_Assoc::flush_PDU"); if (m_state != Ready) + { + logf (m_log, "YAZ_PDU_Assoc::flush_PDU, not ready"); return 1; + } PDU_Queue *q = m_queue_out; if (!q) { @@ -214,13 +281,13 @@ int Yaz_PDU_Assoc::flush_PDU() m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ| YAZ_SOCKET_OBSERVE_EXCEPT| YAZ_SOCKET_OBSERVE_WRITE); - logf (LOG_LOG, "put %d bytes (incomplete write)", q->m_len); + logf (m_log, "Yaz_PDU_Assoc::flush_PDU put %d bytes (incomplete)", + q->m_len); return r; } - logf (LOG_LOG, "put %d bytes fd=%d", q->m_len, cs_fileno(m_cs)); + logf (m_log, "Yaz_PDU_Assoc::flush_PDU put %d bytes", q->m_len); // whole packet sent... delete this and proceed to next ... m_queue_out = q->m_next; - logf (LOG_LOG, "m_queue_out = %p", m_queue_out); delete q; // don't select on write if queue is empty ... if (!m_queue_out) @@ -231,27 +298,23 @@ int Yaz_PDU_Assoc::flush_PDU() int Yaz_PDU_Assoc::send_PDU(const char *buf, int len) { + logf (m_log, "Yaz_PDU_Assoc::send_PDU"); PDU_Queue **pq = &m_queue_out; int is_idle = (*pq ? 0 : 1); - logf (LOG_LOG, "send_PDU, m_queue_out=%p fd=%d", m_queue_out, - cs_fileno(m_cs)); if (!m_cs) { - logf (LOG_LOG, "send_PDU failed, m_cs == 0"); - return 0; + logf (m_log, "Yaz_PDU_Assoc::send_PDU failed, m_cs == 0"); + return -1; } while (*pq) pq = &(*pq)->m_next; *pq = new PDU_Queue(buf, len); if (is_idle) - { return flush_PDU (); - } else - { - logf (LOG_LOG, "cannot send_PDU fd=%d", cs_fileno(m_cs)); - } + logf (m_log, "Yaz_PDU_Assoc::cannot send_PDU fd=%d", + cs_fileno(m_cs)); return 0; } @@ -260,8 +323,7 @@ COMSTACK Yaz_PDU_Assoc::comstack() if (!m_cs) { CS_TYPE cs_type = tcpip_type; - int protocol = PROTO_Z3950; - m_cs = cs_create (cs_type, 0, protocol); + m_cs = cs_create (cs_type, 0, PROTO_Z3950); } return m_cs; } @@ -273,6 +335,7 @@ void Yaz_PDU_Assoc::listen(IYaz_PDU_Observer *observer, void *ap; COMSTACK cs = comstack(); + logf (m_log, "Yaz_PDU_Assoc::listen %s", addr); m_PDU_Observer = observer; if (!cs) return; @@ -284,32 +347,126 @@ void Yaz_PDU_Assoc::listen(IYaz_PDU_Observer *observer, m_socketObservable->addObserver(cs_fileno(cs), this); m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ| YAZ_SOCKET_OBSERVE_EXCEPT); + logf (m_log, "Yaz_PDU_Assoc::listen ok fd=%d", cs_fileno(cs)); m_state = Listen; } +void Yaz_PDU_Assoc::idleTime(int idleTime) +{ + m_idleTime = idleTime; + logf (m_log, "Yaz_PDU_Assoc::idleTime(%d)", idleTime); + m_socketObservable->timeoutObserver(this, m_idleTime); +} + void Yaz_PDU_Assoc::connect(IYaz_PDU_Observer *observer, const char *addr) { - logf (LOG_LOG, "Yaz_PDU_Assoc::connect"); + logf (m_log, "Yaz_PDU_Assoc::connect %s", addr); close(); m_PDU_Observer = observer; COMSTACK cs = comstack(); void *ap = cs_straddr (cs, addr); if (!ap) - return; - int res = cs_connect (cs, ap); - if (res < 0) { - logf (LOG_DEBUG, "Yaz_PDU_Assoc::connect failed"); - close (); - return; + logf (m_log, "cs_straddr failed"); + return; } + int res = cs_connect (cs, ap); + logf (m_log, "Yaz_PDU_Assoc::connect fd=%d res=%d", cs_fileno(cs), res); m_socketObservable->addObserver(cs_fileno(cs), this); m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ| - YAZ_SOCKET_OBSERVE_EXCEPT| - YAZ_SOCKET_OBSERVE_WRITE); - if (res == 1) - m_state = Connecting; - else - m_state = Connected; + YAZ_SOCKET_OBSERVE_EXCEPT| + YAZ_SOCKET_OBSERVE_WRITE); + m_state = Connecting; +} + +void Yaz_PDU_Assoc::socket(IYaz_PDU_Observer *observer, int fd) +{ + close(); + m_PDU_Observer = observer; + if (fd >= 0) + { + CS_TYPE cs_type = tcpip_type; + m_cs = cs_createbysocket(fd, cs_type, 0, PROTO_Z3950); + m_state = Ready; + m_socketObservable->addObserver(fd, this); + m_socketObservable->maskObserver(this, + YAZ_SOCKET_OBSERVE_READ| + YAZ_SOCKET_OBSERVE_EXCEPT); + m_socketObservable->timeoutObserver(this, m_idleTime); + } +} + +#if 1 + // 1 = single-threaded + // 0 = multi-threaded + +// Single-threaded... Only useful for non-blocking handlers +void Yaz_PDU_Assoc::childNotify(int fd) +{ + // Clone PDU Observable (keep socket manager) + IYaz_PDU_Observable *new_observable = clone(); + + // Clone PDU Observer + IYaz_PDU_Observer *observer = m_PDU_Observer->clone(new_observable); + + // Attach new socket to it + new_observable->socket(observer, fd); +} +#else + +#include + +#ifdef WIN32 +#include +#else +#include +#endif + +#ifdef WIN32 +void __cdecl +#else +void * +#endif + events(void *p) +{ + Yaz_SocketManager *s = (Yaz_SocketManager *) p; + + logf (LOG_LOG, "thread started"); + while (s->processEvent() > 0) + ; + logf (LOG_LOG, "thread finished"); +#ifdef WIN32 +#else + return 0; +#endif +} + +void Yaz_PDU_Assoc::childNotify(int fd) +{ + Yaz_SocketManager *socket_observable = new Yaz_SocketManager; + Yaz_PDU_Assoc *new_observable = new Yaz_PDU_Assoc (socket_observable); + + /// Clone PDU Observer + IYaz_PDU_Observer *observer = m_PDU_Observer->clone(new_observable); + + /// Attach new socket to it + new_observable->socket(observer, fd); + +#ifdef WIN32 + long t_id; + t_id = _beginthread (events, 0, socket_observable); + if (t_id == -1) + { + logf (LOG_FATAL|LOG_ERRNO, "_beginthread failed"); + exit (1); + } +#else + pthread_t type; + + int id = pthread_create (&type, 0, events, socket_observable); + logf (LOG_LOG, "pthread_create returned id=%d", id); +#endif } +// Threads end +#endif