Przeglądaj źródła

AMBARI-4006. Custom Action: Add agent support and support for Python based custom action scripts (dsen)

Dmitry Sen 11 lat temu
rodzic
commit
56f7f8744f

+ 3 - 0
ambari-agent/conf/unix/ambari-agent.ini

@@ -34,6 +34,9 @@ facter_home=/usr/lib/ambari-agent/lib/facter-1.6.10
 # How many seconds will pass before running puppet is terminated on timeout
 # How many seconds will pass before running puppet is terminated on timeout
 timeout_seconds = 600
 timeout_seconds = 600
 
 
+[python]
+custom_actions_dir = /var/lib/ambari-agent/resources
+
 [command]
 [command]
 maxretries=2
 maxretries=2
 sleepBetweenRetries=1
 sleepBetweenRetries=1

+ 12 - 0
ambari-agent/pom.xml

@@ -343,6 +343,18 @@
                 </source>
                 </source>
               </sources>
               </sources>
             </mapping>
             </mapping>
+            <mapping>
+              <!-- custom actions root-->
+              <directory>/var/lib/ambari-agent/resources</directory>
+              <filemode>755</filemode>
+              <username>root</username>
+              <groupname>root</groupname>
+              <sources>
+                <source>
+                  <location>../ambari-server/src/main/resources/custom_actions</location>
+                </source>
+              </sources>
+            </mapping>
           </mappings>
           </mappings>
         </configuration>
         </configuration>
       </plugin>
       </plugin>

+ 5 - 0
ambari-agent/src/main/python/ambari_agent/ActionQueue.py

@@ -143,6 +143,7 @@ class ActionQueue(threading.Thread):
     in_progress_status.update({
     in_progress_status.update({
       'tmpout': self.tmpdir + os.sep + 'output-' + str(taskId) + '.txt',
       'tmpout': self.tmpdir + os.sep + 'output-' + str(taskId) + '.txt',
       'tmperr': self.tmpdir + os.sep + 'errors-' + str(taskId) + '.txt',
       'tmperr': self.tmpdir + os.sep + 'errors-' + str(taskId) + '.txt',
+      'structuredOut' : self.tmpdir + os.sep + 'structured-out-' + str(taskId) + '.json',
       'status': self.IN_PROGRESS_STATUS
       'status': self.IN_PROGRESS_STATUS
     })
     })
     self.commandStatuses.put_command_status(command, in_progress_status)
     self.commandStatuses.put_command_status(command, in_progress_status)
@@ -176,6 +177,10 @@ class ActionQueue(threading.Thread):
     if roleResult['stderr'] == '':
     if roleResult['stderr'] == '':
       roleResult['stderr'] = 'None'
       roleResult['stderr'] = 'None'
 
 
+    if 'structuredOut' in commandresult:
+      roleResult['structuredOut'] = str(commandresult['structuredOut'])
+    else:
+      roleResult['structuredOut'] = ''
     # let ambari know that configuration tags were applied
     # let ambari know that configuration tags were applied
     if status == self.COMPLETED_STATUS:
     if status == self.COMPLETED_STATUS:
       configHandler = ActualConfigHandler(self.config)
       configHandler = ActualConfigHandler(self.config)

+ 3 - 0
ambari-agent/src/main/python/ambari_agent/AmbariConfig.py

@@ -44,6 +44,9 @@ puppet_home=/root/workspace/puppet-install/puppet-2.7.9
 facter_home=/root/workspace/puppet-install/facter-1.6.10
 facter_home=/root/workspace/puppet-install/facter-1.6.10
 timeout_seconds = 600
 timeout_seconds = 600
 
 
+[python]
+custom_actions_dir = /var/lib/ambari-agent/resources
+
 [command]
 [command]
 maxretries=2
 maxretries=2
 sleepBetweenRetries=1
 sleepBetweenRetries=1

+ 5 - 0
ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py

@@ -18,6 +18,7 @@ See the License for the specific language governing permissions and
 limitations under the License.
 limitations under the License.
 '''
 '''
 
 
+import json
 import logging
 import logging
 import threading
 import threading
 from Grep import Grep
 from Grep import Grep
@@ -96,16 +97,20 @@ class CommandStatusDict():
     try:
     try:
       tmpout = open(report['tmpout'], 'r').read()
       tmpout = open(report['tmpout'], 'r').read()
       tmperr = open(report['tmperr'], 'r').read()
       tmperr = open(report['tmperr'], 'r').read()
+      with open(report['structuredOut'], 'r') as fp:
+        tmpstructuredout = json.load(fp)
     except Exception, err:
     except Exception, err:
       logger.warn(err)
       logger.warn(err)
       tmpout = '...'
       tmpout = '...'
       tmperr = '...'
       tmperr = '...'
+      tmpstructuredout = ''
     grep = Grep()
     grep = Grep()
     output = grep.tail(tmpout, Grep.OUTPUT_LAST_LINES)
     output = grep.tail(tmpout, Grep.OUTPUT_LAST_LINES)
     inprogress = self.generate_report_template(command)
     inprogress = self.generate_report_template(command)
     inprogress.update({
     inprogress.update({
       'stdout': grep.filterMarkup(output),
       'stdout': grep.filterMarkup(output),
       'stderr': tmperr,
       'stderr': tmperr,
+      'structuredOut': tmpstructuredout,
       'exitCode': 777,
       'exitCode': 777,
       'status': ActionQueue.IN_PROGRESS_STATUS,
       'status': ActionQueue.IN_PROGRESS_STATUS,
     })
     })

+ 17 - 7
ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py

@@ -39,6 +39,7 @@ class CustomServiceOrchestrator():
   """
   """
 
 
   SCRIPT_TYPE_PYTHON = "PYTHON"
   SCRIPT_TYPE_PYTHON = "PYTHON"
+  CUSTOM_ACTION_COMMAND = 'ACTIONEXECUTE'
 
 
   def __init__(self, config):
   def __init__(self, config):
     self.config = config
     self.config = config
@@ -50,21 +51,29 @@ class CustomServiceOrchestrator():
   def runCommand(self, command, tmpoutfile, tmperrfile):
   def runCommand(self, command, tmpoutfile, tmperrfile):
     try:
     try:
       component_name = command['role']
       component_name = command['role']
-      stack_name = command['hostLevelParams']['stack_name']
-      stack_version = command['hostLevelParams']['stack_version']
       script_type = command['commandParams']['script_type']
       script_type = command['commandParams']['script_type']
       script = command['commandParams']['script']
       script = command['commandParams']['script']
       command_name = command['roleCommand']
       command_name = command['roleCommand']
       timeout = int(command['commandParams']['command_timeout'])
       timeout = int(command['commandParams']['command_timeout'])
-      metadata_folder = command['commandParams']['service_metadata_folder']
-      base_dir = self.file_cache.get_service_base_dir(
+      task_id = command['taskId']
+      if command_name == self.CUSTOM_ACTION_COMMAND:
+        base_dir = self.config.get('python', 'custom_actions_dir')
+        script_path = os.path.join(base_dir, script)
+      else:
+        stack_name = command['hostLevelParams']['stack_name']
+        stack_version = command['hostLevelParams']['stack_version']
+        metadata_folder = command['commandParams']['service_metadata_folder']
+        base_dir = self.file_cache.get_service_base_dir(
           stack_name, stack_version, metadata_folder, component_name)
           stack_name, stack_version, metadata_folder, component_name)
-      script_path = self.resolve_script_path(base_dir, script, script_type)
+        script_path = self.resolve_script_path(base_dir, script, script_type)
+
+      tmpstrucoutfile = os.path.join(self.tmp_dir, "structured-out-{0}.json".
+        format(task_id))
       if script_type.upper() == self.SCRIPT_TYPE_PYTHON:
       if script_type.upper() == self.SCRIPT_TYPE_PYTHON:
         json_path = self.dump_command_to_json(command)
         json_path = self.dump_command_to_json(command)
         script_params = [command_name, json_path, base_dir]
         script_params = [command_name, json_path, base_dir]
-        ret = self.python_executor.run_file(
-          script_path, script_params, tmpoutfile, tmperrfile, timeout)
+        ret = self.python_executor.run_file(script_path, script_params,
+          tmpoutfile, tmperrfile, timeout, tmpstrucoutfile)
       else:
       else:
         message = "Unknown script type {0}".format(script_type)
         message = "Unknown script type {0}".format(script_type)
         raise AgentException(message)
         raise AgentException(message)
@@ -76,6 +85,7 @@ class CustomServiceOrchestrator():
       ret = {
       ret = {
         'stdout' : message,
         'stdout' : message,
         'stderr' : message,
         'stderr' : message,
+        'structuredOut' : message,
         'exitcode': 1,
         'exitcode': 1,
       }
       }
     return ret
     return ret

+ 23 - 4
ambari-agent/src/main/python/ambari_agent/PythonExecutor.py

@@ -17,7 +17,9 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 See the License for the specific language governing permissions and
 See the License for the specific language governing permissions and
 limitations under the License.
 limitations under the License.
 '''
 '''
+import json
 import logging
 import logging
+import os
 import subprocess
 import subprocess
 import pprint
 import pprint
 import threading
 import threading
@@ -45,7 +47,8 @@ class PythonExecutor:
     self.config = config
     self.config = config
     pass
     pass
 
 
-  def run_file(self, script, script_params, tmpoutfile, tmperrfile, timeout):
+  def run_file(self, script, script_params, tmpoutfile, tmperrfile, timeout,
+               tmpstructedoutfile):
     """
     """
     Executes the specified python file in a separate subprocess.
     Executes the specified python file in a separate subprocess.
     Method returns only when the subprocess is finished.
     Method returns only when the subprocess is finished.
@@ -55,6 +58,7 @@ class PythonExecutor:
     """
     """
     tmpout =  open(tmpoutfile, 'w')
     tmpout =  open(tmpoutfile, 'w')
     tmperr =  open(tmperrfile, 'w')
     tmperr =  open(tmperrfile, 'w')
+    script_params += [tmpstructedoutfile]
     pythonCommand = self.python_command(script, script_params)
     pythonCommand = self.python_command(script, script_params)
     logger.info("Running command " + pprint.pformat(pythonCommand))
     logger.info("Running command " + pprint.pformat(pythonCommand))
     process = self.launch_python_subprocess(pythonCommand, tmpout, tmperr)
     process = self.launch_python_subprocess(pythonCommand, tmpout, tmperr)
@@ -72,10 +76,24 @@ class PythonExecutor:
     returncode = process.returncode
     returncode = process.returncode
     out = open(tmpoutfile, 'r').read()
     out = open(tmpoutfile, 'r').read()
     error = open(tmperrfile, 'r').read()
     error = open(tmperrfile, 'r').read()
+
+    try:
+      with open(tmpstructedoutfile, 'r') as fp:
+        structured_out = json.load(fp)
+    except Exception:
+      if os.path.exists(tmpstructedoutfile):
+        errMsg = 'Unable to read structured output from ' + tmpstructedoutfile
+        structured_out = {
+          'msg' : errMsg
+        }
+        logger.warn(structured_out)
+      else:
+        structured_out = ''
+
     if self.python_process_has_been_killed:
     if self.python_process_has_been_killed:
       error = str(error) + "\n Python script has been killed due to timeout"
       error = str(error) + "\n Python script has been killed due to timeout"
       returncode = 999
       returncode = 999
-    result = self.condenseOutput(out, error, returncode)
+    result = self.condenseOutput(out, error, returncode, structured_out)
     logger.info("Result: %s" % result)
     logger.info("Result: %s" % result)
     return result
     return result
 
 
@@ -97,12 +115,13 @@ class PythonExecutor:
     python_command = [python_binary, script] + script_params
     python_command = [python_binary, script] + script_params
     return python_command
     return python_command
 
 
-  def condenseOutput(self, stdout, stderr, retcode):
+  def condenseOutput(self, stdout, stderr, retcode, structured_out):
     grep = self.grep
     grep = self.grep
     result = {
     result = {
       "exitcode": retcode,
       "exitcode": retcode,
       "stdout"  : grep.tail(stdout, grep.OUTPUT_LAST_LINES),
       "stdout"  : grep.tail(stdout, grep.OUTPUT_LAST_LINES),
-      "stderr"  : grep.tail(stderr, grep.OUTPUT_LAST_LINES)
+      "stderr"  : grep.tail(stderr, grep.OUTPUT_LAST_LINES),
+      "structuredOut" : structured_out
     }
     }
     return result
     return result
 
 

+ 19 - 8
ambari-agent/src/main/python/resource_management/libraries/script/script.py

@@ -36,6 +36,16 @@ class Script():
   even different Script instances can not be used from different threads at
   even different Script instances can not be used from different threads at
   the same time
   the same time
   """
   """
+  structuredOut = {}
+
+  def put_structured_out(self, sout):
+    Script.structuredOut.update(sout)
+    try:
+      structuredOut = json.dumps(Script.structuredOut)
+      with open(self.stroutfile, 'w') as fp:
+        json.dump(structuredOut, fp)
+    except IOError:
+      Script.structuredOut.update({"errMsg" : "Unable to write to " + self.stroutfile})
 
 
   def execute(self):
   def execute(self):
     """
     """
@@ -55,13 +65,14 @@ class Script():
     logger.addHandler(cherr)
     logger.addHandler(cherr)
     logger.addHandler(chout)
     logger.addHandler(chout)
     # parse arguments
     # parse arguments
-    if len(sys.argv) < 1+3:
-      logger.error("Script expects at least 3 arguments")
+    if len(sys.argv) < 5:
+      logger.error("Script expects at least 4 arguments")
       sys.exit(1)
       sys.exit(1)
-    command_type = str.lower(sys.argv[1])
+    command_name = str.lower(sys.argv[1])
     # parse command parameters
     # parse command parameters
     command_data_file = sys.argv[2]
     command_data_file = sys.argv[2]
     basedir = sys.argv[3]
     basedir = sys.argv[3]
+    self.stroutfile = sys.argv[4]
     try:
     try:
       with open(command_data_file, "r") as f:
       with open(command_data_file, "r") as f:
         pass
         pass
@@ -71,17 +82,17 @@ class Script():
       sys.exit(1)
       sys.exit(1)
     # Run class method mentioned by a command type
     # Run class method mentioned by a command type
     self_methods = dir(self)
     self_methods = dir(self)
-    if not command_type in self_methods:
-      logger.error("Script {0} has not method '{1}'".format(sys.argv[0], command_type))
+    if not command_name in self_methods:
+      logger.error("Script {0} has not method '{1}'".format(sys.argv[0], command_name))
       sys.exit(1)
       sys.exit(1)
-    method = getattr(self, command_type)
+    method = getattr(self, command_name)
     try:
     try:
       with Environment(basedir) as env:
       with Environment(basedir) as env:
         method(env)
         method(env)
     except Fail:
     except Fail:
-      logger.exception("Got exception while executing method '{0}':".format(command_type))
+      logger.exception("Got exception while executing method '{0}':".format(command_name))
       sys.exit(1)
       sys.exit(1)
-      
+
   @staticmethod
   @staticmethod
   def get_config():
   def get_config():
     """
     """

+ 8 - 2
ambari-agent/src/test/python/ambari_agent/TestActionQueue.py

@@ -195,10 +195,10 @@ class TestActionQueue(TestCase):
     self.assertTrue(print_exc_mock.called)
     self.assertTrue(print_exc_mock.called)
 
 
 
 
-
+  @patch("json.load")
   @patch("__builtin__.open")
   @patch("__builtin__.open")
   @patch.object(ActionQueue, "status_update_callback")
   @patch.object(ActionQueue, "status_update_callback")
-  def test_execute_command(self, status_update_callback_mock, open_mock):
+  def test_execute_command(self, status_update_callback_mock, open_mock, json_load_mock):
     # Make file read calls visible
     # Make file read calls visible
     def open_side_effect(file, mode):
     def open_side_effect(file, mode):
       if mode == 'r':
       if mode == 'r':
@@ -208,6 +208,7 @@ class TestActionQueue(TestCase):
       else:
       else:
         return self.original_open(file, mode)
         return self.original_open(file, mode)
     open_mock.side_effect = open_side_effect
     open_mock.side_effect = open_side_effect
+    json_load_mock.return_value = ''
 
 
     config = AmbariConfig().getConfig()
     config = AmbariConfig().getConfig()
     tempdir = tempfile.gettempdir()
     tempdir = tempfile.gettempdir()
@@ -217,6 +218,7 @@ class TestActionQueue(TestCase):
     puppet_execution_result_dict = {
     puppet_execution_result_dict = {
       'stdout': 'out',
       'stdout': 'out',
       'stderr': 'stderr',
       'stderr': 'stderr',
+      'structuredOut' : ''
       }
       }
     def side_effect(command, tmpoutfile, tmperrfile):
     def side_effect(command, tmpoutfile, tmperrfile):
       unfreeze_flag.wait()
       unfreeze_flag.wait()
@@ -244,6 +246,7 @@ class TestActionQueue(TestCase):
     expected = {'status': 'IN_PROGRESS',
     expected = {'status': 'IN_PROGRESS',
                 'stderr': 'Read from {0}/errors-3.txt'.format(tempdir),
                 'stderr': 'Read from {0}/errors-3.txt'.format(tempdir),
                 'stdout': 'Read from {0}/output-3.txt'.format(tempdir),
                 'stdout': 'Read from {0}/output-3.txt'.format(tempdir),
+                'structuredOut' : '',
                 'clusterName': u'cc',
                 'clusterName': u'cc',
                 'roleCommand': u'INSTALL',
                 'roleCommand': u'INSTALL',
                 'serviceName': u'HDFS',
                 'serviceName': u'HDFS',
@@ -270,6 +273,7 @@ class TestActionQueue(TestCase):
                 'role': u'DATANODE',
                 'role': u'DATANODE',
                 'actionId': '1-1',
                 'actionId': '1-1',
                 'taskId': 3,
                 'taskId': 3,
+                'structuredOut' : '',
                 'exitCode': 0}
                 'exitCode': 0}
     self.assertEqual(len(report['reports']), 1)
     self.assertEqual(len(report['reports']), 1)
     self.assertEqual(report['reports'][0], expected)
     self.assertEqual(report['reports'][0], expected)
@@ -307,6 +311,7 @@ class TestActionQueue(TestCase):
                 'role': u'DATANODE',
                 'role': u'DATANODE',
                 'actionId': '1-1',
                 'actionId': '1-1',
                 'taskId': 3,
                 'taskId': 3,
+                'structuredOut' : '',
                 'exitCode': 13}
                 'exitCode': 13}
     self.assertEqual(len(report['reports']), 1)
     self.assertEqual(len(report['reports']), 1)
     self.assertEqual(report['reports'][0], expected)
     self.assertEqual(report['reports'][0], expected)
@@ -338,6 +343,7 @@ class TestActionQueue(TestCase):
                 'role': 'role',
                 'role': 'role',
                 'actionId': 17,
                 'actionId': 17,
                 'taskId': 'taskId',
                 'taskId': 'taskId',
+                'structuredOut' : '',
                 'exitCode': 0}
                 'exitCode': 0}
     self.assertEqual(len(report['reports']), 1)
     self.assertEqual(len(report['reports']), 1)
     self.assertEqual(report['reports'][0], expected)
     self.assertEqual(report['reports'][0], expected)

+ 2 - 0
ambari-agent/src/test/python/ambari_agent/TestCommandStatusDict.py

@@ -107,12 +107,14 @@ class TestCommandStatusDict(TestCase):
                    {'status': 'COMPLETE', 'taskId': 4},
                    {'status': 'COMPLETE', 'taskId': 4},
                    {'status': 'IN_PROGRESS', 'stderr': '...',
                    {'status': 'IN_PROGRESS', 'stderr': '...',
                     'stdout': '...', 'clusterName': u'cc',
                     'stdout': '...', 'clusterName': u'cc',
+                    'structuredOut' : '',
                     'roleCommand': u'INSTALL', 'serviceName': u'HDFS',
                     'roleCommand': u'INSTALL', 'serviceName': u'HDFS',
                     'role': u'DATANODE', 'actionId': '1-1', 'taskId': 5,
                     'role': u'DATANODE', 'actionId': '1-1', 'taskId': 5,
                     'exitCode': 777},
                     'exitCode': 777},
                    {'status': 'IN_PROGRESS',
                    {'status': 'IN_PROGRESS',
                     'stderr': '...',
                     'stderr': '...',
                     'stdout': '...',
                     'stdout': '...',
+                    'structuredOut' : '',
                     'clusterName': u'cc',
                     'clusterName': u'cc',
                     'roleCommand': u'INSTALL',
                     'roleCommand': u'INSTALL',
                     'serviceName': u'HDFS',
                     'serviceName': u'HDFS',

+ 28 - 0
ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py

@@ -50,6 +50,8 @@ class TestCustomServiceOrchestrator(TestCase):
     self.config.add_section('agent')
     self.config.add_section('agent')
     self.config.set('agent', 'prefix', tmpdir)
     self.config.set('agent', 'prefix', tmpdir)
     self.config.set('agent', 'cache_dir', "/cachedir")
     self.config.set('agent', 'cache_dir', "/cachedir")
+    self.config.add_section('python')
+    self.config.set('python', 'custom_actions_dir', tmpdir)
 
 
 
 
   @patch("hostname.public_hostname")
   @patch("hostname.public_hostname")
@@ -116,6 +118,7 @@ class TestCustomServiceOrchestrator(TestCase):
         'command_timeout': '600',
         'command_timeout': '600',
         'service_metadata_folder' : 'HBASE'
         'service_metadata_folder' : 'HBASE'
       },
       },
+      'taskId' : '3',
       'roleCommand': 'INSTALL'
       'roleCommand': 'INSTALL'
     }
     }
     get_service_base_dir_mock.return_value = "/basedir/"
     get_service_base_dir_mock.return_value = "/basedir/"
@@ -140,6 +143,31 @@ class TestCustomServiceOrchestrator(TestCase):
     self.assertTrue("Unknown script type" in ret['stdout'])
     self.assertTrue("Unknown script type" in ret['stdout'])
     pass
     pass
 
 
+  @patch.object(CustomServiceOrchestrator, "dump_command_to_json")
+  @patch.object(PythonExecutor, "run_file")
+  def test_runCommand(self, run_file_mock, dump_command_to_json_mock):
+    _, script = tempfile.mkstemp()
+    command = {
+      'role' : 'any',
+      'commandParams': {
+        'script_type': 'PYTHON',
+        'script': 'some_custom_action.py',
+        'command_timeout': '600',
+      },
+      'taskId' : '3',
+      'roleCommand': 'ACTIONEXECUTE'
+    }
+
+    orchestrator = CustomServiceOrchestrator(self.config)
+    # normal run case
+    run_file_mock.return_value = {
+      'stdout' : 'sss',
+      'stderr' : 'eee',
+      'exitcode': 0,
+      }
+    ret = orchestrator.runCommand(command, "out.txt", "err.txt")
+    self.assertEqual(ret['exitcode'], 0)
+    self.assertTrue(run_file_mock.called)
 
 
   def tearDown(self):
   def tearDown(self):
     # enable stdout
     # enable stdout

+ 7 - 4
ambari-agent/src/test/python/ambari_agent/TestPythonExecutor.py

@@ -42,6 +42,7 @@ class TestPythonExecutor(TestCase):
     executor = PythonExecutor("/tmp", AmbariConfig().getConfig())
     executor = PythonExecutor("/tmp", AmbariConfig().getConfig())
     _, tmpoutfile = tempfile.mkstemp()
     _, tmpoutfile = tempfile.mkstemp()
     _, tmperrfile = tempfile.mkstemp()
     _, tmperrfile = tempfile.mkstemp()
+    _, tmpstrucout = tempfile.mkstemp()
     PYTHON_TIMEOUT_SECONDS = 0.1
     PYTHON_TIMEOUT_SECONDS = 0.1
     kill_process_with_children_mock.side_effect = lambda pid : subproc_mock.terminate()
     kill_process_with_children_mock.side_effect = lambda pid : subproc_mock.terminate()
 
 
@@ -54,8 +55,8 @@ class TestPythonExecutor(TestCase):
     runShellKillPgrp_method.side_effect = lambda python : python.terminate()
     runShellKillPgrp_method.side_effect = lambda python : python.terminate()
     executor.runShellKillPgrp = runShellKillPgrp_method
     executor.runShellKillPgrp = runShellKillPgrp_method
     subproc_mock.returncode = None
     subproc_mock.returncode = None
-    thread = Thread(target =  executor.run_file, args = ("fake_puppetFile", ["arg1", "arg2"],
-                                                    tmpoutfile, tmperrfile, PYTHON_TIMEOUT_SECONDS))
+    thread = Thread(target =  executor.run_file, args = ("fake_puppetFile",
+      ["arg1", "arg2"], tmpoutfile, tmperrfile, PYTHON_TIMEOUT_SECONDS, tmpstrucout))
     thread.start()
     thread.start()
     time.sleep(0.1)
     time.sleep(0.1)
     subproc_mock.finished_event.wait()
     subproc_mock.finished_event.wait()
@@ -96,6 +97,7 @@ class TestPythonExecutor(TestCase):
     executor = PythonExecutor("/tmp", AmbariConfig().getConfig())
     executor = PythonExecutor("/tmp", AmbariConfig().getConfig())
     _, tmpoutfile = tempfile.mkstemp()
     _, tmpoutfile = tempfile.mkstemp()
     _, tmperrfile = tempfile.mkstemp()
     _, tmperrfile = tempfile.mkstemp()
+    _, tmpstroutfile = tempfile.mkstemp()
     PYTHON_TIMEOUT_SECONDS =  5
     PYTHON_TIMEOUT_SECONDS =  5
 
 
     def launch_python_subprocess_method(command, tmpout, tmperr):
     def launch_python_subprocess_method(command, tmpout, tmperr):
@@ -108,8 +110,9 @@ class TestPythonExecutor(TestCase):
     executor.runShellKillPgrp = runShellKillPgrp_method
     executor.runShellKillPgrp = runShellKillPgrp_method
     subproc_mock.returncode = 0
     subproc_mock.returncode = 0
     subproc_mock.should_finish_event.set()
     subproc_mock.should_finish_event.set()
-    result = executor.run_file("file", ["arg1", "arg2"], tmpoutfile, tmperrfile, PYTHON_TIMEOUT_SECONDS)
-    self.assertEquals(result, {'exitcode': 0, 'stderr': 'Dummy err', 'stdout': 'Dummy output'})
+    result = executor.run_file("file", ["arg1", "arg2"], tmpoutfile, tmperrfile, PYTHON_TIMEOUT_SECONDS, tmpstroutfile)
+    self.assertEquals(result, {'exitcode': 0, 'stderr': 'Dummy err', 'stdout': 'Dummy output',
+                               'structuredOut': {'msg': 'Unable to read structured output from ' + tmpstroutfile}})
 
 
 
 
   def test_is_successfull(self):
   def test_is_successfull(self):

+ 19 - 0
ambari-agent/src/test/python/resource_management/TestScript.py

@@ -82,6 +82,25 @@ class TestScript(TestCase):
     resource_dump = pprint.pformat(env.resource_list)
     resource_dump = pprint.pformat(env.resource_list)
     self.assertEqual(resource_dump, "[Package['hbase'], Package['yet-another-package']]")
     self.assertEqual(resource_dump, "[Package['hbase'], Package['yet-another-package']]")
 
 
+  @patch("__builtin__.open")
+  def test_structured_out(self, open_mock):
+    script = Script()
+    script.stroutfile = ''
+
+    self.assertEqual(Script.structuredOut, {})
+
+    script.put_structured_out({"1": "1"})
+    self.assertEqual(Script.structuredOut, {"1": "1"})
+    self.assertTrue(open_mock.called)
+
+    script.put_structured_out({"2": "2"})
+    self.assertEqual(open_mock.call_count, 2)
+    self.assertEqual(Script.structuredOut, {"1": "1", "2": "2"})
+
+    #Overriding
+    script.put_structured_out({"1": "3"})
+    self.assertEqual(open_mock.call_count, 3)
+    self.assertEqual(Script.structuredOut, {"1": "3", "2": "2"})
 
 
   def tearDown(self):
   def tearDown(self):
     # enable stdout
     # enable stdout

+ 21 - 18
ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java

@@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit;
 
 
 import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.Role;
 import org.apache.ambari.server.Role;
+import org.apache.ambari.server.RoleCommand;
 import org.apache.ambari.server.ServiceComponentNotFoundException;
 import org.apache.ambari.server.ServiceComponentNotFoundException;
 import org.apache.ambari.server.agent.ActionQueue;
 import org.apache.ambari.server.agent.ActionQueue;
 import org.apache.ambari.server.agent.CommandReport;
 import org.apache.ambari.server.agent.CommandReport;
@@ -420,24 +421,26 @@ class ActionScheduler implements Runnable {
     String roleStr = cmd.getRole().toString();
     String roleStr = cmd.getRole().toString();
     String hostname = cmd.getHostname();
     String hostname = cmd.getHostname();
     if (s.getStartTime(hostname, roleStr) < 0) {
     if (s.getStartTime(hostname, roleStr) < 0) {
-      try {
-        Cluster c = fsmObject.getCluster(s.getClusterName());
-        Service svc = c.getService(cmd.getServiceName());
-        ServiceComponent svcComp = svc.getServiceComponent(roleStr);
-        ServiceComponentHost svcCompHost =
-            svcComp.getServiceComponentHost(hostname);
-        svcCompHost.handleEvent(s.getFsmEvent(hostname, roleStr).getEvent());
-      } catch (ServiceComponentNotFoundException scnex) {
-        LOG.info("Not a service component, assuming its an action", scnex);
-      } catch (InvalidStateTransitionException e) {
-        LOG.info(
-            "Transition failed for host: " + hostname + ", role: "
-                + roleStr, e);
-        throw e;
-      } catch (AmbariException e) {
-        LOG.warn("Exception in fsm: " + hostname + ", role: " + roleStr,
-            e);
-        throw e;
+      if (RoleCommand.ACTIONEXECUTE != cmd.getRoleCommand()) {
+        try {
+          Cluster c = fsmObject.getCluster(s.getClusterName());
+          Service svc = c.getService(cmd.getServiceName());
+          ServiceComponent svcComp = svc.getServiceComponent(roleStr);
+          ServiceComponentHost svcCompHost =
+              svcComp.getServiceComponentHost(hostname);
+          svcCompHost.handleEvent(s.getFsmEvent(hostname, roleStr).getEvent());
+        } catch (ServiceComponentNotFoundException scnex) {
+          LOG.info("Not a service component, assuming its an action", scnex);
+        } catch (InvalidStateTransitionException e) {
+          LOG.info(
+              "Transition failed for host: " + hostname + ", role: "
+                  + roleStr, e);
+          throw e;
+        } catch (AmbariException e) {
+          LOG.warn("Exception in fsm: " + hostname + ", role: " + roleStr,
+              e);
+          throw e;
+        }
       }
       }
       s.setStartTime(hostname,roleStr, now);
       s.setStartTime(hostname,roleStr, now);
       s.setHostRoleStatus(hostname, roleStr, HostRoleStatus.QUEUED);
       s.setHostRoleStatus(hostname, roleStr, HostRoleStatus.QUEUED);

+ 1 - 1
ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java

@@ -193,7 +193,7 @@ public class HeartBeatHandler {
     List<CommandReport> reports = heartbeat.getReports();
     List<CommandReport> reports = heartbeat.getReports();
     for (CommandReport report : reports) {
     for (CommandReport report : reports) {
       LOG.debug("Received command report: " + report);
       LOG.debug("Received command report: " + report);
-      if (RoleCommand.ACTIONEXECUTE.equals(report.getRoleCommand())) {
+      if (RoleCommand.ACTIONEXECUTE.toString().equals(report.getRoleCommand())) {
         continue;
         continue;
       }
       }
 
 

+ 12 - 5
ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariActionExecutionHelper.java

@@ -41,10 +41,9 @@ import org.apache.ambari.server.utils.StageUtils;
 import org.slf4j.Logger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
 
 
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
+import java.util.*;
+
+import static org.apache.ambari.server.agent.ExecutionCommand.KeyNames.*;
 
 
 /**
 /**
  * Helper class containing logic to process custom action execution requests
  * Helper class containing logic to process custom action execution requests
@@ -58,6 +57,8 @@ public class AmbariActionExecutionHelper {
   private ActionManager actionManager;
   private ActionManager actionManager;
   private AmbariMetaInfo ambariMetaInfo;
   private AmbariMetaInfo ambariMetaInfo;
 
 
+  private static final String TYPE_PYTHON = "PYTHON";
+
   public AmbariActionExecutionHelper(ActionMetadata actionMetadata, Clusters clusters,
   public AmbariActionExecutionHelper(ActionMetadata actionMetadata, Clusters clusters,
                                      AmbariManagementControllerImpl amcImpl) {
                                      AmbariManagementControllerImpl amcImpl) {
     this.amcImpl = amcImpl;
     this.amcImpl = amcImpl;
@@ -272,6 +273,12 @@ public class AmbariActionExecutionHelper {
         configTags = amcImpl.findConfigurationTagsWithOverrides(cluster, hostName);
         configTags = amcImpl.findConfigurationTagsWithOverrides(cluster, hostName);
       }
       }
 
 
+      Map<String, String> commandParams = actionContext.getParameters();
+      commandParams.put(COMMAND_TIMEOUT, actionContext.getTimeout().toString());
+      commandParams.put(SCRIPT, actionName + ".py");
+      commandParams.put(SCRIPT_TYPE, TYPE_PYTHON);
+      commandParams.put(SCHEMA_VERSION, AmbariMetaInfo.SCHEMA_VERSION_2);
+
       ExecutionCommand execCmd = stage.getExecutionCommandWrapper(hostName,
       ExecutionCommand execCmd = stage.getExecutionCommandWrapper(hostName,
           actionContext.getActionName()).getExecutionCommand();
           actionContext.getActionName()).getExecutionCommand();
 
 
@@ -282,7 +289,7 @@ public class AmbariActionExecutionHelper {
       execCmd.setConfigurations(configurations);
       execCmd.setConfigurations(configurations);
       execCmd.setConfigurationTags(configTags);
       execCmd.setConfigurationTags(configTags);
       execCmd.setHostLevelParams(hostLevelParams);
       execCmd.setHostLevelParams(hostLevelParams);
-      execCmd.setCommandParams(actionContext.getParameters());
+      execCmd.setCommandParams(commandParams);
       execCmd.setServiceName(serviceName);
       execCmd.setServiceName(serviceName);
       execCmd.setComponentName(componentName);
       execCmd.setComponentName(componentName);
 
 

+ 63 - 0
ambari-server/src/main/resources/custom_actions/hdfs_rebalance.py

@@ -0,0 +1,63 @@
+#!/usr/bin/env python2.6
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+Ambari Agent
+
+"""
+
+from resource_management import *
+import os
+import json
+
+
+class HdfsRebalance(Script):
+  def actionexecute(self, env):
+
+    config = Script.get_config()
+
+    hdfs_user = config['configurations']['global']['hdfs_user']
+    conf_dir = config['configurations']['global']['hadoop_conf_dir']
+
+    security_enabled = config['configurations']['global']['security_enabled']
+    kinit_path_local = functions.get_kinit_path(
+      [default('kinit_path_local'), "/usr/bin", "/usr/kerberos/bin", "/usr/sbin"])
+
+    threshold = config['commandParams']['threshold']
+    principal = config['commandParams']['principal']
+    keytab = config['commandParams']['keytab']
+
+    if security_enabled:
+      Execute(format("{kinit_path_local}  -kt {keytab} {principal}"))
+
+    ExecuteHadoop(format('balancer -threshold {threshold}'),
+      user=hdfs_user,
+      conf_dir=conf_dir
+    )
+
+    structured_output_example = {
+      'user' : hdfs_user,
+      'conf_dir' : conf_dir,
+      'principal' : principal,
+      'keytab' : keytab
+      }
+
+    self.put_structured_out(structured_output_example)
+
+if __name__ == "__main__":
+
+  HdfsRebalance().execute()