X-Git-Url: http://git.indexdata.com/?a=blobdiff_plain;f=src%2Fyaz-pdu-assoc.cpp;h=0462f5ae61be58c905e34e04d20dbabac8b43da6;hb=355215b6f2c914f2eb099dda88828b8f0c8cb001;hp=e0c0c9df45b82d9b487a9b2a095779341f290f0f;hpb=a8063bff74c6d48f7a0aec23c266894777e3134b;p=yazpp-moved-to-github.git diff --git a/src/yaz-pdu-assoc.cpp b/src/yaz-pdu-assoc.cpp index e0c0c9d..0462f5a 100644 --- a/src/yaz-pdu-assoc.cpp +++ b/src/yaz-pdu-assoc.cpp @@ -1,10 +1,53 @@ /* - * Copyright (c) 1998-1999, Index Data. + * Copyright (c) 1998-2000, Index Data. * See the file LICENSE for details. - * Sebastian Hammer, Adam Dickmeiss * * $Log: yaz-pdu-assoc.cpp,v $ - * Revision 1.4 1999-03-23 14:17:57 adam + * Revision 1.17 2000-10-11 11:58:16 adam + * Moved header files to include/yaz++. Switched to libtool and automake. + * Configure script creates yaz++-config script. + * + * Revision 1.16 2000/09/22 09:54:11 heikki + * minor + * + * Revision 1.15 2000/09/21 21:43:20 adam + * Better high-level server API. + * + * Revision 1.14 2000/09/12 12:09:53 adam + * More work on high-level server. + * + * Revision 1.13 2000/09/08 10:23:42 adam + * Added skeleton of yaz-z-server. + * + * Revision 1.12 2000/09/06 14:23:45 adam + * WIN32 updates. + * + * Revision 1.11 2000/09/04 08:29:22 adam + * Fixed memory leak(s). Added re-use of associations, rather than + * re-init, when maximum number of targets are in use. + * + * Revision 1.10 2000/08/10 08:42:42 adam + * Fixes for {set,get}_APDU_log. + * + * Revision 1.9 1999/12/06 13:52:45 adam + * Modified for new location of YAZ header files. Experimental threaded + * operation. + * + * Revision 1.8 1999/04/28 13:04:03 adam + * Fixed setting of proxy otherInfo so that database(s) are removed. + * + * Revision 1.7 1999/04/21 12:09:01 adam + * Many improvements. Modified to proxy server to work with "sessions" + * based on cookies. + * + * Revision 1.6 1999/04/20 10:30:05 adam + * Implemented various stuff for client and proxy. Updated calls + * to ODR to reflect new name parameter. + *g + * Revision 1.5 1999/04/09 11:46:57 adam + * Added object Yaz_Z_Assoc. Much more functional client. + * + * Revision 1.4 1999/03/23 14:17:57 adam * More work on timeout handling. Work on yaz-client. * * Revision 1.3 1999/02/02 14:01:20 adam @@ -21,16 +64,15 @@ #include -#include +#include -#include -#include +#include +#include -Yaz_PDU_Assoc::Yaz_PDU_Assoc(IYazSocketObservable *socketObservable, - COMSTACK cs) +Yaz_PDU_Assoc::Yaz_PDU_Assoc(IYazSocketObservable *socketObservable) { m_state = Closed; - m_cs = cs; + m_cs = 0; m_socketObservable = socketObservable; m_PDU_Observer = 0; m_queue_out = 0; @@ -40,23 +82,20 @@ Yaz_PDU_Assoc::Yaz_PDU_Assoc(IYazSocketObservable *socketObservable, m_parent = 0; m_next = 0; m_destroyed = 0; + m_idleTime = 0; + m_log = LOG_DEBUG; } IYaz_PDU_Observable *Yaz_PDU_Assoc::clone() { - Yaz_PDU_Assoc *copy = new Yaz_PDU_Assoc(m_socketObservable, 0); + Yaz_PDU_Assoc *copy = new Yaz_PDU_Assoc(m_socketObservable); return copy; } -Yaz_PDU_Assoc::~Yaz_PDU_Assoc() -{ - destroy(); -} - void Yaz_PDU_Assoc::socketNotify(int event) { - logf (LOG_LOG, "socketNotify p=%p event = %d", this, event); - if (m_state == Connected) + logf (m_log, "Yaz_PDU_Assoc::socketNotify p=%p event = %d", this, event); + if (0 /* m_state == Connected */) { m_state = Ready; m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ| @@ -71,7 +110,11 @@ void Yaz_PDU_Assoc::socketNotify(int event) close(); m_PDU_Observer->failNotify(); } - else + else if (event & YAZ_SOCKET_OBSERVE_TIMEOUT) + { + m_PDU_Observer->timeoutNotify(); + } + else { m_state = Ready; m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ| @@ -91,26 +134,22 @@ void Yaz_PDU_Assoc::socketNotify(int event) return; if (res < 0) { - logf(LOG_FATAL, "cs_listen failed"); + logf(LOG_FATAL|LOG_ERRNO, "cs_listen failed"); return; } if (!(new_line = cs_accept(m_cs))) return; - - Yaz_PDU_Assoc *assoc = new Yaz_PDU_Assoc (m_socketObservable, - new_line); - assoc->m_parent = this; - assoc->m_next = m_children; - m_children = assoc; - - assoc->m_PDU_Observer = m_PDU_Observer->clone(assoc); - assoc->m_state = Ready; - assoc->m_socketObservable->addObserver(cs_fileno(new_line), assoc); - assoc->m_socketObservable->maskObserver(assoc, - YAZ_SOCKET_OBSERVE_READ| - YAZ_SOCKET_OBSERVE_EXCEPT); - if (m_idleTime) - assoc->m_socketObservable->timeoutObserver(assoc, m_idleTime); + /* 1. create socket-manager + 2. create pdu-assoc + 3. create top-level object + setup observer for child fileid in pdu-assoc + 4. start thread + */ + int fd = cs_fileno(new_line); + logf (m_log, "accept ok fd = %d", fd); + cs_fileno(new_line) = -1; + cs_close (new_line); + childNotify(fd); } } else if (m_state == Ready) @@ -128,7 +167,7 @@ void Yaz_PDU_Assoc::socketNotify(int event) return; else if (res <= 0) { - logf (LOG_LOG, "Connection closed by client"); + logf (m_log, "Connection closed by peer"); close(); m_PDU_Observer->failNotify(); return; @@ -138,6 +177,7 @@ void Yaz_PDU_Assoc::socketNotify(int event) m_destroyed = &destroyed; m_PDU_Observer->recv_PDU(m_input_buf, res); + m_destroyed = 0; if (destroyed) // it really was destroyed, return now. return; } while (m_cs && cs_more (m_cs)); @@ -155,7 +195,7 @@ void Yaz_PDU_Assoc::close() m_state = Closed; if (m_cs) { - logf (LOG_LOG, "Yaz_PDU_Assoc::close fd=%d", cs_fileno(m_cs)); + logf (m_log, "Yaz_PDU_Assoc::close fd=%d", cs_fileno(m_cs)); cs_close (m_cs); } m_cs = 0; @@ -165,7 +205,7 @@ void Yaz_PDU_Assoc::close() m_queue_out = m_queue_out->m_next; delete q_this; } -// free (m_input_buf); + xfree (m_input_buf); m_input_buf = 0; m_input_len = 0; } @@ -215,9 +255,13 @@ Yaz_PDU_Assoc::PDU_Queue::~PDU_Queue() int Yaz_PDU_Assoc::flush_PDU() { int r; - + + logf (m_log, "Yaz_PDU_Assoc::flush_PDU"); if (m_state != Ready) + { + logf (m_log, "YAZ_PDU_Assoc::flush_PDU, not ready"); return 1; + } PDU_Queue *q = m_queue_out; if (!q) { @@ -237,13 +281,13 @@ int Yaz_PDU_Assoc::flush_PDU() m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ| YAZ_SOCKET_OBSERVE_EXCEPT| YAZ_SOCKET_OBSERVE_WRITE); - logf (LOG_LOG, "put %d bytes (incomplete write)", q->m_len); + logf (m_log, "Yaz_PDU_Assoc::flush_PDU put %d bytes (incomplete)", + q->m_len); return r; } - logf (LOG_LOG, "put %d bytes fd=%d", q->m_len, cs_fileno(m_cs)); + logf (m_log, "Yaz_PDU_Assoc::flush_PDU put %d bytes", q->m_len); // whole packet sent... delete this and proceed to next ... m_queue_out = q->m_next; - logf (LOG_LOG, "m_queue_out = %p", m_queue_out); delete q; // don't select on write if queue is empty ... if (!m_queue_out) @@ -254,26 +298,23 @@ int Yaz_PDU_Assoc::flush_PDU() int Yaz_PDU_Assoc::send_PDU(const char *buf, int len) { - logf (LOG_LOG, "send_PDU"); + logf (m_log, "Yaz_PDU_Assoc::send_PDU"); PDU_Queue **pq = &m_queue_out; int is_idle = (*pq ? 0 : 1); if (!m_cs) { - logf (LOG_LOG, "send_PDU failed, m_cs == 0"); - return 0; + logf (m_log, "Yaz_PDU_Assoc::send_PDU failed, m_cs == 0"); + return -1; } while (*pq) pq = &(*pq)->m_next; *pq = new PDU_Queue(buf, len); if (is_idle) - { return flush_PDU (); - } else - { - logf (LOG_LOG, "cannot send_PDU fd=%d", cs_fileno(m_cs)); - } + logf (m_log, "Yaz_PDU_Assoc::cannot send_PDU fd=%d", + cs_fileno(m_cs)); return 0; } @@ -282,8 +323,7 @@ COMSTACK Yaz_PDU_Assoc::comstack() if (!m_cs) { CS_TYPE cs_type = tcpip_type; - int protocol = PROTO_Z3950; - m_cs = cs_create (cs_type, 0, protocol); + m_cs = cs_create (cs_type, 0, PROTO_Z3950); } return m_cs; } @@ -295,7 +335,7 @@ void Yaz_PDU_Assoc::listen(IYaz_PDU_Observer *observer, void *ap; COMSTACK cs = comstack(); - logf (LOG_LOG, "Yaz_PDU_Assoc::listen %s", addr); + logf (m_log, "Yaz_PDU_Assoc::listen %s", addr); m_PDU_Observer = observer; if (!cs) return; @@ -307,43 +347,126 @@ void Yaz_PDU_Assoc::listen(IYaz_PDU_Observer *observer, m_socketObservable->addObserver(cs_fileno(cs), this); m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ| YAZ_SOCKET_OBSERVE_EXCEPT); + logf (m_log, "Yaz_PDU_Assoc::listen ok fd=%d", cs_fileno(cs)); m_state = Listen; } void Yaz_PDU_Assoc::idleTime(int idleTime) { m_idleTime = idleTime; + logf (m_log, "Yaz_PDU_Assoc::idleTime(%d)", idleTime); + m_socketObservable->timeoutObserver(this, m_idleTime); } void Yaz_PDU_Assoc::connect(IYaz_PDU_Observer *observer, const char *addr) { - logf (LOG_LOG, "Yaz_PDU_Assoc::connect %s", addr); + logf (m_log, "Yaz_PDU_Assoc::connect %s", addr); close(); m_PDU_Observer = observer; COMSTACK cs = comstack(); void *ap = cs_straddr (cs, addr); if (!ap) { - logf (LOG_LOG, "cs_straddr failed"); + logf (m_log, "cs_straddr failed"); return; } int res = cs_connect (cs, ap); - if (res < 0) + logf (m_log, "Yaz_PDU_Assoc::connect fd=%d res=%d", cs_fileno(cs), res); + m_socketObservable->addObserver(cs_fileno(cs), this); + m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ| + YAZ_SOCKET_OBSERVE_EXCEPT| + YAZ_SOCKET_OBSERVE_WRITE); + m_state = Connecting; +} + +void Yaz_PDU_Assoc::socket(IYaz_PDU_Observer *observer, int fd) +{ + close(); + m_PDU_Observer = observer; + if (fd >= 0) { - logf (LOG_DEBUG, "Yaz_PDU_Assoc::connect failed"); - close (); + CS_TYPE cs_type = tcpip_type; + m_cs = cs_createbysocket(fd, cs_type, 0, PROTO_Z3950); + m_state = Ready; + m_socketObservable->addObserver(fd, this); + m_socketObservable->maskObserver(this, + YAZ_SOCKET_OBSERVE_READ| + YAZ_SOCKET_OBSERVE_EXCEPT); + m_socketObservable->timeoutObserver(this, m_idleTime); } - else +} + +#if 1 + // 1 = single-threaded + // 0 = multi-threaded + +// Single-threaded... Only useful for non-blocking handlers +void Yaz_PDU_Assoc::childNotify(int fd) +{ + // Clone PDU Observable (keep socket manager) + IYaz_PDU_Observable *new_observable = clone(); + + // Clone PDU Observer + IYaz_PDU_Observer *observer = m_PDU_Observer->clone(new_observable); + + // Attach new socket to it + new_observable->socket(observer, fd); +} +#else + +#include + +#ifdef WIN32 +#include +#else +#include +#endif + +#ifdef WIN32 +void __cdecl +#else +void * +#endif + events(void *p) +{ + Yaz_SocketManager *s = (Yaz_SocketManager *) p; + + logf (LOG_LOG, "thread started"); + while (s->processEvent() > 0) + ; + logf (LOG_LOG, "thread finished"); +#ifdef WIN32 +#else + return 0; +#endif +} + +void Yaz_PDU_Assoc::childNotify(int fd) +{ + Yaz_SocketManager *socket_observable = new Yaz_SocketManager; + Yaz_PDU_Assoc *new_observable = new Yaz_PDU_Assoc (socket_observable); + + /// Clone PDU Observer + IYaz_PDU_Observer *observer = m_PDU_Observer->clone(new_observable); + + /// Attach new socket to it + new_observable->socket(observer, fd); + +#ifdef WIN32 + long t_id; + t_id = _beginthread (events, 0, socket_observable); + if (t_id == -1) { - logf (LOG_LOG, "Yaz_PDU_Assoc::connect fd=%d", cs_fileno(cs)); - m_socketObservable->addObserver(cs_fileno(cs), this); - m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ| - YAZ_SOCKET_OBSERVE_EXCEPT| - YAZ_SOCKET_OBSERVE_WRITE); - if (res == 1) - m_state = Connecting; - else - m_state = Connected; + logf (LOG_FATAL|LOG_ERRNO, "_beginthread failed"); + exit (1); } +#else + pthread_t type; + + int id = pthread_create (&type, 0, events, socket_observable); + logf (LOG_LOG, "pthread_create returned id=%d", id); +#endif } +// Threads end +#endif