projects
/
metaproxy-moved-to-github.git
/ commitdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
| commitdiff |
tree
raw
|
patch
|
inline
| side by side (parent:
8951d04
)
frontend_net: dynamic thread pool MP-629
author
Adam Dickmeiss
<adam@indexdata.dk>
Mon, 28 Sep 2015 14:03:44 +0000
(16:03 +0200)
committer
Adam Dickmeiss
<adam@indexdata.dk>
Mon, 28 Sep 2015 14:03:44 +0000
(16:03 +0200)
etc/config1.xml
patch
|
blob
|
history
src/filter_frontend_net.cpp
patch
|
blob
|
history
src/thread_pool_observer.cpp
patch
|
blob
|
history
src/thread_pool_observer.hpp
patch
|
blob
|
history
xml/schema/filter_frontend_net.rnc
patch
|
blob
|
history
diff --git
a/etc/config1.xml
b/etc/config1.xml
index
b556b72
..
3040d04
100644
(file)
--- a/
etc/config1.xml
+++ b/
etc/config1.xml
@@
-4,7
+4,8
@@
<routes>
<route id="start">
<filter type="frontend_net">
<routes>
<route id="start">
<filter type="frontend_net">
- <threads>10</threads>
+ <threads>1</threads>
+ <max-threads>10</max-threads>
<port>127.0.0.2:9000</port>
<port>127.0.0.1:9000</port>
<message>FN</message>
<port>127.0.0.2:9000</port>
<port>127.0.0.1:9000</port>
<message>FN</message>
diff --git
a/src/filter_frontend_net.cpp
b/src/filter_frontend_net.cpp
index
6607a1e
..
3df0332
100644
(file)
--- a/
src/filter_frontend_net.cpp
+++ b/
src/filter_frontend_net.cpp
@@
-55,6
+55,7
@@
namespace metaproxy_1 {
friend class FrontendNet;
int m_no_threads;
friend class FrontendNet;
int m_no_threads;
+ int m_max_threads;
std::vector<Port> m_ports;
int m_listen_duration;
int m_session_timeout;
std::vector<Port> m_ports;
int m_listen_duration;
int m_session_timeout;
@@
-524,7
+525,7
@@
yf::FrontendNet::FrontendNet() : m_p(new Rep)
yf::FrontendNet::Rep::Rep()
{
yf::FrontendNet::Rep::Rep()
{
- m_no_threads = 5;
+ m_max_threads = m_no_threads = 5;
m_listen_duration = 0;
m_session_timeout = 300; // 5 minutes
m_connect_max = 0;
m_listen_duration = 0;
m_session_timeout = 300; // 5 minutes
m_connect_max = 0;
@@
-614,7
+615,8
@@
void yf::FrontendNet::process(mp::Package &package) const
tt = new My_Timer_Thread(&m_p->mySocketManager,
m_p->m_listen_duration);
tt = new My_Timer_Thread(&m_p->mySocketManager,
m_p->m_listen_duration);
- ThreadPoolSocketObserver tp(&m_p->mySocketManager, m_p->m_no_threads);
+ ThreadPoolSocketObserver tp(&m_p->mySocketManager, m_p->m_no_threads,
+ m_p->m_max_threads);
for (i = 0; i<m_p->m_ports.size(); i++)
{
for (i = 0; i<m_p->m_ports.size(); i++)
{
@@
-703,6
+705,15
@@
void yf::FrontendNet::configure(const xmlNode * ptr, bool test_only,
+ threads_str);
m_p->m_no_threads = threads;
}
+ threads_str);
m_p->m_no_threads = threads;
}
+ else if (!strcmp((const char *) ptr->name, "max-threads"))
+ {
+ std::string threads_str = mp::xml::get_text(ptr);
+ int threads = atoi(threads_str.c_str());
+ if (threads < 1)
+ throw yf::FilterException("Bad value for threads: "
+ + threads_str);
+ m_p->m_max_threads = threads;
+ }
else if (!strcmp((const char *) ptr->name, "timeout"))
{
std::string timeout_str = mp::xml::get_text(ptr);
else if (!strcmp((const char *) ptr->name, "timeout"))
{
std::string timeout_str = mp::xml::get_text(ptr);
diff --git
a/src/thread_pool_observer.cpp
b/src/thread_pool_observer.cpp
index
a52cd92
..
bf60e73
100644
(file)
--- a/
src/thread_pool_observer.cpp
+++ b/
src/thread_pool_observer.cpp
@@
-69,7
+69,9
@@
namespace metaproxy_1 {
std::deque<IThreadPoolMsg *> m_output;
bool m_stop_flag;
unsigned m_no_threads;
std::deque<IThreadPoolMsg *> m_output;
bool m_stop_flag;
unsigned m_no_threads;
- unsigned m_no_threads_waiting;
+ unsigned m_min_threads;
+ unsigned m_max_threads;
+ unsigned m_waiting_threads;
};
const unsigned int queue_size_per_thread = 64;
}
};
const unsigned int queue_size_per_thread = 64;
}
@@
-94,17
+96,19
@@
IThreadPoolMsg::~IThreadPoolMsg()
}
ThreadPoolSocketObserver::ThreadPoolSocketObserver(
}
ThreadPoolSocketObserver::ThreadPoolSocketObserver(
- yazpp_1::ISocketObservable *obs, int no_threads)
+ yazpp_1::ISocketObservable *obs,
+ unsigned min_threads, unsigned max_threads)
: m_p(new Rep(obs))
{
obs->addObserver(m_p->m_pipe.read_fd(), this);
obs->maskObserver(this, SOCKET_OBSERVE_READ);
m_p->m_stop_flag = false;
: m_p(new Rep(obs))
{
obs->addObserver(m_p->m_pipe.read_fd(), this);
obs->maskObserver(this, SOCKET_OBSERVE_READ);
m_p->m_stop_flag = false;
- m_p->m_no_threads = no_threads;
- m_p->m_no_threads_waiting = 0;
- int i;
- for (i = 0; i<no_threads; i++)
+ m_p->m_min_threads = m_p->m_no_threads = min_threads;
+ m_p->m_max_threads = max_threads;
+ m_p->m_waiting_threads = 0;
+ unsigned i;
+ for (i = 0; i < m_p->m_no_threads; i++)
{
Worker w(this);
m_p->m_thrds.add_thread(new boost::thread(w));
{
Worker w(this);
m_p->m_thrds.add_thread(new boost::thread(w));
@@
-119,7
+123,6
@@
ThreadPoolSocketObserver::~ThreadPoolSocketObserver()
m_p->m_cond_input_data.notify_all();
}
m_p->m_thrds.join_all();
m_p->m_cond_input_data.notify_all();
}
m_p->m_thrds.join_all();
-
m_p->m_socketObservable->deleteObserver(this);
}
m_p->m_socketObservable->deleteObserver(this);
}
@@
-148,15
+151,13
@@
void ThreadPoolSocketObserver::socketNotify(int event)
out = m_p->m_output.front();
m_p->m_output.pop_front();
}
out = m_p->m_output.front();
m_p->m_output.pop_front();
}
-
-
if (out)
{
std::ostringstream os;
{
boost::mutex::scoped_lock input_lock(m_p->m_mutex_input_data);
os << "tbusy/total " <<
if (out)
{
std::ostringstream os;
{
boost::mutex::scoped_lock input_lock(m_p->m_mutex_input_data);
os << "tbusy/total " <<
- m_p->m_no_threads - m_p->m_no_threads_waiting <<
+ m_p->m_no_threads - m_p->m_waiting_threads <<
"/" << m_p->m_no_threads
<< " queue in/out " << m_p->m_input.size() << "/"
<< m_p->m_output.size();
"/" << m_p->m_no_threads
<< " queue in/out " << m_p->m_input.size() << "/"
<< m_p->m_output.size();
@@
-168,7
+169,7
@@
void ThreadPoolSocketObserver::socketNotify(int event)
void ThreadPoolSocketObserver::get_thread_info(int &tbusy, int &total)
{
void ThreadPoolSocketObserver::get_thread_info(int &tbusy, int &total)
{
- tbusy = m_p->m_no_threads - m_p->m_no_threads_waiting;
+ tbusy = m_p->m_no_threads - m_p->m_waiting_threads;
total = m_p->m_no_threads;
}
total = m_p->m_no_threads;
}
@@
-179,10
+180,10
@@
void ThreadPoolSocketObserver::run(void *p)
IThreadPoolMsg *in = 0;
{
boost::mutex::scoped_lock input_lock(m_p->m_mutex_input_data);
IThreadPoolMsg *in = 0;
{
boost::mutex::scoped_lock input_lock(m_p->m_mutex_input_data);
- m_p->m_no_threads_waiting++;
+ m_p->m_waiting_threads++;
while (!m_p->m_stop_flag && m_p->m_input.size() == 0)
m_p->m_cond_input_data.wait(input_lock);
while (!m_p->m_stop_flag && m_p->m_input.size() == 0)
m_p->m_cond_input_data.wait(input_lock);
- m_p->m_no_threads_waiting--;
+ m_p->m_waiting_threads--;
if (m_p->m_stop_flag)
break;
if (m_p->m_stop_flag)
break;
@@
-232,7
+233,13
@@
void ThreadPoolSocketObserver::cleanup(IThreadPoolMsg *m, void *info)
void ThreadPoolSocketObserver::put(IThreadPoolMsg *m)
{
boost::mutex::scoped_lock input_lock(m_p->m_mutex_input_data);
void ThreadPoolSocketObserver::put(IThreadPoolMsg *m)
{
boost::mutex::scoped_lock input_lock(m_p->m_mutex_input_data);
-
+ if (m_p->m_waiting_threads == 0 &&
+ m_p->m_no_threads < m_p->m_max_threads)
+ {
+ m_p->m_no_threads++;
+ Worker w(this);
+ m_p->m_thrds.add_thread(new boost::thread(w));
+ }
while (m_p->m_input.size() >= m_p->m_no_threads * queue_size_per_thread)
m_p->m_cond_input_full.wait(input_lock);
m_p->m_input.push_back(m);
while (m_p->m_input.size() >= m_p->m_no_threads * queue_size_per_thread)
m_p->m_cond_input_full.wait(input_lock);
m_p->m_input.push_back(m);
diff --git
a/src/thread_pool_observer.hpp
b/src/thread_pool_observer.hpp
index
630b39b
..
88c3a0b
100644
(file)
--- a/
src/thread_pool_observer.hpp
+++ b/
src/thread_pool_observer.hpp
@@
-38,7
+38,7
@@
namespace metaproxy_1 {
class Worker;
public:
ThreadPoolSocketObserver(yazpp_1::ISocketObservable *obs,
class Worker;
public:
ThreadPoolSocketObserver(yazpp_1::ISocketObservable *obs,
- int no_threads);
+ unsigned min_threads, unsigned max_threads);
virtual ~ThreadPoolSocketObserver();
void put(IThreadPoolMsg *m);
void cleanup(IThreadPoolMsg *m, void *info);
virtual ~ThreadPoolSocketObserver();
void put(IThreadPoolMsg *m);
void cleanup(IThreadPoolMsg *m, void *info);
diff --git
a/xml/schema/filter_frontend_net.rnc
b/xml/schema/filter_frontend_net.rnc
index
1776c47
..
6f94f6b
100644
(file)
--- a/
xml/schema/filter_frontend_net.rnc
+++ b/
xml/schema/filter_frontend_net.rnc
@@
-7,6
+7,7
@@
filter_frontend_net =
attribute id { xsd:NCName }?,
attribute name { xsd:NCName }?,
element mp:threads { xsd:integer }?,
attribute id { xsd:NCName }?,
attribute name { xsd:NCName }?,
element mp:threads { xsd:integer }?,
+ element mp:max-threads { xsd:integer }?,
element mp:port {
attribute route { xsd:NCName }?,
attribute max_recv_bytes { xsd:integer }?,
element mp:port {
attribute route { xsd:NCName }?,
attribute max_recv_bytes { xsd:integer }?,