X-Git-Url: http://git.indexdata.com/?a=blobdiff_plain;f=src%2Fyaz-pdu-assoc.cpp;h=fafac448bf46e028a04e87457b0aaeff6aa2f56f;hb=6104ba97908b1292806a1242b65beb0edbf2314f;hp=5350092e8a4364b6ba3e3919fc49038ff231e9f9;hpb=97118338f9ac93e767e5589d449d3f9abacb3190;p=yazpp-moved-to-github.git diff --git a/src/yaz-pdu-assoc.cpp b/src/yaz-pdu-assoc.cpp index 5350092..fafac44 100644 --- a/src/yaz-pdu-assoc.cpp +++ b/src/yaz-pdu-assoc.cpp @@ -4,7 +4,34 @@ * Sebastian Hammer, Adam Dickmeiss * * $Log: yaz-pdu-assoc.cpp,v $ - * Revision 1.2 1999-01-28 13:08:44 adam + * Revision 1.10 2000-08-10 08:42:42 adam + * Fixes for {set,get}_APDU_log. + * + * Revision 1.9 1999/12/06 13:52:45 adam + * Modified for new location of YAZ header files. Experimental threaded + * operation. + * + * Revision 1.8 1999/04/28 13:04:03 adam + * Fixed setting of proxy otherInfo so that database(s) are removed. + * + * Revision 1.7 1999/04/21 12:09:01 adam + * Many improvements. Modified to proxy server to work with "sessions" + * based on cookies. + * + * Revision 1.6 1999/04/20 10:30:05 adam + * Implemented various stuff for client and proxy. Updated calls + * to ODR to reflect new name parameter. + * + * Revision 1.5 1999/04/09 11:46:57 adam + * Added object Yaz_Z_Assoc. Much more functional client. + * + * Revision 1.4 1999/03/23 14:17:57 adam + * More work on timeout handling. Work on yaz-client. + * + * Revision 1.3 1999/02/02 14:01:20 adam + * First WIN32 port of YAZ++. + * + * Revision 1.2 1999/01/28 13:08:44 adam * Yaz_PDU_Assoc better encapsulated. Memory leak fix in * yaz-socket-manager.cc. * @@ -17,14 +44,13 @@ #include -#include -#include +#include +#include -Yaz_PDU_Assoc::Yaz_PDU_Assoc(IYazSocketObservable *socketObservable, - COMSTACK cs) +Yaz_PDU_Assoc::Yaz_PDU_Assoc(IYazSocketObservable *socketObservable) { m_state = Closed; - m_cs = cs; + m_cs = 0; m_socketObservable = socketObservable; m_PDU_Observer = 0; m_queue_out = 0; @@ -34,11 +60,13 @@ Yaz_PDU_Assoc::Yaz_PDU_Assoc(IYazSocketObservable *socketObservable, m_parent = 0; m_next = 0; m_destroyed = 0; + m_idleTime = 0; + m_log = LOG_DEBUG; } IYaz_PDU_Observable *Yaz_PDU_Assoc::clone() { - Yaz_PDU_Assoc *copy = new Yaz_PDU_Assoc(m_socketObservable, 0); + Yaz_PDU_Assoc *copy = new Yaz_PDU_Assoc(m_socketObservable); return copy; } @@ -49,8 +77,8 @@ Yaz_PDU_Assoc::~Yaz_PDU_Assoc() void Yaz_PDU_Assoc::socketNotify(int event) { - logf (LOG_LOG, "socketNotify p=%p event = %d", this, event); - if (m_state == Connected) + logf (m_log, "Yaz_PDU_Assoc::socketNotify p=%p event = %d", this, event); + if (0 /* m_state == Connected */) { m_state = Ready; m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ| @@ -65,7 +93,11 @@ void Yaz_PDU_Assoc::socketNotify(int event) close(); m_PDU_Observer->failNotify(); } - else + else if (event & YAZ_SOCKET_OBSERVE_TIMEOUT) + { + m_PDU_Observer->timeoutNotify(); + } + else { m_state = Ready; m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ| @@ -91,18 +123,26 @@ void Yaz_PDU_Assoc::socketNotify(int event) if (!(new_line = cs_accept(m_cs))) return; - Yaz_PDU_Assoc *assoc = new Yaz_PDU_Assoc (m_socketObservable, - new_line); + /* 1. create socket-manager + 2. create pdu-assoc + 3. create top-level object + setup observer for child fileid in pdu-assoc + 4. start thread + */ + int fd = cs_fileno(new_line); + cs_fileno(new_line) = -1; + cs_close (new_line); /* potential problem ... */ +#if 1 + childNotify(fd); +#else + Yaz_PDU_Assoc *assoc = new Yaz_PDU_Assoc (m_socketObservable); assoc->m_parent = this; assoc->m_next = m_children; m_children = assoc; - + assoc->m_PDU_Observer = m_PDU_Observer->clone(assoc); - assoc->m_state = Ready; - assoc->m_socketObservable->addObserver(cs_fileno(new_line), assoc); - assoc->m_socketObservable->maskObserver(assoc, - YAZ_SOCKET_OBSERVE_READ| - YAZ_SOCKET_OBSERVE_EXCEPT); + socket(fd); +#endif } } else if (m_state == Ready) @@ -120,7 +160,7 @@ void Yaz_PDU_Assoc::socketNotify(int event) return; else if (res <= 0) { - logf (LOG_LOG, "Connection closed by client"); + logf (m_log, "Connection closed by peer"); close(); m_PDU_Observer->failNotify(); return; @@ -134,6 +174,10 @@ void Yaz_PDU_Assoc::socketNotify(int event) return; } while (m_cs && cs_more (m_cs)); } + if (event & YAZ_SOCKET_OBSERVE_TIMEOUT) + { + m_PDU_Observer->timeoutNotify(); + } } } @@ -143,18 +187,17 @@ void Yaz_PDU_Assoc::close() m_state = Closed; if (m_cs) { - logf (LOG_LOG, "Yaz_PDU_Assoc::close fd=%d", cs_fileno(m_cs)); + logf (m_log, "Yaz_PDU_Assoc::close fd=%d", cs_fileno(m_cs)); cs_close (m_cs); } m_cs = 0; - PDU_Queue **q = &m_queue_out; - while (*q) + while (m_queue_out) { - PDU_Queue *q_this = *q; - *q = (*q)->m_next; + PDU_Queue *q_this = m_queue_out; + m_queue_out = m_queue_out->m_next; delete q_this; } - free (m_input_buf); +// free (m_input_buf); m_input_buf = 0; m_input_len = 0; } @@ -204,9 +247,13 @@ Yaz_PDU_Assoc::PDU_Queue::~PDU_Queue() int Yaz_PDU_Assoc::flush_PDU() { int r; - + + logf (m_log, "Yaz_PDU_Assoc::flush_PDU"); if (m_state != Ready) + { + logf (m_log, "YAZ_PDU_Assoc::flush_PDU, not ready"); return 1; + } PDU_Queue *q = m_queue_out; if (!q) { @@ -226,13 +273,13 @@ int Yaz_PDU_Assoc::flush_PDU() m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ| YAZ_SOCKET_OBSERVE_EXCEPT| YAZ_SOCKET_OBSERVE_WRITE); - logf (LOG_LOG, "put %d bytes (incomplete write)", q->m_len); + logf (m_log, "Yaz_PDU_Assoc::flush_PDU put %d bytes (incomplete)", + q->m_len); return r; } - logf (LOG_LOG, "put %d bytes fd=%d", q->m_len, cs_fileno(m_cs)); + logf (m_log, "Yaz_PDU_Assoc::flush_PDU put %d bytes", q->m_len); // whole packet sent... delete this and proceed to next ... m_queue_out = q->m_next; - logf (LOG_LOG, "m_queue_out = %p", m_queue_out); delete q; // don't select on write if queue is empty ... if (!m_queue_out) @@ -243,25 +290,23 @@ int Yaz_PDU_Assoc::flush_PDU() int Yaz_PDU_Assoc::send_PDU(const char *buf, int len) { + logf (m_log, "Yaz_PDU_Assoc::send_PDU"); PDU_Queue **pq = &m_queue_out; int is_idle = (*pq ? 0 : 1); if (!m_cs) { - logf (LOG_LOG, "send_PDU failed, m_cs == 0"); - return 0; + logf (m_log, "Yaz_PDU_Assoc::send_PDU failed, m_cs == 0"); + return -1; } while (*pq) pq = &(*pq)->m_next; *pq = new PDU_Queue(buf, len); if (is_idle) - { return flush_PDU (); - } else - { - logf (LOG_LOG, "cannot send_PDU fd=%d", cs_fileno(m_cs)); - } + logf (m_log, "Yaz_PDU_Assoc::cannot send_PDU fd=%d", + cs_fileno(m_cs)); return 0; } @@ -270,8 +315,7 @@ COMSTACK Yaz_PDU_Assoc::comstack() if (!m_cs) { CS_TYPE cs_type = tcpip_type; - int protocol = PROTO_Z3950; - m_cs = cs_create (cs_type, 0, protocol); + m_cs = cs_create (cs_type, 0, PROTO_Z3950); } return m_cs; } @@ -283,6 +327,7 @@ void Yaz_PDU_Assoc::listen(IYaz_PDU_Observer *observer, void *ap; COMSTACK cs = comstack(); + logf (m_log, "Yaz_PDU_Assoc::listen %s", addr); m_PDU_Observer = observer; if (!cs) return; @@ -297,32 +342,101 @@ void Yaz_PDU_Assoc::listen(IYaz_PDU_Observer *observer, m_state = Listen; } +void Yaz_PDU_Assoc::idleTime(int idleTime) +{ + m_idleTime = idleTime; + logf (m_log, "Yaz_PDU_Assoc::idleTime(%d)", idleTime); + m_socketObservable->timeoutObserver(this, m_idleTime); +} + void Yaz_PDU_Assoc::connect(IYaz_PDU_Observer *observer, const char *addr) { - logf (LOG_LOG, "Yaz_PDU_Assoc::connect"); + logf (m_log, "Yaz_PDU_Assoc::connect %s", addr); close(); m_PDU_Observer = observer; COMSTACK cs = comstack(); void *ap = cs_straddr (cs, addr); if (!ap) - return; - int res = cs_connect (cs, ap); - if (res < 0) { - logf (LOG_DEBUG, "Yaz_PDU_Assoc::connect failed"); - close (); + logf (m_log, "cs_straddr failed"); + return; } - else + int res = cs_connect (cs, ap); + logf (m_log, "Yaz_PDU_Assoc::connect fd=%d res=%d", cs_fileno(cs), res); + m_socketObservable->addObserver(cs_fileno(cs), this); + m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ| + YAZ_SOCKET_OBSERVE_EXCEPT| + YAZ_SOCKET_OBSERVE_WRITE); + m_state = Connecting; +} + +void Yaz_PDU_Assoc::socket(IYaz_PDU_Observer *observer, int fd) +{ + close(); + m_PDU_Observer = observer; + if (fd >= 0) { - logf (LOG_LOG, "Yaz_PDU_Assoc::connect fd=%d", cs_fileno(cs)); - m_socketObservable->addObserver(cs_fileno(cs), this); - m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ| - YAZ_SOCKET_OBSERVE_EXCEPT| - YAZ_SOCKET_OBSERVE_WRITE); - if (res == 1) - m_state = Connecting; - else - m_state = Connected; + CS_TYPE cs_type = tcpip_type; + m_cs = cs_createbysocket(fd, cs_type, 0, PROTO_Z3950); + m_state = Ready; + m_socketObservable->addObserver(fd, this); + m_socketObservable->maskObserver(this, + YAZ_SOCKET_OBSERVE_READ| + YAZ_SOCKET_OBSERVE_EXCEPT); + m_socketObservable->timeoutObserver(this, m_idleTime); } } + +#if 1 +void Yaz_PDU_Assoc::childNotify(int fd) +{ + /// Clone PDU Observable (keep socket manager) + IYaz_PDU_Observable *new_observable = clone(); + + /// Clone PDU Observer + IYaz_PDU_Observer *observer = m_PDU_Observer->clone(new_observable); + + /// Attach new socket to it + new_observable->socket(observer, fd); +} +#else + +#include +#include + +class thread_info { + Yaz_SocketManager *socketManager; + IYaz_PDU_Observable * + +}; + +static void *events(void *p) +{ + Yaz_SocketManager *s = (Yaz_SocketManager *) p; + + while (s->processEvent() > 0) + ; + return 0; +} + +void Yaz_PDU_Assoc::childNotify(int fd) +{ + Yaz_SocketManager *socket_observable = new Yaz_SocketManager; + IYaz_PDU_Observable *new_observable = clone(); + + m_socketObservable = socket_observable; + + /// Clone PDU Observer + IYaz_PDU_Observer *observer = m_PDU_Observer->clone(new_observable); + + /// Attach new socket to it + new_observable->socket(observer, fd); + + pthread_t type; + + int id = pthread_create (&type, 0, events, socket_observable); + logf (LOG_LOG, "pthread_create returned id=%d", id); +} +#endif +