Browse Source

Revert "AMBARI-5065 Rolling restart should also handle clients on the same machine as the restarting component (dsen)"

This reverts commit 45de7eff7c4587c72a8982ed0942f08105db7c3a.
Mahadev Konar 11 năm trước cách đây
mục cha
commit
b612522f94

+ 4 - 4
ambari-agent/src/main/python/ambari_agent/ActionQueue.py

@@ -28,6 +28,7 @@ import os
 from LiveStatus import LiveStatus
 from shell import shellRunner
 import PuppetExecutor
+import PythonExecutor
 from ActualConfigHandler import ActualConfigHandler
 from CommandStatusDict import CommandStatusDict
 from CustomServiceOrchestrator import CustomServiceOrchestrator
@@ -74,7 +75,6 @@ class ActionQueue(threading.Thread):
     self.config = config
     self.controller = controller
     self.sh = shellRunner()
-    self.configTags = {}
     self._stop = threading.Event()
     self.tmpdir = config.get('agent', 'prefix')
     self.customServiceOrchestrator = CustomServiceOrchestrator(config,
@@ -214,7 +214,7 @@ class ActionQueue(threading.Thread):
       roleResult['structuredOut'] = ''
     # let ambari know that configuration tags were applied
     if status == self.COMPLETED_STATUS:
-      configHandler = ActualConfigHandler(self.config, self.configTags)
+      configHandler = ActualConfigHandler(self.config)
       if command.has_key('configurationTags'):
         configHandler.write_actual(command['configurationTags'])
         roleResult['configurationTags'] = command['configurationTags']
@@ -226,7 +226,7 @@ class ActionQueue(threading.Thread):
         (command['roleCommand'] == self.ROLE_COMMAND_CUSTOM_COMMAND and \
         command['hostLevelParams'].has_key('custom_command') and \
         command['hostLevelParams']['custom_command'] == self.CUSTOM_COMMAND_RESTART)):
-        configHandler.write_actual_component(command['role'], command['configurationTags'])
+        configHandler.copy_to_component(command['role'])
         roleResult['configurationTags'] = configHandler.read_actual_component(command['role'])
     self.commandStatuses.put_command_status(command, roleResult)
 
@@ -248,7 +248,7 @@ class ActionQueue(threading.Thread):
       command_format = self.determine_command_format_version(command)
 
       livestatus = LiveStatus(cluster, service, component,
-                              globalConfig, self.config, self.configTags)
+                              globalConfig, self.config)
       component_status = None
       if command_format == self.COMMAND_FORMAT_V2:
         # For custom services, responsibility to determine service status is

+ 17 - 36
ambari-agent/src/main/python/ambari_agent/ActualConfigHandler.py

@@ -21,16 +21,15 @@ limitations under the License.
 import json
 import logging
 import os
-import LiveStatus
+import shutil
 
 logger = logging.getLogger()
 
 class ActualConfigHandler:
   CONFIG_NAME = 'config.json'
 
-  def __init__(self, config, configTags):
-    self.config = config
-    self.configTags = configTags
+  def __init__(self, config):
+    self.config = config;
 
   def findRunDir(self):
     runDir = '/var/run/ambari-agent'
@@ -40,36 +39,19 @@ class ActualConfigHandler:
       runDir = '/tmp'
     return runDir
 
-  def write_actual(self, tags):
-    self.write_file(self.CONFIG_NAME, tags)
-
-  def write_actual_component(self, component, tags):
-    self.configTags[component] = tags
-    filename = component + "_" + self.CONFIG_NAME
-    self.write_file(filename, tags)
-    self.write_client_components(tags)
-
-  def write_client_components(self, tags):
-    for comp in LiveStatus.LiveStatus.CLIENT_COMPONENTS:
-      componentName = comp['componentName']
-      if componentName in self.configTags.keys():
-        tags_updated = False
-        for config_name in tags.keys():
-          if config_name in self.configTags[componentName] and \
-            tags[config_name] != self.configTags[componentName][config_name]:
-            self.configTags[componentName][config_name] = tags[config_name]
-            tags_updated = True
-        if tags_updated:
-          filename = componentName + "_" + self.CONFIG_NAME
-          self.write_file(filename, self.configTags[componentName])
-    pass
-
-  def write_file(self, filename, tags):
+  def write_actual(self, configTags):
     runDir = self.findRunDir()
-    conf_file = open(os.path.join(runDir, filename), 'w')
-    json.dump(tags, conf_file)
+    conf_file = open(os.path.join(runDir, self.CONFIG_NAME), 'w')
+    json.dump(configTags, conf_file)
     conf_file.close()
 
+  def copy_to_component(self, componentName):
+    runDir = self.findRunDir()
+    srcfile = os.path.join(runDir, self.CONFIG_NAME)
+    if os.path.isfile(srcfile):
+      dstfile = os.path.join(runDir, componentName + "_" + self.CONFIG_NAME)
+      shutil.copy(srcfile, dstfile)
+
   def read_file(self, filename):
     runDir = self.findRunDir()
     fullname = os.path.join(runDir, filename)
@@ -92,8 +74,7 @@ class ActualConfigHandler:
   def read_actual(self):
     return self.read_file(self.CONFIG_NAME)
 
-  def read_actual_component(self, component):
-    if component not in self.configTags.keys():
-      self.configTags[component] = \
-        self.read_file(component + "_" + self.CONFIG_NAME)
-    return self.configTags[component]
+  def read_actual_component(self, componentName):
+    return self.read_file(componentName + "_" + self.CONFIG_NAME)
+    
+

+ 3 - 5
ambari-agent/src/main/python/ambari_agent/LiveStatus.py

@@ -151,16 +151,14 @@ class LiveStatus:
   LIVE_STATUS = "STARTED"
   DEAD_STATUS = "INSTALLED"
 
-  def __init__(self, cluster, service, component, globalConfig, config,
-               configTags):
+  def __init__(self, cluster, service, component, globalConfig, config):
     self.cluster = cluster
     self.service = service
     self.component = component
     self.globalConfig = globalConfig
     versionsFileDir = config.get('agent', 'prefix')
     self.versionsHandler = StackVersionsFileHandler(versionsFileDir)
-    self.configTags = configTags
-    self.actualConfigHandler = ActualConfigHandler(config, configTags)
+    self.actualConfigHandler = ActualConfigHandler(config)
 
   def belongsToService(self, component):
     #TODO: Should also check belonging of server to cluster
@@ -196,7 +194,7 @@ class LiveStatus:
                  "stackVersion": self.versionsHandler.
                  read_stack_version(self.component)
     }
-    active_config = self.actualConfigHandler.read_actual_component(self.component) #
+    active_config = self.actualConfigHandler.read_actual_component(self.component)
     if not active_config is None:
       livestatus['configurationTags'] = active_config
 

+ 6 - 67
ambari-agent/src/test/python/ambari_agent/TestActualConfigHandler.py

@@ -23,7 +23,7 @@ from ambari_agent.AmbariConfig import AmbariConfig
 from ambari_agent.ActualConfigHandler import ActualConfigHandler
 import os
 import logging
-from mock.mock import patch
+import json
 
 class TestActualConfigHandler(TestCase):
 
@@ -33,9 +33,9 @@ class TestActualConfigHandler(TestCase):
     config = AmbariConfig().getConfig()
     tmpdir = tempfile.gettempdir()
     config.set('agent', 'prefix', tmpdir)
-
+    handler = ActualConfigHandler(config)
+    
     tags = { "global": "version1", "core-site": "version2" }
-    handler = ActualConfigHandler(config, tags)
     handler.write_actual(tags)
     output = handler.read_actual()
     self.assertEquals(tags, output)
@@ -45,7 +45,7 @@ class TestActualConfigHandler(TestCase):
     config = AmbariConfig().getConfig()
     tmpdir = tempfile.gettempdir()
     config.set('agent', 'prefix', tmpdir)
-    handler = ActualConfigHandler(config, {})
+    handler = ActualConfigHandler(config)
 
     conf_file = open(os.path.join(tmpdir, ActualConfigHandler.CONFIG_NAME), 'w')
     conf_file.write("")
@@ -59,11 +59,11 @@ class TestActualConfigHandler(TestCase):
     config = AmbariConfig().getConfig()
     tmpdir = tempfile.gettempdir()
     config.set('agent', 'prefix', tmpdir)
+    handler = ActualConfigHandler(config)
 
     tags1 = { "global": "version1", "core-site": "version2" }
-    handler = ActualConfigHandler(config, {})
     handler.write_actual(tags1)
-    handler.write_actual_component('FOO', tags1)
+    handler.copy_to_component('FOO')
 
     output1 = handler.read_actual_component('FOO')
     output2 = handler.read_actual_component('GOO') 
@@ -80,64 +80,3 @@ class TestActualConfigHandler(TestCase):
     self.assertEquals(tags1, output4)
     os.remove(os.path.join(tmpdir, "FOO_" + ActualConfigHandler.CONFIG_NAME))
     os.remove(os.path.join(tmpdir, ActualConfigHandler.CONFIG_NAME))
-
-  def test_write_actual_component(self):
-    config = AmbariConfig().getConfig()
-    tmpdir = tempfile.gettempdir()
-    config.set('agent', 'prefix', tmpdir)
-
-    tags1 = { "global": "version1", "core-site": "version2" }
-    tags2 = { "global": "version33", "core-site": "version33" }
-    handler = ActualConfigHandler(config, {})
-    handler.write_actual_component('HDFS_CLIENT', tags1)
-    handler.write_actual_component('HBASE_CLIENT', tags1)
-    self.assertEquals(tags1, handler.read_actual_component('HDFS_CLIENT'))
-    self.assertEquals(tags1, handler.read_actual_component('HBASE_CLIENT'))
-    handler.write_actual_component('DATANODE', tags2)
-    self.assertEquals(tags2, handler.read_actual_component('DATANODE'))
-    self.assertEquals(tags2, handler.read_actual_component('HDFS_CLIENT'))
-
-    os.remove(os.path.join(tmpdir, "DATANODE_" + ActualConfigHandler.CONFIG_NAME))
-    os.remove(os.path.join(tmpdir, "HBASE_CLIENT_" + ActualConfigHandler.CONFIG_NAME))
-    os.remove(os.path.join(tmpdir, "HDFS_CLIENT_" + ActualConfigHandler.CONFIG_NAME))
-
-  @patch.object(ActualConfigHandler, "write_file")
-  def test_write_client_components(self, write_file_mock):
-    config = AmbariConfig().getConfig()
-    tmpdir = tempfile.gettempdir()
-    config.set('agent', 'prefix', tmpdir)
-
-    tags0 = {"global": "version0", "core-site": "version0"}
-    tags1 = {"global": "version1", "core-site": "version2"}
-    tags2 = {"global": "version33", "core-site": "version33"}
-    configTags = {'HDFS_CLIENT': tags0, 'HBASE_CLIENT': tags1}
-    handler = ActualConfigHandler(config, configTags)
-    self.assertEquals(tags0, handler.read_actual_component('HDFS_CLIENT'))
-    self.assertEquals(tags1, handler.read_actual_component('HBASE_CLIENT'))
-    handler.write_client_components(tags2)
-    self.assertEquals(tags2, handler.read_actual_component('HDFS_CLIENT'))
-    self.assertEquals(tags2, handler.read_actual_component('HBASE_CLIENT'))
-    self.assertTrue(write_file_mock.called)
-    self.assertEqual(2, write_file_mock.call_count)
-
-  @patch.object(ActualConfigHandler, "write_file")
-  @patch.object(ActualConfigHandler, "read_file")
-  def test_read_actual_component_inmemory(self, read_file_mock, write_file_mock):
-    config = AmbariConfig().getConfig()
-    tmpdir = tempfile.gettempdir()
-    config.set('agent', 'prefix', tmpdir)
-
-    tags1 = { "global": "version1", "core-site": "version2" }
-    read_file_mock.return_value = tags1
-
-    handler = ActualConfigHandler(config, {})
-
-    handler.write_actual_component('NAMENODE', tags1)
-    self.assertTrue(write_file_mock.called)
-    self.assertEquals(tags1, handler.read_actual_component('NAMENODE'))
-    self.assertFalse(read_file_mock.called)
-    self.assertEquals(tags1, handler.read_actual_component('DATANODE'))
-    self.assertTrue(read_file_mock.called)
-    self.assertEquals(1, read_file_mock.call_count)
-    self.assertEquals(tags1, handler.read_actual_component('DATANODE'))
-    self.assertEquals(1, read_file_mock.call_count)

+ 8 - 7
ambari-agent/src/test/python/ambari_agent/TestLiveStatus.py

@@ -21,9 +21,10 @@ limitations under the License.
 from unittest import TestCase
 from ambari_agent.LiveStatus import LiveStatus
 from ambari_agent.AmbariConfig import AmbariConfig
+import socket
 import os, sys, StringIO
 from ambari_agent import ActualConfigHandler
-from mock.mock import patch
+from mock.mock import patch, MagicMock, call
 import pprint
 from ambari_agent import StatusCheck
 
@@ -45,7 +46,7 @@ class TestLiveStatus(TestCase):
     for component in LiveStatus.COMPONENTS:
       config = AmbariConfig().getConfig()
       config.set('agent', 'prefix', "ambari_agent" + os.sep + "dummy_files")
-      livestatus = LiveStatus('', component['serviceName'], component['componentName'], {}, config, {})
+      livestatus = LiveStatus('', component['serviceName'], component['componentName'], {}, config)
       livestatus.versionsHandler.versionsFilePath = "ambari_agent" + os.sep + "dummy_files" + os.sep + "dummy_current_stack"
       result = livestatus.build()
       print "LiveStatus of {0}: {1}".format(component['serviceName'], str(result))
@@ -56,23 +57,23 @@ class TestLiveStatus(TestCase):
 
     # Test build status for CLIENT component (in LiveStatus.CLIENT_COMPONENTS)
     read_actual_component_mock.return_value = "some tags"
-    livestatus = LiveStatus('c1', 'HDFS', 'HDFS_CLIENT', { }, config, {})
+    livestatus = LiveStatus('c1', 'HDFS', 'HDFS_CLIENT', { }, config)
     result = livestatus.build()
     self.assertTrue(len(result) > 0, 'Livestatus should not be empty')
     self.assertTrue(result.has_key('configurationTags'))
     # Test build status with forsed_component_status
     ## Alive
-    livestatus = LiveStatus('c1', 'HDFS', 'HDFS_CLIENT', { }, config, {})
+    livestatus = LiveStatus('c1', 'HDFS', 'HDFS_CLIENT', { }, config)
     result = livestatus.build(forsed_component_status = LiveStatus.LIVE_STATUS)
     self.assertTrue(len(result) > 0, 'Livestatus should not be empty')
     self.assertTrue(result['status'], LiveStatus.LIVE_STATUS)
     ## Dead
-    livestatus = LiveStatus('c1', 'HDFS', 'HDFS_CLIENT', { }, config, {})
+    livestatus = LiveStatus('c1', 'HDFS', 'HDFS_CLIENT', { }, config)
     result = livestatus.build(forsed_component_status = LiveStatus.DEAD_STATUS)
     self.assertTrue(len(result) > 0, 'Livestatus should not be empty')
     self.assertTrue(result['status'], LiveStatus.DEAD_STATUS)
 
-    livestatus = LiveStatus('c1', 'TEZ', 'TEZ_CLIENT', { }, config, {})
+    livestatus = LiveStatus('c1', 'TEZ', 'TEZ_CLIENT', { }, config)
     result = livestatus.build(forsed_component_status = LiveStatus.LIVE_STATUS)
     self.assertTrue(len(result) > 0, 'Livestatus should not be empty')
     self.assertTrue(result['status'], LiveStatus.LIVE_STATUS)
@@ -88,7 +89,7 @@ class TestLiveStatus(TestCase):
     config = AmbariConfig().getConfig()
     config.set('agent', 'prefix', "ambari_agent" + os.sep + "dummy_files")
     livestatus = LiveStatus('', 'SOME_UNKNOWN_SERVICE',
-                            'SOME_UNKNOWN_COMPONENT', {}, config, {})
+                            'SOME_UNKNOWN_COMPONENT', {}, config)
     livestatus.versionsHandler.versionsFilePath = "ambari_agent" + \
                       os.sep + "dummy_files" + os.sep + "dummy_current_stack"
     result = livestatus.build(forsed_component_status = "STARTED")