Sfoglia il codice sorgente

AMBARI-10083 - Ambari Agent Alerts Prevents Binding to the Ping Port Listener On Startup (jonathanhurley)

Jonathan Hurley 10 anni fa
parent
commit
fcb18659be

+ 45 - 34
ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py

@@ -26,6 +26,8 @@ import logging
 import os
 import os
 import sys
 import sys
 import time
 import time
+import atexit
+
 from apscheduler.scheduler import Scheduler
 from apscheduler.scheduler import Scheduler
 from alerts.collector import AlertCollector
 from alerts.collector import AlertCollector
 from alerts.metric_alert import MetricAlert
 from alerts.metric_alert import MetricAlert
@@ -35,7 +37,6 @@ from alerts.web_alert import WebAlert
 
 
 logger = logging.getLogger()
 logger = logging.getLogger()
 
 
-
 class AlertSchedulerHandler():
 class AlertSchedulerHandler():
   FILENAME = 'definitions.json'
   FILENAME = 'definitions.json'
   TYPE_PORT = 'PORT'
   TYPE_PORT = 'PORT'
@@ -49,7 +50,6 @@ class AlertSchedulerHandler():
     'standalone': False
     'standalone': False
   }
   }
 
 
-
   def __init__(self, cachedir, stacks_dir, common_services_dir, host_scripts_dir,
   def __init__(self, cachedir, stacks_dir, common_services_dir, host_scripts_dir,
       cluster_configuration, config, in_minutes=True):
       cluster_configuration, config, in_minutes=True):
 
 
@@ -71,6 +71,16 @@ class AlertSchedulerHandler():
     self.__in_minutes = in_minutes
     self.__in_minutes = in_minutes
     self.config = config
     self.config = config
 
 
+    # register python exit handler
+    atexit.register(self.exit_handler)
+
+
+  def exit_handler(self):
+    """
+    Exit handler
+    """
+    self.stop()
+
 
 
   def update_definitions(self, heartbeat):
   def update_definitions(self, heartbeat):
     """
     """
@@ -116,12 +126,12 @@ class AlertSchedulerHandler():
       self.__scheduler = Scheduler(AlertSchedulerHandler.APS_CONFIG)
       self.__scheduler = Scheduler(AlertSchedulerHandler.APS_CONFIG)
 
 
     alert_callables = self.__load_definitions()
     alert_callables = self.__load_definitions()
-      
+
     # schedule each definition
     # schedule each definition
     for _callable in alert_callables:
     for _callable in alert_callables:
       self.schedule_definition(_callable)
       self.schedule_definition(_callable)
-      
-    logger.debug("[AlertScheduler] Starting {0}; currently running: {1}".format(
+
+    logger.info("[AlertScheduler] Starting {0}; currently running: {1}".format(
       str(self.__scheduler), str(self.__scheduler.running)))
       str(self.__scheduler), str(self.__scheduler.running)))
 
 
     self.__scheduler.start()
     self.__scheduler.start()
@@ -132,22 +142,23 @@ class AlertSchedulerHandler():
       self.__scheduler.shutdown(wait=False)
       self.__scheduler.shutdown(wait=False)
       self.__scheduler = Scheduler(AlertSchedulerHandler.APS_CONFIG)
       self.__scheduler = Scheduler(AlertSchedulerHandler.APS_CONFIG)
 
 
+    logger.info("[AlertScheduler] Stopped the alert scheduler.")
 
 
   def reschedule(self):
   def reschedule(self):
     """
     """
-    Removes jobs that are scheduled where their UUID no longer is valid. 
+    Removes jobs that are scheduled where their UUID no longer is valid.
     Schedules jobs where the definition UUID is not currently scheduled.
     Schedules jobs where the definition UUID is not currently scheduled.
     """
     """
     jobs_scheduled = 0
     jobs_scheduled = 0
     jobs_removed = 0
     jobs_removed = 0
-    
+
     definitions = self.__load_definitions()
     definitions = self.__load_definitions()
     scheduled_jobs = self.__scheduler.get_jobs()
     scheduled_jobs = self.__scheduler.get_jobs()
-    
+
     # for every scheduled job, see if its UUID is still valid
     # for every scheduled job, see if its UUID is still valid
     for scheduled_job in scheduled_jobs:
     for scheduled_job in scheduled_jobs:
       uuid_valid = False
       uuid_valid = False
-      
+
       for definition in definitions:
       for definition in definitions:
         definition_uuid = definition.get_uuid()
         definition_uuid = definition.get_uuid()
         if scheduled_job.name == definition_uuid:
         if scheduled_job.name == definition_uuid:
@@ -160,7 +171,7 @@ class AlertSchedulerHandler():
         logger.info("[AlertScheduler] Unscheduling {0}".format(scheduled_job.name))
         logger.info("[AlertScheduler] Unscheduling {0}".format(scheduled_job.name))
         self._collector.remove_by_uuid(scheduled_job.name)
         self._collector.remove_by_uuid(scheduled_job.name)
         self.__scheduler.unschedule_job(scheduled_job)
         self.__scheduler.unschedule_job(scheduled_job)
-      
+
     # for every definition, determine if there is a scheduled job
     # for every definition, determine if there is a scheduled job
     for definition in definitions:
     for definition in definitions:
       definition_scheduled = False
       definition_scheduled = False
@@ -169,12 +180,12 @@ class AlertSchedulerHandler():
         if definition_uuid == scheduled_job.name:
         if definition_uuid == scheduled_job.name:
           definition_scheduled = True
           definition_scheduled = True
           break
           break
-      
+
       # if no jobs are found with the definitions UUID, schedule it
       # if no jobs are found with the definitions UUID, schedule it
       if definition_scheduled == False:
       if definition_scheduled == False:
         jobs_scheduled += 1
         jobs_scheduled += 1
         self.schedule_definition(definition)
         self.schedule_definition(definition)
-  
+
     logger.info("[AlertScheduler] Reschedule Summary: {0} rescheduled, {1} unscheduled".format(
     logger.info("[AlertScheduler] Reschedule Summary: {0} rescheduled, {1} unscheduled".format(
         str(jobs_scheduled), str(jobs_removed)))
         str(jobs_scheduled), str(jobs_removed)))
 
 
@@ -209,7 +220,7 @@ class AlertSchedulerHandler():
   def collector(self):
   def collector(self):
     """ gets the collector for reporting to the server """
     """ gets the collector for reporting to the server """
     return self._collector
     return self._collector
-  
+
 
 
   def __load_definitions(self):
   def __load_definitions(self):
     """
     """
@@ -218,7 +229,7 @@ class AlertSchedulerHandler():
     :return:
     :return:
     """
     """
     definitions = []
     definitions = []
-    
+
     all_commands = None
     all_commands = None
     alerts_definitions_path = os.path.join(self.cachedir, self.FILENAME)
     alerts_definitions_path = os.path.join(self.cachedir, self.FILENAME)
     try:
     try:
@@ -227,21 +238,21 @@ class AlertSchedulerHandler():
     except:
     except:
       logger.warning('[AlertScheduler] {0} not found or invalid. No alerts will be scheduled until registration occurs.'.format(alerts_definitions_path))
       logger.warning('[AlertScheduler] {0} not found or invalid. No alerts will be scheduled until registration occurs.'.format(alerts_definitions_path))
       return definitions
       return definitions
-    
+
     for command_json in all_commands:
     for command_json in all_commands:
       clusterName = '' if not 'clusterName' in command_json else command_json['clusterName']
       clusterName = '' if not 'clusterName' in command_json else command_json['clusterName']
       hostName = '' if not 'hostName' in command_json else command_json['hostName']
       hostName = '' if not 'hostName' in command_json else command_json['hostName']
 
 
       for definition in command_json['alertDefinitions']:
       for definition in command_json['alertDefinitions']:
         alert = self.__json_to_callable(clusterName, hostName, definition)
         alert = self.__json_to_callable(clusterName, hostName, definition)
-        
+
         if alert is None:
         if alert is None:
           continue
           continue
-          
+
         alert.set_helpers(self._collector, self._cluster_configuration)
         alert.set_helpers(self._collector, self._cluster_configuration)
 
 
         definitions.append(alert)
         definitions.append(alert)
-      
+
     return definitions
     return definitions
 
 
 
 
@@ -255,7 +266,7 @@ class AlertSchedulerHandler():
 
 
     if logger.isEnabledFor(logging.DEBUG):
     if logger.isEnabledFor(logging.DEBUG):
       logger.debug("[AlertScheduler] Creating job type {0} with {1}".format(source_type, str(json_definition)))
       logger.debug("[AlertScheduler] Creating job type {0} with {1}".format(source_type, str(json_definition)))
-    
+
     alert = None
     alert = None
 
 
     if source_type == AlertSchedulerHandler.TYPE_METRIC:
     if source_type == AlertSchedulerHandler.TYPE_METRIC:
@@ -289,7 +300,7 @@ class AlertSchedulerHandler():
       logger.info("[AlertScheduler] The alert {0} with UUID {1} is disabled and will not be scheduled".format(
       logger.info("[AlertScheduler] The alert {0} with UUID {1} is disabled and will not be scheduled".format(
           definition.get_name(),definition.get_uuid()))
           definition.get_name(),definition.get_uuid()))
       return
       return
-    
+
     job = None
     job = None
 
 
     if self.__in_minutes:
     if self.__in_minutes:
@@ -298,15 +309,15 @@ class AlertSchedulerHandler():
     else:
     else:
       job = self.__scheduler.add_interval_job(self.__make_function(definition),
       job = self.__scheduler.add_interval_job(self.__make_function(definition),
         seconds=definition.interval())
         seconds=definition.interval())
-    
-    # although the documentation states that Job(kwargs) takes a name 
+
+    # although the documentation states that Job(kwargs) takes a name
     # key/value pair, it does not actually set the name; do it manually
     # key/value pair, it does not actually set the name; do it manually
     if job is not None:
     if job is not None:
       job.name = definition.get_uuid()
       job.name = definition.get_uuid()
-      
+
     logger.info("[AlertScheduler] Scheduling {0} with UUID {1}".format(
     logger.info("[AlertScheduler] Scheduling {0} with UUID {1}".format(
       definition.get_name(), definition.get_uuid()))
       definition.get_name(), definition.get_uuid()))
-  
+
 
 
   def get_job_count(self):
   def get_job_count(self):
     """
     """
@@ -315,10 +326,10 @@ class AlertSchedulerHandler():
     """
     """
     if self.__scheduler is None:
     if self.__scheduler is None:
       return 0
       return 0
-    
+
     return len(self.__scheduler.get_jobs())
     return len(self.__scheduler.get_jobs())
 
 
-  
+
   def execute_alert(self, execution_commands):
   def execute_alert(self, execution_commands):
     """
     """
     Executes an alert immediately, ignoring any scheduled jobs. The existing
     Executes an alert immediately, ignoring any scheduled jobs. The existing
@@ -331,18 +342,18 @@ class AlertSchedulerHandler():
     for execution_command in execution_commands:
     for execution_command in execution_commands:
       try:
       try:
         alert_definition = execution_command['alertDefinition']
         alert_definition = execution_command['alertDefinition']
-        
+
         clusterName = '' if not 'clusterName' in execution_command else execution_command['clusterName']
         clusterName = '' if not 'clusterName' in execution_command else execution_command['clusterName']
-        hostName = '' if not 'hostName' in execution_command else execution_command['hostName']      
-        
+        hostName = '' if not 'hostName' in execution_command else execution_command['hostName']
+
         alert = self.__json_to_callable(clusterName, hostName, alert_definition)
         alert = self.__json_to_callable(clusterName, hostName, alert_definition)
-  
+
         if alert is None:
         if alert is None:
           continue
           continue
-  
+
         logger.info("[AlertScheduler] Executing on-demand alert {0} ({1})".format(alert.get_name(),
         logger.info("[AlertScheduler] Executing on-demand alert {0} ({1})".format(alert.get_name(),
             alert.get_uuid()))
             alert.get_uuid()))
-        
+
         alert.set_helpers(self._collector, self._cluster_configuration)
         alert.set_helpers(self._collector, self._cluster_configuration)
         alert.collect()
         alert.collect()
       except:
       except:
@@ -357,7 +368,7 @@ def main():
     logger.setLevel(logging.DEBUG)
     logger.setLevel(logging.DEBUG)
   except TypeError:
   except TypeError:
     logger.setLevel(12)
     logger.setLevel(12)
-    
+
   ch = logging.StreamHandler()
   ch = logging.StreamHandler()
   ch.setLevel(logger.level)
   ch.setLevel(logger.level)
   logger.addHandler(ch)
   logger.addHandler(ch)
@@ -376,7 +387,7 @@ def main():
   print str(ash.collector().alerts())
   print str(ash.collector().alerts())
       
       
   ash.stop()
   ash.stop()
-    
+
 if __name__ == "__main__":
 if __name__ == "__main__":
   main()
   main()
   
   

+ 3 - 14
ambari-agent/src/main/python/ambari_agent/Controller.py

@@ -102,7 +102,6 @@ class Controller(threading.Thread):
 
 
   def __del__(self):
   def __del__(self):
     logger.info("Server connection disconnected.")
     logger.info("Server connection disconnected.")
-    pass
 
 
   def registerWithServer(self):
   def registerWithServer(self):
     """
     """
@@ -151,7 +150,6 @@ class Controller(threading.Thread):
         if 'statusCommands' in ret.keys():
         if 'statusCommands' in ret.keys():
           logger.info("Got status commands on registration.")
           logger.info("Got status commands on registration.")
           self.addToStatusQueue(ret['statusCommands'])
           self.addToStatusQueue(ret['statusCommands'])
-          pass
         else:
         else:
           self.hasMappedComponents = False
           self.hasMappedComponents = False
 
 
@@ -170,7 +168,7 @@ class Controller(threading.Thread):
         logger.error("Unable to connect to: " + self.registerUrl, exc_info=True)
         logger.error("Unable to connect to: " + self.registerUrl, exc_info=True)
         """ Sleeping for {0} seconds and then retrying again """.format(delay)
         """ Sleeping for {0} seconds and then retrying again """.format(delay)
         time.sleep(delay)
         time.sleep(delay)
-      pass
+
     return ret
     return ret
 
 
   def cancelCommandInQueue(self, commands):
   def cancelCommandInQueue(self, commands):
@@ -180,8 +178,6 @@ class Controller(threading.Thread):
         self.actionQueue.cancel(commands)
         self.actionQueue.cancel(commands)
       except Exception, err:
       except Exception, err:
         logger.error("Exception occurred on commands cancel: %s", err.message)
         logger.error("Exception occurred on commands cancel: %s", err.message)
-        pass
-    pass
 
 
   def addToQueue(self, commands):
   def addToQueue(self, commands):
     """Add to the queue for running the commands """
     """Add to the queue for running the commands """
@@ -192,7 +188,6 @@ class Controller(threading.Thread):
     else:
     else:
       """Only add to the queue if not empty list """
       """Only add to the queue if not empty list """
       self.actionQueue.put(commands)
       self.actionQueue.put(commands)
-    pass
 
 
   def addToStatusQueue(self, commands):
   def addToStatusQueue(self, commands):
     if not commands:
     if not commands:
@@ -201,7 +196,6 @@ class Controller(threading.Thread):
       if not LiveStatus.SERVICES:
       if not LiveStatus.SERVICES:
         self.updateComponents(commands[0]['clusterName'])
         self.updateComponents(commands[0]['clusterName'])
       self.actionQueue.put_status(commands)
       self.actionQueue.put_status(commands)
-    pass
 
 
   # For testing purposes
   # For testing purposes
   DEBUG_HEARTBEAT_RETRIES = 0
   DEBUG_HEARTBEAT_RETRIES = 0
@@ -223,7 +217,6 @@ class Controller(threading.Thread):
         if not retry:
         if not retry:
           data = json.dumps(
           data = json.dumps(
               self.heartbeat.build(self.responseId, int(hb_interval), self.hasMappedComponents))
               self.heartbeat.build(self.responseId, int(hb_interval), self.hasMappedComponents))
-          pass
         else:
         else:
           self.DEBUG_HEARTBEAT_RETRIES += 1
           self.DEBUG_HEARTBEAT_RETRIES += 1
 
 
@@ -333,7 +326,6 @@ class Controller(threading.Thread):
         # Stop loop when stop event received
         # Stop loop when stop event received
         logger.info("Stop event received")
         logger.info("Stop event received")
         self.DEBUG_STOP_HEARTBEATING=True
         self.DEBUG_STOP_HEARTBEATING=True
-    pass
 
 
   def run(self):
   def run(self):
     self.actionQueue = ActionQueue(self.config, controller=self)
     self.actionQueue = ActionQueue(self.config, controller=self)
@@ -350,8 +342,6 @@ class Controller(threading.Thread):
       if not self.repeatRegistration:
       if not self.repeatRegistration:
         break
         break
 
 
-    pass
-
   def registerAndHeartbeat(self):
   def registerAndHeartbeat(self):
     registerResponse = self.registerWithServer()
     registerResponse = self.registerWithServer()
     message = registerResponse['response']
     message = registerResponse['response']
@@ -373,8 +363,8 @@ class Controller(threading.Thread):
       self.heartbeatWithServer()
       self.heartbeatWithServer()
 
 
   def restartAgent(self):
   def restartAgent(self):
-    os._exit(AGENT_AUTO_RESTART_EXIT_CODE)
-    pass
+    sys.exit(AGENT_AUTO_RESTART_EXIT_CODE)
+
 
 
   def sendRequest(self, url, data):
   def sendRequest(self, url, data):
     response = None
     response = None
@@ -411,7 +401,6 @@ class Controller(threading.Thread):
     logger.debug("LiveStatus.SERVICES" + str(LiveStatus.SERVICES))
     logger.debug("LiveStatus.SERVICES" + str(LiveStatus.SERVICES))
     logger.debug("LiveStatus.CLIENT_COMPONENTS" + str(LiveStatus.CLIENT_COMPONENTS))
     logger.debug("LiveStatus.CLIENT_COMPONENTS" + str(LiveStatus.CLIENT_COMPONENTS))
     logger.debug("LiveStatus.COMPONENTS" + str(LiveStatus.COMPONENTS))
     logger.debug("LiveStatus.COMPONENTS" + str(LiveStatus.COMPONENTS))
-    pass
 
 
 def main(argv=None):
 def main(argv=None):
   # Allow Ctrl-C
   # Allow Ctrl-C

+ 1 - 5
ambari-agent/src/main/python/ambari_agent/ProcessHelper.py

@@ -35,7 +35,6 @@ pidfile = os.path.join(piddir, "ambari-agent.pid")
 
 
 
 
 def _clean():
 def _clean():
-
   logger.info("Removing pid file")
   logger.info("Removing pid file")
   try:
   try:
     os.unlink(pidfile)
     os.unlink(pidfile)
@@ -54,14 +53,11 @@ def _clean():
 
 
 
 
 def stopAgent():
 def stopAgent():
-
   _clean()
   _clean()
-  os._exit(0)
-  pass
+  sys.exit(0)
 
 
 
 
 def restartAgent():
 def restartAgent():
-
   _clean()
   _clean()
 
 
   executable = sys.executable
   executable = sys.executable

+ 4 - 4
ambari-agent/src/main/python/ambari_agent/main.py

@@ -158,7 +158,7 @@ def stop_agent():
     time.sleep(5)
     time.sleep(5)
     if os.path.exists(ProcessHelper.pidfile):
     if os.path.exists(ProcessHelper.pidfile):
       raise Exception("PID file still exists.")
       raise Exception("PID file still exists.")
-    os._exit(0)
+    sys.exit(0)
   except Exception, err:
   except Exception, err:
     if pid == -1:
     if pid == -1:
       print ("Agent process is not running")
       print ("Agent process is not running")
@@ -166,7 +166,7 @@ def stop_agent():
       res = runner.run([AMBARI_SUDO_BINARY, 'kill', '-9', str(pid)])
       res = runner.run([AMBARI_SUDO_BINARY, 'kill', '-9', str(pid)])
       if res['exitCode'] != 0:
       if res['exitCode'] != 0:
         raise Exception("Error while performing agent stop. " + res['error'] + res['output'])
         raise Exception("Error while performing agent stop. " + res['error'] + res['output'])
-    os._exit(1)
+    sys.exit(1)
 
 
 def reset_agent(options):
 def reset_agent(options):
   try:
   try:
@@ -191,9 +191,9 @@ def reset_agent(options):
         os.rmdir(os.path.join(root, name))
         os.rmdir(os.path.join(root, name))
   except Exception, err:
   except Exception, err:
     print("A problem occurred while trying to reset the agent: " + str(err))
     print("A problem occurred while trying to reset the agent: " + str(err))
-    os._exit(1)
+    sys.exit(1)
 
 
-  os._exit(0)
+  sys.exit(0)
 
 
 # event - event, that will be passed to Controller and NetUtil to make able to interrupt loops form outside process
 # event - event, that will be passed to Controller and NetUtil to make able to interrupt loops form outside process
 # we need this for windows os, where no sigterm available
 # we need this for windows os, where no sigterm available

+ 19 - 13
ambari-agent/src/test/python/ambari_agent/TestMain.py

@@ -25,8 +25,8 @@ import signal
 import os
 import os
 import socket
 import socket
 import tempfile
 import tempfile
-import platform
 import ConfigParser
 import ConfigParser
+
 from ambari_commons import OSCheck
 from ambari_commons import OSCheck
 from only_for_platform import only_for_platform, get_platform, PLATFORM_WINDOWS, PLATFORM_LINUX
 from only_for_platform import only_for_platform, get_platform, PLATFORM_WINDOWS, PLATFORM_LINUX
 from mock.mock import MagicMock, patch, ANY, Mock
 from mock.mock import MagicMock, patch, ANY, Mock
@@ -62,16 +62,16 @@ class TestMain(unittest.TestCase):
 
 
   @only_for_platform(PLATFORM_LINUX)
   @only_for_platform(PLATFORM_LINUX)
   @patch("ambari_agent.HeartbeatHandlers.HeartbeatStopHandlersLinux")
   @patch("ambari_agent.HeartbeatHandlers.HeartbeatStopHandlersLinux")
-  @patch("os._exit")
+  @patch("sys.exit")
   @patch("os.getpid")
   @patch("os.getpid")
   @patch.object(ProcessHelper, "stopAgent")
   @patch.object(ProcessHelper, "stopAgent")
-  def test_signal_handler(self, stopAgent_mock, os_getpid_mock, os_exit_mock, heartbeat_handler_mock):
+  def test_signal_handler(self, stopAgent_mock, os_getpid_mock, sys_exit_mock, heartbeat_handler_mock):
     # testing exit of children
     # testing exit of children
     main.agentPid = 4444
     main.agentPid = 4444
     os_getpid_mock.return_value = 5555
     os_getpid_mock.return_value = 5555
     HeartbeatHandlers.signal_handler("signum", "frame")
     HeartbeatHandlers.signal_handler("signum", "frame")
     heartbeat_handler_mock.set_stop.assert_called()
     heartbeat_handler_mock.set_stop.assert_called()
-    os_exit_mock.reset_mock()
+    sys_exit_mock.reset_mock()
 
 
     # testing exit of main process
     # testing exit of main process
     os_getpid_mock.return_value = main.agentPid
     os_getpid_mock.return_value = main.agentPid
@@ -202,9 +202,9 @@ class TestMain(unittest.TestCase):
   @only_for_platform(PLATFORM_LINUX)
   @only_for_platform(PLATFORM_LINUX)
   @patch("time.sleep")
   @patch("time.sleep")
   @patch.object(shellRunner,"run")
   @patch.object(shellRunner,"run")
-  @patch("os._exit")
+  @patch("sys.exit")
   @patch("os.path.exists")
   @patch("os.path.exists")
-  def test_daemonize_and_stop(self, exists_mock, _exit_mock, kill_mock, sleep_mock):
+  def test_daemonize_and_stop(self, exists_mock, sys_exit_mock, kill_mock, sleep_mock):
     oldpid = ProcessHelper.pidfile
     oldpid = ProcessHelper.pidfile
     pid = str(os.getpid())
     pid = str(os.getpid())
     _, tmpoutfile = tempfile.mkstemp()
     _, tmpoutfile = tempfile.mkstemp()
@@ -220,11 +220,11 @@ class TestMain(unittest.TestCase):
     exists_mock.return_value = False
     exists_mock.return_value = False
     main.stop_agent()
     main.stop_agent()
     kill_mock.assert_called_with(['ambari-sudo.sh', 'kill', '-15', pid])
     kill_mock.assert_called_with(['ambari-sudo.sh', 'kill', '-15', pid])
-    _exit_mock.assert_called_with(0)
+    sys_exit_mock.assert_called_with(0)
 
 
     # Restore
     # Restore
     kill_mock.reset_mock()
     kill_mock.reset_mock()
-    _exit_mock.reset_mock()
+    sys_exit_mock.reset_mock()
     kill_mock.return_value = {'exitCode': 0, 'output': 'out', 'error': 'err'}
     kill_mock.return_value = {'exitCode': 0, 'output': 'out', 'error': 'err'}
 
 
     # Testing exit when failed to remove pid file
     # Testing exit when failed to remove pid file
@@ -232,7 +232,7 @@ class TestMain(unittest.TestCase):
     main.stop_agent()
     main.stop_agent()
     kill_mock.assert_any_call(['ambari-sudo.sh', 'kill', '-15', pid])
     kill_mock.assert_any_call(['ambari-sudo.sh', 'kill', '-15', pid])
     kill_mock.assert_any_call(['ambari-sudo.sh', 'kill', '-9', pid])
     kill_mock.assert_any_call(['ambari-sudo.sh', 'kill', '-9', pid])
-    _exit_mock.assert_called_with(1)
+    sys_exit_mock.assert_called_with(1)
 
 
     # Restore
     # Restore
     ProcessHelper.pidfile = oldpid
     ProcessHelper.pidfile = oldpid
@@ -242,10 +242,10 @@ class TestMain(unittest.TestCase):
   @patch("os.path.join")
   @patch("os.path.join")
   @patch('__builtin__.open')
   @patch('__builtin__.open')
   @patch.object(ConfigParser, "ConfigParser")
   @patch.object(ConfigParser, "ConfigParser")
-  @patch("os._exit")
+  @patch("sys.exit")
   @patch("os.walk")
   @patch("os.walk")
   @patch("os.remove")
   @patch("os.remove")
-  def test_reset(self, os_remove_mock, os_walk_mock, os_exit_mock, config_parser_mock, open_mock, os_path_join_mock, os_rmdir_mock):
+  def test_reset(self, os_remove_mock, os_walk_mock, sys_exit_mock, config_parser_mock, open_mock, os_path_join_mock, os_rmdir_mock):
     # Agent config update
     # Agent config update
     config_mock = MagicMock()
     config_mock = MagicMock()
     os_walk_mock.return_value = [('/', ('',), ('file1.txt', 'file2.txt'))]
     os_walk_mock.return_value = [('/', ('',), ('file1.txt', 'file2.txt'))]
@@ -256,14 +256,18 @@ class TestMain(unittest.TestCase):
     self.assertEqual(config_mock.set.call_count, 1)
     self.assertEqual(config_mock.set.call_count, 1)
     self.assertEqual(os_remove_mock.call_count, 2)
     self.assertEqual(os_remove_mock.call_count, 2)
 
 
+    self.assertTrue(sys_exit_mock.called)
+
   @patch("os.rmdir")
   @patch("os.rmdir")
   @patch("os.path.join")
   @patch("os.path.join")
   @patch('__builtin__.open')
   @patch('__builtin__.open')
   @patch.object(ConfigParser, "ConfigParser")
   @patch.object(ConfigParser, "ConfigParser")
-  @patch("os._exit")
+  @patch("sys.exit")
   @patch("os.walk")
   @patch("os.walk")
   @patch("os.remove")
   @patch("os.remove")
-  def test_reset_invalid_path(self, os_remove_mock, os_walk_mock, os_exit_mock, config_parser_mock, open_mock, os_path_join_mock, os_rmdir_mock):
+  def test_reset_invalid_path(self, os_remove_mock, os_walk_mock, sys_exit_mock,
+      config_parser_mock, open_mock, os_path_join_mock, os_rmdir_mock):
+
     # Agent config file cannot be accessed
     # Agent config file cannot be accessed
     config_mock = MagicMock()
     config_mock = MagicMock()
     os_walk_mock.return_value = [('/', ('',), ('file1.txt', 'file2.txt'))]
     os_walk_mock.return_value = [('/', ('',), ('file1.txt', 'file2.txt'))]
@@ -276,6 +280,8 @@ class TestMain(unittest.TestCase):
     except:
     except:
       self.assertTrue(True)
       self.assertTrue(True)
 
 
+    self.assertTrue(sys_exit_mock.called)
+
 
 
   @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
   @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
   @patch.object(socket, "gethostbyname")
   @patch.object(socket, "gethostbyname")

+ 27 - 27
ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/alerts/alert_hive_metastore.py

@@ -20,12 +20,13 @@ limitations under the License.
 
 
 import socket
 import socket
 import time
 import time
+
 from resource_management.libraries.functions import format
 from resource_management.libraries.functions import format
 from resource_management.libraries.functions import get_kinit_path
 from resource_management.libraries.functions import get_kinit_path
 from resource_management.core.resources import Execute
 from resource_management.core.resources import Execute
 
 
-OK_MESSAGE = "Metastore OK - %.4f response"
-CRITICAL_MESSAGE = "Connection to metastore failed on host {0}"
+OK_MESSAGE = "Metastore OK - Hive command took {0:.3f}s"
+CRITICAL_MESSAGE = "Metastore on {0} failed ({1})"
 
 
 SECURITY_ENABLED_KEY = '{{cluster-env/security_enabled}}'
 SECURITY_ENABLED_KEY = '{{cluster-env/security_enabled}}'
 SMOKEUSER_KEYTAB_KEY = '{{cluster-env/smokeuser_keytab}}'
 SMOKEUSER_KEYTAB_KEY = '{{cluster-env/smokeuser_keytab}}'
@@ -72,18 +73,20 @@ def execute(parameters=None, host_name=None):
 
 
   result_code = None
   result_code = None
 
 
-  if security_enabled:
-    smokeuser_keytab = SMOKEUSER_KEYTAB_DEFAULT
-    if SMOKEUSER_KEYTAB_KEY in parameters:
-      smokeuser_keytab = parameters[SMOKEUSER_KEYTAB_KEY]
-    kinit_path_local = get_kinit_path()
-    kinitcmd=format("{kinit_path_local} -kt {smokeuser_keytab} {smokeuser}; ")
-    Execute(kinitcmd,
-            user=smokeuser,
-            path=["/bin/", "/usr/bin/", "/usr/lib/hive/bin/", "/usr/sbin/"],
-    )
-
   try:
   try:
+    if security_enabled:
+      smokeuser_keytab = SMOKEUSER_KEYTAB_DEFAULT
+
+      if SMOKEUSER_KEYTAB_KEY in parameters:
+        smokeuser_keytab = parameters[SMOKEUSER_KEYTAB_KEY]
+
+      kinit_path_local = get_kinit_path()
+      kinitcmd=format("{kinit_path_local} -kt {smokeuser_keytab} {smokeuser}; ")
+
+      Execute(kinitcmd, user=smokeuser,
+        path=["/bin/", "/usr/bin/", "/usr/lib/hive/bin/", "/usr/sbin/"],
+        timeout=10)
+
     if host_name is None:
     if host_name is None:
       host_name = socket.getfqdn()
       host_name = socket.getfqdn()
 
 
@@ -92,24 +95,21 @@ def execute(parameters=None, host_name=None):
         metastore_uri = uri
         metastore_uri = uri
 
 
     cmd = format("hive --hiveconf hive.metastore.uris={metastore_uri} -e 'show databases;'")
     cmd = format("hive --hiveconf hive.metastore.uris={metastore_uri} -e 'show databases;'")
+
     start_time = time.time()
     start_time = time.time()
+
     try:
     try:
-      Execute(cmd,
-              user=smokeuser,
-              path=["/bin/", "/usr/bin/", "/usr/lib/hive/bin/", "/usr/sbin/"],
-              timeout=240
-      )
-      is_metastore_ok = True
-    except:
-      is_metastore_ok = False
-
-    if is_metastore_ok == True:
-      result_code = 'OK'
+      Execute(cmd, user=smokeuser,
+        path=["/bin/", "/usr/bin/", "/usr/lib/hive/bin/", "/usr/sbin/"],
+        timeout=30 )
+
       total_time = time.time() - start_time
       total_time = time.time() - start_time
-      label = OK_MESSAGE % (total_time)
-    else:
+
+      result_code = 'OK'
+      label = OK_MESSAGE.format(total_time)
+    except Exception, exception:
       result_code = 'CRITICAL'
       result_code = 'CRITICAL'
-      label = CRITICAL_MESSAGE.format(host_name)
+      label = CRITICAL_MESSAGE.format(host_name, exception.message)
 
 
   except Exception, e:
   except Exception, e:
     label = str(e)
     label = str(e)

+ 4 - 2
ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/alerts/alert_hive_thrift_port.py

@@ -110,8 +110,10 @@ def execute(parameters=None, host_name=None):
 
 
     start_time = time.time()
     start_time = time.time()
     try:
     try:
-      hive_check.check_thrift_port_sasl(host_name, port, hive_server2_authentication,
-                                        hive_server_principal, kinitcmd, smokeuser, transport_mode = transport_mode)
+      hive_check.check_thrift_port_sasl(host_name, port,
+        hive_server2_authentication, hive_server_principal, kinitcmd, smokeuser,
+        transport_mode = transport_mode)
+
       is_thrift_port_ok = True
       is_thrift_port_ok = True
     except:
     except:
       is_thrift_port_ok = False
       is_thrift_port_ok = False