|
@@ -20,7 +20,7 @@ limitations under the License.
|
|
|
import sys
|
|
|
import os
|
|
|
import json
|
|
|
-
|
|
|
+import tempfile
|
|
|
from resource_management import *
|
|
|
from resource_management.libraries.functions.security_commons import build_expectations, \
|
|
|
cached_kinit_executor, get_params_from_filesystem, validate_security_config_properties, \
|
|
@@ -30,6 +30,7 @@ from resource_management.libraries.functions.version import compare_versions, \
|
|
|
from resource_management.libraries.functions.format import format
|
|
|
from resource_management.libraries.functions.check_process_status import check_process_status
|
|
|
from resource_management.core.exceptions import Fail
|
|
|
+from resource_management.libraries.functions import get_klist_path
|
|
|
|
|
|
import namenode_upgrade
|
|
|
from hdfs_namenode import namenode
|
|
@@ -38,6 +39,17 @@ import hdfs_rebalance
|
|
|
from utils import failover_namenode
|
|
|
from setup_ranger_hdfs import setup_ranger_hdfs
|
|
|
|
|
|
+# hashlib is supplied as of Python 2.5 as the replacement interface for md5
|
|
|
+# and other secure hashes. In 2.6, md5 is deprecated. Import hashlib if
|
|
|
+# available, avoiding a deprecation warning under 2.6. Import md5 otherwise,
|
|
|
+# preserving 2.4 compatibility.
|
|
|
+try:
|
|
|
+ import hashlib
|
|
|
+ _md5 = hashlib.md5
|
|
|
+except ImportError:
|
|
|
+ import md5
|
|
|
+ _md5 = md5.new
|
|
|
+
|
|
|
class NameNode(Script):
|
|
|
|
|
|
def get_stack_to_component(self):
|
|
@@ -183,15 +195,34 @@ class NameNode(Script):
|
|
|
threshold = name_node_parameters['threshold']
|
|
|
_print("Starting balancer with threshold = %s\n" % threshold)
|
|
|
|
|
|
+ rebalance_env = {'PATH': params.hadoop_bin_dir}
|
|
|
+
|
|
|
+ if params.security_enabled:
|
|
|
+ # Create the kerberos credentials cache (ccache) file and set it in the environment to use
|
|
|
+ # when executing HDFS rebalance command. Use the md5 hash of the combination of the principal and keytab file
|
|
|
+ # to generate a (relatively) unique cache filename so that we can use it as needed.
|
|
|
+ # TODO: params.tmp_dir=/var/lib/ambari-agent/data/tmp. However hdfs user doesn't have access to this path.
|
|
|
+ # TODO: Hence using /tmp
|
|
|
+ ccache_file_name = "hdfs_rebalance_cc_" + _md5(format("{hdfs_principal_name}|{hdfs_user_keytab}")).hexdigest()
|
|
|
+ ccache_file_path = os.path.join(tempfile.gettempdir(), ccache_file_name)
|
|
|
+ rebalance_env['KRB5CCNAME'] = ccache_file_path
|
|
|
+
|
|
|
+ # If there are no tickets in the cache or they are expired, perform a kinit, else use what
|
|
|
+ # is in the cache
|
|
|
+ klist_cmd = format("{klist_path_local} -s {ccache_file_path}")
|
|
|
+ kinit_cmd = format("{kinit_path_local} -c {ccache_file_path} -kt {hdfs_user_keytab} {hdfs_principal_name}")
|
|
|
+ if os.system(klist_cmd) != 0:
|
|
|
+ Execute(kinit_cmd, user=params.hdfs_user)
|
|
|
+
|
|
|
def calculateCompletePercent(first, current):
|
|
|
return 1.0 - current.bytesLeftToMove/first.bytesLeftToMove
|
|
|
|
|
|
|
|
|
- def startRebalancingProcess(threshold):
|
|
|
+ def startRebalancingProcess(threshold, rebalance_env):
|
|
|
rebalanceCommand = format('hdfs --config {hadoop_conf_dir} balancer -threshold {threshold}')
|
|
|
- return as_user(rebalanceCommand, params.hdfs_user, env={'PATH': params.hadoop_bin_dir})
|
|
|
+ return as_user(rebalanceCommand, params.hdfs_user, env=rebalance_env)
|
|
|
|
|
|
- command = startRebalancingProcess(threshold)
|
|
|
+ command = startRebalancingProcess(threshold, rebalance_env)
|
|
|
|
|
|
basedir = os.path.join(env.config.basedir, 'scripts')
|
|
|
if(threshold == 'DEBUG'): #FIXME TODO remove this on PROD
|
|
@@ -220,6 +251,10 @@ class NameNode(Script):
|
|
|
logoutput = False,
|
|
|
)
|
|
|
|
|
|
+ if params.security_enabled and os.path.exists(ccache_file_path):
|
|
|
+ # Delete the kerberos credentials cache (ccache) file
|
|
|
+ os.remove(ccache_file_path)
|
|
|
+
|
|
|
def _print(line):
|
|
|
sys.stdout.write(line)
|
|
|
sys.stdout.flush()
|