/* This file is part of Metaproxy.
- Copyright (C) 2005-2011 Index Data
+ Copyright (C) Index Data
Metaproxy is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free
#include <stdio.h>
#include <deque>
+#include <sstream>
#include <yazpp/socket-observer.h>
#include <yaz/log.h>
std::deque<IThreadPoolMsg *> m_input;
std::deque<IThreadPoolMsg *> m_output;
bool m_stop_flag;
+ unsigned m_stack_size;
unsigned m_no_threads;
+ unsigned m_min_threads;
+ unsigned m_max_threads;
+ unsigned m_waiting_threads;
};
const unsigned int queue_size_per_thread = 64;
}
}
ThreadPoolSocketObserver::ThreadPoolSocketObserver(
- yazpp_1::ISocketObservable *obs, int no_threads)
+ yazpp_1::ISocketObservable *obs,
+ unsigned min_threads, unsigned max_threads,
+ unsigned stack_size)
: m_p(new Rep(obs))
{
obs->addObserver(m_p->m_pipe.read_fd(), this);
obs->maskObserver(this, SOCKET_OBSERVE_READ);
m_p->m_stop_flag = false;
- m_p->m_no_threads = no_threads;
- int i;
- for (i = 0; i<no_threads; i++)
+ m_p->m_min_threads = m_p->m_no_threads = min_threads;
+ m_p->m_max_threads = max_threads;
+ m_p->m_waiting_threads = 0;
+ m_p->m_stack_size = stack_size;
+ unsigned i;
+ for (i = 0; i < m_p->m_no_threads; i++)
{
Worker w(this);
- m_p->m_thrds.add_thread(new boost::thread(w));
+ boost::thread::attributes attrs;
+ if (m_p->m_stack_size)
+ attrs.set_stack_size(m_p->m_stack_size);
+
+ boost::thread *x = new boost::thread(attrs, w);
+
+ m_p->m_thrds.add_thread(x);
}
}
m_p->m_cond_input_data.notify_all();
}
m_p->m_thrds.join_all();
-
m_p->m_socketObservable->deleteObserver(this);
}
m_p->m_output.pop_front();
}
if (out)
- out->result();
+ {
+ std::ostringstream os;
+ {
+ boost::mutex::scoped_lock input_lock(m_p->m_mutex_input_data);
+ os << "tbusy/total " <<
+ m_p->m_no_threads - m_p->m_waiting_threads <<
+ "/" << m_p->m_no_threads
+ << " queue in/out " << m_p->m_input.size() << "/"
+ << m_p->m_output.size();
+ }
+ out->result(os.str().c_str());
+ }
}
}
+void ThreadPoolSocketObserver::get_thread_info(int &tbusy, int &total)
+{
+ tbusy = m_p->m_no_threads - m_p->m_waiting_threads;
+ total = m_p->m_no_threads;
+}
+
void ThreadPoolSocketObserver::run(void *p)
{
while(1)
IThreadPoolMsg *in = 0;
{
boost::mutex::scoped_lock input_lock(m_p->m_mutex_input_data);
+ m_p->m_waiting_threads++;
while (!m_p->m_stop_flag && m_p->m_input.size() == 0)
m_p->m_cond_input_data.wait(input_lock);
+ m_p->m_waiting_threads--;
if (m_p->m_stop_flag)
break;
-
+
in = m_p->m_input.front();
- m_p->m_input.pop_front();
+ m_p->m_input.pop_front();
m_p->m_cond_input_full.notify_all();
}
IThreadPoolMsg *out = in->handle();
}
}
+void ThreadPoolSocketObserver::cleanup(IThreadPoolMsg *m, void *info)
+{
+ boost::mutex::scoped_lock input_lock(m_p->m_mutex_input_data);
+
+ std::deque<IThreadPoolMsg *>::iterator it = m_p->m_input.begin();
+ while (it != m_p->m_input.end())
+ {
+ if ((*it)->cleanup(info))
+ {
+ delete *it;
+ it = m_p->m_input.erase(it);
+ }
+ else
+ it++;
+ }
+}
+
void ThreadPoolSocketObserver::put(IThreadPoolMsg *m)
{
boost::mutex::scoped_lock input_lock(m_p->m_mutex_input_data);
+ if (m_p->m_waiting_threads == 0 &&
+ m_p->m_no_threads < m_p->m_max_threads)
+ {
+ m_p->m_no_threads++;
+ Worker w(this);
+
+ boost::thread::attributes attrs;
+ if (m_p->m_stack_size)
+ attrs.set_stack_size(m_p->m_stack_size);
+ boost::thread *x = new boost::thread(attrs, w);
+
+ m_p->m_thrds.add_thread(x);
+ }
while (m_p->m_input.size() >= m_p->m_no_threads * queue_size_per_thread)
m_p->m_cond_input_full.wait(input_lock);
m_p->m_input.push_back(m);
m_p->m_cond_input_data.notify_one();
}
+
/*
* Local variables:
* c-basic-offset: 4