|
@@ -20,7 +20,6 @@ Ambari Agent
|
|
|
"""
|
|
|
|
|
|
from resource_management.libraries.script.script import Script
|
|
|
-from resource_management.libraries.resources.hdfs_resource import HdfsResource
|
|
|
from resource_management.libraries.functions import conf_select
|
|
|
from resource_management.libraries.functions import hdp_select
|
|
|
from resource_management.libraries.functions.check_process_status import check_process_status
|
|
@@ -29,13 +28,17 @@ from resource_management.libraries.functions.version import compare_versions, fo
|
|
|
from resource_management.libraries.functions.security_commons import build_expectations, \
|
|
|
cached_kinit_executor, get_params_from_filesystem, validate_security_config_properties, \
|
|
|
FILE_TYPE_XML
|
|
|
+from resource_management.libraries.functions.decorator import retry
|
|
|
from resource_management.core.resources.system import File, Execute
|
|
|
from resource_management.core.source import Template
|
|
|
from resource_management.core.logger import Logger
|
|
|
+from resource_management.core.exceptions import Fail
|
|
|
+from resource_management.libraries.providers.hdfs_resource import WebHDFSUtil
|
|
|
+from resource_management.libraries.providers.hdfs_resource import HdfsResourceProvider
|
|
|
+from resource_management import is_empty
|
|
|
+from resource_management import shell
|
|
|
|
|
|
|
|
|
-
|
|
|
-from install_jars import install_tez_jars
|
|
|
from yarn import yarn
|
|
|
from service import service
|
|
|
from ambari_commons import OSConst
|
|
@@ -113,6 +116,12 @@ class ResourcemanagerDefault(Resourcemanager):
|
|
|
self.configure(env) # FOR SECURITY
|
|
|
if params.has_ranger_admin and params.is_supported_yarn_ranger:
|
|
|
setup_ranger_yarn() #Ranger Yarn Plugin related calls
|
|
|
+
|
|
|
+ # wait for active-dir and done-dir to be created by ATS if needed
|
|
|
+ if params.has_ats:
|
|
|
+ Logger.info("Verifying DFS directories where ATS stores time line data for active and completed applications.")
|
|
|
+ self.wait_for_dfs_directories_created(params.entity_groupfs_store_dir, params.entity_groupfs_active_dir)
|
|
|
+
|
|
|
service('resourcemanager', action='start')
|
|
|
|
|
|
def status(self, env):
|
|
@@ -217,5 +226,53 @@ class ResourcemanagerDefault(Resourcemanager):
|
|
|
pass
|
|
|
|
|
|
|
|
|
+
|
|
|
+
|
|
|
+ def wait_for_dfs_directories_created(self, *dirs):
|
|
|
+ import params
|
|
|
+
|
|
|
+ ignored_dfs_dirs = HdfsResourceProvider.get_ignored_resources_list(params.hdfs_resource_ignore_file)
|
|
|
+
|
|
|
+ if params.security_enabled:
|
|
|
+ Execute(
|
|
|
+ format("{rm_kinit_cmd}")
|
|
|
+ , user=params.yarn_user
|
|
|
+ )
|
|
|
+
|
|
|
+ for dir_path in dirs:
|
|
|
+ self.wait_for_dfs_directory_created(dir_path, ignored_dfs_dirs)
|
|
|
+
|
|
|
+
|
|
|
+ @retry(times=8, sleep_time=20, backoff_factor=1, err_class=Fail)
|
|
|
+ def wait_for_dfs_directory_created(self, dir_path, ignored_dfs_dirs):
|
|
|
+ import params
|
|
|
+
|
|
|
+
|
|
|
+ if not is_empty(dir_path):
|
|
|
+ dir_path = HdfsResourceProvider.parse_path(dir_path)
|
|
|
+
|
|
|
+ if dir_path in ignored_dfs_dirs:
|
|
|
+ Logger.info("Skipping DFS directory '" + dir_path + "' as it's marked to be ignored.")
|
|
|
+ return
|
|
|
+
|
|
|
+ Logger.info("Verifying if DFS directory '" + dir_path + "' exists.")
|
|
|
+
|
|
|
+ dir_exists = None
|
|
|
+
|
|
|
+ if WebHDFSUtil.is_webhdfs_available(params.is_webhdfs_enabled, params.default_fs):
|
|
|
+ # check with webhdfs is much faster than executing hdfs dfs -test
|
|
|
+ util = WebHDFSUtil(params.hdfs_site, params.yarn_user, params.security_enabled)
|
|
|
+ list_status = util.run_command(dir_path, 'GETFILESTATUS', method='GET', ignore_status_codes=['404'], assertable_result=False)
|
|
|
+ dir_exists = ('FileStatus' in list_status)
|
|
|
+ else:
|
|
|
+ # have to do time expensive hdfs dfs -d check.
|
|
|
+ dfs_ret_code = shell.call(format("hdfs --config {hadoop_conf_dir} dfs -test -d " + dir_path), user=params.yarn_user)[0]
|
|
|
+ dir_exists = not dfs_ret_code #dfs -test -d returns 0 in case the dir exists
|
|
|
+
|
|
|
+ if not dir_exists:
|
|
|
+ raise Fail("DFS directory '" + dir_path + "' does not exist !")
|
|
|
+ else:
|
|
|
+ Logger.info("DFS directory '" + dir_path + "' exists.")
|
|
|
+
|
|
|
if __name__ == "__main__":
|
|
|
Resourcemanager().execute()
|