Browse Source

AMBARI-8917. Rolling Upgrade - prepare function to copy tarballs based on new HDP version (alejandro)

Alejandro Fernandez 10 years ago
parent
commit
e6b4e2fbbc

+ 3 - 2
ambari-common/src/main/python/resource_management/libraries/functions/dynamic_variable_interpretation.py

@@ -114,9 +114,10 @@ def _copy_files(source_and_dest_pairs, file_owner, group_owner, kinit_if_needed)
   return return_value
 
 
-def copy_tarballs_to_hdfs(tarball_prefix, component_user, file_owner, group_owner):
+def copy_tarballs_to_hdfs(tarball_prefix, hdp_select_component_name, component_user, file_owner, group_owner):
   """
   :param tarball_prefix: Prefix of the tarball must be one of tez, hive, mr, pig
+  :param hdp_select_component_name: Component name to get the status to determine the version
   :param component_user: User that will execute the Hadoop commands
   :param file_owner: Owner of the files copied to HDFS (typically hdfs account)
   :param group_owner: Group owner of the files copied to HDFS (typically hadoop group)
@@ -145,7 +146,7 @@ def copy_tarballs_to_hdfs(tarball_prefix, component_user, file_owner, group_owne
   tmpfile = tempfile.NamedTemporaryFile()
   out = None
   with open(tmpfile.name, 'r+') as file:
-    get_hdp_version_cmd = '/usr/bin/hdp-select status > %s' % tmpfile.name
+    get_hdp_version_cmd = '/usr/bin/hdp-select status %s > %s' % (hdp_select_component_name, tmpfile.name)
     code, stdoutdata = shell.call(get_hdp_version_cmd)
     out = file.read()
   pass

+ 1 - 1
ambari-server/src/main/python/ambari-server.py

@@ -1506,7 +1506,7 @@ def check_database_name_property(args, upgrade=False):
     return -1
 
   version = get_ambari_version(properties)
-  if upgrade and compare_versions(version, "1.7.0") >= 0:
+  if upgrade and compare_versions(version, "1.7.0") == 0:
 
     # This code exists for historic reasons in which property names changed from Ambari 1.6.1 to 1.7.0
     persistence_type = properties[PERSISTENCE_TYPE_PROPERTY]

+ 17 - 15
ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/namenode_upgrade.py

@@ -23,6 +23,7 @@ from resource_management.core.resources.system import Execute
 from resource_management.libraries.functions.format import format
 from resource_management.core.shell import call
 from resource_management.core.exceptions import Fail
+from resource_management.libraries.functions.decorator import retry
 
 
 class SAFEMODE:
@@ -33,28 +34,29 @@ class SAFEMODE:
 safemode_to_instruction = {SAFEMODE.ON: "enter",
                            SAFEMODE.OFF: "leave"}
 
-
-def reach_safemode_state(secure_user, safemode_state, in_ha):
+@retry(times=3, sleep_time=6, err_class=Fail)
+def reach_safemode_state(user, safemode_state, in_ha):
   """
   Enter or leave safemode for the Namenode.
-  @param secure_user: user to perform action as
+  @param user: user to perform action as
   @param safemode_state: ON or OFF
   @param in_ha: bool indicating if Namenode High Availability is enabled
   @:return True if successful, false otherwise.
   """
-  Logger.info("Prepare to leave safemode")
+  Logger.info("Prepare to transition into safemode state %s" % safemode_state)
   import params
 
   hostname = params.hostname
   grep = format("Safe mode is {safemode_state} in {hostname}") if in_ha else format("Safe mode is {safemode_state}")
-  safemode_check = format("su - {secure_user} -c 'hdfs dfsadmin -safemode get | grep \"{grep}\"'")
+  safemode_check = format("su - {user} -c 'hdfs dfsadmin -safemode get | grep \"{grep}\"'")
   code, out = call(safemode_check)
   Logger.info("Command: %s\nCode: %d." % (safemode_check, code))
   if code != 0:
     command = "hdfs dfsadmin -safemode %s" % (safemode_to_instruction[safemode_state])
     Execute(command,
-            user=secure_user,
-            logoutput=True)
+            user=user,
+            logoutput=True,
+            path=[params.hadoop_bin_dir])
 
     code, out = call(safemode_check)
     Logger.info("Command: %s\nCode: %d. Out: %s" % (safemode_check, code, out))
@@ -73,23 +75,23 @@ def prepare_rolling_upgrade():
   Logger.info("Executing Rolling Upgrade prepare")
   import params
 
-  secure_user = params.hdfs_principal_name if params.security_enabled else params.hdfs_user
+  user = params.hdfs_principal_name if params.security_enabled else params.hdfs_user
 
   if params.security_enabled:
     Execute(format("{kinit_path_local} -kt {hdfs_user_keytab} {hdfs_principal_name}"),
-            user=secure_user)
+            user=user)
 
-  safemode_transition_successful = reach_safemode_state(secure_user, SAFEMODE.OFF, True)
+  safemode_transition_successful = reach_safemode_state(user, SAFEMODE.OFF, True)
   if not safemode_transition_successful:
     raise Fail("Could leave safemode")
 
   prepare = "hdfs dfsadmin -rollingUpgrade prepare"
   query = "hdfs dfsadmin -rollingUpgrade query"
   Execute(prepare,
-          user=secure_user,
+          user=user,
           logoutput=True)
   Execute(query,
-          user=secure_user,
+          user=user,
           logoutput=True)
 
 
@@ -100,12 +102,12 @@ def finalize_rolling_upgrade():
   Logger.info("Executing Rolling Upgrade finalize")
   import params
 
-  secure_user = params.hdfs_principal_name if params.security_enabled else params.hdfs_user
+  user = params.hdfs_principal_name if params.security_enabled else params.hdfs_user
   finalize_cmd = "hdfs dfsadmin -rollingUpgrade finalize"
   Execute(finalize_cmd,
-          user=secure_user,
+          user=user,
           logoutput=True)
 
-  safemode_transition_successful = reach_safemode_state(secure_user, SAFEMODE.OFF, True)
+  safemode_transition_successful = reach_safemode_state(user, SAFEMODE.OFF, True)
   if not safemode_transition_successful:
     Logger.warning("Could leave safemode")

+ 4 - 2
ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/scripts/hive_server.py

@@ -53,8 +53,8 @@ class HiveServer(Script):
     self.configure(env) # FOR SECURITY
 
     # This function is needed in HDP 2.2, but it is safe to call in earlier versions.
-    copy_tarballs_to_hdfs('mapreduce', params.tez_user, params.hdfs_user, params.user_group)
-    copy_tarballs_to_hdfs('tez', params.tez_user, params.hdfs_user, params.user_group)
+    copy_tarballs_to_hdfs('mapreduce', 'hive-server2', params.tez_user, params.hdfs_user, params.user_group)
+    copy_tarballs_to_hdfs('tez', 'hive-server2', params.tez_user, params.hdfs_user, params.user_group)
 
     hive_service( 'hiveserver2', action = 'start',
       rolling_restart=rolling_restart )
@@ -88,6 +88,8 @@ class HiveServer(Script):
 
     if params.version and compare_versions(format_hdp_stack_version(params.version), '2.2.0.0') >= 0:
       Execute(format("hdp-select set hive-server2 {version}"))
+      copy_tarballs_to_hdfs('mapreduce', 'hive-server2', params.tez_user, params.hdfs_user, params.user_group)
+      copy_tarballs_to_hdfs('tez', 'hive-server2', params.tez_user, params.hdfs_user, params.user_group)
 
 
   def security_status(self, env):

+ 4 - 4
ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/scripts/webhcat.py

@@ -81,10 +81,10 @@ def webhcat():
 
   # TODO, these checks that are specific to HDP 2.2 and greater should really be in a script specific to that stack.
   if params.hdp_stack_version != "" and compare_versions(params.hdp_stack_version, "2.2.0.0") >= 0:
-    copy_tarballs_to_hdfs('hive', params.webhcat_user, params.hdfs_user, params.user_group)
-    copy_tarballs_to_hdfs('pig', params.webhcat_user, params.hdfs_user, params.user_group)
-    copy_tarballs_to_hdfs('hadoop-streaming', params.webhcat_user, params.hdfs_user, params.user_group)
-    copy_tarballs_to_hdfs('sqoop', params.webhcat_user, params.hdfs_user, params.user_group)
+    copy_tarballs_to_hdfs('hive', 'hive-webhcat', params.webhcat_user, params.hdfs_user, params.user_group)
+    copy_tarballs_to_hdfs('pig', 'hive-webhcat', params.webhcat_user, params.hdfs_user, params.user_group)
+    copy_tarballs_to_hdfs('hadoop-streaming', 'hive-webhcat', params.webhcat_user, params.hdfs_user, params.user_group)
+    copy_tarballs_to_hdfs('sqoop', 'hive-webhcat', params.webhcat_user, params.hdfs_user, params.user_group)
   else:
     CopyFromLocal(params.hadoop_streeming_jars,
                   owner=params.webhcat_user,

+ 1 - 1
ambari-server/src/main/resources/common-services/PIG/0.12.0.2.0/package/scripts/service_check.py

@@ -81,7 +81,7 @@ class PigServiceCheck(Script):
       )
 
       # Check for Pig-on-Tez
-      copy_tarballs_to_hdfs('tez', params.smokeuser, params.hdfs_user, params.user_group)
+      copy_tarballs_to_hdfs('tez', 'hadoop-client', params.smokeuser, params.hdfs_user, params.user_group)
 
       if params.security_enabled:
         kinit_cmd = format("{kinit_path_local} -kt {smoke_user_keytab} {smokeuser};")

+ 2 - 2
ambari-server/src/main/resources/common-services/YARN/2.1.0.2.0/package/scripts/historyserver.py

@@ -50,13 +50,13 @@ class HistoryServer(Script):
 
     if params.version and compare_versions(format_hdp_stack_version(params.version), '2.2.0.0') >= 0:
       Execute(format("hdp-select set hadoop-mapreduce-historyserver {version}"))
-      copy_tarballs_to_hdfs('mapreduce', params.mapred_user, params.hdfs_user, params.user_group)
+      copy_tarballs_to_hdfs('mapreduce', 'hadoop-mapreduce-historyserver', params.mapred_user, params.hdfs_user, params.user_group)
 
   def start(self, env, rolling_restart=False):
     import params
     env.set_params(params)
     self.configure(env) # FOR SECURITY
-    copy_tarballs_to_hdfs('mapreduce', params.mapred_user, params.hdfs_user, params.user_group)
+    copy_tarballs_to_hdfs('mapreduce', 'hadoop-mapreduce-historyserver', params.mapred_user, params.hdfs_user, params.user_group)
     service('historyserver', action='start', serviceName='mapreduce')
 
     self.save_component_version_to_structured_out(params.stack_name)

+ 2 - 2
ambari-server/src/test/python/TestAmbariServer.py

@@ -3434,7 +3434,7 @@ MIIFHjCCAwYCCQDpHKOBI+Lt0zANBgkqhkiG9w0BAQUFADBRMQswCQYDVQQGEwJV
     p = MagicMock()
     get_ambari_properties_mock.reset_mock()
     get_ambari_properties_mock.return_value = p
-    p.__getitem__.side_effect = ["something", "something", KeyError("test exception")]
+    p.__getitem__.side_effect = ["something", KeyError("test exception")]
     fail = False
 
     try:
@@ -3445,7 +3445,7 @@ MIIFHjCCAwYCCQDpHKOBI+Lt0zANBgkqhkiG9w0BAQUFADBRMQswCQYDVQQGEwJV
 
     # test if some drivers are available in resources, and symlink available too
     p.reset_mock()
-    p.__getitem__.side_effect = ["something", "something", "resources"]
+    p.__getitem__.side_effect = ["something", "resources"]
     lexists_mock.return_value = True
     isfile_mock.side_effect = [True, False, False]
     ambari_server.upgrade(args)

+ 3 - 0
ambari-server/src/test/python/stacks/2.0.6/HIVE/test_hive_server.py

@@ -22,6 +22,7 @@ import subprocess
 
 from mock.mock import MagicMock, patch
 from resource_management.core import shell
+from resource_management.libraries.functions import dynamic_variable_interpretation
 from stacks.utils.RMFTestCase import *
 
 class TestHiveServer(RMFTestCase):
@@ -43,6 +44,7 @@ class TestHiveServer(RMFTestCase):
   @patch.object(shell, "call", new=MagicMock(return_value=(0, '')))
   @patch.object(subprocess,"Popen")
   @patch("socket.socket")
+  @patch.object(dynamic_variable_interpretation, "copy_tarballs_to_hdfs", new=MagicMock())
   def test_start_default(self, socket_mock, popen_mock):
     s = socket_mock.return_value
     
@@ -75,6 +77,7 @@ class TestHiveServer(RMFTestCase):
     self.assertTrue(s.close.called)
 
   @patch("socket.socket")
+  @patch.object(dynamic_variable_interpretation, "copy_tarballs_to_hdfs", new=MagicMock())
   def test_stop_default(self, socket_mock):
     self.executeScript(self.COMMON_SERVICES_PACKAGE_DIR + "/scripts/hive_server.py",
                        classname = "HiveServer",