浏览代码

AMBARI-5065 Rolling restart should also handle clients on the same machine as the restarting component (dsen)

Dmitry Sen 11 年之前
父节点
当前提交
37434acae1

+ 5 - 4
ambari-agent/src/main/python/ambari_agent/ActionQueue.py

@@ -28,7 +28,6 @@ import os
 from LiveStatus import LiveStatus
 from shell import shellRunner
 import PuppetExecutor
-import PythonExecutor
 from ActualConfigHandler import ActualConfigHandler
 from CommandStatusDict import CommandStatusDict
 from CustomServiceOrchestrator import CustomServiceOrchestrator
@@ -75,6 +74,7 @@ class ActionQueue(threading.Thread):
     self.config = config
     self.controller = controller
     self.sh = shellRunner()
+    self.configTags = {}
     self._stop = threading.Event()
     self.tmpdir = config.get('agent', 'prefix')
     self.customServiceOrchestrator = CustomServiceOrchestrator(config,
@@ -214,7 +214,7 @@ class ActionQueue(threading.Thread):
       roleResult['structuredOut'] = ''
     # let ambari know that configuration tags were applied
     if status == self.COMPLETED_STATUS:
-      configHandler = ActualConfigHandler(self.config)
+      configHandler = ActualConfigHandler(self.config, self.configTags)
       if command.has_key('configurationTags'):
         configHandler.write_actual(command['configurationTags'])
         roleResult['configurationTags'] = command['configurationTags']
@@ -226,7 +226,8 @@ class ActionQueue(threading.Thread):
         (command['roleCommand'] == self.ROLE_COMMAND_CUSTOM_COMMAND and \
         command['hostLevelParams'].has_key('custom_command') and \
         command['hostLevelParams']['custom_command'] == self.CUSTOM_COMMAND_RESTART)):
-        configHandler.copy_to_component(command['role'])
+        configHandler.write_actual_component(command['role'], command['configurationTags'])
+        configHandler.write_client_components(command['serviceName'], command['configurationTags'])
         roleResult['configurationTags'] = configHandler.read_actual_component(command['role'])
     self.commandStatuses.put_command_status(command, roleResult)
 
@@ -248,7 +249,7 @@ class ActionQueue(threading.Thread):
       command_format = self.determine_command_format_version(command)
 
       livestatus = LiveStatus(cluster, service, component,
-                              globalConfig, self.config)
+                              globalConfig, self.config, self.configTags)
       component_status = None
       if command_format == self.COMMAND_FORMAT_V2:
         # For custom services, responsibility to determine service status is

+ 28 - 16
ambari-agent/src/main/python/ambari_agent/ActualConfigHandler.py

@@ -21,15 +21,16 @@ limitations under the License.
 import json
 import logging
 import os
-import shutil
+import LiveStatus
 
 logger = logging.getLogger()
 
 class ActualConfigHandler:
   CONFIG_NAME = 'config.json'
 
-  def __init__(self, config):
-    self.config = config;
+  def __init__(self, config, configTags):
+    self.config = config
+    self.configTags = configTags
 
   def findRunDir(self):
     runDir = '/var/run/ambari-agent'
@@ -39,18 +40,28 @@ class ActualConfigHandler:
       runDir = '/tmp'
     return runDir
 
-  def write_actual(self, configTags):
-    runDir = self.findRunDir()
-    conf_file = open(os.path.join(runDir, self.CONFIG_NAME), 'w')
-    json.dump(configTags, conf_file)
-    conf_file.close()
+  def write_actual(self, tags):
+    self.write_file(self.CONFIG_NAME, tags)
+
+  def write_actual_component(self, component, tags):
+    self.configTags[component] = tags
+    filename = component + "_" + self.CONFIG_NAME
+    self.write_file(filename, tags)
 
-  def copy_to_component(self, componentName):
+  def write_client_components(self, serviceName, tags):
+    for comp in LiveStatus.LiveStatus.CLIENT_COMPONENTS:
+      if comp['serviceName'] == serviceName:
+        componentName = comp['componentName']
+        if componentName in self.configTags and \
+            tags != self.configTags[componentName]:
+          self.write_actual_component(componentName, tags)
+    pass
+
+  def write_file(self, filename, tags):
     runDir = self.findRunDir()
-    srcfile = os.path.join(runDir, self.CONFIG_NAME)
-    if os.path.isfile(srcfile):
-      dstfile = os.path.join(runDir, componentName + "_" + self.CONFIG_NAME)
-      shutil.copy(srcfile, dstfile)
+    conf_file = open(os.path.join(runDir, filename), 'w')
+    json.dump(tags, conf_file)
+    conf_file.close()
 
   def read_file(self, filename):
     runDir = self.findRunDir()
@@ -75,6 +86,7 @@ class ActualConfigHandler:
     return self.read_file(self.CONFIG_NAME)
 
   def read_actual_component(self, componentName):
-    return self.read_file(componentName + "_" + self.CONFIG_NAME)
-    
-
+    if componentName not in self.configTags.keys():
+      self.configTags[componentName] = \
+        self.read_file(componentName + "_" + self.CONFIG_NAME)
+    return self.configTags[componentName]

+ 4 - 2
ambari-agent/src/main/python/ambari_agent/LiveStatus.py

@@ -151,14 +151,16 @@ class LiveStatus:
   LIVE_STATUS = "STARTED"
   DEAD_STATUS = "INSTALLED"
 
-  def __init__(self, cluster, service, component, globalConfig, config):
+  def __init__(self, cluster, service, component, globalConfig, config,
+               configTags):
     self.cluster = cluster
     self.service = service
     self.component = component
     self.globalConfig = globalConfig
     versionsFileDir = config.get('agent', 'prefix')
     self.versionsHandler = StackVersionsFileHandler(versionsFileDir)
-    self.actualConfigHandler = ActualConfigHandler(config)
+    self.configTags = configTags
+    self.actualConfigHandler = ActualConfigHandler(config, configTags)
 
   def belongsToService(self, component):
     #TODO: Should also check belonging of server to cluster

+ 70 - 7
ambari-agent/src/test/python/ambari_agent/TestActualConfigHandler.py

@@ -23,7 +23,7 @@ from ambari_agent.AmbariConfig import AmbariConfig
 from ambari_agent.ActualConfigHandler import ActualConfigHandler
 import os
 import logging
-import json
+from mock.mock import patch
 
 class TestActualConfigHandler(TestCase):
 
@@ -33,9 +33,9 @@ class TestActualConfigHandler(TestCase):
     config = AmbariConfig().getConfig()
     tmpdir = tempfile.gettempdir()
     config.set('agent', 'prefix', tmpdir)
-    handler = ActualConfigHandler(config)
-    
+
     tags = { "global": "version1", "core-site": "version2" }
+    handler = ActualConfigHandler(config, tags)
     handler.write_actual(tags)
     output = handler.read_actual()
     self.assertEquals(tags, output)
@@ -45,7 +45,7 @@ class TestActualConfigHandler(TestCase):
     config = AmbariConfig().getConfig()
     tmpdir = tempfile.gettempdir()
     config.set('agent', 'prefix', tmpdir)
-    handler = ActualConfigHandler(config)
+    handler = ActualConfigHandler(config, {})
 
     conf_file = open(os.path.join(tmpdir, ActualConfigHandler.CONFIG_NAME), 'w')
     conf_file.write("")
@@ -59,14 +59,14 @@ class TestActualConfigHandler(TestCase):
     config = AmbariConfig().getConfig()
     tmpdir = tempfile.gettempdir()
     config.set('agent', 'prefix', tmpdir)
-    handler = ActualConfigHandler(config)
 
     tags1 = { "global": "version1", "core-site": "version2" }
+    handler = ActualConfigHandler(config, {})
     handler.write_actual(tags1)
-    handler.copy_to_component('FOO')
+    handler.write_actual_component('FOO', tags1)
 
     output1 = handler.read_actual_component('FOO')
-    output2 = handler.read_actual_component('GOO') 
+    output2 = handler.read_actual_component('GOO')
 
     self.assertEquals(tags1, output1)
     self.assertEquals(None, output2)
@@ -80,3 +80,66 @@ class TestActualConfigHandler(TestCase):
     self.assertEquals(tags1, output4)
     os.remove(os.path.join(tmpdir, "FOO_" + ActualConfigHandler.CONFIG_NAME))
     os.remove(os.path.join(tmpdir, ActualConfigHandler.CONFIG_NAME))
+
+  def test_write_actual_component_and_client_components(self):
+    config = AmbariConfig().getConfig()
+    tmpdir = tempfile.gettempdir()
+    config.set('agent', 'prefix', tmpdir)
+
+    tags1 = { "global": "version1", "core-site": "version2" }
+    tags2 = { "global": "version33", "core-site": "version33" }
+    handler = ActualConfigHandler(config, {})
+    handler.write_actual_component('HDFS_CLIENT', tags1)
+    handler.write_actual_component('HBASE_CLIENT', tags1)
+    self.assertEquals(tags1, handler.read_actual_component('HDFS_CLIENT'))
+    self.assertEquals(tags1, handler.read_actual_component('HBASE_CLIENT'))
+    handler.write_actual_component('DATANODE', tags2)
+    self.assertEquals(tags2, handler.read_actual_component('DATANODE'))
+    self.assertEquals(tags1, handler.read_actual_component('HDFS_CLIENT'))
+    handler.write_client_components('HDFS', tags2)
+    self.assertEquals(tags2, handler.read_actual_component('HDFS_CLIENT'))
+
+    os.remove(os.path.join(tmpdir, "DATANODE_" + ActualConfigHandler.CONFIG_NAME))
+    os.remove(os.path.join(tmpdir, "HBASE_CLIENT_" + ActualConfigHandler.CONFIG_NAME))
+    os.remove(os.path.join(tmpdir, "HDFS_CLIENT_" + ActualConfigHandler.CONFIG_NAME))
+
+  @patch.object(ActualConfigHandler, "write_file")
+  def test_write_client_components(self, write_file_mock):
+    config = AmbariConfig().getConfig()
+    tmpdir = tempfile.gettempdir()
+    config.set('agent', 'prefix', tmpdir)
+
+    tags0 = {"global": "version0", "core-site": "version0"}
+    tags1 = {"global": "version1", "core-site": "version2"}
+    tags2 = {"global": "version33", "core-site": "version33"}
+    configTags = {'HDFS_CLIENT': tags0, 'HBASE_CLIENT': tags1}
+    handler = ActualConfigHandler(config, configTags)
+    self.assertEquals(tags0, handler.read_actual_component('HDFS_CLIENT'))
+    self.assertEquals(tags1, handler.read_actual_component('HBASE_CLIENT'))
+    handler.write_client_components('HDFS', tags2)
+    self.assertEquals(tags2, handler.read_actual_component('HDFS_CLIENT'))
+    self.assertEquals(tags1, handler.read_actual_component('HBASE_CLIENT'))
+    self.assertTrue(write_file_mock.called)
+    self.assertEqual(1, write_file_mock.call_count)
+
+  @patch.object(ActualConfigHandler, "write_file")
+  @patch.object(ActualConfigHandler, "read_file")
+  def test_read_actual_component_inmemory(self, read_file_mock, write_file_mock):
+    config = AmbariConfig().getConfig()
+    tmpdir = tempfile.gettempdir()
+    config.set('agent', 'prefix', tmpdir)
+
+    tags1 = { "global": "version1", "core-site": "version2" }
+    read_file_mock.return_value = tags1
+
+    handler = ActualConfigHandler(config, {})
+
+    handler.write_actual_component('NAMENODE', tags1)
+    self.assertTrue(write_file_mock.called)
+    self.assertEquals(tags1, handler.read_actual_component('NAMENODE'))
+    self.assertFalse(read_file_mock.called)
+    self.assertEquals(tags1, handler.read_actual_component('DATANODE'))
+    self.assertTrue(read_file_mock.called)
+    self.assertEquals(1, read_file_mock.call_count)
+    self.assertEquals(tags1, handler.read_actual_component('DATANODE'))
+    self.assertEquals(1, read_file_mock.call_count)

+ 7 - 8
ambari-agent/src/test/python/ambari_agent/TestLiveStatus.py

@@ -21,10 +21,9 @@ limitations under the License.
 from unittest import TestCase
 from ambari_agent.LiveStatus import LiveStatus
 from ambari_agent.AmbariConfig import AmbariConfig
-import socket
 import os, sys, StringIO
 from ambari_agent import ActualConfigHandler
-from mock.mock import patch, MagicMock, call
+from mock.mock import patch
 import pprint
 from ambari_agent import StatusCheck
 
@@ -46,7 +45,7 @@ class TestLiveStatus(TestCase):
     for component in LiveStatus.COMPONENTS:
       config = AmbariConfig().getConfig()
       config.set('agent', 'prefix', "ambari_agent" + os.sep + "dummy_files")
-      livestatus = LiveStatus('', component['serviceName'], component['componentName'], {}, config)
+      livestatus = LiveStatus('', component['serviceName'], component['componentName'], {}, config, {})
       livestatus.versionsHandler.versionsFilePath = "ambari_agent" + os.sep + "dummy_files" + os.sep + "dummy_current_stack"
       result = livestatus.build()
       print "LiveStatus of {0}: {1}".format(component['serviceName'], str(result))
@@ -57,23 +56,23 @@ class TestLiveStatus(TestCase):
 
     # Test build status for CLIENT component (in LiveStatus.CLIENT_COMPONENTS)
     read_actual_component_mock.return_value = "some tags"
-    livestatus = LiveStatus('c1', 'HDFS', 'HDFS_CLIENT', { }, config)
+    livestatus = LiveStatus('c1', 'HDFS', 'HDFS_CLIENT', { }, config, {})
     result = livestatus.build()
     self.assertTrue(len(result) > 0, 'Livestatus should not be empty')
     self.assertTrue(result.has_key('configurationTags'))
     # Test build status with forsed_component_status
     ## Alive
-    livestatus = LiveStatus('c1', 'HDFS', 'HDFS_CLIENT', { }, config)
+    livestatus = LiveStatus('c1', 'HDFS', 'HDFS_CLIENT', { }, config, {})
     result = livestatus.build(forsed_component_status = LiveStatus.LIVE_STATUS)
     self.assertTrue(len(result) > 0, 'Livestatus should not be empty')
     self.assertTrue(result['status'], LiveStatus.LIVE_STATUS)
     ## Dead
-    livestatus = LiveStatus('c1', 'HDFS', 'HDFS_CLIENT', { }, config)
+    livestatus = LiveStatus('c1', 'HDFS', 'HDFS_CLIENT', { }, config, {})
     result = livestatus.build(forsed_component_status = LiveStatus.DEAD_STATUS)
     self.assertTrue(len(result) > 0, 'Livestatus should not be empty')
     self.assertTrue(result['status'], LiveStatus.DEAD_STATUS)
 
-    livestatus = LiveStatus('c1', 'TEZ', 'TEZ_CLIENT', { }, config)
+    livestatus = LiveStatus('c1', 'TEZ', 'TEZ_CLIENT', { }, config, {})
     result = livestatus.build(forsed_component_status = LiveStatus.LIVE_STATUS)
     self.assertTrue(len(result) > 0, 'Livestatus should not be empty')
     self.assertTrue(result['status'], LiveStatus.LIVE_STATUS)
@@ -89,7 +88,7 @@ class TestLiveStatus(TestCase):
     config = AmbariConfig().getConfig()
     config.set('agent', 'prefix', "ambari_agent" + os.sep + "dummy_files")
     livestatus = LiveStatus('', 'SOME_UNKNOWN_SERVICE',
-                            'SOME_UNKNOWN_COMPONENT', {}, config)
+                            'SOME_UNKNOWN_COMPONENT', {}, config, {})
     livestatus.versionsHandler.versionsFilePath = "ambari_agent" + \
                       os.sep + "dummy_files" + os.sep + "dummy_current_stack"
     result = livestatus.build(forsed_component_status = "STARTED")