Remove incoming requests that can not be handled
authorAdam Dickmeiss <adam@indexdata.dk>
Tue, 17 Apr 2012 08:33:12 +0000 (10:33 +0200)
committerAdam Dickmeiss <adam@indexdata.dk>
Tue, 17 Apr 2012 08:33:12 +0000 (10:33 +0200)
Requests in queue for filter frontend_net are removed if client
closes connection for the session and request is not yet handled.

src/filter_frontend_net.cpp
src/test_thread_pool_observer.cpp
src/thread_pool_observer.cpp
src/thread_pool_observer.hpp

index a3ab5f7..b347ad2 100644 (file)
@@ -103,7 +103,7 @@ namespace metaproxy_1 {
         ~ThreadPoolPackage();
         IThreadPoolMsg *handle();
         void result(const char *t_info);
-        
+        bool cleanup(void *info);
     private:
         yaz_timing_t timer;
         mp::ZAssocChild *m_assoc_child;
@@ -155,6 +155,13 @@ mp::ThreadPoolPackage::~ThreadPoolPackage()
     delete m_package;
 }
 
+bool mp::ThreadPoolPackage::cleanup(void *info)
+{
+    mp::Session *ses = (mp::Session *) info;
+
+    return *ses == m_package->session();
+}
+
 void mp::ThreadPoolPackage::result(const char *t_info)
 {
     m_assoc_child->m_no_requests--;
@@ -284,7 +291,8 @@ void mp::ZAssocChild::failNotify()
     mp::ThreadPoolPackage *tp = new mp::ThreadPoolPackage(p, this,
                                                           m_msg_config);
     p->copy_route(*m_package);
-    m_thread_pool_observer->put(tp);  
+    m_thread_pool_observer->cleanup(tp, &m_session);
+    m_thread_pool_observer->put(tp);
 }
 
 void mp::ZAssocChild::timeoutNotify()
index 9700db1..89f8f04 100644 (file)
@@ -40,6 +40,7 @@ class My_Msg : public mp::IThreadPoolMsg {
 public:
     mp::IThreadPoolMsg *handle();
     void result(const char *t_info);
+    bool cleanup(void *info);
     int m_val;
     My_Timer_Thread *m_timer;
 };
@@ -70,6 +71,11 @@ mp::IThreadPoolMsg *My_Msg::handle()
     return res;
 }
 
+bool My_Msg::cleanup(void *info)
+{
+    return false;
+}
+
 void My_Msg::result(const char *t_info)
 {
     m_timer->m_sum += m_val;
index d630fd9..ce37235 100644 (file)
@@ -206,14 +206,30 @@ void ThreadPoolSocketObserver::run(void *p)
     }
 }
 
+void ThreadPoolSocketObserver::cleanup(IThreadPoolMsg *m, void *info)
+{
+    boost::mutex::scoped_lock input_lock(m_p->m_mutex_input_data);
+
+    std::deque<IThreadPoolMsg *>::iterator it = m_p->m_input.begin();
+    while (it != m_p->m_input.end())
+    {
+        if ((*it)->cleanup(info))
+            it = m_p->m_input.erase(it);
+        else
+            it++;
+    }
+}
+
 void ThreadPoolSocketObserver::put(IThreadPoolMsg *m)
 {
     boost::mutex::scoped_lock input_lock(m_p->m_mutex_input_data);
+
     while (m_p->m_input.size() >= m_p->m_no_threads * queue_size_per_thread)
         m_p->m_cond_input_full.wait(input_lock);
     m_p->m_input.push_back(m);
     m_p->m_cond_input_data.notify_one();
 }
+
 /*
  * Local variables:
  * c-basic-offset: 4
index 67aee0b..73ee78f 100644 (file)
@@ -30,6 +30,7 @@ namespace metaproxy_1 {
         virtual IThreadPoolMsg *handle() = 0;
         virtual void result(const char *info) = 0;
         virtual ~IThreadPoolMsg();
+        virtual bool cleanup(void *info) = 0;
     };
 
     class ThreadPoolSocketObserver : public yazpp_1::ISocketObserver {
@@ -40,6 +41,7 @@ namespace metaproxy_1 {
                                  int no_threads);
         virtual ~ThreadPoolSocketObserver();
         void put(IThreadPoolMsg *m);
+        void cleanup(IThreadPoolMsg *m, void *info);
         IThreadPoolMsg *get();
         void run(void *p);
     private: