Forráskód Böngészése

AMBARI-13652. Stop-and-Start Upgrade: Handle Atlas and Kafka in all upgrade paths and downgrades (Dmytro Grinenko via ncole)

Nate Cole 9 éve
szülő
commit
32e64a3fd8

+ 10 - 1
ambari-server/src/main/resources/common-services/ATLAS/0.1.0.2.3/package/scripts/atlas_client.py

@@ -31,11 +31,20 @@ class AtlasClient(Script):
   def get_stack_to_component(self):
     return {"HDP": "atlas-client"}
 
+  # ToDo: currently hdp-select doesn't contain atlas-client, uncomment this block when
+  # ToDo: atlas-client will be available
+  # def pre_upgrade_restart(self, env, upgrade_type=None):
+  #   import params
+  #   env.set_params(params)
+  #
+  #   if params.version and compare_versions(format_hdp_stack_version(params.version), '2.3.0.0') >= 0:
+  #     conf_select.select(params.stack_name, "atlas", params.version)
+  #     hdp_select.select("atlas-client", params.version)
+
   def install(self, env):
     self.install_packages(env)
     self.configure(env)
 
-
   def configure(self, env):
     import params
     env.set_params(params)

+ 13 - 8
ambari-server/src/main/resources/common-services/ATLAS/0.1.0.2.3/package/scripts/metadata_server.py

@@ -17,8 +17,11 @@ limitations under the License.
 
 """
 from metadata import metadata
+from resource_management.libraries.functions import conf_select
+from resource_management.libraries.functions import hdp_select
 from resource_management import Execute, check_process_status, Script
 from resource_management.libraries.functions import format
+from resource_management.libraries.functions.version import compare_versions, format_hdp_stack_version
 from resource_management.libraries.functions.security_commons import build_expectations, \
   get_params_from_filesystem, validate_security_config_properties, \
   FILE_TYPE_PROPERTIES
@@ -36,12 +39,15 @@ class MetadataServer(Script):
     env.set_params(params)
     metadata()
 
-  # def pre_rolling_restart(self, env):
-  #   import params
-  #   env.set_params(params)
-  #   upgrade.prestart(env, "metadata-server")
-  #
-  def start(self, env, rolling_restart=False):
+  def pre_upgrade_restart(self, env, upgrade_type=None):
+    import params
+    env.set_params(params)
+
+    if params.version and compare_versions(format_hdp_stack_version(params.version), '2.3.0.0') >= 0:
+      # conf_select.select(params.stack_name, "atlas", params.version)
+      hdp_select.select("atlas-server", params.version)
+
+  def start(self, env, upgrade_type=None):
     import params
     env.set_params(params)
     self.configure(env)
@@ -53,7 +59,7 @@ class MetadataServer(Script):
             not_if=no_op_test
     )
 
-  def stop(self, env, rolling_restart=False):
+  def stop(self, env, upgrade_type=None):
     import params
     env.set_params(params)
     daemon_cmd = format('source {params.conf_dir}/atlas-env.sh; {params.metadata_stop_script}')
@@ -62,7 +68,6 @@ class MetadataServer(Script):
     )
     Execute (format("rm -f {params.pid_file}"))
 
-
   def status(self, env):
     import status_params
     env.set_params(status_params)

+ 5 - 0
ambari-server/src/main/resources/common-services/ATLAS/0.1.0.2.3/package/scripts/params.py

@@ -31,6 +31,11 @@ config = Script.get_config()
 # security enabled
 security_enabled = status_params.security_enabled
 
+stack_name = default("/hostLevelParams/stack_name", None)
+
+# New Cluster Stack Version that is defined during the RESTART of a Stack Upgrade
+version = default("/commandParams/version", None)
+
 # hdp version
 stack_version_unformatted = str(config['hostLevelParams']['stack_version'])
 hdp_stack_version = format_hdp_stack_version(stack_version_unformatted)

+ 3 - 3
ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka_broker.py

@@ -37,12 +37,12 @@ class KafkaBroker(Script):
     env.set_params(params)
     kafka()
 
-  def pre_rolling_restart(self, env):
+  def pre_upgrade_restart(self, env, upgrade_type=None):
     import params
     env.set_params(params)
     upgrade.prestart(env, "kafka-broker")
 
-  def start(self, env, rolling_restart=False):
+  def start(self, env, upgrade_type=None):
     import params
     env.set_params(params)
     self.configure(env)
@@ -55,7 +55,7 @@ class KafkaBroker(Script):
             not_if=no_op_test
     )
 
-  def stop(self, env, rolling_restart=False):
+  def stop(self, env, upgrade_type=None):
     import params
     env.set_params(params)
     daemon_cmd = format('source {params.conf_dir}/kafka-env.sh; {params.kafka_bin} stop')

+ 6 - 0
ambari-server/src/main/resources/stacks/HDP/2.2/upgrades/nonrolling-upgrade-2.3.xml

@@ -479,6 +479,12 @@
         <task xsi:type="configure" id="hdp_2_3_0_0_nimbus_remove_deprecated_ranger_properties"/>
       </execute-stage>
 
+      <!-- KAFKA  -->
+
+      <execute-stage service="KAFKA" component="KAFKA_BROKER" title="Apply config changes for Kafka">
+        <task xsi:type="configure" id ="hdp_2_3_0_0_kafka_broker_listeners"/>
+      </execute-stage>
+
     </group>
 
     <!-- Now, restart all of the services. -->

+ 12 - 0
ambari-server/src/main/resources/stacks/HDP/2.3/upgrades/config-upgrade.xml

@@ -815,6 +815,18 @@
         </changes>
       </component>
     </service>
+
+
+    <service name="KAFKA">
+      <component name="KAFKA_BROKER">
+        <changes>
+          <definition xsi:type="configure" id="hdp_2_3_0_0_kafka_broker_listeners">
+            <type>kafka-broker</type>
+            <set key="listeners" value="PLAINTEXT://localhost:6667"/>
+          </definition>
+        </changes>
+      </component>
+    </service>
   </services>
 
 </upgrade-config-changes>

+ 21 - 0
ambari-server/src/main/resources/stacks/HDP/2.3/upgrades/nonrolling-upgrade-2.3.xml

@@ -58,6 +58,10 @@
       <skippable>true</skippable>
       <service-check>false</service-check>
 
+      <service name="ATLAS">
+        <component>ATLAS_SERVER</component>
+      </service>
+
       <service name="FLUME">
         <component>FLUME_HANDLER</component>
       </service>
@@ -392,6 +396,14 @@
       </service>
     </group>
 
+    <group xsi:type="restart" name="ATLAS" title="Atlas">
+      <service-check>false</service-check>
+      <skippable>true</skippable>
+      <service name="ATLAS">
+        <component>ATLAS_SERVER</component>
+      </service>
+    </group>
+
     <!-- Upgrade Oozie DB only on Upgrade direction, and always create a new ShareLib. -->
     <group name="Upgrade Oozie" title="Upgrade Oozie Database">
       <direction>UPGRADE</direction>
@@ -888,5 +900,14 @@
         </upgrade>
       </component>
     </service>
+
+    <service name="ATLAS">
+      <component name="ATLAS_SERVER">
+        <upgrade>
+          <task xsi:type="restart-task"/>
+        </upgrade>
+      </component>
+
+    </service>
   </processing>
 </upgrade>

+ 4 - 4
ambari-server/src/test/python/stacks/2.2/KAFKA/test_kafka_broker.py

@@ -100,7 +100,7 @@ class TestKafkaBroker(RMFTestCase):
     self.assertTrue(islink_mock.called)
     self.assertTrue(realpath_mock.called)
 
-  def test_pre_rolling_restart(self):
+  def test_pre_upgrade_restart(self):
     config_file = self.get_src_folder()+"/test/python/stacks/2.2/configs/default.json"
     with open(config_file, "r") as f:
       json_content = json.load(f)
@@ -108,7 +108,7 @@ class TestKafkaBroker(RMFTestCase):
     json_content['commandParams']['version'] = version
     self.executeScript(self.COMMON_SERVICES_PACKAGE_DIR + "/scripts/kafka_broker.py",
                        classname = "KafkaBroker",
-                       command = "pre_rolling_restart",
+                       command = "pre_upgrade_restart",
                        config_dict = json_content,
                        hdp_stack_version = self.STACK_VERSION,
                        target = RMFTestCase.TARGET_COMMON_SERVICES)
@@ -117,7 +117,7 @@ class TestKafkaBroker(RMFTestCase):
     self.assertNoMoreResources()
 
   @patch("resource_management.core.shell.call")
-  def test_pre_rolling_restart_23(self, call_mock):
+  def test_pre_upgrade_restart_23(self, call_mock):
     config_file = self.get_src_folder()+"/test/python/stacks/2.2/configs/default.json"
     with open(config_file, "r") as f:
       json_content = json.load(f)
@@ -127,7 +127,7 @@ class TestKafkaBroker(RMFTestCase):
     mocks_dict = {}
     self.executeScript(self.COMMON_SERVICES_PACKAGE_DIR + "/scripts/kafka_broker.py",
                        classname = "KafkaBroker",
-                       command = "pre_rolling_restart",
+                       command = "pre_upgrade_restart",
                        config_dict = json_content,
                        hdp_stack_version = self.STACK_VERSION,
                        target = RMFTestCase.TARGET_COMMON_SERVICES,