|
@@ -17,26 +17,148 @@ limitations under the License.
|
|
|
|
|
|
"""
|
|
"""
|
|
import re
|
|
import re
|
|
|
|
+import os
|
|
|
|
|
|
from resource_management.core.logger import Logger
|
|
from resource_management.core.logger import Logger
|
|
from resource_management.core.resources.system import Execute
|
|
from resource_management.core.resources.system import Execute
|
|
-from resource_management.libraries.functions.format import format
|
|
|
|
-from resource_management.libraries.functions.default import default
|
|
|
|
from resource_management.core import shell
|
|
from resource_management.core import shell
|
|
-from resource_management.libraries.functions import Direction, SafeMode
|
|
|
|
|
|
+from resource_management.core.shell import as_user
|
|
from resource_management.core.exceptions import Fail
|
|
from resource_management.core.exceptions import Fail
|
|
|
|
+from resource_management.libraries.functions.format import format
|
|
|
|
+from resource_management.libraries.functions import get_unique_id_and_date
|
|
|
|
+from resource_management.libraries.functions import Direction, SafeMode
|
|
|
|
+
|
|
|
|
+from namenode_ha_state import NamenodeHAState
|
|
|
|
|
|
|
|
|
|
safemode_to_instruction = {SafeMode.ON: "enter",
|
|
safemode_to_instruction = {SafeMode.ON: "enter",
|
|
SafeMode.OFF: "leave"}
|
|
SafeMode.OFF: "leave"}
|
|
|
|
|
|
-def reach_safemode_state(user, safemode_state, in_ha):
|
|
|
|
|
|
+
|
|
|
|
+def prepare_upgrade_check_for_previous_dir():
|
|
|
|
+ """
|
|
|
|
+ During a NonRolling (aka Express Upgrade), preparing the NameNode requires backing up some data.
|
|
|
|
+ Check that there is no "previous" folder inside the NameNode Name Dir.
|
|
|
|
+ """
|
|
|
|
+ import params
|
|
|
|
+
|
|
|
|
+ if params.dfs_ha_enabled:
|
|
|
|
+ namenode_ha = NamenodeHAState()
|
|
|
|
+ if namenode_ha.is_active(params.hostname):
|
|
|
|
+ Logger.info("NameNode High Availability is enabled and this is the Active NameNode.")
|
|
|
|
+
|
|
|
|
+ problematic_previous_namenode_dirs = set()
|
|
|
|
+ nn_name_dirs = params.dfs_name_dir.split(',')
|
|
|
|
+ for nn_dir in nn_name_dirs:
|
|
|
|
+ if os.path.isdir(nn_dir):
|
|
|
|
+ # Check for a previous folder, which is not allowed.
|
|
|
|
+ previous_dir = os.path.join(nn_dir, "previous")
|
|
|
|
+ if os.path.isdir(previous_dir):
|
|
|
|
+ problematic_previous_namenode_dirs.add(previous_dir)
|
|
|
|
+
|
|
|
|
+ if len(problematic_previous_namenode_dirs) > 0:
|
|
|
|
+ message = 'WARNING. The following NameNode Name Dir(s) have a "previous" folder from an older version.\n' \
|
|
|
|
+ 'Please back it up first, and then delete it, OR Finalize (E.g., "hdfs dfsadmin -finalizeUpgrade").\n' \
|
|
|
|
+ 'NameNode Name Dir(s): {0}\n' \
|
|
|
|
+ '***** Then, retry this step. *****'.format(", ".join(problematic_previous_namenode_dirs))
|
|
|
|
+ Logger.error(message)
|
|
|
|
+ raise Fail(message)
|
|
|
|
+
|
|
|
|
+def prepare_upgrade_enter_safe_mode(hdfs_binary):
|
|
|
|
+ """
|
|
|
|
+ During a NonRolling (aka Express Upgrade), preparing the NameNode requires first entering Safemode.
|
|
|
|
+ :param hdfs_binary: name/path of the HDFS binary to use
|
|
|
|
+ """
|
|
|
|
+ import params
|
|
|
|
+
|
|
|
|
+ safe_mode_enter_cmd = format("{hdfs_binary} dfsadmin -safemode enter")
|
|
|
|
+ safe_mode_enter_and_check_for_on = format("{safe_mode_enter_cmd} | grep 'Safe mode is ON'")
|
|
|
|
+ try:
|
|
|
|
+ # Safe to call if already in Safe Mode
|
|
|
|
+ Logger.info("Enter SafeMode if not already in it.")
|
|
|
|
+ as_user(safe_mode_enter_and_check_for_on, params.hdfs_user, env={'PATH': params.hadoop_bin_dir})
|
|
|
|
+ except Exception, e:
|
|
|
|
+ message = format("Could not enter safemode. As the HDFS user, call this command: {safe_mode_enter_cmd}")
|
|
|
|
+ Logger.error(message)
|
|
|
|
+ raise Fail(message)
|
|
|
|
+
|
|
|
|
+def prepare_upgrade_save_namespace(hdfs_binary):
|
|
|
|
+ """
|
|
|
|
+ During a NonRolling (aka Express Upgrade), preparing the NameNode requires saving the namespace.
|
|
|
|
+ :param hdfs_binary: name/path of the HDFS binary to use
|
|
|
|
+ """
|
|
|
|
+ import params
|
|
|
|
+
|
|
|
|
+ save_namespace_cmd = format("{hdfs_binary} dfsadmin -saveNamespace")
|
|
|
|
+ try:
|
|
|
|
+ Logger.info("Checkpoint the current namespace.")
|
|
|
|
+ as_user(save_namespace_cmd, params.hdfs_user, env={'PATH': params.hadoop_bin_dir})
|
|
|
|
+ except Exception, e:
|
|
|
|
+ message = format("Could save the NameSpace. As the HDFS user, call this command: {save_namespace_cmd}")
|
|
|
|
+ Logger.error(message)
|
|
|
|
+ raise Fail(message)
|
|
|
|
+
|
|
|
|
+def prepare_upgrade_backup_namenode_dir():
|
|
|
|
+ """
|
|
|
|
+ During a NonRolling (aka Express Upgrade), preparing the NameNode requires backing up the NameNode Name Dirs.
|
|
|
|
+ """
|
|
|
|
+ import params
|
|
|
|
+
|
|
|
|
+ i = 0
|
|
|
|
+ failed_paths = []
|
|
|
|
+ nn_name_dirs = params.dfs_name_dir.split(',')
|
|
|
|
+ backup_destination_root_dir = "/tmp/upgrades/{0}".format(params.stack_version_unformatted)
|
|
|
|
+ if len(nn_name_dirs) > 0:
|
|
|
|
+ Logger.info("Backup the NameNode name directory's CURRENT folder.")
|
|
|
|
+ for nn_dir in nn_name_dirs:
|
|
|
|
+ i += 1
|
|
|
|
+ namenode_current_image = os.path.join(nn_dir, "current")
|
|
|
|
+ unique = get_unique_id_and_date() + "_" + str(i)
|
|
|
|
+ # Note that /tmp may not be writeable.
|
|
|
|
+ backup_current_folder = "{0}/namenode_{1}/".format(backup_destination_root_dir, unique)
|
|
|
|
+
|
|
|
|
+ if os.path.isdir(namenode_current_image) and not os.path.isdir(backup_current_folder):
|
|
|
|
+ try:
|
|
|
|
+ os.makedirs(backup_current_folder)
|
|
|
|
+ Execute(('cp', '-ar', namenode_current_image, backup_current_folder),
|
|
|
|
+ sudo=True
|
|
|
|
+ )
|
|
|
|
+ except Exception, e:
|
|
|
|
+ failed_paths.append(namenode_current_image)
|
|
|
|
+ if len(failed_paths) > 0:
|
|
|
|
+ Logger.error("Could not backup the NameNode Name Dir(s) to {0}, make sure that the destination path is "
|
|
|
|
+ "writeable and copy the directories on your own. Directories: {1}".format(backup_destination_root_dir,
|
|
|
|
+ ", ".join(failed_paths)))
|
|
|
|
+
|
|
|
|
+def prepare_upgrade_finalize_previous_upgrades(hdfs_binary):
|
|
|
|
+ """
|
|
|
|
+ During a NonRolling (aka Express Upgrade), preparing the NameNode requires Finalizing any upgrades that are in progress.
|
|
|
|
+ :param hdfs_binary: name/path of the HDFS binary to use
|
|
|
|
+ """
|
|
|
|
+ import params
|
|
|
|
+
|
|
|
|
+ finalize_command = format("{hdfs_binary} dfsadmin -rollingUpgrade finalize")
|
|
|
|
+ try:
|
|
|
|
+ Logger.info("Attempt to Finalize if there are any in-progress upgrades. "
|
|
|
|
+ "This will return 255 if no upgrades are in progress.")
|
|
|
|
+ code, out = shell.checked_call(finalize_command, logoutput=True, user=params.hdfs_user)
|
|
|
|
+ if out:
|
|
|
|
+ expected_substring = "there is no rolling upgrade in progress"
|
|
|
|
+ if expected_substring not in out.lower():
|
|
|
|
+ Logger.warning('Finalize command did not contain substring: %s' % expected_substring)
|
|
|
|
+ else:
|
|
|
|
+ Logger.warning("Finalize command did not return any output.")
|
|
|
|
+ except Exception, e:
|
|
|
|
+ Logger.warning("Ensure no upgrades are in progress.")
|
|
|
|
+
|
|
|
|
+def reach_safemode_state(user, safemode_state, in_ha, hdfs_binary):
|
|
"""
|
|
"""
|
|
Enter or leave safemode for the Namenode.
|
|
Enter or leave safemode for the Namenode.
|
|
- @param user: user to perform action as
|
|
|
|
- @param safemode_state: Desired state of ON or OFF
|
|
|
|
- @param in_ha: bool indicating if Namenode High Availability is enabled
|
|
|
|
- @:return Returns a tuple of (transition success, original state). If no change is needed, the indicator of
|
|
|
|
|
|
+ :param user: user to perform action as
|
|
|
|
+ :param safemode_state: Desired state of ON or OFF
|
|
|
|
+ :param in_ha: bool indicating if Namenode High Availability is enabled
|
|
|
|
+ :param hdfs_binary: name/path of the HDFS binary to use
|
|
|
|
+ :return: Returns a tuple of (transition success, original state). If no change is needed, the indicator of
|
|
success will be True
|
|
success will be True
|
|
"""
|
|
"""
|
|
Logger.info("Prepare to transition into safemode state %s" % safemode_state)
|
|
Logger.info("Prepare to transition into safemode state %s" % safemode_state)
|
|
@@ -44,7 +166,7 @@ def reach_safemode_state(user, safemode_state, in_ha):
|
|
original_state = SafeMode.UNKNOWN
|
|
original_state = SafeMode.UNKNOWN
|
|
|
|
|
|
hostname = params.hostname
|
|
hostname = params.hostname
|
|
- safemode_check = format("hdfs dfsadmin -safemode get")
|
|
|
|
|
|
+ safemode_check = format("{hdfs_binary} dfsadmin -safemode get")
|
|
|
|
|
|
grep_pattern = format("Safe mode is {safemode_state} in {hostname}") if in_ha else format("Safe mode is {safemode_state}")
|
|
grep_pattern = format("Safe mode is {safemode_state} in {hostname}") if in_ha else format("Safe mode is {safemode_state}")
|
|
safemode_check_with_grep = format("hdfs dfsadmin -safemode get | grep '{grep_pattern}'")
|
|
safemode_check_with_grep = format("hdfs dfsadmin -safemode get | grep '{grep_pattern}'")
|
|
@@ -61,7 +183,7 @@ def reach_safemode_state(user, safemode_state, in_ha):
|
|
return (True, original_state)
|
|
return (True, original_state)
|
|
else:
|
|
else:
|
|
# Make a transition
|
|
# Make a transition
|
|
- command = "hdfs dfsadmin -safemode %s" % (safemode_to_instruction[safemode_state])
|
|
|
|
|
|
+ command = "{0} dfsadmin -safemode {1}".format(hdfs_binary, safemode_to_instruction[safemode_state])
|
|
Execute(command,
|
|
Execute(command,
|
|
user=user,
|
|
user=user,
|
|
logoutput=True,
|
|
logoutput=True,
|
|
@@ -74,7 +196,7 @@ def reach_safemode_state(user, safemode_state, in_ha):
|
|
return (False, original_state)
|
|
return (False, original_state)
|
|
|
|
|
|
|
|
|
|
-def prepare_rolling_upgrade():
|
|
|
|
|
|
+def prepare_rolling_upgrade(hdfs_binary):
|
|
"""
|
|
"""
|
|
Perform either an upgrade or a downgrade.
|
|
Perform either an upgrade or a downgrade.
|
|
|
|
|
|
@@ -83,6 +205,7 @@ def prepare_rolling_upgrade():
|
|
1. Leave safemode if the safemode status is not OFF
|
|
1. Leave safemode if the safemode status is not OFF
|
|
2. Execute a rolling upgrade "prepare"
|
|
2. Execute a rolling upgrade "prepare"
|
|
3. Execute a rolling upgrade "query"
|
|
3. Execute a rolling upgrade "query"
|
|
|
|
+ :param hdfs_binary: name/path of the HDFS binary to use
|
|
"""
|
|
"""
|
|
import params
|
|
import params
|
|
|
|
|
|
@@ -96,12 +219,12 @@ def prepare_rolling_upgrade():
|
|
|
|
|
|
|
|
|
|
if params.upgrade_direction == Direction.UPGRADE:
|
|
if params.upgrade_direction == Direction.UPGRADE:
|
|
- safemode_transition_successful, original_state = reach_safemode_state(params.hdfs_user, SafeMode.OFF, True)
|
|
|
|
|
|
+ safemode_transition_successful, original_state = reach_safemode_state(params.hdfs_user, SafeMode.OFF, True, hdfs_binary)
|
|
if not safemode_transition_successful:
|
|
if not safemode_transition_successful:
|
|
raise Fail("Could not transition to safemode state %s. Please check logs to make sure namenode is up." % str(SafeMode.OFF))
|
|
raise Fail("Could not transition to safemode state %s. Please check logs to make sure namenode is up." % str(SafeMode.OFF))
|
|
|
|
|
|
- prepare = "hdfs dfsadmin -rollingUpgrade prepare"
|
|
|
|
- query = "hdfs dfsadmin -rollingUpgrade query"
|
|
|
|
|
|
+ prepare = format("{hdfs_binary} dfsadmin -rollingUpgrade prepare")
|
|
|
|
+ query = format("{hdfs_binary} dfsadmin -rollingUpgrade query")
|
|
Execute(prepare,
|
|
Execute(prepare,
|
|
user=params.hdfs_user,
|
|
user=params.hdfs_user,
|
|
logoutput=True)
|
|
logoutput=True)
|
|
@@ -111,9 +234,11 @@ def prepare_rolling_upgrade():
|
|
elif params.upgrade_direction == Direction.DOWNGRADE:
|
|
elif params.upgrade_direction == Direction.DOWNGRADE:
|
|
pass
|
|
pass
|
|
|
|
|
|
-def finalize_rolling_upgrade():
|
|
|
|
|
|
+def finalize_upgrade(upgrade_type, hdfs_binary):
|
|
"""
|
|
"""
|
|
Finalize the Namenode upgrade, at which point it cannot be downgraded.
|
|
Finalize the Namenode upgrade, at which point it cannot be downgraded.
|
|
|
|
+ :param upgrade_type rolling or nonrolling
|
|
|
|
+ :param hdfs_binary: name/path of the HDFS binary to use
|
|
"""
|
|
"""
|
|
Logger.info("Executing Rolling Upgrade finalize")
|
|
Logger.info("Executing Rolling Upgrade finalize")
|
|
import params
|
|
import params
|
|
@@ -122,8 +247,15 @@ def finalize_rolling_upgrade():
|
|
kinit_command = format("{params.kinit_path_local} -kt {params.hdfs_user_keytab} {params.hdfs_principal_name}")
|
|
kinit_command = format("{params.kinit_path_local} -kt {params.hdfs_user_keytab} {params.hdfs_principal_name}")
|
|
Execute(kinit_command, user=params.hdfs_user, logoutput=True)
|
|
Execute(kinit_command, user=params.hdfs_user, logoutput=True)
|
|
|
|
|
|
- finalize_cmd = "hdfs dfsadmin -rollingUpgrade finalize"
|
|
|
|
- query_cmd = "hdfs dfsadmin -rollingUpgrade query"
|
|
|
|
|
|
+ finalize_cmd = ""
|
|
|
|
+ query_cmd = ""
|
|
|
|
+ if upgrade_type == "rolling":
|
|
|
|
+ finalize_cmd = format("{hdfs_binary} dfsadmin -rollingUpgrade finalize")
|
|
|
|
+ query_cmd = format("{hdfs_binary} dfsadmin -rollingUpgrade query")
|
|
|
|
+
|
|
|
|
+ elif upgrade_type == "nonrolling":
|
|
|
|
+ finalize_cmd = format("{hdfs_binary} dfsadmin -finalizeUpgrade")
|
|
|
|
+ query_cmd = format("{hdfs_binary} dfsadmin -rollingUpgrade query")
|
|
|
|
|
|
Execute(query_cmd,
|
|
Execute(query_cmd,
|
|
user=params.hdfs_user,
|
|
user=params.hdfs_user,
|