|
@@ -22,10 +22,12 @@ Ambari Agent
|
|
|
|
|
|
__all__ = ["checked_call", "call"]
|
|
__all__ = ["checked_call", "call"]
|
|
|
|
|
|
-import subprocess
|
|
|
|
import pipes
|
|
import pipes
|
|
-from subprocess import TimeoutExpired
|
|
|
|
|
|
+import subprocess
|
|
|
|
+import threading
|
|
|
|
+from multiprocessing import Queue
|
|
from exceptions import Fail
|
|
from exceptions import Fail
|
|
|
|
+from exceptions import ExecuteTimeoutException
|
|
from resource_management.core.logger import Logger
|
|
from resource_management.core.logger import Logger
|
|
|
|
|
|
def checked_call(command, logoutput=False,
|
|
def checked_call(command, logoutput=False,
|
|
@@ -35,8 +37,7 @@ def checked_call(command, logoutput=False,
|
|
def call(command, logoutput=False,
|
|
def call(command, logoutput=False,
|
|
cwd=None, env=None, preexec_fn=None, user=None, wait_for_finish=True, timeout=None):
|
|
cwd=None, env=None, preexec_fn=None, user=None, wait_for_finish=True, timeout=None):
|
|
return _call(command, logoutput, False, cwd, env, preexec_fn, user, wait_for_finish, timeout)
|
|
return _call(command, logoutput, False, cwd, env, preexec_fn, user, wait_for_finish, timeout)
|
|
-
|
|
|
|
-
|
|
|
|
|
|
+
|
|
def _call(command, logoutput=False, throw_on_failure=True,
|
|
def _call(command, logoutput=False, throw_on_failure=True,
|
|
cwd=None, env=None, preexec_fn=None, user=None, wait_for_finish=True, timeout=None):
|
|
cwd=None, env=None, preexec_fn=None, user=None, wait_for_finish=True, timeout=None):
|
|
"""
|
|
"""
|
|
@@ -65,13 +66,20 @@ def _call(command, logoutput=False, throw_on_failure=True,
|
|
if not wait_for_finish:
|
|
if not wait_for_finish:
|
|
return None, None
|
|
return None, None
|
|
|
|
|
|
-
|
|
|
|
- try:
|
|
|
|
- out = proc.communicate(timeout=timeout)[0].strip('\n')
|
|
|
|
- except TimeoutExpired as ex:
|
|
|
|
- proc.terminate()
|
|
|
|
- raise ex
|
|
|
|
|
|
+ if timeout:
|
|
|
|
+ q = Queue()
|
|
|
|
+ t = threading.Timer( timeout, on_timeout, [proc, q] )
|
|
|
|
+ t.start()
|
|
|
|
|
|
|
|
+ out = proc.communicate()[0].strip('\n')
|
|
|
|
+
|
|
|
|
+ if timeout:
|
|
|
|
+ if q.empty():
|
|
|
|
+ t.cancel()
|
|
|
|
+ # timeout occurred
|
|
|
|
+ else:
|
|
|
|
+ raise ExecuteTimeoutException()
|
|
|
|
+
|
|
code = proc.returncode
|
|
code = proc.returncode
|
|
|
|
|
|
if logoutput and out:
|
|
if logoutput and out:
|
|
@@ -81,4 +89,12 @@ def _call(command, logoutput=False, throw_on_failure=True,
|
|
err_msg = ("Execution of '%s' returned %d. %s") % (command[-1], code, out)
|
|
err_msg = ("Execution of '%s' returned %d. %s") % (command[-1], code, out)
|
|
raise Fail(err_msg)
|
|
raise Fail(err_msg)
|
|
|
|
|
|
- return code, out
|
|
|
|
|
|
+ return code, out
|
|
|
|
+
|
|
|
|
+def on_timeout(proc, q):
|
|
|
|
+ q.put(True)
|
|
|
|
+ if proc.poll() == None:
|
|
|
|
+ try:
|
|
|
|
+ proc.terminate()
|
|
|
|
+ except:
|
|
|
|
+ pass
|