projects
/
metaproxy-moved-to-github.git
/ commitdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
| commitdiff |
tree
raw
|
patch
|
inline
| side by side (parent:
09a3443
)
Allow stack-size to be set for ThreadPoolSocketObserver
mp-629
author
Adam Dickmeiss
<adam@indexdata.dk>
Mon, 28 Sep 2015 17:41:33 +0000
(19:41 +0200)
committer
Adam Dickmeiss
<adam@indexdata.dk>
Mon, 28 Sep 2015 17:41:33 +0000
(19:41 +0200)
src/filter_frontend_net.cpp
patch
|
blob
|
history
src/test_thread_pool_observer.cpp
patch
|
blob
|
history
src/thread_pool_observer.cpp
patch
|
blob
|
history
src/thread_pool_observer.hpp
patch
|
blob
|
history
xml/schema/filter_frontend_net.rnc
patch
|
blob
|
history
diff --git
a/src/filter_frontend_net.cpp
b/src/filter_frontend_net.cpp
index
3df0332
..
7d6b422
100644
(file)
--- a/
src/filter_frontend_net.cpp
+++ b/
src/filter_frontend_net.cpp
@@
-56,6
+56,7
@@
namespace metaproxy_1 {
int m_no_threads;
int m_max_threads;
int m_no_threads;
int m_max_threads;
+ int m_stack_size;
std::vector<Port> m_ports;
int m_listen_duration;
int m_session_timeout;
std::vector<Port> m_ports;
int m_listen_duration;
int m_session_timeout;
@@
-526,6
+527,7
@@
yf::FrontendNet::FrontendNet() : m_p(new Rep)
yf::FrontendNet::Rep::Rep()
{
m_max_threads = m_no_threads = 5;
yf::FrontendNet::Rep::Rep()
{
m_max_threads = m_no_threads = 5;
+ m_stack_size = 0;
m_listen_duration = 0;
m_session_timeout = 300; // 5 minutes
m_connect_max = 0;
m_listen_duration = 0;
m_session_timeout = 300; // 5 minutes
m_connect_max = 0;
@@
-616,7
+618,8
@@
void yf::FrontendNet::process(mp::Package &package) const
m_p->m_listen_duration);
ThreadPoolSocketObserver tp(&m_p->mySocketManager, m_p->m_no_threads,
m_p->m_listen_duration);
ThreadPoolSocketObserver tp(&m_p->mySocketManager, m_p->m_no_threads,
- m_p->m_max_threads);
+ m_p->m_max_threads,
+ m_p->m_stack_size);
for (i = 0; i<m_p->m_ports.size(); i++)
{
for (i = 0; i<m_p->m_ports.size(); i++)
{
@@
-710,10
+713,19
@@
void yf::FrontendNet::configure(const xmlNode * ptr, bool test_only,
std::string threads_str = mp::xml::get_text(ptr);
int threads = atoi(threads_str.c_str());
if (threads < 1)
std::string threads_str = mp::xml::get_text(ptr);
int threads = atoi(threads_str.c_str());
if (threads < 1)
- throw yf::FilterException("Bad value for threads: "
+ throw yf::FilterException("Bad value for max-threads: "
+ threads_str);
m_p->m_max_threads = threads;
}
+ threads_str);
m_p->m_max_threads = threads;
}
+ else if (!strcmp((const char *) ptr->name, "stack-size"))
+ {
+ std::string sz_str = mp::xml::get_text(ptr);
+ int sz = atoi(sz_str.c_str());
+ if (sz < 0)
+ throw yf::FilterException("Bad value for stack-size: "
+ + sz_str);
+ m_p->m_stack_size = sz * 1024;
+ }
else if (!strcmp((const char *) ptr->name, "timeout"))
{
std::string timeout_str = mp::xml::get_text(ptr);
else if (!strcmp((const char *) ptr->name, "timeout"))
{
std::string timeout_str = mp::xml::get_text(ptr);
diff --git
a/src/test_thread_pool_observer.cpp
b/src/test_thread_pool_observer.cpp
index
3b2fe9a
..
8df5b22
100644
(file)
--- a/
src/test_thread_pool_observer.cpp
+++ b/
src/test_thread_pool_observer.cpp
@@
-61,14
+61,9
@@
public:
mp::IThreadPoolMsg *My_Msg::handle()
{
mp::IThreadPoolMsg *My_Msg::handle()
{
- My_Msg *res = new My_Msg;
-
if (m_val == 7)
sleep(1);
if (m_val == 7)
sleep(1);
-
- res->m_val = m_val;
- res->m_timer = m_timer;
- return res;
+ return this;
}
bool My_Msg::cleanup(void *info)
}
bool My_Msg::cleanup(void *info)
@@
-80,6
+75,7
@@
void My_Msg::result(const char *t_info)
{
m_timer->m_sum += m_val;
m_timer->m_responses++;
{
m_timer->m_sum += m_val;
m_timer->m_responses++;
+ delete this;
}
My_Timer_Thread::My_Timer_Thread(ISocketObservable *obs,
}
My_Timer_Thread::My_Timer_Thread(ISocketObservable *obs,
@@
-97,26
+93,24
@@
My_Timer_Thread::My_Timer_Thread(ISocketObservable *obs,
void My_Timer_Thread::socketNotify(int event)
{
void My_Timer_Thread::socketNotify(int event)
{
- My_Msg *m = new My_Msg;
- m->m_val = m_requests++;
- m->m_timer = this;
- m_t->put(m);
-#if 0
- // prevent input queue from being filled up..
- // bug #1064: Test test_thread_pool_observer hangs
- // fortunately we don't need this hack. because put (ebove)
- // will block itself if needed
- if (m->m_val == 30)
+ if (m_requests == 30)
m_obs->deleteObserver(this);
m_obs->deleteObserver(this);
-#endif
+ else
+ {
+ My_Msg *m = new My_Msg;
+ m->m_val = m_requests++;
+ m->m_timer = this;
+ m_t->put(m);
+ }
}
BOOST_AUTO_TEST_CASE( thread_pool_observer1 )
{
SocketManager mySocketManager;
}
BOOST_AUTO_TEST_CASE( thread_pool_observer1 )
{
SocketManager mySocketManager;
- mp::ThreadPoolSocketObserver m(&mySocketManager, 3);
- My_Timer_Thread t(&mySocketManager, &m) ;
+ mp::ThreadPoolSocketObserver m(&mySocketManager, 3, 3, 16*1024);
+ My_Timer_Thread t(&mySocketManager, &m);
+
while (t.m_responses < 30 && mySocketManager.processEvent() > 0)
;
BOOST_CHECK_EQUAL(t.m_responses, 30);
while (t.m_responses < 30 && mySocketManager.processEvent() > 0)
;
BOOST_CHECK_EQUAL(t.m_responses, 30);
diff --git
a/src/thread_pool_observer.cpp
b/src/thread_pool_observer.cpp
index
bf60e73
..
f7d3e63
100644
(file)
--- a/
src/thread_pool_observer.cpp
+++ b/
src/thread_pool_observer.cpp
@@
-68,6
+68,7
@@
namespace metaproxy_1 {
std::deque<IThreadPoolMsg *> m_input;
std::deque<IThreadPoolMsg *> m_output;
bool m_stop_flag;
std::deque<IThreadPoolMsg *> m_input;
std::deque<IThreadPoolMsg *> m_output;
bool m_stop_flag;
+ unsigned m_stack_size;
unsigned m_no_threads;
unsigned m_min_threads;
unsigned m_max_threads;
unsigned m_no_threads;
unsigned m_min_threads;
unsigned m_max_threads;
@@
-97,7
+98,8
@@
IThreadPoolMsg::~IThreadPoolMsg()
ThreadPoolSocketObserver::ThreadPoolSocketObserver(
yazpp_1::ISocketObservable *obs,
ThreadPoolSocketObserver::ThreadPoolSocketObserver(
yazpp_1::ISocketObservable *obs,
- unsigned min_threads, unsigned max_threads)
+ unsigned min_threads, unsigned max_threads,
+ unsigned stack_size)
: m_p(new Rep(obs))
{
obs->addObserver(m_p->m_pipe.read_fd(), this);
: m_p(new Rep(obs))
{
obs->addObserver(m_p->m_pipe.read_fd(), this);
@@
-107,11
+109,18
@@
ThreadPoolSocketObserver::ThreadPoolSocketObserver(
m_p->m_min_threads = m_p->m_no_threads = min_threads;
m_p->m_max_threads = max_threads;
m_p->m_waiting_threads = 0;
m_p->m_min_threads = m_p->m_no_threads = min_threads;
m_p->m_max_threads = max_threads;
m_p->m_waiting_threads = 0;
+ m_p->m_stack_size = stack_size;
unsigned i;
for (i = 0; i < m_p->m_no_threads; i++)
{
Worker w(this);
unsigned i;
for (i = 0; i < m_p->m_no_threads; i++)
{
Worker w(this);
- m_p->m_thrds.add_thread(new boost::thread(w));
+ boost::thread::attributes attrs;
+ if (m_p->m_stack_size)
+ attrs.set_stack_size(m_p->m_stack_size);
+
+ boost::thread *x = new boost::thread(attrs, w);
+
+ m_p->m_thrds.add_thread(x);
}
}
}
}
@@
-238,7
+247,13
@@
void ThreadPoolSocketObserver::put(IThreadPoolMsg *m)
{
m_p->m_no_threads++;
Worker w(this);
{
m_p->m_no_threads++;
Worker w(this);
- m_p->m_thrds.add_thread(new boost::thread(w));
+
+ boost::thread::attributes attrs;
+ if (m_p->m_stack_size)
+ attrs.set_stack_size(m_p->m_stack_size);
+ boost::thread *x = new boost::thread(attrs, w);
+
+ m_p->m_thrds.add_thread(x);
}
while (m_p->m_input.size() >= m_p->m_no_threads * queue_size_per_thread)
m_p->m_cond_input_full.wait(input_lock);
}
while (m_p->m_input.size() >= m_p->m_no_threads * queue_size_per_thread)
m_p->m_cond_input_full.wait(input_lock);
diff --git
a/src/thread_pool_observer.hpp
b/src/thread_pool_observer.hpp
index
88c3a0b
..
8d59897
100644
(file)
--- a/
src/thread_pool_observer.hpp
+++ b/
src/thread_pool_observer.hpp
@@
-38,7
+38,8
@@
namespace metaproxy_1 {
class Worker;
public:
ThreadPoolSocketObserver(yazpp_1::ISocketObservable *obs,
class Worker;
public:
ThreadPoolSocketObserver(yazpp_1::ISocketObservable *obs,
- unsigned min_threads, unsigned max_threads);
+ unsigned min_threads, unsigned max_threads,
+ unsigned stack_size);
virtual ~ThreadPoolSocketObserver();
void put(IThreadPoolMsg *m);
void cleanup(IThreadPoolMsg *m, void *info);
virtual ~ThreadPoolSocketObserver();
void put(IThreadPoolMsg *m);
void cleanup(IThreadPoolMsg *m, void *info);
diff --git
a/xml/schema/filter_frontend_net.rnc
b/xml/schema/filter_frontend_net.rnc
index
6f94f6b
..
4e32ccd
100644
(file)
--- a/
xml/schema/filter_frontend_net.rnc
+++ b/
xml/schema/filter_frontend_net.rnc
@@
-8,6
+8,7
@@
filter_frontend_net =
attribute name { xsd:NCName }?,
element mp:threads { xsd:integer }?,
element mp:max-threads { xsd:integer }?,
attribute name { xsd:NCName }?,
element mp:threads { xsd:integer }?,
element mp:max-threads { xsd:integer }?,
+ element mp:stack-size { xsd:integer }?,
element mp:port {
attribute route { xsd:NCName }?,
attribute max_recv_bytes { xsd:integer }?,
element mp:port {
attribute route { xsd:NCName }?,
attribute max_recv_bytes { xsd:integer }?,