|
@@ -31,13 +31,16 @@ ExitHelper().register(_shutdown_all)
|
|
|
|
|
|
|
|
|
|
class ThreadPool(object):
|
|
class ThreadPool(object):
|
|
- def __init__(self, core_threads=0, max_threads=20, keepalive=1):
|
|
|
|
|
|
+ def __init__(self, core_threads=0, max_threads=20, keepalive=1, context_injector=None, agent_config=None):
|
|
"""
|
|
"""
|
|
:param core_threads: maximum number of persistent threads in the pool
|
|
:param core_threads: maximum number of persistent threads in the pool
|
|
:param max_threads: maximum number of total threads in the pool
|
|
:param max_threads: maximum number of total threads in the pool
|
|
:param thread_class: callable that creates a Thread object
|
|
:param thread_class: callable that creates a Thread object
|
|
:param keepalive: seconds to keep non-core worker threads waiting
|
|
:param keepalive: seconds to keep non-core worker threads waiting
|
|
for new tasks
|
|
for new tasks
|
|
|
|
+
|
|
|
|
+ :type context_injector func
|
|
|
|
+ :type agent_config AmbariConfig.AmbariConfig
|
|
"""
|
|
"""
|
|
self.core_threads = core_threads
|
|
self.core_threads = core_threads
|
|
self.max_threads = max(max_threads, core_threads, 1)
|
|
self.max_threads = max(max_threads, core_threads, 1)
|
|
@@ -47,6 +50,9 @@ class ThreadPool(object):
|
|
self._threads = set()
|
|
self._threads = set()
|
|
self._shutdown = False
|
|
self._shutdown = False
|
|
|
|
|
|
|
|
+ self._job_context_injector = context_injector
|
|
|
|
+ self._agent_config = agent_config
|
|
|
|
+
|
|
_threadpools.add(ref(self))
|
|
_threadpools.add(ref(self))
|
|
logger.info('Started thread pool with %d core threads and %s maximum '
|
|
logger.info('Started thread pool with %d core threads and %s maximum '
|
|
'threads', core_threads, max_threads or 'unlimited')
|
|
'threads', core_threads, max_threads or 'unlimited')
|
|
@@ -73,6 +79,9 @@ class ThreadPool(object):
|
|
block = self.keepalive > 0
|
|
block = self.keepalive > 0
|
|
timeout = self.keepalive
|
|
timeout = self.keepalive
|
|
|
|
|
|
|
|
+ if self._job_context_injector is not None:
|
|
|
|
+ self._job_context_injector(self._agent_config)
|
|
|
|
+
|
|
while True:
|
|
while True:
|
|
try:
|
|
try:
|
|
func, args, kwargs = self._queue.get(block, timeout)
|
|
func, args, kwargs = self._queue.get(block, timeout)
|